// Copyright 2017 Google Inc. All rights reserved. // // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file or at // https://developers.google.com/open-source/licenses/bsd #include "packager/media/chunking/chunking_handler.h" #include #include "packager/base/logging.h" #include "packager/base/threading/platform_thread.h" #include "packager/media/base/media_sample.h" namespace shaka { namespace media { namespace { int64_t kThreadIdUnset = -1; } // namespace ChunkingHandler::ChunkingHandler(const ChunkingParams& chunking_params) : chunking_params_(chunking_params), thread_id_(kThreadIdUnset), media_sample_comparator_(this), cached_media_sample_stream_data_(media_sample_comparator_) { CHECK_NE(chunking_params.segment_duration_in_seconds, 0u); } ChunkingHandler::~ChunkingHandler() {} Status ChunkingHandler::InitializeInternal() { segment_info_.resize(num_input_streams()); subsegment_info_.resize(num_input_streams()); time_scales_.resize(num_input_streams()); last_sample_end_timestamps_.resize(num_input_streams()); num_cached_samples_.resize(num_input_streams()); return Status::OK; } Status ChunkingHandler::Process(std::unique_ptr stream_data) { switch (stream_data->stream_data_type) { case StreamDataType::kStreamInfo: return OnStreamInfo(stream_data->stream_index, stream_data->stream_info); case StreamDataType::kScte35Event: return OnScte35Event(stream_data->stream_index, stream_data->scte35_event); case StreamDataType::kSegmentInfo: VLOG(3) << "Droppping existing segment info."; return Status::OK; case StreamDataType::kMediaSample: return OnMediaSample(std::move(stream_data)); default: VLOG(3) << "Stream data type " << static_cast(stream_data->stream_data_type) << " ignored."; return Dispatch(std::move(stream_data)); } } Status ChunkingHandler::OnFlushRequest(size_t input_stream_index) { // Process all cached samples. while (!cached_media_sample_stream_data_.empty()) { Status status = ProcessMediaSampleStreamData(*cached_media_sample_stream_data_.top()); if (!status.ok()) return status; --num_cached_samples_[cached_media_sample_stream_data_.top()->stream_index]; cached_media_sample_stream_data_.pop(); } if (segment_info_[input_stream_index]) { auto& segment_info = segment_info_[input_stream_index]; if (segment_info->start_timestamp != -1) { segment_info->duration = last_sample_end_timestamps_[input_stream_index] - segment_info->start_timestamp; Status status = DispatchSegmentInfo(input_stream_index, std::move(segment_info)); if (!status.ok()) return status; } } const size_t output_stream_index = input_stream_index; return FlushDownstream(output_stream_index); } Status ChunkingHandler::OnStreamInfo(uint64_t stream_index, std::shared_ptr info) { // Make sure the inputs come from the same thread. const int64_t thread_id = static_cast(base::PlatformThread::CurrentId()); int64_t expected = kThreadIdUnset; if (!thread_id_.compare_exchange_strong(expected, thread_id) && expected != thread_id) { return Status(error::CHUNKING_ERROR, "Inputs should come from the same thread."); } const auto time_scale = info->time_scale(); time_scales_[stream_index] = time_scale; // The video stream is treated as the main stream. If there is only one // stream, it is the main stream. const bool is_main_stream = main_stream_index_ == kInvalidStreamIndex && (info->stream_type() == kStreamVideo || num_input_streams() == 1); if (is_main_stream) { main_stream_index_ = stream_index; segment_duration_ = chunking_params_.segment_duration_in_seconds * time_scale; subsegment_duration_ = chunking_params_.subsegment_duration_in_seconds * time_scale; } else if (info->stream_type() == kStreamVideo) { return Status(error::CHUNKING_ERROR, "Only one video stream is allowed per chunking handler."); } return DispatchStreamInfo(stream_index, std::move(info)); } Status ChunkingHandler::OnScte35Event( uint64_t stream_index, std::shared_ptr event) { if (stream_index == main_stream_index_) { scte35_events_.push(std::move(event)); } else { VLOG(3) << "Dropping scte35 event from non main stream."; } return Status::OK; } Status ChunkingHandler::OnMediaSample(std::unique_ptr stream_data) { DCHECK_EQ(StreamDataType::kMediaSample, stream_data->stream_data_type); const size_t stream_index = stream_data->stream_index; DCHECK_NE(time_scales_[stream_index], 0u) << "kStreamInfo should arrive before kMediaSample"; if (stream_index != main_stream_index_ && !stream_data->media_sample->is_key_frame()) { return Status(error::CHUNKING_ERROR, "All non video samples should be key frames."); } // The streams are expected to be roughly synchronized, so we don't expect // to see a lot of samples from one stream but no samples from another // stream. // The value is kind of arbitrary here. For a 24fps video, it is ~40s. const size_t kMaxCachedSamplesPerStream = 1000u; if (num_cached_samples_[stream_index] >= kMaxCachedSamplesPerStream) { LOG(ERROR) << "Streams are not synchronized:"; for (size_t i = 0; i < num_cached_samples_.size(); ++i) LOG(ERROR) << " [Stream " << i << "] " << num_cached_samples_[i]; return Status(error::CHUNKING_ERROR, "Streams are not synchronized."); } cached_media_sample_stream_data_.push(std::move(stream_data)); ++num_cached_samples_[stream_index]; // If we have cached samples from every stream, the first sample in // |cached_media_samples_stream_data_| is guaranteed to be the earliest // sample. Extract and process that sample. if (std::all_of(num_cached_samples_.begin(), num_cached_samples_.end(), [](size_t num_samples) { return num_samples > 0; })) { while (true) { const size_t top_stream_index = cached_media_sample_stream_data_.top()->stream_index; Status status = ProcessMediaSampleStreamData(*cached_media_sample_stream_data_.top()); if (!status.ok()) return status; cached_media_sample_stream_data_.pop(); if (--num_cached_samples_[top_stream_index] == 0) break; } } return Status::OK; } Status ChunkingHandler::ProcessMainMediaSample(const MediaSample* sample) { const bool is_key_frame = sample->is_key_frame(); const int64_t timestamp = sample->dts(); const int64_t time_scale = time_scales_[main_stream_index_]; const double dts_in_seconds = static_cast(sample->dts()) / time_scale; // Check if we need to terminate the current (sub)segment. bool new_segment = false; bool new_subsegment = false; std::shared_ptr cue_event; if (is_key_frame || !chunking_params_.segment_sap_aligned) { const int64_t segment_index = timestamp / segment_duration_; if (segment_index != current_segment_index_) { current_segment_index_ = segment_index; // Reset subsegment index. current_subsegment_index_ = 0; new_segment = true; } // We use 'while' instead of 'if' to make sure to pop off multiple SCTE35 // events that may be very close to each other. while (!scte35_events_.empty() && scte35_events_.top()->start_time_in_seconds <= dts_in_seconds) { // For simplicity, don't change |current_segment_index_|. current_subsegment_index_ = 0; new_segment = true; cue_event = std::make_shared(); cue_event->time_in_seconds = static_cast(sample->pts()) / time_scale; cue_event->cue_data = scte35_events_.top()->cue_data; LOG(INFO) << "Chunked at " << dts_in_seconds << " seconds for Ad Cue."; scte35_events_.pop(); } } if (!new_segment && subsegment_duration_ > 0 && (is_key_frame || !chunking_params_.subsegment_sap_aligned)) { const int64_t subsegment_index = (timestamp - segment_info_[main_stream_index_]->start_timestamp) / subsegment_duration_; if (subsegment_index != current_subsegment_index_) { current_subsegment_index_ = subsegment_index; new_subsegment = true; } } Status status; if (new_segment) { status.Update(DispatchSegmentInfoForAllStreams()); segment_info_[main_stream_index_]->start_timestamp = timestamp; if (cue_event) status.Update(DispatchCueEventForAllStreams(std::move(cue_event))); } if (subsegment_duration_ > 0 && (new_segment || new_subsegment)) { status.Update(DispatchSubsegmentInfoForAllStreams()); subsegment_info_[main_stream_index_]->start_timestamp = timestamp; } return status; } Status ChunkingHandler::ProcessMediaSampleStreamData( const StreamData& media_sample_stream_data) { const size_t stream_index = media_sample_stream_data.stream_index; const auto sample = std::move(media_sample_stream_data.media_sample); if (stream_index == main_stream_index_) { Status status = ProcessMainMediaSample(sample.get()); if (!status.ok()) return status; } VLOG(3) << "Stream index: " << stream_index << " " << "Sample ts: " << sample->dts() << " " << " duration: " << sample->duration() << " scale: " << time_scales_[stream_index] << "\n" << " scale: " << time_scales_[main_stream_index_] << (segment_info_[stream_index] ? " dispatch " : " discard "); // Discard samples before segment start. If the segment has started, // |segment_info_[stream_index]| won't be null. if (!segment_info_[stream_index]) return Status::OK; if (segment_info_[stream_index]->start_timestamp == -1) segment_info_[stream_index]->start_timestamp = sample->dts(); if (subsegment_info_[stream_index] && subsegment_info_[stream_index]->start_timestamp == -1) { subsegment_info_[stream_index]->start_timestamp = sample->dts(); } last_sample_end_timestamps_[stream_index] = sample->dts() + sample->duration(); return DispatchMediaSample(stream_index, std::move(sample)); } Status ChunkingHandler::DispatchSegmentInfoForAllStreams() { Status status; for (size_t i = 0; i < segment_info_.size() && status.ok(); ++i) { if (segment_info_[i] && segment_info_[i]->start_timestamp != -1) { segment_info_[i]->duration = last_sample_end_timestamps_[i] - segment_info_[i]->start_timestamp; status.Update(DispatchSegmentInfo(i, std::move(segment_info_[i]))); } segment_info_[i].reset(new SegmentInfo); subsegment_info_[i].reset(); } return status; } Status ChunkingHandler::DispatchSubsegmentInfoForAllStreams() { Status status; for (size_t i = 0; i < subsegment_info_.size() && status.ok(); ++i) { if (subsegment_info_[i] && subsegment_info_[i]->start_timestamp != -1) { subsegment_info_[i]->duration = last_sample_end_timestamps_[i] - subsegment_info_[i]->start_timestamp; status.Update(DispatchSegmentInfo(i, std::move(subsegment_info_[i]))); } subsegment_info_[i].reset(new SegmentInfo); subsegment_info_[i]->is_subsegment = true; } return status; } Status ChunkingHandler::DispatchCueEventForAllStreams( std::shared_ptr cue_event) { Status status; for (size_t i = 0; i < segment_info_.size() && status.ok(); ++i) { status.Update(DispatchCueEvent(i, cue_event)); } return status; } ChunkingHandler::MediaSampleTimestampGreater::MediaSampleTimestampGreater( const ChunkingHandler* const chunking_handler) : chunking_handler_(chunking_handler) {} bool ChunkingHandler::MediaSampleTimestampGreater::operator()( const std::unique_ptr& lhs, const std::unique_ptr& rhs) const { DCHECK(lhs); DCHECK(rhs); return GetSampleTimeInSeconds(*lhs) > GetSampleTimeInSeconds(*rhs); } double ChunkingHandler::MediaSampleTimestampGreater::GetSampleTimeInSeconds( const StreamData& media_sample_stream_data) const { const size_t stream_index = media_sample_stream_data.stream_index; const auto& sample = media_sample_stream_data.media_sample; DCHECK(sample); // Order main samples by left boundary and non main samples by mid-point. This // ensures non main samples are properly chunked, i.e. if the portion of the // sample in the next chunk is bigger than the portion of the sample in the // previous chunk, the sample is placed in the next chunk. const uint64_t timestamp = stream_index == chunking_handler_->main_stream_index_ ? sample->dts() : (sample->dts() + sample->duration() / 2); return static_cast(timestamp) / chunking_handler_->time_scales_[stream_index]; } bool ChunkingHandler::Scte35EventTimestampGreater::operator()( const std::shared_ptr& lhs, const std::shared_ptr& rhs) const { DCHECK(lhs); DCHECK(rhs); return lhs->start_time_in_seconds > rhs->start_time_in_seconds; } } // namespace media } // namespace shaka