From 3b5b2bccca32c7ded6f91f880812372254658dda Mon Sep 17 00:00:00 2001 From: Aaron Vaage Date: Thu, 15 Mar 2018 16:04:01 -0700 Subject: [PATCH] Implements SyncPointQueue and CueAlignmentHandler SyncPointQueue manages all cue points and returns aligned cue points to the callers (CueAlignmentHandlers). CueAlignmentHandler is responsible for aligning cues from different streams. It uses SyncPointQueue internally to align / synchronize the cue points. Issue: #355 Change-Id: I281fecb46a3ca7172d71e7495bdd07b8efdeb283 --- packager/media/base/media_handler_test_base.h | 4 +- packager/media/chunking/chunking.gyp | 5 + .../media/chunking/cue_alignment_handler.cc | 272 ++++++++ .../media/chunking/cue_alignment_handler.h | 83 +++ .../cue_alignment_handler_unittest.cc | 618 ++++++++++++++++++ packager/media/chunking/sync_point_queue.cc | 118 ++++ packager/media/chunking/sync_point_queue.h | 65 ++ 7 files changed, 1163 insertions(+), 2 deletions(-) create mode 100644 packager/media/chunking/cue_alignment_handler.cc create mode 100644 packager/media/chunking/cue_alignment_handler.h create mode 100644 packager/media/chunking/cue_alignment_handler_unittest.cc create mode 100644 packager/media/chunking/sync_point_queue.cc create mode 100644 packager/media/chunking/sync_point_queue.h diff --git a/packager/media/base/media_handler_test_base.h b/packager/media/base/media_handler_test_base.h index db89cb9378..cdbebe6ceb 100644 --- a/packager/media/base/media_handler_test_base.h +++ b/packager/media/base/media_handler_test_base.h @@ -116,8 +116,8 @@ MATCHER_P4(IsMediaSample, stream_index, timestamp, duration, encrypted, "") { << arg->media_sample->duration() << "," << BoolToString(arg->media_sample->is_encrypted()) << ")"; return arg->stream_index == stream_index && - arg->media_sample->dts() == timestamp && - arg->media_sample->duration() == duration && + arg->media_sample->dts() == static_cast(timestamp) && + arg->media_sample->duration() == static_cast(duration) && arg->media_sample->is_encrypted() == encrypted; } diff --git a/packager/media/chunking/chunking.gyp b/packager/media/chunking/chunking.gyp index 57c00880df..eff100192a 100644 --- a/packager/media/chunking/chunking.gyp +++ b/packager/media/chunking/chunking.gyp @@ -15,6 +15,10 @@ 'sources': [ 'chunking_handler.cc', 'chunking_handler.h', + 'cue_alignment_handler.cc', + 'cue_alignment_handler.h', + 'sync_point_queue.cc', + 'sync_point_queue.h', ], 'dependencies': [ '../base/media_base.gyp:media_base', @@ -25,6 +29,7 @@ 'type': '<(gtest_target_type)', 'sources': [ 'chunking_handler_unittest.cc', + 'cue_alignment_handler_unittest.cc', ], 'dependencies': [ '../../testing/gtest.gyp:gtest', diff --git a/packager/media/chunking/cue_alignment_handler.cc b/packager/media/chunking/cue_alignment_handler.cc new file mode 100644 index 0000000000..fb53c0da69 --- /dev/null +++ b/packager/media/chunking/cue_alignment_handler.cc @@ -0,0 +1,272 @@ +// Copyright 2018 Google Inc. All rights reserved. +// +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file or at +// https://developers.google.com/open-source/licenses/bsd + +#include "packager/media/chunking/cue_alignment_handler.h" + +namespace shaka { +namespace media { +namespace { +// The max number of samples that are allowed to be buffered before we shutdown +// because there is likely a problem with the content or how the pipeline was +// configured. This is about 20 seconds of buffer for audio with 48kHz. +const size_t kMaxBufferSize = 1000; + +double TimeInSeconds(const StreamInfo& info, const StreamData& data) { + int64_t time_scale; + int64_t scaled_time; + switch (data.stream_data_type) { + case StreamDataType::kMediaSample: + time_scale = info.time_scale(); + scaled_time = data.media_sample->pts(); + break; + case StreamDataType::kTextSample: + // Text is always in MS but the stream info time scale is 0. + time_scale = 1000; + scaled_time = data.text_sample->start_time(); + break; + default: + time_scale = 0; + scaled_time = 0; + NOTREACHED() << "TimeInSeconds should only be called on media samples " + "and text samples."; + break; + } + + return static_cast(scaled_time) / time_scale; +} +} // namespace + +CueAlignmentHandler::CueAlignmentHandler(SyncPointQueue* sync_points) + : sync_points_(sync_points) {} + +Status CueAlignmentHandler::InitializeInternal() { + sync_points_->AddThread(); + stream_states_.resize(num_input_streams()); + return Status::OK; +} + +Status CueAlignmentHandler::Process(std::unique_ptr data) { + switch (data->stream_data_type) { + case StreamDataType::kStreamInfo: + return OnStreamInfo(std::move(data)); + case StreamDataType::kTextSample: + case StreamDataType::kMediaSample: + return OnSample(std::move(data)); + default: + VLOG(3) << "Dropping unsupported data type " + << static_cast(data->stream_data_type); + return Status::OK; + } +} + +Status CueAlignmentHandler::OnFlushRequest(size_t stream_index) { + // We need to wait for all stream to flush before we can flush each stream. + // This allows cached buffers to be cleared and cues to be properly + // synchronized and set on all streams.. + + stream_states_[stream_index].to_be_flushed = true; + for (const StreamState& stream_state : stream_states_) { + if (!stream_state.to_be_flushed) + return Status::OK; + } + + // |to_be_flushed| is set on all streams. We don't expect to see data in any + // buffers. + for (size_t i = 0; i < stream_states_.size(); ++i) { + StreamState& stream_state = stream_states_[i]; + if (!stream_state.samples.empty()) { + LOG(WARNING) << "Unexpected data seen on stream " << i; + while (!stream_state.samples.empty()) { + Status status(Dispatch(std::move(stream_state.samples.front()))); + if (!status.ok()) + return status; + stream_state.samples.pop_front(); + } + } + } + return FlushAllDownstreams(); +} + +Status CueAlignmentHandler::OnStreamInfo(std::unique_ptr data) { + StreamState& stream_state = stream_states_[data->stream_index]; + // Keep a copy of the stream info so that we can check type and check + // timescale. + stream_state.info = data->stream_info; + // Get the first hint for the stream. Use a negative hint so that if there is + // suppose to be a sync point at zero, we will still respect it. + stream_state.next_cue_hint = sync_points_->GetHint(-1); + + return Dispatch(std::move(data)); +} + +Status CueAlignmentHandler::OnSample(std::unique_ptr sample) { + // There are two modes: + // 1. There is a video input. + // 2. There are no video inputs. + // + // When there is a video input, we rely on the video input get the next sync + // point and release all the samples. + // + // When there are no video inputs, we rely on the sync point queue to block + // us until there is a sync point. + + const uint64_t stream_index = sample->stream_index; + StreamState& stream_state = stream_states_[stream_index]; + + if (stream_state.info->stream_type() == kStreamVideo) { + const double sample_time = TimeInSeconds(*stream_state.info, *sample); + if (sample->media_sample->is_key_frame() && + sample_time >= stream_state.next_cue_hint) { + std::shared_ptr next_sync = + sync_points_->PromoteAt(sample_time); + if (!next_sync) { + LOG(ERROR) + << "Failed to promote sync point at " << sample_time + << ". This happens only if video streams are not GOP-aligned."; + return Status(error::INVALID_ARGUMENT, + "Streams are not properly GOP-aligned."); + } + + Status status(DispatchCueEvent(stream_index, next_sync)); + stream_state.cue.reset(); + status.Update(UseNewSyncPoint(next_sync)); + if (!status.ok()) + return status; + } + return Dispatch(std::move(sample)); + } + + // Accept the sample. This will output it if it comes before the hint point or + // will cache it if it comes after the hint point. + Status status(AcceptSample(std::move(sample), &stream_state)); + if (!status.ok()) { + return status; + } + + // If all the streams are waiting on a hint, it means that none has next sync + // point determined. It also means that there are no video streams and we need + // to wait for all streams to converge on a hint so that we can get the next + // sync point. + if (EveryoneWaitingAtHint()) { + // All streams should have the same hint right now. + const double next_cue_hint = stream_state.next_cue_hint; + for (const StreamState& stream_state : stream_states_) { + DCHECK_EQ(next_cue_hint, stream_state.next_cue_hint); + } + + std::shared_ptr next_sync = + sync_points_->GetNext(next_cue_hint); + DCHECK(next_sync); + + Status status = UseNewSyncPoint(next_sync); + if (!status.ok()) { + return status; + } + } + + return Status::OK; +} + +Status CueAlignmentHandler::UseNewSyncPoint( + std::shared_ptr new_sync) { + const double new_hint = sync_points_->GetHint(new_sync->time_in_seconds); + DCHECK_GT(new_hint, new_sync->time_in_seconds); + + Status status; + for (StreamState& stream_state : stream_states_) { + // No stream should be so out of sync with the others that they would + // still be working on an old cue. + if (stream_state.cue) { + LOG(ERROR) << "Found two cue events that are too close together. One at " + << stream_state.cue->time_in_seconds << " and the other at " + << new_sync->time_in_seconds; + return Status(error::INVALID_ARGUMENT, "Cue events too close together"); + } + + // Add the cue and update the hint. The cue will always be used over the + // hint, so hint should always be greater than the latest cue. + stream_state.cue = new_sync; + stream_state.next_cue_hint = new_hint; + + while (status.ok() && !stream_state.samples.empty()) { + std::unique_ptr& sample = stream_state.samples.front(); + const double sample_time_in_seconds = + TimeInSeconds(*stream_state.info, *sample); + if (sample_time_in_seconds >= stream_state.next_cue_hint) { + DCHECK(!stream_state.cue); + break; + } + + const size_t stream_index = sample->stream_index; + if (stream_state.cue) { + status.Update(DispatchCueIfNeeded(stream_index, sample_time_in_seconds, + &stream_state)); + } + status.Update(Dispatch(std::move(sample))); + stream_state.samples.pop_front(); + } + } + return status; +} + +bool CueAlignmentHandler::EveryoneWaitingAtHint() const { + for (const StreamState& stream_state : stream_states_) { + if (stream_state.samples.empty()) { + return false; + } + } + return true; +} + +// Accept Sample will either: +// 1. Send the sample downstream, as it comes before the next sync point and +// therefore can skip the bufferring. +// 2. Save the sample in the buffer as it comes after the next sync point. +Status CueAlignmentHandler::AcceptSample(std::unique_ptr sample, + StreamState* stream_state) { + DCHECK(stream_state); + + const size_t stream_index = sample->stream_index; + if (stream_state->samples.empty()) { + const double sample_time_in_seconds = + TimeInSeconds(*stream_state->info, *sample); + if (sample_time_in_seconds < stream_state->next_cue_hint) { + Status status; + if (stream_state->cue) { + status.Update(DispatchCueIfNeeded(stream_index, sample_time_in_seconds, + stream_state)); + } + status.Update(Dispatch(std::move(sample))); + return status; + } + DCHECK(!stream_state->cue); + } + + stream_state->samples.push_back(std::move(sample)); + if (stream_state->samples.size() > kMaxBufferSize) { + LOG(ERROR) << "Stream " << stream_index << " has buffered " + << stream_state->samples.size() << " when the max is " + << kMaxBufferSize; + return Status(error::INVALID_ARGUMENT, + "Streams are not properly multiplexed."); + } + + return Status::OK; +} + +Status CueAlignmentHandler::DispatchCueIfNeeded( + size_t stream_index, + double next_sample_time_in_seconds, + StreamState* stream_state) { + DCHECK(stream_state->cue); + if (next_sample_time_in_seconds < stream_state->cue->time_in_seconds) + return Status::OK; + DCHECK_LT(stream_state->cue->time_in_seconds, stream_state->next_cue_hint); + return DispatchCueEvent(stream_index, std::move(stream_state->cue)); +} + +} // namespace media +} // namespace shaka diff --git a/packager/media/chunking/cue_alignment_handler.h b/packager/media/chunking/cue_alignment_handler.h new file mode 100644 index 0000000000..317d2c98a9 --- /dev/null +++ b/packager/media/chunking/cue_alignment_handler.h @@ -0,0 +1,83 @@ +// Copyright 2018 Google Inc. All rights reserved. +// +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file or at +// https://developers.google.com/open-source/licenses/bsd + +#include + +#include "packager/media/base/media_handler.h" +#include "packager/media/chunking/sync_point_queue.h" + +namespace shaka { +namespace media { + +/// The cue alignment handler is a N-to-N handler that will inject CueEvents +/// into all streams. It will align the cues across streams (and handlers) +/// using a shared SyncPointQueue. +/// +/// There should be a cue alignment handler per demuxer/thread and not per +/// stream. A cue alignment handler must be one per thread in order to properly +/// manage blocking. +class CueAlignmentHandler : public MediaHandler { + public: + explicit CueAlignmentHandler(SyncPointQueue* sync_points); + ~CueAlignmentHandler() = default; + + private: + CueAlignmentHandler(const CueAlignmentHandler&) = delete; + CueAlignmentHandler& operator=(const CueAlignmentHandler&) = delete; + + struct StreamState { + // Information for the stream. + std::shared_ptr info; + // Cached samples that cannot be dispatched. All the samples should be at or + // after |hint|. + std::list> samples; + // If set, the stream is pending to be flushed. + bool to_be_flushed = false; + + // If set, it points to the next cue it has to send downstream. Note that if + // it is not set, the next cue is not determined. + // This is set but not really used by video stream. + std::shared_ptr cue; + // If |cue| is set, this is the hint for the cue after |cue|, i.e. next next + // cue; otherwise this is the hint for the next cue. This holds the time in + // seconds of the scheduled (unpromoted) next or next next cue's time. This + // is essentially the barrier for the sample stream. Non video samples after + // this barrier must wait until the |cue| is determined and thus the next + // hint to be determined; video samples will promote the hint to cue when + // seeing the first key frame after |next_cue_hint|. + double next_cue_hint = 0; + }; + + // MediaHandler overrides. + Status InitializeInternal() override; + Status Process(std::unique_ptr data) override; + Status OnFlushRequest(size_t stream_index) override; + + // Internal handling functions for different stream data. + Status OnStreamInfo(std::unique_ptr data); + Status OnSample(std::unique_ptr sample); + + // Update stream states with new sync point. + Status UseNewSyncPoint(std::shared_ptr new_sync); + + // Check if everyone is waiting for new hint points. + bool EveryoneWaitingAtHint() const; + + // Dispatch or save incoming sample. + Status AcceptSample(std::unique_ptr sample, + StreamState* stream_state); + + // Dispatch the cue if the new sample comes at or after it. + Status DispatchCueIfNeeded(size_t stream_index, + double next_sample_time_in_seconds, + StreamState* stream_state); + + SyncPointQueue* const sync_points_ = nullptr; + std::vector stream_states_; +}; + +} // namespace media +} // namespace shaka diff --git a/packager/media/chunking/cue_alignment_handler_unittest.cc b/packager/media/chunking/cue_alignment_handler_unittest.cc new file mode 100644 index 0000000000..1dcece2da1 --- /dev/null +++ b/packager/media/chunking/cue_alignment_handler_unittest.cc @@ -0,0 +1,618 @@ +// Copyright 2018 Google Inc. All rights reserved. +// +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file or at +// https://developers.google.com/open-source/licenses/bsd + +#include "packager/media/chunking/chunking_handler.h" + +#include +#include + +#include "packager/media/base/media_handler_test_base.h" +#include "packager/media/chunking/cue_alignment_handler.h" +#include "packager/media/public/ad_cue_generator_params.h" +#include "packager/status_test_util.h" + +using ::testing::ElementsAre; +using ::testing::IsEmpty; + +namespace shaka { +namespace media { +namespace { +const size_t kOneInput = 1; +const size_t kOneOutput = 1; + +const size_t kThreeInput = 3; +const size_t kThreeOutput = 3; + +const bool kEncrypted = true; +const bool kKeyFrame = true; + +const uint64_t kNoTimeScale = 0; +const uint64_t kMsTimeScale = 1000; + +const char* kNoId = ""; +const char* kNoSettings = ""; +const char* kNoPayload = ""; + +const size_t kChild = 0; +const size_t kParent = 0; +} // namespace + +class CueAlignmentHandlerTest : public MediaHandlerTestBase {}; + +TEST_F(CueAlignmentHandlerTest, VideoInputWithNoCues) { + const size_t kVideoStream = 0; + + const int64_t kSampleDuration = 1000; + const int64_t kSample0Start = 0; + const int64_t kSample1Start = kSample0Start + kSampleDuration; + const int64_t kSample2Start = kSample1Start + kSampleDuration; + + AdCueGeneratorParams params; + SyncPointQueue sync_points(params); + std::shared_ptr handler = + std::make_shared(&sync_points); + SetUpAndInitializeGraph(handler, kOneInput, kOneOutput); + + { + testing::InSequence s; + + EXPECT_CALL(*Output(kVideoStream), + OnProcess(IsStreamInfo(kParent, kMsTimeScale, !kEncrypted))); + EXPECT_CALL(*Output(kVideoStream), + OnProcess(IsMediaSample(kParent, kSample0Start, kSampleDuration, + !kEncrypted))); + EXPECT_CALL(*Output(kVideoStream), + OnProcess(IsMediaSample(kParent, kSample1Start, kSampleDuration, + !kEncrypted))); + EXPECT_CALL(*Output(kVideoStream), + OnProcess(IsMediaSample(kParent, kSample2Start, kSampleDuration, + !kEncrypted))); + EXPECT_CALL(*Output(kVideoStream), OnFlush(kParent)); + } + + Input(kVideoStream) + ->Dispatch( + StreamData::FromStreamInfo(kChild, GetVideoStreamInfo(kMsTimeScale))); + Input(kVideoStream) + ->Dispatch(StreamData::FromMediaSample( + kChild, GetMediaSample(kSample0Start, kSampleDuration, kKeyFrame))); + Input(kVideoStream) + ->Dispatch(StreamData::FromMediaSample( + kChild, GetMediaSample(kSample1Start, kSampleDuration, !kKeyFrame))); + Input(kVideoStream) + ->Dispatch(StreamData::FromMediaSample( + kChild, GetMediaSample(kSample2Start, kSampleDuration, kKeyFrame))); + Input(kVideoStream)->FlushAllDownstreams(); +} + +TEST_F(CueAlignmentHandlerTest, AudioInputWithNoCues) { + const size_t kAudioStream = 0; + + const int64_t kSampleDuration = 1000; + const int64_t kSample0Start = 0; + const int64_t kSample1Start = kSample0Start + kSampleDuration; + const int64_t kSample2Start = kSample1Start + kSampleDuration; + + AdCueGeneratorParams params; + SyncPointQueue sync_points(params); + std::shared_ptr handler = + std::make_shared(&sync_points); + SetUpAndInitializeGraph(handler, kOneInput, kOneOutput); + + { + testing::InSequence s; + + EXPECT_CALL(*Output(kAudioStream), + OnProcess(IsStreamInfo(kParent, kMsTimeScale, !kEncrypted))); + EXPECT_CALL(*Output(kAudioStream), + OnProcess(IsMediaSample(kParent, kSample0Start, kSampleDuration, + !kEncrypted))); + EXPECT_CALL(*Output(kAudioStream), + OnProcess(IsMediaSample(kParent, kSample1Start, kSampleDuration, + !kEncrypted))); + EXPECT_CALL(*Output(kAudioStream), + OnProcess(IsMediaSample(kParent, kSample2Start, kSampleDuration, + !kEncrypted))); + EXPECT_CALL(*Output(kAudioStream), OnFlush(kParent)); + } + + Input(kAudioStream) + ->Dispatch( + StreamData::FromStreamInfo(kChild, GetAudioStreamInfo(kMsTimeScale))); + Input(kAudioStream) + ->Dispatch(StreamData::FromMediaSample( + kChild, GetMediaSample(kSample0Start, kSampleDuration, kKeyFrame))); + Input(kAudioStream) + ->Dispatch(StreamData::FromMediaSample( + kChild, GetMediaSample(kSample1Start, kSampleDuration, kKeyFrame))); + Input(kAudioStream) + ->Dispatch(StreamData::FromMediaSample( + kChild, GetMediaSample(kSample2Start, kSampleDuration, kKeyFrame))); + Input(kAudioStream)->FlushAllDownstreams(); +} + +TEST_F(CueAlignmentHandlerTest, TextInputWithNoCues) { + const size_t kTextStream = 0; + + const int64_t kSampleDuration = 1000; + + const uint64_t kSample0Start = 0; + const uint64_t kSample0End = kSample0Start + kSampleDuration; + const uint64_t kSample1Start = kSample0End; + const uint64_t kSample1End = kSample1Start + kSampleDuration; + const uint64_t kSample2Start = kSample1End; + const uint64_t kSample2End = kSample2Start + kSampleDuration; + + AdCueGeneratorParams params; + SyncPointQueue sync_points(params); + std::shared_ptr handler = + std::make_shared(&sync_points); + SetUpAndInitializeGraph(handler, kOneInput, kOneOutput); + + { + testing::InSequence s; + + EXPECT_CALL(*Output(kTextStream), + OnProcess(IsStreamInfo(kParent, kNoTimeScale, !kEncrypted))); + EXPECT_CALL(*Output(kTextStream), + OnProcess(IsTextSample(kNoId, kSample0Start, kSample0End, + kNoSettings, kNoPayload))); + EXPECT_CALL(*Output(kTextStream), + OnProcess(IsTextSample(kNoId, kSample1Start, kSample1End, + kNoSettings, kNoPayload))); + EXPECT_CALL(*Output(kTextStream), + OnProcess(IsTextSample(kNoId, kSample2Start, kSample2End, + kNoSettings, kNoPayload))); + EXPECT_CALL(*Output(kTextStream), OnFlush(kParent)); + } + + Input(kTextStream) + ->Dispatch(StreamData::FromStreamInfo(kChild, GetTextStreamInfo())); + Input(kTextStream) + ->Dispatch(StreamData::FromTextSample( + kChild, + GetTextSample(kNoId, kSample0Start, kSample0End, kNoPayload))); + Input(kTextStream) + ->Dispatch(StreamData::FromTextSample( + kChild, + GetTextSample(kNoId, kSample1Start, kSample1End, kNoPayload))); + Input(kTextStream) + ->Dispatch(StreamData::FromTextSample( + kChild, + GetTextSample(kNoId, kSample2Start, kSample2End, kNoPayload))); + Input(kTextStream)->FlushAllDownstreams(); +} + +TEST_F(CueAlignmentHandlerTest, TextAudioVideoInputWithNoCues) { + const size_t kTextStream = 0; + const size_t kAudioStream = 1; + const size_t kVideoStream = 2; + + const uint64_t kSampleDuration = 1000; + + const uint64_t kSample0Start = 0; + const uint64_t kSample0End = kSample0Start + kSampleDuration; + const uint64_t kSample1Start = kSample0Start + kSampleDuration; + const uint64_t kSample1End = kSample1Start + kSampleDuration; + const uint64_t kSample2Start = kSample1Start + kSampleDuration; + const uint64_t kSample2End = kSample2Start + kSampleDuration; + + AdCueGeneratorParams params; + SyncPointQueue sync_points(params); + std::shared_ptr handler = + std::make_shared(&sync_points); + SetUpAndInitializeGraph(handler, kThreeInput, kThreeOutput); + + { + testing::InSequence s; + + EXPECT_CALL(*Output(kTextStream), + OnProcess(IsStreamInfo(kParent, kNoTimeScale, !kEncrypted))); + EXPECT_CALL(*Output(kTextStream), + OnProcess(IsTextSample(kNoId, kSample0Start, kSample0End, + kNoSettings, kNoPayload))); + EXPECT_CALL(*Output(kTextStream), + OnProcess(IsTextSample(kNoId, kSample1Start, kSample1End, + kNoSettings, kNoPayload))); + EXPECT_CALL(*Output(kTextStream), + OnProcess(IsTextSample(kNoId, kSample2Start, kSample2End, + kNoSettings, kNoPayload))); + EXPECT_CALL(*Output(kTextStream), OnFlush(kParent)); + } + + { + testing::InSequence s; + + EXPECT_CALL(*Output(kAudioStream), + OnProcess(IsStreamInfo(kParent, kMsTimeScale, !kEncrypted))); + EXPECT_CALL(*Output(kAudioStream), + OnProcess(IsMediaSample(kParent, kSample0Start, kSampleDuration, + !kEncrypted))); + EXPECT_CALL(*Output(kAudioStream), + OnProcess(IsMediaSample(kParent, kSample1Start, kSampleDuration, + !kEncrypted))); + EXPECT_CALL(*Output(kAudioStream), + OnProcess(IsMediaSample(kParent, kSample2Start, kSampleDuration, + !kEncrypted))); + EXPECT_CALL(*Output(kAudioStream), OnFlush(kParent)); + } + + { + testing::InSequence s; + + EXPECT_CALL(*Output(kVideoStream), + OnProcess(IsStreamInfo(kParent, kMsTimeScale, !kEncrypted))); + EXPECT_CALL(*Output(kVideoStream), + OnProcess(IsMediaSample(kParent, kSample0Start, kSampleDuration, + !kEncrypted))); + EXPECT_CALL(*Output(kVideoStream), + OnProcess(IsMediaSample(kParent, kSample1Start, kSampleDuration, + !kEncrypted))); + EXPECT_CALL(*Output(kVideoStream), + OnProcess(IsMediaSample(kParent, kSample2Start, kSampleDuration, + !kEncrypted))); + EXPECT_CALL(*Output(kVideoStream), OnFlush(kParent)); + } + + Input(kTextStream) + ->Dispatch(StreamData::FromStreamInfo(kChild, GetTextStreamInfo())); + Input(kTextStream) + ->Dispatch(StreamData::FromTextSample( + kChild, + GetTextSample(kNoId, kSample0Start, kSample0End, kNoPayload))); + Input(kTextStream) + ->Dispatch(StreamData::FromTextSample( + kChild, + GetTextSample(kNoId, kSample1Start, kSample1End, kNoPayload))); + Input(kTextStream) + ->Dispatch(StreamData::FromTextSample( + kChild, + GetTextSample(kNoId, kSample2Start, kSample2End, kNoPayload))); + Input(kTextStream)->FlushAllDownstreams(); + + Input(kAudioStream) + ->Dispatch( + StreamData::FromStreamInfo(kChild, GetAudioStreamInfo(kMsTimeScale))); + Input(kAudioStream) + ->Dispatch(StreamData::FromMediaSample( + kChild, GetMediaSample(kSample0Start, kSampleDuration, kKeyFrame))); + Input(kAudioStream) + ->Dispatch(StreamData::FromMediaSample( + kChild, GetMediaSample(kSample1Start, kSampleDuration, kKeyFrame))); + Input(kAudioStream) + ->Dispatch(StreamData::FromMediaSample( + kChild, GetMediaSample(kSample2Start, kSampleDuration, kKeyFrame))); + Input(kAudioStream)->FlushAllDownstreams(); + + Input(kVideoStream) + ->Dispatch( + StreamData::FromStreamInfo(kChild, GetVideoStreamInfo(kMsTimeScale))); + Input(kVideoStream) + ->Dispatch(StreamData::FromMediaSample( + kChild, GetMediaSample(kSample0Start, kSampleDuration, kKeyFrame))); + Input(kVideoStream) + ->Dispatch(StreamData::FromMediaSample( + kChild, GetMediaSample(kSample1Start, kSampleDuration, !kKeyFrame))); + Input(kVideoStream) + ->Dispatch(StreamData::FromMediaSample( + kChild, GetMediaSample(kSample2Start, kSampleDuration, kKeyFrame))); + Input(kVideoStream)->FlushAllDownstreams(); +} + +TEST_F(CueAlignmentHandlerTest, VideoInputWithCues) { + const size_t kVideoStream = 0; + + const int64_t kSampleDuration = 1000; + const int64_t kSample0Start = 0; + const int64_t kSample1Start = kSample0Start + kSampleDuration; + const int64_t kSample2Start = kSample1Start + kSampleDuration; + + const double kSample2StartInSeconds = + static_cast(kSample2Start) / kMsTimeScale; + + // Put the cue between two key frames. + Cuepoint cue; + cue.start_time_in_seconds = static_cast(kSample1Start) / kMsTimeScale; + + AdCueGeneratorParams params; + params.cue_points.push_back(cue); + + SyncPointQueue sync_points(params); + std::shared_ptr handler = + std::make_shared(&sync_points); + SetUpAndInitializeGraph(handler, kOneInput, kOneOutput); + + { + testing::InSequence s; + + EXPECT_CALL(*Output(kVideoStream), + OnProcess(IsStreamInfo(kParent, kMsTimeScale, !kEncrypted))); + EXPECT_CALL(*Output(kVideoStream), + OnProcess(IsMediaSample(kParent, kSample0Start, kSampleDuration, + !kEncrypted))); + EXPECT_CALL(*Output(kVideoStream), + OnProcess(IsMediaSample(kParent, kSample1Start, kSampleDuration, + !kEncrypted))); + EXPECT_CALL(*Output(kVideoStream), + OnProcess(IsCueEvent(kParent, kSample2StartInSeconds))); + EXPECT_CALL(*Output(kVideoStream), + OnProcess(IsMediaSample(kParent, kSample2Start, kSampleDuration, + !kEncrypted))); + EXPECT_CALL(*Output(kVideoStream), OnFlush(kParent)); + } + + Input(kVideoStream) + ->Dispatch( + StreamData::FromStreamInfo(kChild, GetVideoStreamInfo(kMsTimeScale))); + Input(kVideoStream) + ->Dispatch(StreamData::FromMediaSample( + kChild, GetMediaSample(kSample0Start, kSampleDuration, kKeyFrame))); + Input(kVideoStream) + ->Dispatch(StreamData::FromMediaSample( + kChild, GetMediaSample(kSample1Start, kSampleDuration, !kKeyFrame))); + Input(kVideoStream) + ->Dispatch(StreamData::FromMediaSample( + kChild, GetMediaSample(kSample2Start, kSampleDuration, kKeyFrame))); + Input(kVideoStream)->FlushAllDownstreams(); +} + +TEST_F(CueAlignmentHandlerTest, AudioInputWithCues) { + const size_t kAudioStream = 0; + + const int64_t kSampleDuration = 1000; + const int64_t kSample0Start = 0; + const int64_t kSample1Start = kSample0Start + kSampleDuration; + const int64_t kSample2Start = kSample1Start + kSampleDuration; + + const double kSample1StartInSeconds = + static_cast(kSample1Start) / kMsTimeScale; + + Cuepoint cue; + cue.start_time_in_seconds = static_cast(kSample1Start) / kMsTimeScale; + + AdCueGeneratorParams params; + params.cue_points.push_back(cue); + + SyncPointQueue sync_points(params); + std::shared_ptr handler = + std::make_shared(&sync_points); + SetUpAndInitializeGraph(handler, kOneInput, kOneOutput); + + { + testing::InSequence s; + + EXPECT_CALL(*Output(kAudioStream), + OnProcess(IsStreamInfo(kParent, kMsTimeScale, !kEncrypted))); + EXPECT_CALL(*Output(kAudioStream), + OnProcess(IsMediaSample(kParent, kSample0Start, kSampleDuration, + !kEncrypted))); + EXPECT_CALL(*Output(kAudioStream), + OnProcess(IsCueEvent(kParent, kSample1StartInSeconds))); + EXPECT_CALL(*Output(kAudioStream), + OnProcess(IsMediaSample(kParent, kSample1Start, kSampleDuration, + !kEncrypted))); + EXPECT_CALL(*Output(kAudioStream), + OnProcess(IsMediaSample(kParent, kSample2Start, kSampleDuration, + !kEncrypted))); + EXPECT_CALL(*Output(kAudioStream), OnFlush(kParent)); + } + + Input(kAudioStream) + ->Dispatch( + StreamData::FromStreamInfo(kChild, GetAudioStreamInfo(kMsTimeScale))); + Input(kAudioStream) + ->Dispatch(StreamData::FromMediaSample( + kChild, GetMediaSample(kSample0Start, kSampleDuration, kKeyFrame))); + Input(kAudioStream) + ->Dispatch(StreamData::FromMediaSample( + kChild, GetMediaSample(kSample1Start, kSampleDuration, kKeyFrame))); + Input(kAudioStream) + ->Dispatch(StreamData::FromMediaSample( + kChild, GetMediaSample(kSample2Start, kSampleDuration, kKeyFrame))); + Input(kAudioStream)->FlushAllDownstreams(); +} + +TEST_F(CueAlignmentHandlerTest, TextInputWithCues) { + const size_t kTextStream = 0; + + const int64_t kSampleDuration = 1000; + + const uint64_t kSample0Start = 0; + const uint64_t kSample0End = kSample0Start + kSampleDuration; + const uint64_t kSample1Start = kSample0End; + const uint64_t kSample1End = kSample1Start + kSampleDuration; + const uint64_t kSample2Start = kSample1End; + const uint64_t kSample2End = kSample2Start + kSampleDuration; + + const double kSample1StartInSeconds = + static_cast(kSample1Start) / kMsTimeScale; + + Cuepoint cue; + cue.start_time_in_seconds = static_cast(kSample1Start) / kMsTimeScale; + + AdCueGeneratorParams params; + params.cue_points.push_back(cue); + + SyncPointQueue sync_points(params); + std::shared_ptr handler = + std::make_shared(&sync_points); + SetUpAndInitializeGraph(handler, kOneInput, kOneOutput); + + { + testing::InSequence s; + + EXPECT_CALL(*Output(kTextStream), + OnProcess(IsStreamInfo(kParent, kNoTimeScale, !kEncrypted))); + EXPECT_CALL(*Output(kTextStream), + OnProcess(IsTextSample(kNoId, kSample0Start, kSample0End, + kNoSettings, kNoPayload))); + EXPECT_CALL(*Output(kTextStream), + OnProcess(IsCueEvent(kParent, kSample1StartInSeconds))); + EXPECT_CALL(*Output(kTextStream), + OnProcess(IsTextSample(kNoId, kSample1Start, kSample1End, + kNoSettings, kNoPayload))); + EXPECT_CALL(*Output(kTextStream), + OnProcess(IsTextSample(kNoId, kSample2Start, kSample2End, + kNoSettings, kNoPayload))); + EXPECT_CALL(*Output(kTextStream), OnFlush(kParent)); + } + + Input(kTextStream) + ->Dispatch(StreamData::FromStreamInfo(kChild, GetTextStreamInfo())); + Input(kTextStream) + ->Dispatch(StreamData::FromTextSample( + kChild, + GetTextSample(kNoId, kSample0Start, kSample0End, kNoPayload))); + Input(kTextStream) + ->Dispatch(StreamData::FromTextSample( + kChild, + GetTextSample(kNoId, kSample1Start, kSample1End, kNoPayload))); + Input(kTextStream) + ->Dispatch(StreamData::FromTextSample( + kChild, + GetTextSample(kNoId, kSample2Start, kSample2End, kNoPayload))); + Input(kTextStream)->FlushAllDownstreams(); +} + +TEST_F(CueAlignmentHandlerTest, TextAudioVideoInputWithCues) { + const size_t kTextStream = 0; + const size_t kAudioStream = 1; + const size_t kVideoStream = 2; + + const int64_t kSampleDuration = 1000; + + const int64_t kSample0Start = 0; + const int64_t kSample1Start = kSample0Start + kSampleDuration; + const int64_t kSample2Start = kSample1Start + kSampleDuration; + + const uint64_t kSample0StartU = 0; + const uint64_t kSample0EndU = kSample0StartU + kSampleDuration; + const uint64_t kSample1StartU = kSample0EndU; + const uint64_t kSample1EndU = kSample1StartU + kSampleDuration; + const uint64_t kSample2StartU = kSample1EndU; + const uint64_t kSample2EndU = kSample2StartU + kSampleDuration; + + const double kSample2StartInSeconds = + static_cast(kSample2Start) / kMsTimeScale; + + // Put the cue between two key frames. + Cuepoint cue; + cue.start_time_in_seconds = static_cast(kSample1Start) / kMsTimeScale; + + AdCueGeneratorParams params; + params.cue_points.push_back(cue); + + SyncPointQueue sync_points(params); + std::shared_ptr handler = + std::make_shared(&sync_points); + SetUpAndInitializeGraph(handler, kThreeInput, kThreeOutput); + + { + testing::InSequence s; + + EXPECT_CALL(*Output(kTextStream), + OnProcess(IsStreamInfo(kParent, kNoTimeScale, !kEncrypted))); + EXPECT_CALL(*Output(kTextStream), + OnProcess(IsTextSample(kNoId, kSample0StartU, kSample0EndU, + kNoSettings, kNoPayload))); + EXPECT_CALL(*Output(kTextStream), + OnProcess(IsTextSample(kNoId, kSample1StartU, kSample1EndU, + kNoSettings, kNoPayload))); + EXPECT_CALL(*Output(kTextStream), + OnProcess(IsCueEvent(kParent, kSample2StartInSeconds))); + EXPECT_CALL(*Output(kTextStream), + OnProcess(IsTextSample(kNoId, kSample2StartU, kSample2EndU, + kNoSettings, kNoPayload))); + EXPECT_CALL(*Output(kTextStream), OnFlush(kParent)); + } + + { + testing::InSequence s; + + EXPECT_CALL(*Output(kAudioStream), + OnProcess(IsStreamInfo(kParent, kMsTimeScale, !kEncrypted))); + EXPECT_CALL(*Output(kAudioStream), + OnProcess(IsMediaSample(kParent, kSample0Start, kSampleDuration, + !kEncrypted))); + EXPECT_CALL(*Output(kAudioStream), + OnProcess(IsMediaSample(kParent, kSample1Start, kSampleDuration, + !kEncrypted))); + EXPECT_CALL(*Output(kAudioStream), + OnProcess(IsCueEvent(kParent, kSample2StartInSeconds))); + EXPECT_CALL(*Output(kAudioStream), + OnProcess(IsMediaSample(kParent, kSample2Start, kSampleDuration, + !kEncrypted))); + EXPECT_CALL(*Output(kAudioStream), OnFlush(kParent)); + } + + { + testing::InSequence s; + + EXPECT_CALL(*Output(kVideoStream), + OnProcess(IsStreamInfo(kParent, kMsTimeScale, !kEncrypted))); + EXPECT_CALL(*Output(kVideoStream), + OnProcess(IsMediaSample(kParent, kSample0Start, kSampleDuration, + !kEncrypted))); + EXPECT_CALL(*Output(kVideoStream), + OnProcess(IsMediaSample(kParent, kSample1Start, kSampleDuration, + !kEncrypted))); + EXPECT_CALL(*Output(kVideoStream), + OnProcess(IsCueEvent(kParent, kSample2StartInSeconds))); + EXPECT_CALL(*Output(kVideoStream), + OnProcess(IsMediaSample(kParent, kSample2Start, kSampleDuration, + !kEncrypted))); + EXPECT_CALL(*Output(kVideoStream), OnFlush(kParent)); + } + + Input(kTextStream) + ->Dispatch(StreamData::FromStreamInfo(kChild, GetTextStreamInfo())); + Input(kTextStream) + ->Dispatch(StreamData::FromTextSample( + kChild, + GetTextSample(kNoId, kSample0StartU, kSample0EndU, kNoPayload))); + Input(kTextStream) + ->Dispatch(StreamData::FromTextSample( + kChild, + GetTextSample(kNoId, kSample1StartU, kSample1EndU, kNoPayload))); + Input(kTextStream) + ->Dispatch(StreamData::FromTextSample( + kChild, + GetTextSample(kNoId, kSample2StartU, kSample2EndU, kNoPayload))); + Input(kTextStream)->FlushAllDownstreams(); + + Input(kAudioStream) + ->Dispatch( + StreamData::FromStreamInfo(kChild, GetAudioStreamInfo(kMsTimeScale))); + Input(kAudioStream) + ->Dispatch(StreamData::FromMediaSample( + kChild, GetMediaSample(kSample0Start, kSampleDuration, kKeyFrame))); + Input(kAudioStream) + ->Dispatch(StreamData::FromMediaSample( + kChild, GetMediaSample(kSample1Start, kSampleDuration, kKeyFrame))); + Input(kAudioStream) + ->Dispatch(StreamData::FromMediaSample( + kChild, GetMediaSample(kSample2Start, kSampleDuration, kKeyFrame))); + Input(kAudioStream)->FlushAllDownstreams(); + + Input(kVideoStream) + ->Dispatch( + StreamData::FromStreamInfo(kChild, GetVideoStreamInfo(kMsTimeScale))); + Input(kVideoStream) + ->Dispatch(StreamData::FromMediaSample( + kChild, GetMediaSample(kSample0Start, kSampleDuration, kKeyFrame))); + Input(kVideoStream) + ->Dispatch(StreamData::FromMediaSample( + kChild, GetMediaSample(kSample1Start, kSampleDuration, !kKeyFrame))); + Input(kVideoStream) + ->Dispatch(StreamData::FromMediaSample( + kChild, GetMediaSample(kSample2Start, kSampleDuration, kKeyFrame))); + Input(kVideoStream)->FlushAllDownstreams(); +} + +// TODO(kqyang): Add more tests, in particular, multi-thread tests. + +} // namespace media +} // namespace shaka diff --git a/packager/media/chunking/sync_point_queue.cc b/packager/media/chunking/sync_point_queue.cc new file mode 100644 index 0000000000..f8c4ce820f --- /dev/null +++ b/packager/media/chunking/sync_point_queue.cc @@ -0,0 +1,118 @@ +// Copyright 2018 Google Inc. All rights reserved. +// +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file or at +// https://developers.google.com/open-source/licenses/bsd + +#include "packager/media/chunking/sync_point_queue.h" +#include "packager/media/base/media_handler.h" + +#include +#include + +namespace shaka { +namespace media { + +SyncPointQueue::SyncPointQueue(const AdCueGeneratorParams& params) + : sync_condition_(&lock_) { + for (const Cuepoint& point : params.cue_points) { + std::shared_ptr event = std::make_shared(); + event->time_in_seconds = point.start_time_in_seconds; + unpromoted_[point.start_time_in_seconds] = std::move(event); + } +} + +void SyncPointQueue::AddThread() { + base::AutoLock auto_lock(lock_); + thread_count_++; +} + +double SyncPointQueue::GetHint(double time_in_seconds) { + base::AutoLock auto_lock(lock_); + + auto iter = promoted_.upper_bound(time_in_seconds); + if (iter != promoted_.end()) + return iter->first; + + iter = unpromoted_.upper_bound(time_in_seconds); + if (iter != unpromoted_.end()) + return iter->first; + + // Use MAX DOUBLE as the fall back so that we can force all streams to run + // out all their samples even when there are no cues. + return std::numeric_limits::max(); +} + +std::shared_ptr SyncPointQueue::GetNext( + double hint_in_seconds) { + base::AutoLock auto_lock(lock_); + while (true) { + // Find the promoted cue that would line up with our hint, which is the + // first cue that is not less than |hint_in_seconds|. + auto iter = promoted_.lower_bound(hint_in_seconds); + if (iter != promoted_.end()) { + return iter->second; + } + + // Promote |hint_in_seconds| if everyone is waiting. + if (waiting_thread_count_ + 1 == thread_count_) { + std::shared_ptr cue = PromoteAtNoLocking(hint_in_seconds); + CHECK(cue); + return cue; + } + + waiting_thread_count_++; + // This blocks until either a cue is promoted or all threads are blocked + // (in which case, the unpromoted cue at the hint will be self-promoted + // and returned - see section above). Spurious signal events are possible + // with most condition variable implementations, so if it returns, we go + // back and check if a cue is actually promoted or not. + sync_condition_.Wait(); + waiting_thread_count_--; + } +} + +std::shared_ptr SyncPointQueue::PromoteAt( + double time_in_seconds) { + base::AutoLock auto_lock(lock_); + return PromoteAtNoLocking(time_in_seconds); +} + +std::shared_ptr SyncPointQueue::PromoteAtNoLocking( + double time_in_seconds) { + lock_.AssertAcquired(); + + // It is possible that |time_in_seconds| has been promoted. + auto iter = promoted_.find(time_in_seconds); + if (iter != promoted_.end()) + return iter->second; + + // Find the unpromoted cue that would work for the given time, which is the + // first cue that is not greater than |time_in_seconds|. + // So find the the first cue that is greater than |time_in_seconds| first and + // then get the previous one. + iter = unpromoted_.upper_bound(time_in_seconds); + // The first cue in |unpromoted_| should not be greater than + // |time_in_seconds|. It could happen only if it has been promoted at a + // different timestamp, which can only be the result of unaligned GOPs. + if (iter == unpromoted_.begin()) + return nullptr; + auto prev_iter = std::prev(iter); + DCHECK(prev_iter != unpromoted_.end()); + + std::shared_ptr cue = prev_iter->second; + cue->time_in_seconds = time_in_seconds; + + promoted_[time_in_seconds] = cue; + // Remove all unpromoted cues up to the cue that was just promoted. + // User may provide multiple cue points at the same or similar timestamps. The + // extra unused cues are simply ignored. + unpromoted_.erase(unpromoted_.begin(), iter); + + // Wake up other threads that may be waiting. + sync_condition_.Broadcast(); + return std::move(cue); +} + +} // namespace media +} // namespace shaka diff --git a/packager/media/chunking/sync_point_queue.h b/packager/media/chunking/sync_point_queue.h new file mode 100644 index 0000000000..30c0ae7d7e --- /dev/null +++ b/packager/media/chunking/sync_point_queue.h @@ -0,0 +1,65 @@ +// Copyright 2018 Google Inc. All rights reserved. +// +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file or at +// https://developers.google.com/open-source/licenses/bsd + +#include +#include + +#include "packager/base/synchronization/condition_variable.h" +#include "packager/base/synchronization/lock.h" +#include "packager/media/public/ad_cue_generator_params.h" + +namespace shaka { +namespace media { + +struct CueEvent; + +/// A synchronized queue for cue points. +class SyncPointQueue { + public: + explicit SyncPointQueue(const AdCueGeneratorParams& params); + ~SyncPointQueue() = default; + + /// Add a new thread. Each thread using this instance must call this method in + /// order to keep track of its clients. + void AddThread(); + + /// @return A hint for when the next cue event would be. The returned hint is + /// not less than @a time_in_seconds. The actual time for the next cue + /// event will not be less than the returned hint, with the exact + /// value depends on promotion. + double GetHint(double time_in_seconds); + + /// @return The next cue based on a previous hint. If a cue has been promoted + /// that comes after @a hint_in_seconds it is returned. If no cue + /// after @a hint_in_seconds has been promoted, this will block until + /// either a cue is promoted or all threads are blocked (in which + /// case, the unpromoted cue at @a hint_in_seconds will be + /// self-promoted and returned). + std::shared_ptr GetNext(double hint_in_seconds); + + /// Promote the first cue that is not greater than @a time_in_seconds. All + /// unpromoted cues before the cue will be discarded. + std::shared_ptr PromoteAt(double time_in_seconds); + + private: + SyncPointQueue(const SyncPointQueue&) = delete; + SyncPointQueue& operator=(const SyncPointQueue&) = delete; + + // PromoteAt() without locking. It is called by PromoteAt() and other + // functions that have locks. + std::shared_ptr PromoteAtNoLocking(double time_in_seconds); + + base::Lock lock_; + base::ConditionVariable sync_condition_; + size_t thread_count_ = 0; + size_t waiting_thread_count_ = 0; + + std::map> unpromoted_; + std::map> promoted_; +}; + +} // namespace media +} // namespace shaka