From feabf770bf7beee7c82bae4a008bc50b238eafa8 Mon Sep 17 00:00:00 2001 From: KongQun Yang Date: Mon, 8 Jan 2018 17:18:15 -0800 Subject: [PATCH] Sync streams coming to ChunkingHandler Change-Id: I50d1a64f2ec3c50dfb1b7282241b47f3a55e26ce --- packager/media/base/media_handler_test_base.h | 2 +- packager/media/chunking/chunking_handler.cc | 213 ++++++++++-------- packager/media/chunking/chunking_handler.h | 55 +++-- .../chunking/chunking_handler_unittest.cc | 29 ++- 4 files changed, 173 insertions(+), 126 deletions(-) diff --git a/packager/media/base/media_handler_test_base.h b/packager/media/base/media_handler_test_base.h index 4b4d56a95b..986df5cf6d 100644 --- a/packager/media/base/media_handler_test_base.h +++ b/packager/media/base/media_handler_test_base.h @@ -94,7 +94,7 @@ MATCHER_P4(IsMediaSample, stream_index, timestamp, duration, encrypted, "") { << StreamDataTypeToString(arg->stream_data_type); return false; } - *result_listener << "which is (" << stream_index << "," + *result_listener << "which is (" << arg->stream_index << "," << arg->media_sample->dts() << "," << arg->media_sample->duration() << "," << BoolToString(arg->media_sample->is_encrypted()) << ")"; diff --git a/packager/media/chunking/chunking_handler.cc b/packager/media/chunking/chunking_handler.cc index 31e07d4647..12639d48ab 100644 --- a/packager/media/chunking/chunking_handler.cc +++ b/packager/media/chunking/chunking_handler.cc @@ -6,20 +6,24 @@ #include "packager/media/chunking/chunking_handler.h" +#include + #include "packager/base/logging.h" #include "packager/base/threading/platform_thread.h" #include "packager/media/base/media_sample.h" namespace { int64_t kThreadIdUnset = -1; -int64_t kTimeStampToDispatchAllSamples = -1; } // namespace namespace shaka { namespace media { ChunkingHandler::ChunkingHandler(const ChunkingParams& chunking_params) - : chunking_params_(chunking_params), thread_id_(kThreadIdUnset) { + : chunking_params_(chunking_params), + thread_id_(kThreadIdUnset), + media_sample_comparator_(this), + cached_media_sample_stream_data_(media_sample_comparator_) { CHECK_NE(chunking_params.segment_duration_in_seconds, 0u); } @@ -30,6 +34,7 @@ Status ChunkingHandler::InitializeInternal() { subsegment_info_.resize(num_input_streams()); time_scales_.resize(num_input_streams()); last_sample_end_timestamps_.resize(num_input_streams()); + num_cached_samples_.resize(num_input_streams()); return Status::OK; } @@ -84,35 +89,45 @@ Status ChunkingHandler::Process(std::unique_ptr stream_data) { const size_t stream_index = stream_data->stream_index; DCHECK_NE(time_scales_[stream_index], 0u) << "kStreamInfo should arrive before kMediaSample"; - if (stream_index != main_stream_index_) { - if (!stream_data->media_sample->is_key_frame()) { - return Status(error::CHUNKING_ERROR, - "All non video samples should be key frames."); - } - // Cache non main stream samples, since we don't know yet whether these - // samples belong to the current or next segment. - non_main_samples_.push_back(std::move(stream_data)); - // The streams are expected to be synchronized, so we don't expect to - // see a lot of samples before seeing video samples. - const size_t kMaxSamplesPerStreamBeforeVideoSample = 5u; - if (non_main_samples_.size() > - num_input_streams() * kMaxSamplesPerStreamBeforeVideoSample) { - return Status(error::CHUNKING_ERROR, - "Too many non video samples before video sample."); - } - return Status::OK; + + if (stream_index != main_stream_index_ && + !stream_data->media_sample->is_key_frame()) { + return Status(error::CHUNKING_ERROR, + "All non video samples should be key frames."); + } + // The streams are expected to be roughly synchronized, so we don't expect + // to see a lot of samples from one stream but no samples from another + // stream. + // The value is kind of arbitrary here. For a 24fps video, it is ~40s. + const size_t kMaxCachedSamplesPerStream = 1000u; + if (num_cached_samples_[stream_index] >= kMaxCachedSamplesPerStream) { + LOG(ERROR) << "Streams are not synchronized:"; + for (size_t i = 0; i < num_cached_samples_.size(); ++i) + LOG(ERROR) << " [Stream " << i << "] " << num_cached_samples_[i]; + return Status(error::CHUNKING_ERROR, "Streams are not synchronized."); } - const MediaSample* sample = stream_data->media_sample.get(); - Status status = ProcessMediaSample(sample); - if (!status.ok()) - return status; - // Discard samples before segment start. - if (!segment_info_[stream_index]) - return Status::OK; - last_sample_end_timestamps_[stream_index] = - sample->dts() + sample->duration(); - break; + cached_media_sample_stream_data_.push(std::move(stream_data)); + ++num_cached_samples_[stream_index]; + + // If we have cached samples from every stream, the first sample in + // |cached_media_samples_stream_data_| is guaranteed to be the earliest + // sample. Extract and process that sample. + if (std::all_of(num_cached_samples_.begin(), num_cached_samples_.end(), + [](size_t num_samples) { return num_samples > 0; })) { + while (true) { + const size_t top_stream_index = + cached_media_sample_stream_data_.top()->stream_index; + Status status = ProcessMediaSampleStreamData( + *cached_media_sample_stream_data_.top()); + if (!status.ok()) + return status; + cached_media_sample_stream_data_.pop(); + if (--num_cached_samples_[top_stream_index] == 0) + break; + } + } + return Status::OK; } default: VLOG(3) << "Stream data type " @@ -123,18 +138,22 @@ Status ChunkingHandler::Process(std::unique_ptr stream_data) { } Status ChunkingHandler::OnFlushRequest(size_t input_stream_index) { + // Process all cached samples. + while (!cached_media_sample_stream_data_.empty()) { + Status status = + ProcessMediaSampleStreamData(*cached_media_sample_stream_data_.top()); + if (!status.ok()) + return status; + --num_cached_samples_[cached_media_sample_stream_data_.top()->stream_index]; + cached_media_sample_stream_data_.pop(); + } if (segment_info_[input_stream_index]) { - Status status; - if (input_stream_index != main_stream_index_) { - status = DispatchNonMainSamples(kTimeStampToDispatchAllSamples); - if (!status.ok()) - return status; - } auto& segment_info = segment_info_[input_stream_index]; if (segment_info->start_timestamp != -1) { segment_info->duration = last_sample_end_timestamps_[input_stream_index] - segment_info->start_timestamp; - status = DispatchSegmentInfo(input_stream_index, std::move(segment_info)); + Status status = + DispatchSegmentInfo(input_stream_index, std::move(segment_info)); if (!status.ok()) return status; } @@ -143,7 +162,7 @@ Status ChunkingHandler::OnFlushRequest(size_t input_stream_index) { return FlushDownstream(output_stream_index); } -Status ChunkingHandler::ProcessMediaSample(const MediaSample* sample) { +Status ChunkingHandler::ProcessMainMediaSample(const MediaSample* sample) { const bool is_key_frame = sample->is_key_frame(); const int64_t timestamp = sample->dts(); // Check if we need to terminate the current (sub)segment. @@ -187,12 +206,6 @@ Status ChunkingHandler::ProcessMediaSample(const MediaSample* sample) { } Status status; - if (new_segment || new_subsegment) { - // Dispatch the samples before |timestamp| - See the implemention on how we - // determine if a sample is before |timestamp|.. - status.Update(DispatchNonMainSamples(timestamp)); - } - if (new_segment) { status.Update(DispatchSegmentInfoForAllStreams()); segment_info_[main_stream_index_]->start_timestamp = timestamp; @@ -204,54 +217,39 @@ Status ChunkingHandler::ProcessMediaSample(const MediaSample* sample) { status.Update(DispatchSubsegmentInfoForAllStreams()); subsegment_info_[main_stream_index_]->start_timestamp = timestamp; } - if (!status.ok()) - return status; - - // Dispatch non-main samples for the next segment. - return DispatchNonMainSamples(kTimeStampToDispatchAllSamples); + return status; } -Status ChunkingHandler::DispatchNonMainSamples(int64_t timestamp_threshold) { - Status status; - while (status.ok() && !non_main_samples_.empty()) { - DCHECK_EQ(non_main_samples_.front()->stream_data_type, - StreamDataType::kMediaSample); - const size_t stream_index = non_main_samples_.front()->stream_index; - const MediaSample* sample = non_main_samples_.front()->media_sample.get(); - // If the portion of the sample before |timestamp_threshold| is bigger than - // the other portion, we consider it part of the current segment. - const int64_t timestamp = sample->dts() + sample->duration() / 2; - const bool stop = - (timestamp_threshold != kTimeStampToDispatchAllSamples && - (static_cast(timestamp) / time_scales_[stream_index]) > - (static_cast(timestamp_threshold) / - time_scales_[main_stream_index_])); - VLOG(3) << "Sample ts: " << sample->dts() << " " - << " duration: " << sample->duration() - << " scale: " << time_scales_[stream_index] << "\n" - << " threshold: " << timestamp_threshold - << " scale: " << time_scales_[main_stream_index_] - << (stop ? " stop " - : (segment_info_[stream_index] ? " dispatch " - : " discard ")); - if (stop) - break; - // Only dispatch samples if the segment has started, otherwise discard - // them. - if (segment_info_[stream_index]) { - if (segment_info_[stream_index]->start_timestamp == -1) - segment_info_[stream_index]->start_timestamp = sample->dts(); - if (subsegment_info_[stream_index] && - subsegment_info_[stream_index]->start_timestamp == -1) { - subsegment_info_[stream_index]->start_timestamp = sample->dts(); - } - last_sample_end_timestamps_[stream_index] = - sample->dts() + sample->duration(); - status.Update(Dispatch(std::move(non_main_samples_.front()))); - } - non_main_samples_.pop_front(); +Status ChunkingHandler::ProcessMediaSampleStreamData( + const StreamData& media_sample_stream_data) { + const size_t stream_index = media_sample_stream_data.stream_index; + const auto sample = std::move(media_sample_stream_data.media_sample); + + if (stream_index == main_stream_index_) { + Status status = ProcessMainMediaSample(sample.get()); + if (!status.ok()) + return status; } - return status; + + VLOG(3) << "Stream index: " << stream_index << " " + << "Sample ts: " << sample->dts() << " " + << " duration: " << sample->duration() + << " scale: " << time_scales_[stream_index] << "\n" + << " scale: " << time_scales_[main_stream_index_] + << (segment_info_[stream_index] ? " dispatch " : " discard "); + // Discard samples before segment start. If the segment has started, + // |segment_info_[stream_index]| won't be null. + if (!segment_info_[stream_index]) + return Status::OK; + if (segment_info_[stream_index]->start_timestamp == -1) + segment_info_[stream_index]->start_timestamp = sample->dts(); + if (subsegment_info_[stream_index] && + subsegment_info_[stream_index]->start_timestamp == -1) { + subsegment_info_[stream_index]->start_timestamp = sample->dts(); + } + last_sample_end_timestamps_[stream_index] = + sample->dts() + sample->duration(); + return DispatchMediaSample(stream_index, std::move(sample)); } Status ChunkingHandler::DispatchSegmentInfoForAllStreams() { @@ -294,5 +292,44 @@ Status ChunkingHandler::DispatchCueEventForAllStreams( return status; } +ChunkingHandler::MediaSampleTimestampGreater::MediaSampleTimestampGreater( + const ChunkingHandler* const chunking_handler) + : chunking_handler_(chunking_handler) {} + +bool ChunkingHandler::MediaSampleTimestampGreater::operator()( + const std::unique_ptr& lhs, + const std::unique_ptr& rhs) const { + DCHECK(lhs); + DCHECK(rhs); + return GetSampleTimeInSeconds(*lhs) > GetSampleTimeInSeconds(*rhs); +} + +double ChunkingHandler::MediaSampleTimestampGreater::GetSampleTimeInSeconds( + const StreamData& media_sample_stream_data) const { + const size_t stream_index = media_sample_stream_data.stream_index; + const auto& sample = media_sample_stream_data.media_sample; + DCHECK(sample); + // Order main samples by left boundary and non main samples by mid-point. This + // ensures non main samples are properly chunked, i.e. if the portion of the + // sample in the next chunk is bigger than the portion of the sample in the + // previous chunk, the sample is placed in the next chunk. + const uint64_t timestamp = + stream_index == chunking_handler_->main_stream_index_ + ? sample->dts() + : (sample->dts() + sample->duration() / 2); + return static_cast(timestamp) / + chunking_handler_->time_scales_[stream_index]; +} + +bool ChunkingHandler::Scte35EventTimestampGreater::operator()( + const std::unique_ptr& lhs, + const std::unique_ptr& rhs) const { + DCHECK(lhs); + DCHECK(rhs); + DCHECK(lhs->scte35_event); + DCHECK(rhs->scte35_event); + return lhs->scte35_event->start_time > rhs->scte35_event->start_time; +} + } // namespace media } // namespace shaka diff --git a/packager/media/chunking/chunking_handler.h b/packager/media/chunking/chunking_handler.h index 4940c6f1fb..2b135d785b 100644 --- a/packager/media/chunking/chunking_handler.h +++ b/packager/media/chunking/chunking_handler.h @@ -62,11 +62,11 @@ class ChunkingHandler : public MediaHandler { ChunkingHandler(const ChunkingHandler&) = delete; ChunkingHandler& operator=(const ChunkingHandler&) = delete; - // Processes media sample and apply chunking if needed. - Status ProcessMediaSample(const MediaSample* sample); + // Processes main media sample and apply chunking if needed. + Status ProcessMainMediaSample(const MediaSample* sample); - // Dispatch cached non main stream samples before |timestamp_threshold|. - Status DispatchNonMainSamples(int64_t timestamp_threshold); + // Processes and dispatches media sample. + Status ProcessMediaSampleStreamData(const StreamData& media_data); // The (sub)segments are aligned and dispatched together. Status DispatchSegmentInfoForAllStreams(); @@ -86,11 +86,30 @@ class ChunkingHandler : public MediaHandler { int64_t segment_duration_ = 0; int64_t subsegment_duration_ = 0; - // The streams are expected to be synchronized. Cache non main (video) stream - // samples so we can determine whether the next segment should include these - // samples. The samples will be dispatched after seeing the next main stream - // sample. - std::deque> non_main_samples_; + class MediaSampleTimestampGreater { + public: + explicit MediaSampleTimestampGreater( + const ChunkingHandler* const chunking_handler); + + // Comparison operator. Used by |media_samples_| priority queue below to + // sort the media samples. + bool operator()(const std::unique_ptr& lhs, + const std::unique_ptr& rhs) const; + + private: + double GetSampleTimeInSeconds( + const StreamData& media_sample_stream_data) const; + + const ChunkingHandler* const chunking_handler_ = nullptr; + }; + MediaSampleTimestampGreater media_sample_comparator_; + // Caches media samples and sort the samples. + std::priority_queue, + std::vector>, + MediaSampleTimestampGreater> + cached_media_sample_stream_data_; + // Tracks number of cached samples in input streams. + std::vector num_cached_samples_; // Current segment index, useful to determine where to do chunking. int64_t current_segment_index_ = -1; @@ -103,22 +122,16 @@ class ChunkingHandler : public MediaHandler { // The end timestamp of the last dispatched sample. std::vector last_sample_end_timestamps_; - struct Scte35EventComparator { - bool operator()(const std::shared_ptr& lhs, - const std::shared_ptr& rhs) const { - DCHECK(lhs); - DCHECK(rhs); - DCHECK(lhs->scte35_event); - return lhs->scte35_event->start_time > rhs->scte35_event->start_time; - } + struct Scte35EventTimestampGreater { + bool operator()(const std::unique_ptr& lhs, + const std::unique_ptr& rhs) const; }; - // Captures all incoming SCTE35 events to identify chunking points. Events // will be removed from this queue one at a time as soon as the correct // chunking point is identified in the incoming samples. - std::priority_queue, - std::vector>, - Scte35EventComparator> + std::priority_queue, + std::vector>, + Scte35EventTimestampGreater> scte35_events_; }; diff --git a/packager/media/chunking/chunking_handler_unittest.cc b/packager/media/chunking/chunking_handler_unittest.cc index 4d940c859c..f1a2cc6750 100644 --- a/packager/media/chunking/chunking_handler_unittest.cc +++ b/packager/media/chunking/chunking_handler_unittest.cc @@ -180,35 +180,32 @@ TEST_F(ChunkingHandlerTest, AudioAndVideo) { // Equivalent to 12345 in video timescale. const int64_t kAudioStartTimestamp = 9876; const int64_t kVideoStartTimestamp = 12345; + // Burst of audio and video samples. They will be properly ordered. for (int i = 0; i < 5; ++i) { ASSERT_OK(Process(StreamData::FromMediaSample( - kStreamIndex0, - GetMediaSample( - kAudioStartTimestamp + kDuration0 * i, - kDuration0, - true)))); + kStreamIndex0, GetMediaSample(kAudioStartTimestamp + kDuration0 * i, + kDuration0, true)))); + } + for (int i = 0; i < 5; ++i) { // Alternate key frame. const bool is_key_frame = (i % 2) == 1; ASSERT_OK(Process(StreamData::FromMediaSample( - kStreamIndex1, - GetMediaSample( - kVideoStartTimestamp + kDuration1 * i, - kDuration1, - is_key_frame)))); + kStreamIndex1, GetMediaSample(kVideoStartTimestamp + kDuration1 * i, + kDuration1, is_key_frame)))); } EXPECT_THAT( GetOutputStreamDataVector(), ElementsAre( // The first samples @ kStartTimestamp is discarded - not key frame. - IsMediaSample(kStreamIndex0, kAudioStartTimestamp + kDuration0, - kDuration0, !kEncrypted), IsMediaSample(kStreamIndex1, kVideoStartTimestamp + kDuration1, kDuration1, !kEncrypted), - IsMediaSample(kStreamIndex0, kAudioStartTimestamp + kDuration0 * 2, + IsMediaSample(kStreamIndex0, kAudioStartTimestamp + kDuration0, kDuration0, !kEncrypted), IsMediaSample(kStreamIndex1, kVideoStartTimestamp + kDuration1 * 2, kDuration1, !kEncrypted), + IsMediaSample(kStreamIndex0, kAudioStartTimestamp + kDuration0 * 2, + kDuration0, !kEncrypted), IsMediaSample(kStreamIndex0, kAudioStartTimestamp + kDuration0 * 3, kDuration0, !kEncrypted), // The audio segment is terminated together with video stream. @@ -220,9 +217,7 @@ TEST_F(ChunkingHandlerTest, AudioAndVideo) { IsMediaSample(kStreamIndex1, kVideoStartTimestamp + kDuration1 * 3, kDuration1, !kEncrypted), IsMediaSample(kStreamIndex0, kAudioStartTimestamp + kDuration0 * 4, - kDuration0, !kEncrypted), - IsMediaSample(kStreamIndex1, kVideoStartTimestamp + kDuration1 * 4, - kDuration1, !kEncrypted))); + kDuration0, !kEncrypted))); ClearOutputStreamDataVector(); // The side comments below show the equivalent timestamp in video timescale. @@ -250,6 +245,8 @@ TEST_F(ChunkingHandlerTest, AudioAndVideo) { EXPECT_THAT( GetOutputStreamDataVector(), ElementsAre( + IsMediaSample(kStreamIndex1, kVideoStartTimestamp + kDuration1 * 4, + kDuration1, !kEncrypted), IsMediaSample(kStreamIndex0, kAudioStartTimestamp + kDuration0 * 5, kDuration0, !kEncrypted), // Audio is terminated along with video below.