diff --git a/packager/media/base/media_handler_test_base.h b/packager/media/base/media_handler_test_base.h index db89cb9378..cdbebe6ceb 100644 --- a/packager/media/base/media_handler_test_base.h +++ b/packager/media/base/media_handler_test_base.h @@ -116,8 +116,8 @@ MATCHER_P4(IsMediaSample, stream_index, timestamp, duration, encrypted, "") { << arg->media_sample->duration() << "," << BoolToString(arg->media_sample->is_encrypted()) << ")"; return arg->stream_index == stream_index && - arg->media_sample->dts() == timestamp && - arg->media_sample->duration() == duration && + arg->media_sample->dts() == static_cast(timestamp) && + arg->media_sample->duration() == static_cast(duration) && arg->media_sample->is_encrypted() == encrypted; } diff --git a/packager/media/chunking/chunking.gyp b/packager/media/chunking/chunking.gyp index 57c00880df..eff100192a 100644 --- a/packager/media/chunking/chunking.gyp +++ b/packager/media/chunking/chunking.gyp @@ -15,6 +15,10 @@ 'sources': [ 'chunking_handler.cc', 'chunking_handler.h', + 'cue_alignment_handler.cc', + 'cue_alignment_handler.h', + 'sync_point_queue.cc', + 'sync_point_queue.h', ], 'dependencies': [ '../base/media_base.gyp:media_base', @@ -25,6 +29,7 @@ 'type': '<(gtest_target_type)', 'sources': [ 'chunking_handler_unittest.cc', + 'cue_alignment_handler_unittest.cc', ], 'dependencies': [ '../../testing/gtest.gyp:gtest', diff --git a/packager/media/chunking/cue_alignment_handler.cc b/packager/media/chunking/cue_alignment_handler.cc new file mode 100644 index 0000000000..fb53c0da69 --- /dev/null +++ b/packager/media/chunking/cue_alignment_handler.cc @@ -0,0 +1,272 @@ +// Copyright 2018 Google Inc. All rights reserved. +// +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file or at +// https://developers.google.com/open-source/licenses/bsd + +#include "packager/media/chunking/cue_alignment_handler.h" + +namespace shaka { +namespace media { +namespace { +// The max number of samples that are allowed to be buffered before we shutdown +// because there is likely a problem with the content or how the pipeline was +// configured. This is about 20 seconds of buffer for audio with 48kHz. +const size_t kMaxBufferSize = 1000; + +double TimeInSeconds(const StreamInfo& info, const StreamData& data) { + int64_t time_scale; + int64_t scaled_time; + switch (data.stream_data_type) { + case StreamDataType::kMediaSample: + time_scale = info.time_scale(); + scaled_time = data.media_sample->pts(); + break; + case StreamDataType::kTextSample: + // Text is always in MS but the stream info time scale is 0. + time_scale = 1000; + scaled_time = data.text_sample->start_time(); + break; + default: + time_scale = 0; + scaled_time = 0; + NOTREACHED() << "TimeInSeconds should only be called on media samples " + "and text samples."; + break; + } + + return static_cast(scaled_time) / time_scale; +} +} // namespace + +CueAlignmentHandler::CueAlignmentHandler(SyncPointQueue* sync_points) + : sync_points_(sync_points) {} + +Status CueAlignmentHandler::InitializeInternal() { + sync_points_->AddThread(); + stream_states_.resize(num_input_streams()); + return Status::OK; +} + +Status CueAlignmentHandler::Process(std::unique_ptr data) { + switch (data->stream_data_type) { + case StreamDataType::kStreamInfo: + return OnStreamInfo(std::move(data)); + case StreamDataType::kTextSample: + case StreamDataType::kMediaSample: + return OnSample(std::move(data)); + default: + VLOG(3) << "Dropping unsupported data type " + << static_cast(data->stream_data_type); + return Status::OK; + } +} + +Status CueAlignmentHandler::OnFlushRequest(size_t stream_index) { + // We need to wait for all stream to flush before we can flush each stream. + // This allows cached buffers to be cleared and cues to be properly + // synchronized and set on all streams.. + + stream_states_[stream_index].to_be_flushed = true; + for (const StreamState& stream_state : stream_states_) { + if (!stream_state.to_be_flushed) + return Status::OK; + } + + // |to_be_flushed| is set on all streams. We don't expect to see data in any + // buffers. + for (size_t i = 0; i < stream_states_.size(); ++i) { + StreamState& stream_state = stream_states_[i]; + if (!stream_state.samples.empty()) { + LOG(WARNING) << "Unexpected data seen on stream " << i; + while (!stream_state.samples.empty()) { + Status status(Dispatch(std::move(stream_state.samples.front()))); + if (!status.ok()) + return status; + stream_state.samples.pop_front(); + } + } + } + return FlushAllDownstreams(); +} + +Status CueAlignmentHandler::OnStreamInfo(std::unique_ptr data) { + StreamState& stream_state = stream_states_[data->stream_index]; + // Keep a copy of the stream info so that we can check type and check + // timescale. + stream_state.info = data->stream_info; + // Get the first hint for the stream. Use a negative hint so that if there is + // suppose to be a sync point at zero, we will still respect it. + stream_state.next_cue_hint = sync_points_->GetHint(-1); + + return Dispatch(std::move(data)); +} + +Status CueAlignmentHandler::OnSample(std::unique_ptr sample) { + // There are two modes: + // 1. There is a video input. + // 2. There are no video inputs. + // + // When there is a video input, we rely on the video input get the next sync + // point and release all the samples. + // + // When there are no video inputs, we rely on the sync point queue to block + // us until there is a sync point. + + const uint64_t stream_index = sample->stream_index; + StreamState& stream_state = stream_states_[stream_index]; + + if (stream_state.info->stream_type() == kStreamVideo) { + const double sample_time = TimeInSeconds(*stream_state.info, *sample); + if (sample->media_sample->is_key_frame() && + sample_time >= stream_state.next_cue_hint) { + std::shared_ptr next_sync = + sync_points_->PromoteAt(sample_time); + if (!next_sync) { + LOG(ERROR) + << "Failed to promote sync point at " << sample_time + << ". This happens only if video streams are not GOP-aligned."; + return Status(error::INVALID_ARGUMENT, + "Streams are not properly GOP-aligned."); + } + + Status status(DispatchCueEvent(stream_index, next_sync)); + stream_state.cue.reset(); + status.Update(UseNewSyncPoint(next_sync)); + if (!status.ok()) + return status; + } + return Dispatch(std::move(sample)); + } + + // Accept the sample. This will output it if it comes before the hint point or + // will cache it if it comes after the hint point. + Status status(AcceptSample(std::move(sample), &stream_state)); + if (!status.ok()) { + return status; + } + + // If all the streams are waiting on a hint, it means that none has next sync + // point determined. It also means that there are no video streams and we need + // to wait for all streams to converge on a hint so that we can get the next + // sync point. + if (EveryoneWaitingAtHint()) { + // All streams should have the same hint right now. + const double next_cue_hint = stream_state.next_cue_hint; + for (const StreamState& stream_state : stream_states_) { + DCHECK_EQ(next_cue_hint, stream_state.next_cue_hint); + } + + std::shared_ptr next_sync = + sync_points_->GetNext(next_cue_hint); + DCHECK(next_sync); + + Status status = UseNewSyncPoint(next_sync); + if (!status.ok()) { + return status; + } + } + + return Status::OK; +} + +Status CueAlignmentHandler::UseNewSyncPoint( + std::shared_ptr new_sync) { + const double new_hint = sync_points_->GetHint(new_sync->time_in_seconds); + DCHECK_GT(new_hint, new_sync->time_in_seconds); + + Status status; + for (StreamState& stream_state : stream_states_) { + // No stream should be so out of sync with the others that they would + // still be working on an old cue. + if (stream_state.cue) { + LOG(ERROR) << "Found two cue events that are too close together. One at " + << stream_state.cue->time_in_seconds << " and the other at " + << new_sync->time_in_seconds; + return Status(error::INVALID_ARGUMENT, "Cue events too close together"); + } + + // Add the cue and update the hint. The cue will always be used over the + // hint, so hint should always be greater than the latest cue. + stream_state.cue = new_sync; + stream_state.next_cue_hint = new_hint; + + while (status.ok() && !stream_state.samples.empty()) { + std::unique_ptr& sample = stream_state.samples.front(); + const double sample_time_in_seconds = + TimeInSeconds(*stream_state.info, *sample); + if (sample_time_in_seconds >= stream_state.next_cue_hint) { + DCHECK(!stream_state.cue); + break; + } + + const size_t stream_index = sample->stream_index; + if (stream_state.cue) { + status.Update(DispatchCueIfNeeded(stream_index, sample_time_in_seconds, + &stream_state)); + } + status.Update(Dispatch(std::move(sample))); + stream_state.samples.pop_front(); + } + } + return status; +} + +bool CueAlignmentHandler::EveryoneWaitingAtHint() const { + for (const StreamState& stream_state : stream_states_) { + if (stream_state.samples.empty()) { + return false; + } + } + return true; +} + +// Accept Sample will either: +// 1. Send the sample downstream, as it comes before the next sync point and +// therefore can skip the bufferring. +// 2. Save the sample in the buffer as it comes after the next sync point. +Status CueAlignmentHandler::AcceptSample(std::unique_ptr sample, + StreamState* stream_state) { + DCHECK(stream_state); + + const size_t stream_index = sample->stream_index; + if (stream_state->samples.empty()) { + const double sample_time_in_seconds = + TimeInSeconds(*stream_state->info, *sample); + if (sample_time_in_seconds < stream_state->next_cue_hint) { + Status status; + if (stream_state->cue) { + status.Update(DispatchCueIfNeeded(stream_index, sample_time_in_seconds, + stream_state)); + } + status.Update(Dispatch(std::move(sample))); + return status; + } + DCHECK(!stream_state->cue); + } + + stream_state->samples.push_back(std::move(sample)); + if (stream_state->samples.size() > kMaxBufferSize) { + LOG(ERROR) << "Stream " << stream_index << " has buffered " + << stream_state->samples.size() << " when the max is " + << kMaxBufferSize; + return Status(error::INVALID_ARGUMENT, + "Streams are not properly multiplexed."); + } + + return Status::OK; +} + +Status CueAlignmentHandler::DispatchCueIfNeeded( + size_t stream_index, + double next_sample_time_in_seconds, + StreamState* stream_state) { + DCHECK(stream_state->cue); + if (next_sample_time_in_seconds < stream_state->cue->time_in_seconds) + return Status::OK; + DCHECK_LT(stream_state->cue->time_in_seconds, stream_state->next_cue_hint); + return DispatchCueEvent(stream_index, std::move(stream_state->cue)); +} + +} // namespace media +} // namespace shaka diff --git a/packager/media/chunking/cue_alignment_handler.h b/packager/media/chunking/cue_alignment_handler.h new file mode 100644 index 0000000000..317d2c98a9 --- /dev/null +++ b/packager/media/chunking/cue_alignment_handler.h @@ -0,0 +1,83 @@ +// Copyright 2018 Google Inc. All rights reserved. +// +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file or at +// https://developers.google.com/open-source/licenses/bsd + +#include + +#include "packager/media/base/media_handler.h" +#include "packager/media/chunking/sync_point_queue.h" + +namespace shaka { +namespace media { + +/// The cue alignment handler is a N-to-N handler that will inject CueEvents +/// into all streams. It will align the cues across streams (and handlers) +/// using a shared SyncPointQueue. +/// +/// There should be a cue alignment handler per demuxer/thread and not per +/// stream. A cue alignment handler must be one per thread in order to properly +/// manage blocking. +class CueAlignmentHandler : public MediaHandler { + public: + explicit CueAlignmentHandler(SyncPointQueue* sync_points); + ~CueAlignmentHandler() = default; + + private: + CueAlignmentHandler(const CueAlignmentHandler&) = delete; + CueAlignmentHandler& operator=(const CueAlignmentHandler&) = delete; + + struct StreamState { + // Information for the stream. + std::shared_ptr info; + // Cached samples that cannot be dispatched. All the samples should be at or + // after |hint|. + std::list> samples; + // If set, the stream is pending to be flushed. + bool to_be_flushed = false; + + // If set, it points to the next cue it has to send downstream. Note that if + // it is not set, the next cue is not determined. + // This is set but not really used by video stream. + std::shared_ptr cue; + // If |cue| is set, this is the hint for the cue after |cue|, i.e. next next + // cue; otherwise this is the hint for the next cue. This holds the time in + // seconds of the scheduled (unpromoted) next or next next cue's time. This + // is essentially the barrier for the sample stream. Non video samples after + // this barrier must wait until the |cue| is determined and thus the next + // hint to be determined; video samples will promote the hint to cue when + // seeing the first key frame after |next_cue_hint|. + double next_cue_hint = 0; + }; + + // MediaHandler overrides. + Status InitializeInternal() override; + Status Process(std::unique_ptr data) override; + Status OnFlushRequest(size_t stream_index) override; + + // Internal handling functions for different stream data. + Status OnStreamInfo(std::unique_ptr data); + Status OnSample(std::unique_ptr sample); + + // Update stream states with new sync point. + Status UseNewSyncPoint(std::shared_ptr new_sync); + + // Check if everyone is waiting for new hint points. + bool EveryoneWaitingAtHint() const; + + // Dispatch or save incoming sample. + Status AcceptSample(std::unique_ptr sample, + StreamState* stream_state); + + // Dispatch the cue if the new sample comes at or after it. + Status DispatchCueIfNeeded(size_t stream_index, + double next_sample_time_in_seconds, + StreamState* stream_state); + + SyncPointQueue* const sync_points_ = nullptr; + std::vector stream_states_; +}; + +} // namespace media +} // namespace shaka diff --git a/packager/media/chunking/cue_alignment_handler_unittest.cc b/packager/media/chunking/cue_alignment_handler_unittest.cc new file mode 100644 index 0000000000..1dcece2da1 --- /dev/null +++ b/packager/media/chunking/cue_alignment_handler_unittest.cc @@ -0,0 +1,618 @@ +// Copyright 2018 Google Inc. All rights reserved. +// +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file or at +// https://developers.google.com/open-source/licenses/bsd + +#include "packager/media/chunking/chunking_handler.h" + +#include +#include + +#include "packager/media/base/media_handler_test_base.h" +#include "packager/media/chunking/cue_alignment_handler.h" +#include "packager/media/public/ad_cue_generator_params.h" +#include "packager/status_test_util.h" + +using ::testing::ElementsAre; +using ::testing::IsEmpty; + +namespace shaka { +namespace media { +namespace { +const size_t kOneInput = 1; +const size_t kOneOutput = 1; + +const size_t kThreeInput = 3; +const size_t kThreeOutput = 3; + +const bool kEncrypted = true; +const bool kKeyFrame = true; + +const uint64_t kNoTimeScale = 0; +const uint64_t kMsTimeScale = 1000; + +const char* kNoId = ""; +const char* kNoSettings = ""; +const char* kNoPayload = ""; + +const size_t kChild = 0; +const size_t kParent = 0; +} // namespace + +class CueAlignmentHandlerTest : public MediaHandlerTestBase {}; + +TEST_F(CueAlignmentHandlerTest, VideoInputWithNoCues) { + const size_t kVideoStream = 0; + + const int64_t kSampleDuration = 1000; + const int64_t kSample0Start = 0; + const int64_t kSample1Start = kSample0Start + kSampleDuration; + const int64_t kSample2Start = kSample1Start + kSampleDuration; + + AdCueGeneratorParams params; + SyncPointQueue sync_points(params); + std::shared_ptr handler = + std::make_shared(&sync_points); + SetUpAndInitializeGraph(handler, kOneInput, kOneOutput); + + { + testing::InSequence s; + + EXPECT_CALL(*Output(kVideoStream), + OnProcess(IsStreamInfo(kParent, kMsTimeScale, !kEncrypted))); + EXPECT_CALL(*Output(kVideoStream), + OnProcess(IsMediaSample(kParent, kSample0Start, kSampleDuration, + !kEncrypted))); + EXPECT_CALL(*Output(kVideoStream), + OnProcess(IsMediaSample(kParent, kSample1Start, kSampleDuration, + !kEncrypted))); + EXPECT_CALL(*Output(kVideoStream), + OnProcess(IsMediaSample(kParent, kSample2Start, kSampleDuration, + !kEncrypted))); + EXPECT_CALL(*Output(kVideoStream), OnFlush(kParent)); + } + + Input(kVideoStream) + ->Dispatch( + StreamData::FromStreamInfo(kChild, GetVideoStreamInfo(kMsTimeScale))); + Input(kVideoStream) + ->Dispatch(StreamData::FromMediaSample( + kChild, GetMediaSample(kSample0Start, kSampleDuration, kKeyFrame))); + Input(kVideoStream) + ->Dispatch(StreamData::FromMediaSample( + kChild, GetMediaSample(kSample1Start, kSampleDuration, !kKeyFrame))); + Input(kVideoStream) + ->Dispatch(StreamData::FromMediaSample( + kChild, GetMediaSample(kSample2Start, kSampleDuration, kKeyFrame))); + Input(kVideoStream)->FlushAllDownstreams(); +} + +TEST_F(CueAlignmentHandlerTest, AudioInputWithNoCues) { + const size_t kAudioStream = 0; + + const int64_t kSampleDuration = 1000; + const int64_t kSample0Start = 0; + const int64_t kSample1Start = kSample0Start + kSampleDuration; + const int64_t kSample2Start = kSample1Start + kSampleDuration; + + AdCueGeneratorParams params; + SyncPointQueue sync_points(params); + std::shared_ptr handler = + std::make_shared(&sync_points); + SetUpAndInitializeGraph(handler, kOneInput, kOneOutput); + + { + testing::InSequence s; + + EXPECT_CALL(*Output(kAudioStream), + OnProcess(IsStreamInfo(kParent, kMsTimeScale, !kEncrypted))); + EXPECT_CALL(*Output(kAudioStream), + OnProcess(IsMediaSample(kParent, kSample0Start, kSampleDuration, + !kEncrypted))); + EXPECT_CALL(*Output(kAudioStream), + OnProcess(IsMediaSample(kParent, kSample1Start, kSampleDuration, + !kEncrypted))); + EXPECT_CALL(*Output(kAudioStream), + OnProcess(IsMediaSample(kParent, kSample2Start, kSampleDuration, + !kEncrypted))); + EXPECT_CALL(*Output(kAudioStream), OnFlush(kParent)); + } + + Input(kAudioStream) + ->Dispatch( + StreamData::FromStreamInfo(kChild, GetAudioStreamInfo(kMsTimeScale))); + Input(kAudioStream) + ->Dispatch(StreamData::FromMediaSample( + kChild, GetMediaSample(kSample0Start, kSampleDuration, kKeyFrame))); + Input(kAudioStream) + ->Dispatch(StreamData::FromMediaSample( + kChild, GetMediaSample(kSample1Start, kSampleDuration, kKeyFrame))); + Input(kAudioStream) + ->Dispatch(StreamData::FromMediaSample( + kChild, GetMediaSample(kSample2Start, kSampleDuration, kKeyFrame))); + Input(kAudioStream)->FlushAllDownstreams(); +} + +TEST_F(CueAlignmentHandlerTest, TextInputWithNoCues) { + const size_t kTextStream = 0; + + const int64_t kSampleDuration = 1000; + + const uint64_t kSample0Start = 0; + const uint64_t kSample0End = kSample0Start + kSampleDuration; + const uint64_t kSample1Start = kSample0End; + const uint64_t kSample1End = kSample1Start + kSampleDuration; + const uint64_t kSample2Start = kSample1End; + const uint64_t kSample2End = kSample2Start + kSampleDuration; + + AdCueGeneratorParams params; + SyncPointQueue sync_points(params); + std::shared_ptr handler = + std::make_shared(&sync_points); + SetUpAndInitializeGraph(handler, kOneInput, kOneOutput); + + { + testing::InSequence s; + + EXPECT_CALL(*Output(kTextStream), + OnProcess(IsStreamInfo(kParent, kNoTimeScale, !kEncrypted))); + EXPECT_CALL(*Output(kTextStream), + OnProcess(IsTextSample(kNoId, kSample0Start, kSample0End, + kNoSettings, kNoPayload))); + EXPECT_CALL(*Output(kTextStream), + OnProcess(IsTextSample(kNoId, kSample1Start, kSample1End, + kNoSettings, kNoPayload))); + EXPECT_CALL(*Output(kTextStream), + OnProcess(IsTextSample(kNoId, kSample2Start, kSample2End, + kNoSettings, kNoPayload))); + EXPECT_CALL(*Output(kTextStream), OnFlush(kParent)); + } + + Input(kTextStream) + ->Dispatch(StreamData::FromStreamInfo(kChild, GetTextStreamInfo())); + Input(kTextStream) + ->Dispatch(StreamData::FromTextSample( + kChild, + GetTextSample(kNoId, kSample0Start, kSample0End, kNoPayload))); + Input(kTextStream) + ->Dispatch(StreamData::FromTextSample( + kChild, + GetTextSample(kNoId, kSample1Start, kSample1End, kNoPayload))); + Input(kTextStream) + ->Dispatch(StreamData::FromTextSample( + kChild, + GetTextSample(kNoId, kSample2Start, kSample2End, kNoPayload))); + Input(kTextStream)->FlushAllDownstreams(); +} + +TEST_F(CueAlignmentHandlerTest, TextAudioVideoInputWithNoCues) { + const size_t kTextStream = 0; + const size_t kAudioStream = 1; + const size_t kVideoStream = 2; + + const uint64_t kSampleDuration = 1000; + + const uint64_t kSample0Start = 0; + const uint64_t kSample0End = kSample0Start + kSampleDuration; + const uint64_t kSample1Start = kSample0Start + kSampleDuration; + const uint64_t kSample1End = kSample1Start + kSampleDuration; + const uint64_t kSample2Start = kSample1Start + kSampleDuration; + const uint64_t kSample2End = kSample2Start + kSampleDuration; + + AdCueGeneratorParams params; + SyncPointQueue sync_points(params); + std::shared_ptr handler = + std::make_shared(&sync_points); + SetUpAndInitializeGraph(handler, kThreeInput, kThreeOutput); + + { + testing::InSequence s; + + EXPECT_CALL(*Output(kTextStream), + OnProcess(IsStreamInfo(kParent, kNoTimeScale, !kEncrypted))); + EXPECT_CALL(*Output(kTextStream), + OnProcess(IsTextSample(kNoId, kSample0Start, kSample0End, + kNoSettings, kNoPayload))); + EXPECT_CALL(*Output(kTextStream), + OnProcess(IsTextSample(kNoId, kSample1Start, kSample1End, + kNoSettings, kNoPayload))); + EXPECT_CALL(*Output(kTextStream), + OnProcess(IsTextSample(kNoId, kSample2Start, kSample2End, + kNoSettings, kNoPayload))); + EXPECT_CALL(*Output(kTextStream), OnFlush(kParent)); + } + + { + testing::InSequence s; + + EXPECT_CALL(*Output(kAudioStream), + OnProcess(IsStreamInfo(kParent, kMsTimeScale, !kEncrypted))); + EXPECT_CALL(*Output(kAudioStream), + OnProcess(IsMediaSample(kParent, kSample0Start, kSampleDuration, + !kEncrypted))); + EXPECT_CALL(*Output(kAudioStream), + OnProcess(IsMediaSample(kParent, kSample1Start, kSampleDuration, + !kEncrypted))); + EXPECT_CALL(*Output(kAudioStream), + OnProcess(IsMediaSample(kParent, kSample2Start, kSampleDuration, + !kEncrypted))); + EXPECT_CALL(*Output(kAudioStream), OnFlush(kParent)); + } + + { + testing::InSequence s; + + EXPECT_CALL(*Output(kVideoStream), + OnProcess(IsStreamInfo(kParent, kMsTimeScale, !kEncrypted))); + EXPECT_CALL(*Output(kVideoStream), + OnProcess(IsMediaSample(kParent, kSample0Start, kSampleDuration, + !kEncrypted))); + EXPECT_CALL(*Output(kVideoStream), + OnProcess(IsMediaSample(kParent, kSample1Start, kSampleDuration, + !kEncrypted))); + EXPECT_CALL(*Output(kVideoStream), + OnProcess(IsMediaSample(kParent, kSample2Start, kSampleDuration, + !kEncrypted))); + EXPECT_CALL(*Output(kVideoStream), OnFlush(kParent)); + } + + Input(kTextStream) + ->Dispatch(StreamData::FromStreamInfo(kChild, GetTextStreamInfo())); + Input(kTextStream) + ->Dispatch(StreamData::FromTextSample( + kChild, + GetTextSample(kNoId, kSample0Start, kSample0End, kNoPayload))); + Input(kTextStream) + ->Dispatch(StreamData::FromTextSample( + kChild, + GetTextSample(kNoId, kSample1Start, kSample1End, kNoPayload))); + Input(kTextStream) + ->Dispatch(StreamData::FromTextSample( + kChild, + GetTextSample(kNoId, kSample2Start, kSample2End, kNoPayload))); + Input(kTextStream)->FlushAllDownstreams(); + + Input(kAudioStream) + ->Dispatch( + StreamData::FromStreamInfo(kChild, GetAudioStreamInfo(kMsTimeScale))); + Input(kAudioStream) + ->Dispatch(StreamData::FromMediaSample( + kChild, GetMediaSample(kSample0Start, kSampleDuration, kKeyFrame))); + Input(kAudioStream) + ->Dispatch(StreamData::FromMediaSample( + kChild, GetMediaSample(kSample1Start, kSampleDuration, kKeyFrame))); + Input(kAudioStream) + ->Dispatch(StreamData::FromMediaSample( + kChild, GetMediaSample(kSample2Start, kSampleDuration, kKeyFrame))); + Input(kAudioStream)->FlushAllDownstreams(); + + Input(kVideoStream) + ->Dispatch( + StreamData::FromStreamInfo(kChild, GetVideoStreamInfo(kMsTimeScale))); + Input(kVideoStream) + ->Dispatch(StreamData::FromMediaSample( + kChild, GetMediaSample(kSample0Start, kSampleDuration, kKeyFrame))); + Input(kVideoStream) + ->Dispatch(StreamData::FromMediaSample( + kChild, GetMediaSample(kSample1Start, kSampleDuration, !kKeyFrame))); + Input(kVideoStream) + ->Dispatch(StreamData::FromMediaSample( + kChild, GetMediaSample(kSample2Start, kSampleDuration, kKeyFrame))); + Input(kVideoStream)->FlushAllDownstreams(); +} + +TEST_F(CueAlignmentHandlerTest, VideoInputWithCues) { + const size_t kVideoStream = 0; + + const int64_t kSampleDuration = 1000; + const int64_t kSample0Start = 0; + const int64_t kSample1Start = kSample0Start + kSampleDuration; + const int64_t kSample2Start = kSample1Start + kSampleDuration; + + const double kSample2StartInSeconds = + static_cast(kSample2Start) / kMsTimeScale; + + // Put the cue between two key frames. + Cuepoint cue; + cue.start_time_in_seconds = static_cast(kSample1Start) / kMsTimeScale; + + AdCueGeneratorParams params; + params.cue_points.push_back(cue); + + SyncPointQueue sync_points(params); + std::shared_ptr handler = + std::make_shared(&sync_points); + SetUpAndInitializeGraph(handler, kOneInput, kOneOutput); + + { + testing::InSequence s; + + EXPECT_CALL(*Output(kVideoStream), + OnProcess(IsStreamInfo(kParent, kMsTimeScale, !kEncrypted))); + EXPECT_CALL(*Output(kVideoStream), + OnProcess(IsMediaSample(kParent, kSample0Start, kSampleDuration, + !kEncrypted))); + EXPECT_CALL(*Output(kVideoStream), + OnProcess(IsMediaSample(kParent, kSample1Start, kSampleDuration, + !kEncrypted))); + EXPECT_CALL(*Output(kVideoStream), + OnProcess(IsCueEvent(kParent, kSample2StartInSeconds))); + EXPECT_CALL(*Output(kVideoStream), + OnProcess(IsMediaSample(kParent, kSample2Start, kSampleDuration, + !kEncrypted))); + EXPECT_CALL(*Output(kVideoStream), OnFlush(kParent)); + } + + Input(kVideoStream) + ->Dispatch( + StreamData::FromStreamInfo(kChild, GetVideoStreamInfo(kMsTimeScale))); + Input(kVideoStream) + ->Dispatch(StreamData::FromMediaSample( + kChild, GetMediaSample(kSample0Start, kSampleDuration, kKeyFrame))); + Input(kVideoStream) + ->Dispatch(StreamData::FromMediaSample( + kChild, GetMediaSample(kSample1Start, kSampleDuration, !kKeyFrame))); + Input(kVideoStream) + ->Dispatch(StreamData::FromMediaSample( + kChild, GetMediaSample(kSample2Start, kSampleDuration, kKeyFrame))); + Input(kVideoStream)->FlushAllDownstreams(); +} + +TEST_F(CueAlignmentHandlerTest, AudioInputWithCues) { + const size_t kAudioStream = 0; + + const int64_t kSampleDuration = 1000; + const int64_t kSample0Start = 0; + const int64_t kSample1Start = kSample0Start + kSampleDuration; + const int64_t kSample2Start = kSample1Start + kSampleDuration; + + const double kSample1StartInSeconds = + static_cast(kSample1Start) / kMsTimeScale; + + Cuepoint cue; + cue.start_time_in_seconds = static_cast(kSample1Start) / kMsTimeScale; + + AdCueGeneratorParams params; + params.cue_points.push_back(cue); + + SyncPointQueue sync_points(params); + std::shared_ptr handler = + std::make_shared(&sync_points); + SetUpAndInitializeGraph(handler, kOneInput, kOneOutput); + + { + testing::InSequence s; + + EXPECT_CALL(*Output(kAudioStream), + OnProcess(IsStreamInfo(kParent, kMsTimeScale, !kEncrypted))); + EXPECT_CALL(*Output(kAudioStream), + OnProcess(IsMediaSample(kParent, kSample0Start, kSampleDuration, + !kEncrypted))); + EXPECT_CALL(*Output(kAudioStream), + OnProcess(IsCueEvent(kParent, kSample1StartInSeconds))); + EXPECT_CALL(*Output(kAudioStream), + OnProcess(IsMediaSample(kParent, kSample1Start, kSampleDuration, + !kEncrypted))); + EXPECT_CALL(*Output(kAudioStream), + OnProcess(IsMediaSample(kParent, kSample2Start, kSampleDuration, + !kEncrypted))); + EXPECT_CALL(*Output(kAudioStream), OnFlush(kParent)); + } + + Input(kAudioStream) + ->Dispatch( + StreamData::FromStreamInfo(kChild, GetAudioStreamInfo(kMsTimeScale))); + Input(kAudioStream) + ->Dispatch(StreamData::FromMediaSample( + kChild, GetMediaSample(kSample0Start, kSampleDuration, kKeyFrame))); + Input(kAudioStream) + ->Dispatch(StreamData::FromMediaSample( + kChild, GetMediaSample(kSample1Start, kSampleDuration, kKeyFrame))); + Input(kAudioStream) + ->Dispatch(StreamData::FromMediaSample( + kChild, GetMediaSample(kSample2Start, kSampleDuration, kKeyFrame))); + Input(kAudioStream)->FlushAllDownstreams(); +} + +TEST_F(CueAlignmentHandlerTest, TextInputWithCues) { + const size_t kTextStream = 0; + + const int64_t kSampleDuration = 1000; + + const uint64_t kSample0Start = 0; + const uint64_t kSample0End = kSample0Start + kSampleDuration; + const uint64_t kSample1Start = kSample0End; + const uint64_t kSample1End = kSample1Start + kSampleDuration; + const uint64_t kSample2Start = kSample1End; + const uint64_t kSample2End = kSample2Start + kSampleDuration; + + const double kSample1StartInSeconds = + static_cast(kSample1Start) / kMsTimeScale; + + Cuepoint cue; + cue.start_time_in_seconds = static_cast(kSample1Start) / kMsTimeScale; + + AdCueGeneratorParams params; + params.cue_points.push_back(cue); + + SyncPointQueue sync_points(params); + std::shared_ptr handler = + std::make_shared(&sync_points); + SetUpAndInitializeGraph(handler, kOneInput, kOneOutput); + + { + testing::InSequence s; + + EXPECT_CALL(*Output(kTextStream), + OnProcess(IsStreamInfo(kParent, kNoTimeScale, !kEncrypted))); + EXPECT_CALL(*Output(kTextStream), + OnProcess(IsTextSample(kNoId, kSample0Start, kSample0End, + kNoSettings, kNoPayload))); + EXPECT_CALL(*Output(kTextStream), + OnProcess(IsCueEvent(kParent, kSample1StartInSeconds))); + EXPECT_CALL(*Output(kTextStream), + OnProcess(IsTextSample(kNoId, kSample1Start, kSample1End, + kNoSettings, kNoPayload))); + EXPECT_CALL(*Output(kTextStream), + OnProcess(IsTextSample(kNoId, kSample2Start, kSample2End, + kNoSettings, kNoPayload))); + EXPECT_CALL(*Output(kTextStream), OnFlush(kParent)); + } + + Input(kTextStream) + ->Dispatch(StreamData::FromStreamInfo(kChild, GetTextStreamInfo())); + Input(kTextStream) + ->Dispatch(StreamData::FromTextSample( + kChild, + GetTextSample(kNoId, kSample0Start, kSample0End, kNoPayload))); + Input(kTextStream) + ->Dispatch(StreamData::FromTextSample( + kChild, + GetTextSample(kNoId, kSample1Start, kSample1End, kNoPayload))); + Input(kTextStream) + ->Dispatch(StreamData::FromTextSample( + kChild, + GetTextSample(kNoId, kSample2Start, kSample2End, kNoPayload))); + Input(kTextStream)->FlushAllDownstreams(); +} + +TEST_F(CueAlignmentHandlerTest, TextAudioVideoInputWithCues) { + const size_t kTextStream = 0; + const size_t kAudioStream = 1; + const size_t kVideoStream = 2; + + const int64_t kSampleDuration = 1000; + + const int64_t kSample0Start = 0; + const int64_t kSample1Start = kSample0Start + kSampleDuration; + const int64_t kSample2Start = kSample1Start + kSampleDuration; + + const uint64_t kSample0StartU = 0; + const uint64_t kSample0EndU = kSample0StartU + kSampleDuration; + const uint64_t kSample1StartU = kSample0EndU; + const uint64_t kSample1EndU = kSample1StartU + kSampleDuration; + const uint64_t kSample2StartU = kSample1EndU; + const uint64_t kSample2EndU = kSample2StartU + kSampleDuration; + + const double kSample2StartInSeconds = + static_cast(kSample2Start) / kMsTimeScale; + + // Put the cue between two key frames. + Cuepoint cue; + cue.start_time_in_seconds = static_cast(kSample1Start) / kMsTimeScale; + + AdCueGeneratorParams params; + params.cue_points.push_back(cue); + + SyncPointQueue sync_points(params); + std::shared_ptr handler = + std::make_shared(&sync_points); + SetUpAndInitializeGraph(handler, kThreeInput, kThreeOutput); + + { + testing::InSequence s; + + EXPECT_CALL(*Output(kTextStream), + OnProcess(IsStreamInfo(kParent, kNoTimeScale, !kEncrypted))); + EXPECT_CALL(*Output(kTextStream), + OnProcess(IsTextSample(kNoId, kSample0StartU, kSample0EndU, + kNoSettings, kNoPayload))); + EXPECT_CALL(*Output(kTextStream), + OnProcess(IsTextSample(kNoId, kSample1StartU, kSample1EndU, + kNoSettings, kNoPayload))); + EXPECT_CALL(*Output(kTextStream), + OnProcess(IsCueEvent(kParent, kSample2StartInSeconds))); + EXPECT_CALL(*Output(kTextStream), + OnProcess(IsTextSample(kNoId, kSample2StartU, kSample2EndU, + kNoSettings, kNoPayload))); + EXPECT_CALL(*Output(kTextStream), OnFlush(kParent)); + } + + { + testing::InSequence s; + + EXPECT_CALL(*Output(kAudioStream), + OnProcess(IsStreamInfo(kParent, kMsTimeScale, !kEncrypted))); + EXPECT_CALL(*Output(kAudioStream), + OnProcess(IsMediaSample(kParent, kSample0Start, kSampleDuration, + !kEncrypted))); + EXPECT_CALL(*Output(kAudioStream), + OnProcess(IsMediaSample(kParent, kSample1Start, kSampleDuration, + !kEncrypted))); + EXPECT_CALL(*Output(kAudioStream), + OnProcess(IsCueEvent(kParent, kSample2StartInSeconds))); + EXPECT_CALL(*Output(kAudioStream), + OnProcess(IsMediaSample(kParent, kSample2Start, kSampleDuration, + !kEncrypted))); + EXPECT_CALL(*Output(kAudioStream), OnFlush(kParent)); + } + + { + testing::InSequence s; + + EXPECT_CALL(*Output(kVideoStream), + OnProcess(IsStreamInfo(kParent, kMsTimeScale, !kEncrypted))); + EXPECT_CALL(*Output(kVideoStream), + OnProcess(IsMediaSample(kParent, kSample0Start, kSampleDuration, + !kEncrypted))); + EXPECT_CALL(*Output(kVideoStream), + OnProcess(IsMediaSample(kParent, kSample1Start, kSampleDuration, + !kEncrypted))); + EXPECT_CALL(*Output(kVideoStream), + OnProcess(IsCueEvent(kParent, kSample2StartInSeconds))); + EXPECT_CALL(*Output(kVideoStream), + OnProcess(IsMediaSample(kParent, kSample2Start, kSampleDuration, + !kEncrypted))); + EXPECT_CALL(*Output(kVideoStream), OnFlush(kParent)); + } + + Input(kTextStream) + ->Dispatch(StreamData::FromStreamInfo(kChild, GetTextStreamInfo())); + Input(kTextStream) + ->Dispatch(StreamData::FromTextSample( + kChild, + GetTextSample(kNoId, kSample0StartU, kSample0EndU, kNoPayload))); + Input(kTextStream) + ->Dispatch(StreamData::FromTextSample( + kChild, + GetTextSample(kNoId, kSample1StartU, kSample1EndU, kNoPayload))); + Input(kTextStream) + ->Dispatch(StreamData::FromTextSample( + kChild, + GetTextSample(kNoId, kSample2StartU, kSample2EndU, kNoPayload))); + Input(kTextStream)->FlushAllDownstreams(); + + Input(kAudioStream) + ->Dispatch( + StreamData::FromStreamInfo(kChild, GetAudioStreamInfo(kMsTimeScale))); + Input(kAudioStream) + ->Dispatch(StreamData::FromMediaSample( + kChild, GetMediaSample(kSample0Start, kSampleDuration, kKeyFrame))); + Input(kAudioStream) + ->Dispatch(StreamData::FromMediaSample( + kChild, GetMediaSample(kSample1Start, kSampleDuration, kKeyFrame))); + Input(kAudioStream) + ->Dispatch(StreamData::FromMediaSample( + kChild, GetMediaSample(kSample2Start, kSampleDuration, kKeyFrame))); + Input(kAudioStream)->FlushAllDownstreams(); + + Input(kVideoStream) + ->Dispatch( + StreamData::FromStreamInfo(kChild, GetVideoStreamInfo(kMsTimeScale))); + Input(kVideoStream) + ->Dispatch(StreamData::FromMediaSample( + kChild, GetMediaSample(kSample0Start, kSampleDuration, kKeyFrame))); + Input(kVideoStream) + ->Dispatch(StreamData::FromMediaSample( + kChild, GetMediaSample(kSample1Start, kSampleDuration, !kKeyFrame))); + Input(kVideoStream) + ->Dispatch(StreamData::FromMediaSample( + kChild, GetMediaSample(kSample2Start, kSampleDuration, kKeyFrame))); + Input(kVideoStream)->FlushAllDownstreams(); +} + +// TODO(kqyang): Add more tests, in particular, multi-thread tests. + +} // namespace media +} // namespace shaka diff --git a/packager/media/chunking/sync_point_queue.cc b/packager/media/chunking/sync_point_queue.cc new file mode 100644 index 0000000000..f8c4ce820f --- /dev/null +++ b/packager/media/chunking/sync_point_queue.cc @@ -0,0 +1,118 @@ +// Copyright 2018 Google Inc. All rights reserved. +// +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file or at +// https://developers.google.com/open-source/licenses/bsd + +#include "packager/media/chunking/sync_point_queue.h" +#include "packager/media/base/media_handler.h" + +#include +#include + +namespace shaka { +namespace media { + +SyncPointQueue::SyncPointQueue(const AdCueGeneratorParams& params) + : sync_condition_(&lock_) { + for (const Cuepoint& point : params.cue_points) { + std::shared_ptr event = std::make_shared(); + event->time_in_seconds = point.start_time_in_seconds; + unpromoted_[point.start_time_in_seconds] = std::move(event); + } +} + +void SyncPointQueue::AddThread() { + base::AutoLock auto_lock(lock_); + thread_count_++; +} + +double SyncPointQueue::GetHint(double time_in_seconds) { + base::AutoLock auto_lock(lock_); + + auto iter = promoted_.upper_bound(time_in_seconds); + if (iter != promoted_.end()) + return iter->first; + + iter = unpromoted_.upper_bound(time_in_seconds); + if (iter != unpromoted_.end()) + return iter->first; + + // Use MAX DOUBLE as the fall back so that we can force all streams to run + // out all their samples even when there are no cues. + return std::numeric_limits::max(); +} + +std::shared_ptr SyncPointQueue::GetNext( + double hint_in_seconds) { + base::AutoLock auto_lock(lock_); + while (true) { + // Find the promoted cue that would line up with our hint, which is the + // first cue that is not less than |hint_in_seconds|. + auto iter = promoted_.lower_bound(hint_in_seconds); + if (iter != promoted_.end()) { + return iter->second; + } + + // Promote |hint_in_seconds| if everyone is waiting. + if (waiting_thread_count_ + 1 == thread_count_) { + std::shared_ptr cue = PromoteAtNoLocking(hint_in_seconds); + CHECK(cue); + return cue; + } + + waiting_thread_count_++; + // This blocks until either a cue is promoted or all threads are blocked + // (in which case, the unpromoted cue at the hint will be self-promoted + // and returned - see section above). Spurious signal events are possible + // with most condition variable implementations, so if it returns, we go + // back and check if a cue is actually promoted or not. + sync_condition_.Wait(); + waiting_thread_count_--; + } +} + +std::shared_ptr SyncPointQueue::PromoteAt( + double time_in_seconds) { + base::AutoLock auto_lock(lock_); + return PromoteAtNoLocking(time_in_seconds); +} + +std::shared_ptr SyncPointQueue::PromoteAtNoLocking( + double time_in_seconds) { + lock_.AssertAcquired(); + + // It is possible that |time_in_seconds| has been promoted. + auto iter = promoted_.find(time_in_seconds); + if (iter != promoted_.end()) + return iter->second; + + // Find the unpromoted cue that would work for the given time, which is the + // first cue that is not greater than |time_in_seconds|. + // So find the the first cue that is greater than |time_in_seconds| first and + // then get the previous one. + iter = unpromoted_.upper_bound(time_in_seconds); + // The first cue in |unpromoted_| should not be greater than + // |time_in_seconds|. It could happen only if it has been promoted at a + // different timestamp, which can only be the result of unaligned GOPs. + if (iter == unpromoted_.begin()) + return nullptr; + auto prev_iter = std::prev(iter); + DCHECK(prev_iter != unpromoted_.end()); + + std::shared_ptr cue = prev_iter->second; + cue->time_in_seconds = time_in_seconds; + + promoted_[time_in_seconds] = cue; + // Remove all unpromoted cues up to the cue that was just promoted. + // User may provide multiple cue points at the same or similar timestamps. The + // extra unused cues are simply ignored. + unpromoted_.erase(unpromoted_.begin(), iter); + + // Wake up other threads that may be waiting. + sync_condition_.Broadcast(); + return std::move(cue); +} + +} // namespace media +} // namespace shaka diff --git a/packager/media/chunking/sync_point_queue.h b/packager/media/chunking/sync_point_queue.h new file mode 100644 index 0000000000..30c0ae7d7e --- /dev/null +++ b/packager/media/chunking/sync_point_queue.h @@ -0,0 +1,65 @@ +// Copyright 2018 Google Inc. All rights reserved. +// +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file or at +// https://developers.google.com/open-source/licenses/bsd + +#include +#include + +#include "packager/base/synchronization/condition_variable.h" +#include "packager/base/synchronization/lock.h" +#include "packager/media/public/ad_cue_generator_params.h" + +namespace shaka { +namespace media { + +struct CueEvent; + +/// A synchronized queue for cue points. +class SyncPointQueue { + public: + explicit SyncPointQueue(const AdCueGeneratorParams& params); + ~SyncPointQueue() = default; + + /// Add a new thread. Each thread using this instance must call this method in + /// order to keep track of its clients. + void AddThread(); + + /// @return A hint for when the next cue event would be. The returned hint is + /// not less than @a time_in_seconds. The actual time for the next cue + /// event will not be less than the returned hint, with the exact + /// value depends on promotion. + double GetHint(double time_in_seconds); + + /// @return The next cue based on a previous hint. If a cue has been promoted + /// that comes after @a hint_in_seconds it is returned. If no cue + /// after @a hint_in_seconds has been promoted, this will block until + /// either a cue is promoted or all threads are blocked (in which + /// case, the unpromoted cue at @a hint_in_seconds will be + /// self-promoted and returned). + std::shared_ptr GetNext(double hint_in_seconds); + + /// Promote the first cue that is not greater than @a time_in_seconds. All + /// unpromoted cues before the cue will be discarded. + std::shared_ptr PromoteAt(double time_in_seconds); + + private: + SyncPointQueue(const SyncPointQueue&) = delete; + SyncPointQueue& operator=(const SyncPointQueue&) = delete; + + // PromoteAt() without locking. It is called by PromoteAt() and other + // functions that have locks. + std::shared_ptr PromoteAtNoLocking(double time_in_seconds); + + base::Lock lock_; + base::ConditionVariable sync_condition_; + size_t thread_count_ = 0; + size_t waiting_thread_count_ = 0; + + std::map> unpromoted_; + std::map> promoted_; +}; + +} // namespace media +} // namespace shaka