Implements SyncPointQueue and CueAlignmentHandler

SyncPointQueue manages all cue points and returns aligned cue
points to the callers (CueAlignmentHandlers).

CueAlignmentHandler is responsible for aligning cues from different
streams. It uses SyncPointQueue internally to align / synchronize the
cue points.

Issue: #355

Change-Id: I281fecb46a3ca7172d71e7495bdd07b8efdeb283
This commit is contained in:
Aaron Vaage 2018-03-15 16:04:01 -07:00 committed by KongQun Yang
parent 587de5be30
commit 3b5b2bccca
7 changed files with 1163 additions and 2 deletions

View File

@ -116,8 +116,8 @@ MATCHER_P4(IsMediaSample, stream_index, timestamp, duration, encrypted, "") {
<< arg->media_sample->duration() << "," << arg->media_sample->duration() << ","
<< BoolToString(arg->media_sample->is_encrypted()) << ")"; << BoolToString(arg->media_sample->is_encrypted()) << ")";
return arg->stream_index == stream_index && return arg->stream_index == stream_index &&
arg->media_sample->dts() == timestamp && arg->media_sample->dts() == static_cast<int64_t>(timestamp) &&
arg->media_sample->duration() == duration && arg->media_sample->duration() == static_cast<int64_t>(duration) &&
arg->media_sample->is_encrypted() == encrypted; arg->media_sample->is_encrypted() == encrypted;
} }

View File

@ -15,6 +15,10 @@
'sources': [ 'sources': [
'chunking_handler.cc', 'chunking_handler.cc',
'chunking_handler.h', 'chunking_handler.h',
'cue_alignment_handler.cc',
'cue_alignment_handler.h',
'sync_point_queue.cc',
'sync_point_queue.h',
], ],
'dependencies': [ 'dependencies': [
'../base/media_base.gyp:media_base', '../base/media_base.gyp:media_base',
@ -25,6 +29,7 @@
'type': '<(gtest_target_type)', 'type': '<(gtest_target_type)',
'sources': [ 'sources': [
'chunking_handler_unittest.cc', 'chunking_handler_unittest.cc',
'cue_alignment_handler_unittest.cc',
], ],
'dependencies': [ 'dependencies': [
'../../testing/gtest.gyp:gtest', '../../testing/gtest.gyp:gtest',

View File

@ -0,0 +1,272 @@
// Copyright 2018 Google Inc. All rights reserved.
//
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file or at
// https://developers.google.com/open-source/licenses/bsd
#include "packager/media/chunking/cue_alignment_handler.h"
namespace shaka {
namespace media {
namespace {
// The max number of samples that are allowed to be buffered before we shutdown
// because there is likely a problem with the content or how the pipeline was
// configured. This is about 20 seconds of buffer for audio with 48kHz.
const size_t kMaxBufferSize = 1000;
double TimeInSeconds(const StreamInfo& info, const StreamData& data) {
int64_t time_scale;
int64_t scaled_time;
switch (data.stream_data_type) {
case StreamDataType::kMediaSample:
time_scale = info.time_scale();
scaled_time = data.media_sample->pts();
break;
case StreamDataType::kTextSample:
// Text is always in MS but the stream info time scale is 0.
time_scale = 1000;
scaled_time = data.text_sample->start_time();
break;
default:
time_scale = 0;
scaled_time = 0;
NOTREACHED() << "TimeInSeconds should only be called on media samples "
"and text samples.";
break;
}
return static_cast<double>(scaled_time) / time_scale;
}
} // namespace
CueAlignmentHandler::CueAlignmentHandler(SyncPointQueue* sync_points)
: sync_points_(sync_points) {}
Status CueAlignmentHandler::InitializeInternal() {
sync_points_->AddThread();
stream_states_.resize(num_input_streams());
return Status::OK;
}
Status CueAlignmentHandler::Process(std::unique_ptr<StreamData> data) {
switch (data->stream_data_type) {
case StreamDataType::kStreamInfo:
return OnStreamInfo(std::move(data));
case StreamDataType::kTextSample:
case StreamDataType::kMediaSample:
return OnSample(std::move(data));
default:
VLOG(3) << "Dropping unsupported data type "
<< static_cast<int>(data->stream_data_type);
return Status::OK;
}
}
Status CueAlignmentHandler::OnFlushRequest(size_t stream_index) {
// We need to wait for all stream to flush before we can flush each stream.
// This allows cached buffers to be cleared and cues to be properly
// synchronized and set on all streams..
stream_states_[stream_index].to_be_flushed = true;
for (const StreamState& stream_state : stream_states_) {
if (!stream_state.to_be_flushed)
return Status::OK;
}
// |to_be_flushed| is set on all streams. We don't expect to see data in any
// buffers.
for (size_t i = 0; i < stream_states_.size(); ++i) {
StreamState& stream_state = stream_states_[i];
if (!stream_state.samples.empty()) {
LOG(WARNING) << "Unexpected data seen on stream " << i;
while (!stream_state.samples.empty()) {
Status status(Dispatch(std::move(stream_state.samples.front())));
if (!status.ok())
return status;
stream_state.samples.pop_front();
}
}
}
return FlushAllDownstreams();
}
Status CueAlignmentHandler::OnStreamInfo(std::unique_ptr<StreamData> data) {
StreamState& stream_state = stream_states_[data->stream_index];
// Keep a copy of the stream info so that we can check type and check
// timescale.
stream_state.info = data->stream_info;
// Get the first hint for the stream. Use a negative hint so that if there is
// suppose to be a sync point at zero, we will still respect it.
stream_state.next_cue_hint = sync_points_->GetHint(-1);
return Dispatch(std::move(data));
}
Status CueAlignmentHandler::OnSample(std::unique_ptr<StreamData> sample) {
// There are two modes:
// 1. There is a video input.
// 2. There are no video inputs.
//
// When there is a video input, we rely on the video input get the next sync
// point and release all the samples.
//
// When there are no video inputs, we rely on the sync point queue to block
// us until there is a sync point.
const uint64_t stream_index = sample->stream_index;
StreamState& stream_state = stream_states_[stream_index];
if (stream_state.info->stream_type() == kStreamVideo) {
const double sample_time = TimeInSeconds(*stream_state.info, *sample);
if (sample->media_sample->is_key_frame() &&
sample_time >= stream_state.next_cue_hint) {
std::shared_ptr<const CueEvent> next_sync =
sync_points_->PromoteAt(sample_time);
if (!next_sync) {
LOG(ERROR)
<< "Failed to promote sync point at " << sample_time
<< ". This happens only if video streams are not GOP-aligned.";
return Status(error::INVALID_ARGUMENT,
"Streams are not properly GOP-aligned.");
}
Status status(DispatchCueEvent(stream_index, next_sync));
stream_state.cue.reset();
status.Update(UseNewSyncPoint(next_sync));
if (!status.ok())
return status;
}
return Dispatch(std::move(sample));
}
// Accept the sample. This will output it if it comes before the hint point or
// will cache it if it comes after the hint point.
Status status(AcceptSample(std::move(sample), &stream_state));
if (!status.ok()) {
return status;
}
// If all the streams are waiting on a hint, it means that none has next sync
// point determined. It also means that there are no video streams and we need
// to wait for all streams to converge on a hint so that we can get the next
// sync point.
if (EveryoneWaitingAtHint()) {
// All streams should have the same hint right now.
const double next_cue_hint = stream_state.next_cue_hint;
for (const StreamState& stream_state : stream_states_) {
DCHECK_EQ(next_cue_hint, stream_state.next_cue_hint);
}
std::shared_ptr<const CueEvent> next_sync =
sync_points_->GetNext(next_cue_hint);
DCHECK(next_sync);
Status status = UseNewSyncPoint(next_sync);
if (!status.ok()) {
return status;
}
}
return Status::OK;
}
Status CueAlignmentHandler::UseNewSyncPoint(
std::shared_ptr<const CueEvent> new_sync) {
const double new_hint = sync_points_->GetHint(new_sync->time_in_seconds);
DCHECK_GT(new_hint, new_sync->time_in_seconds);
Status status;
for (StreamState& stream_state : stream_states_) {
// No stream should be so out of sync with the others that they would
// still be working on an old cue.
if (stream_state.cue) {
LOG(ERROR) << "Found two cue events that are too close together. One at "
<< stream_state.cue->time_in_seconds << " and the other at "
<< new_sync->time_in_seconds;
return Status(error::INVALID_ARGUMENT, "Cue events too close together");
}
// Add the cue and update the hint. The cue will always be used over the
// hint, so hint should always be greater than the latest cue.
stream_state.cue = new_sync;
stream_state.next_cue_hint = new_hint;
while (status.ok() && !stream_state.samples.empty()) {
std::unique_ptr<StreamData>& sample = stream_state.samples.front();
const double sample_time_in_seconds =
TimeInSeconds(*stream_state.info, *sample);
if (sample_time_in_seconds >= stream_state.next_cue_hint) {
DCHECK(!stream_state.cue);
break;
}
const size_t stream_index = sample->stream_index;
if (stream_state.cue) {
status.Update(DispatchCueIfNeeded(stream_index, sample_time_in_seconds,
&stream_state));
}
status.Update(Dispatch(std::move(sample)));
stream_state.samples.pop_front();
}
}
return status;
}
bool CueAlignmentHandler::EveryoneWaitingAtHint() const {
for (const StreamState& stream_state : stream_states_) {
if (stream_state.samples.empty()) {
return false;
}
}
return true;
}
// Accept Sample will either:
// 1. Send the sample downstream, as it comes before the next sync point and
// therefore can skip the bufferring.
// 2. Save the sample in the buffer as it comes after the next sync point.
Status CueAlignmentHandler::AcceptSample(std::unique_ptr<StreamData> sample,
StreamState* stream_state) {
DCHECK(stream_state);
const size_t stream_index = sample->stream_index;
if (stream_state->samples.empty()) {
const double sample_time_in_seconds =
TimeInSeconds(*stream_state->info, *sample);
if (sample_time_in_seconds < stream_state->next_cue_hint) {
Status status;
if (stream_state->cue) {
status.Update(DispatchCueIfNeeded(stream_index, sample_time_in_seconds,
stream_state));
}
status.Update(Dispatch(std::move(sample)));
return status;
}
DCHECK(!stream_state->cue);
}
stream_state->samples.push_back(std::move(sample));
if (stream_state->samples.size() > kMaxBufferSize) {
LOG(ERROR) << "Stream " << stream_index << " has buffered "
<< stream_state->samples.size() << " when the max is "
<< kMaxBufferSize;
return Status(error::INVALID_ARGUMENT,
"Streams are not properly multiplexed.");
}
return Status::OK;
}
Status CueAlignmentHandler::DispatchCueIfNeeded(
size_t stream_index,
double next_sample_time_in_seconds,
StreamState* stream_state) {
DCHECK(stream_state->cue);
if (next_sample_time_in_seconds < stream_state->cue->time_in_seconds)
return Status::OK;
DCHECK_LT(stream_state->cue->time_in_seconds, stream_state->next_cue_hint);
return DispatchCueEvent(stream_index, std::move(stream_state->cue));
}
} // namespace media
} // namespace shaka

View File

@ -0,0 +1,83 @@
// Copyright 2018 Google Inc. All rights reserved.
//
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file or at
// https://developers.google.com/open-source/licenses/bsd
#include <list>
#include "packager/media/base/media_handler.h"
#include "packager/media/chunking/sync_point_queue.h"
namespace shaka {
namespace media {
/// The cue alignment handler is a N-to-N handler that will inject CueEvents
/// into all streams. It will align the cues across streams (and handlers)
/// using a shared SyncPointQueue.
///
/// There should be a cue alignment handler per demuxer/thread and not per
/// stream. A cue alignment handler must be one per thread in order to properly
/// manage blocking.
class CueAlignmentHandler : public MediaHandler {
public:
explicit CueAlignmentHandler(SyncPointQueue* sync_points);
~CueAlignmentHandler() = default;
private:
CueAlignmentHandler(const CueAlignmentHandler&) = delete;
CueAlignmentHandler& operator=(const CueAlignmentHandler&) = delete;
struct StreamState {
// Information for the stream.
std::shared_ptr<const StreamInfo> info;
// Cached samples that cannot be dispatched. All the samples should be at or
// after |hint|.
std::list<std::unique_ptr<StreamData>> samples;
// If set, the stream is pending to be flushed.
bool to_be_flushed = false;
// If set, it points to the next cue it has to send downstream. Note that if
// it is not set, the next cue is not determined.
// This is set but not really used by video stream.
std::shared_ptr<const CueEvent> cue;
// If |cue| is set, this is the hint for the cue after |cue|, i.e. next next
// cue; otherwise this is the hint for the next cue. This holds the time in
// seconds of the scheduled (unpromoted) next or next next cue's time. This
// is essentially the barrier for the sample stream. Non video samples after
// this barrier must wait until the |cue| is determined and thus the next
// hint to be determined; video samples will promote the hint to cue when
// seeing the first key frame after |next_cue_hint|.
double next_cue_hint = 0;
};
// MediaHandler overrides.
Status InitializeInternal() override;
Status Process(std::unique_ptr<StreamData> data) override;
Status OnFlushRequest(size_t stream_index) override;
// Internal handling functions for different stream data.
Status OnStreamInfo(std::unique_ptr<StreamData> data);
Status OnSample(std::unique_ptr<StreamData> sample);
// Update stream states with new sync point.
Status UseNewSyncPoint(std::shared_ptr<const CueEvent> new_sync);
// Check if everyone is waiting for new hint points.
bool EveryoneWaitingAtHint() const;
// Dispatch or save incoming sample.
Status AcceptSample(std::unique_ptr<StreamData> sample,
StreamState* stream_state);
// Dispatch the cue if the new sample comes at or after it.
Status DispatchCueIfNeeded(size_t stream_index,
double next_sample_time_in_seconds,
StreamState* stream_state);
SyncPointQueue* const sync_points_ = nullptr;
std::vector<StreamState> stream_states_;
};
} // namespace media
} // namespace shaka

View File

@ -0,0 +1,618 @@
// Copyright 2018 Google Inc. All rights reserved.
//
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file or at
// https://developers.google.com/open-source/licenses/bsd
#include "packager/media/chunking/chunking_handler.h"
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include "packager/media/base/media_handler_test_base.h"
#include "packager/media/chunking/cue_alignment_handler.h"
#include "packager/media/public/ad_cue_generator_params.h"
#include "packager/status_test_util.h"
using ::testing::ElementsAre;
using ::testing::IsEmpty;
namespace shaka {
namespace media {
namespace {
const size_t kOneInput = 1;
const size_t kOneOutput = 1;
const size_t kThreeInput = 3;
const size_t kThreeOutput = 3;
const bool kEncrypted = true;
const bool kKeyFrame = true;
const uint64_t kNoTimeScale = 0;
const uint64_t kMsTimeScale = 1000;
const char* kNoId = "";
const char* kNoSettings = "";
const char* kNoPayload = "";
const size_t kChild = 0;
const size_t kParent = 0;
} // namespace
class CueAlignmentHandlerTest : public MediaHandlerTestBase {};
TEST_F(CueAlignmentHandlerTest, VideoInputWithNoCues) {
const size_t kVideoStream = 0;
const int64_t kSampleDuration = 1000;
const int64_t kSample0Start = 0;
const int64_t kSample1Start = kSample0Start + kSampleDuration;
const int64_t kSample2Start = kSample1Start + kSampleDuration;
AdCueGeneratorParams params;
SyncPointQueue sync_points(params);
std::shared_ptr<MediaHandler> handler =
std::make_shared<CueAlignmentHandler>(&sync_points);
SetUpAndInitializeGraph(handler, kOneInput, kOneOutput);
{
testing::InSequence s;
EXPECT_CALL(*Output(kVideoStream),
OnProcess(IsStreamInfo(kParent, kMsTimeScale, !kEncrypted)));
EXPECT_CALL(*Output(kVideoStream),
OnProcess(IsMediaSample(kParent, kSample0Start, kSampleDuration,
!kEncrypted)));
EXPECT_CALL(*Output(kVideoStream),
OnProcess(IsMediaSample(kParent, kSample1Start, kSampleDuration,
!kEncrypted)));
EXPECT_CALL(*Output(kVideoStream),
OnProcess(IsMediaSample(kParent, kSample2Start, kSampleDuration,
!kEncrypted)));
EXPECT_CALL(*Output(kVideoStream), OnFlush(kParent));
}
Input(kVideoStream)
->Dispatch(
StreamData::FromStreamInfo(kChild, GetVideoStreamInfo(kMsTimeScale)));
Input(kVideoStream)
->Dispatch(StreamData::FromMediaSample(
kChild, GetMediaSample(kSample0Start, kSampleDuration, kKeyFrame)));
Input(kVideoStream)
->Dispatch(StreamData::FromMediaSample(
kChild, GetMediaSample(kSample1Start, kSampleDuration, !kKeyFrame)));
Input(kVideoStream)
->Dispatch(StreamData::FromMediaSample(
kChild, GetMediaSample(kSample2Start, kSampleDuration, kKeyFrame)));
Input(kVideoStream)->FlushAllDownstreams();
}
TEST_F(CueAlignmentHandlerTest, AudioInputWithNoCues) {
const size_t kAudioStream = 0;
const int64_t kSampleDuration = 1000;
const int64_t kSample0Start = 0;
const int64_t kSample1Start = kSample0Start + kSampleDuration;
const int64_t kSample2Start = kSample1Start + kSampleDuration;
AdCueGeneratorParams params;
SyncPointQueue sync_points(params);
std::shared_ptr<MediaHandler> handler =
std::make_shared<CueAlignmentHandler>(&sync_points);
SetUpAndInitializeGraph(handler, kOneInput, kOneOutput);
{
testing::InSequence s;
EXPECT_CALL(*Output(kAudioStream),
OnProcess(IsStreamInfo(kParent, kMsTimeScale, !kEncrypted)));
EXPECT_CALL(*Output(kAudioStream),
OnProcess(IsMediaSample(kParent, kSample0Start, kSampleDuration,
!kEncrypted)));
EXPECT_CALL(*Output(kAudioStream),
OnProcess(IsMediaSample(kParent, kSample1Start, kSampleDuration,
!kEncrypted)));
EXPECT_CALL(*Output(kAudioStream),
OnProcess(IsMediaSample(kParent, kSample2Start, kSampleDuration,
!kEncrypted)));
EXPECT_CALL(*Output(kAudioStream), OnFlush(kParent));
}
Input(kAudioStream)
->Dispatch(
StreamData::FromStreamInfo(kChild, GetAudioStreamInfo(kMsTimeScale)));
Input(kAudioStream)
->Dispatch(StreamData::FromMediaSample(
kChild, GetMediaSample(kSample0Start, kSampleDuration, kKeyFrame)));
Input(kAudioStream)
->Dispatch(StreamData::FromMediaSample(
kChild, GetMediaSample(kSample1Start, kSampleDuration, kKeyFrame)));
Input(kAudioStream)
->Dispatch(StreamData::FromMediaSample(
kChild, GetMediaSample(kSample2Start, kSampleDuration, kKeyFrame)));
Input(kAudioStream)->FlushAllDownstreams();
}
TEST_F(CueAlignmentHandlerTest, TextInputWithNoCues) {
const size_t kTextStream = 0;
const int64_t kSampleDuration = 1000;
const uint64_t kSample0Start = 0;
const uint64_t kSample0End = kSample0Start + kSampleDuration;
const uint64_t kSample1Start = kSample0End;
const uint64_t kSample1End = kSample1Start + kSampleDuration;
const uint64_t kSample2Start = kSample1End;
const uint64_t kSample2End = kSample2Start + kSampleDuration;
AdCueGeneratorParams params;
SyncPointQueue sync_points(params);
std::shared_ptr<MediaHandler> handler =
std::make_shared<CueAlignmentHandler>(&sync_points);
SetUpAndInitializeGraph(handler, kOneInput, kOneOutput);
{
testing::InSequence s;
EXPECT_CALL(*Output(kTextStream),
OnProcess(IsStreamInfo(kParent, kNoTimeScale, !kEncrypted)));
EXPECT_CALL(*Output(kTextStream),
OnProcess(IsTextSample(kNoId, kSample0Start, kSample0End,
kNoSettings, kNoPayload)));
EXPECT_CALL(*Output(kTextStream),
OnProcess(IsTextSample(kNoId, kSample1Start, kSample1End,
kNoSettings, kNoPayload)));
EXPECT_CALL(*Output(kTextStream),
OnProcess(IsTextSample(kNoId, kSample2Start, kSample2End,
kNoSettings, kNoPayload)));
EXPECT_CALL(*Output(kTextStream), OnFlush(kParent));
}
Input(kTextStream)
->Dispatch(StreamData::FromStreamInfo(kChild, GetTextStreamInfo()));
Input(kTextStream)
->Dispatch(StreamData::FromTextSample(
kChild,
GetTextSample(kNoId, kSample0Start, kSample0End, kNoPayload)));
Input(kTextStream)
->Dispatch(StreamData::FromTextSample(
kChild,
GetTextSample(kNoId, kSample1Start, kSample1End, kNoPayload)));
Input(kTextStream)
->Dispatch(StreamData::FromTextSample(
kChild,
GetTextSample(kNoId, kSample2Start, kSample2End, kNoPayload)));
Input(kTextStream)->FlushAllDownstreams();
}
TEST_F(CueAlignmentHandlerTest, TextAudioVideoInputWithNoCues) {
const size_t kTextStream = 0;
const size_t kAudioStream = 1;
const size_t kVideoStream = 2;
const uint64_t kSampleDuration = 1000;
const uint64_t kSample0Start = 0;
const uint64_t kSample0End = kSample0Start + kSampleDuration;
const uint64_t kSample1Start = kSample0Start + kSampleDuration;
const uint64_t kSample1End = kSample1Start + kSampleDuration;
const uint64_t kSample2Start = kSample1Start + kSampleDuration;
const uint64_t kSample2End = kSample2Start + kSampleDuration;
AdCueGeneratorParams params;
SyncPointQueue sync_points(params);
std::shared_ptr<MediaHandler> handler =
std::make_shared<CueAlignmentHandler>(&sync_points);
SetUpAndInitializeGraph(handler, kThreeInput, kThreeOutput);
{
testing::InSequence s;
EXPECT_CALL(*Output(kTextStream),
OnProcess(IsStreamInfo(kParent, kNoTimeScale, !kEncrypted)));
EXPECT_CALL(*Output(kTextStream),
OnProcess(IsTextSample(kNoId, kSample0Start, kSample0End,
kNoSettings, kNoPayload)));
EXPECT_CALL(*Output(kTextStream),
OnProcess(IsTextSample(kNoId, kSample1Start, kSample1End,
kNoSettings, kNoPayload)));
EXPECT_CALL(*Output(kTextStream),
OnProcess(IsTextSample(kNoId, kSample2Start, kSample2End,
kNoSettings, kNoPayload)));
EXPECT_CALL(*Output(kTextStream), OnFlush(kParent));
}
{
testing::InSequence s;
EXPECT_CALL(*Output(kAudioStream),
OnProcess(IsStreamInfo(kParent, kMsTimeScale, !kEncrypted)));
EXPECT_CALL(*Output(kAudioStream),
OnProcess(IsMediaSample(kParent, kSample0Start, kSampleDuration,
!kEncrypted)));
EXPECT_CALL(*Output(kAudioStream),
OnProcess(IsMediaSample(kParent, kSample1Start, kSampleDuration,
!kEncrypted)));
EXPECT_CALL(*Output(kAudioStream),
OnProcess(IsMediaSample(kParent, kSample2Start, kSampleDuration,
!kEncrypted)));
EXPECT_CALL(*Output(kAudioStream), OnFlush(kParent));
}
{
testing::InSequence s;
EXPECT_CALL(*Output(kVideoStream),
OnProcess(IsStreamInfo(kParent, kMsTimeScale, !kEncrypted)));
EXPECT_CALL(*Output(kVideoStream),
OnProcess(IsMediaSample(kParent, kSample0Start, kSampleDuration,
!kEncrypted)));
EXPECT_CALL(*Output(kVideoStream),
OnProcess(IsMediaSample(kParent, kSample1Start, kSampleDuration,
!kEncrypted)));
EXPECT_CALL(*Output(kVideoStream),
OnProcess(IsMediaSample(kParent, kSample2Start, kSampleDuration,
!kEncrypted)));
EXPECT_CALL(*Output(kVideoStream), OnFlush(kParent));
}
Input(kTextStream)
->Dispatch(StreamData::FromStreamInfo(kChild, GetTextStreamInfo()));
Input(kTextStream)
->Dispatch(StreamData::FromTextSample(
kChild,
GetTextSample(kNoId, kSample0Start, kSample0End, kNoPayload)));
Input(kTextStream)
->Dispatch(StreamData::FromTextSample(
kChild,
GetTextSample(kNoId, kSample1Start, kSample1End, kNoPayload)));
Input(kTextStream)
->Dispatch(StreamData::FromTextSample(
kChild,
GetTextSample(kNoId, kSample2Start, kSample2End, kNoPayload)));
Input(kTextStream)->FlushAllDownstreams();
Input(kAudioStream)
->Dispatch(
StreamData::FromStreamInfo(kChild, GetAudioStreamInfo(kMsTimeScale)));
Input(kAudioStream)
->Dispatch(StreamData::FromMediaSample(
kChild, GetMediaSample(kSample0Start, kSampleDuration, kKeyFrame)));
Input(kAudioStream)
->Dispatch(StreamData::FromMediaSample(
kChild, GetMediaSample(kSample1Start, kSampleDuration, kKeyFrame)));
Input(kAudioStream)
->Dispatch(StreamData::FromMediaSample(
kChild, GetMediaSample(kSample2Start, kSampleDuration, kKeyFrame)));
Input(kAudioStream)->FlushAllDownstreams();
Input(kVideoStream)
->Dispatch(
StreamData::FromStreamInfo(kChild, GetVideoStreamInfo(kMsTimeScale)));
Input(kVideoStream)
->Dispatch(StreamData::FromMediaSample(
kChild, GetMediaSample(kSample0Start, kSampleDuration, kKeyFrame)));
Input(kVideoStream)
->Dispatch(StreamData::FromMediaSample(
kChild, GetMediaSample(kSample1Start, kSampleDuration, !kKeyFrame)));
Input(kVideoStream)
->Dispatch(StreamData::FromMediaSample(
kChild, GetMediaSample(kSample2Start, kSampleDuration, kKeyFrame)));
Input(kVideoStream)->FlushAllDownstreams();
}
TEST_F(CueAlignmentHandlerTest, VideoInputWithCues) {
const size_t kVideoStream = 0;
const int64_t kSampleDuration = 1000;
const int64_t kSample0Start = 0;
const int64_t kSample1Start = kSample0Start + kSampleDuration;
const int64_t kSample2Start = kSample1Start + kSampleDuration;
const double kSample2StartInSeconds =
static_cast<double>(kSample2Start) / kMsTimeScale;
// Put the cue between two key frames.
Cuepoint cue;
cue.start_time_in_seconds = static_cast<double>(kSample1Start) / kMsTimeScale;
AdCueGeneratorParams params;
params.cue_points.push_back(cue);
SyncPointQueue sync_points(params);
std::shared_ptr<MediaHandler> handler =
std::make_shared<CueAlignmentHandler>(&sync_points);
SetUpAndInitializeGraph(handler, kOneInput, kOneOutput);
{
testing::InSequence s;
EXPECT_CALL(*Output(kVideoStream),
OnProcess(IsStreamInfo(kParent, kMsTimeScale, !kEncrypted)));
EXPECT_CALL(*Output(kVideoStream),
OnProcess(IsMediaSample(kParent, kSample0Start, kSampleDuration,
!kEncrypted)));
EXPECT_CALL(*Output(kVideoStream),
OnProcess(IsMediaSample(kParent, kSample1Start, kSampleDuration,
!kEncrypted)));
EXPECT_CALL(*Output(kVideoStream),
OnProcess(IsCueEvent(kParent, kSample2StartInSeconds)));
EXPECT_CALL(*Output(kVideoStream),
OnProcess(IsMediaSample(kParent, kSample2Start, kSampleDuration,
!kEncrypted)));
EXPECT_CALL(*Output(kVideoStream), OnFlush(kParent));
}
Input(kVideoStream)
->Dispatch(
StreamData::FromStreamInfo(kChild, GetVideoStreamInfo(kMsTimeScale)));
Input(kVideoStream)
->Dispatch(StreamData::FromMediaSample(
kChild, GetMediaSample(kSample0Start, kSampleDuration, kKeyFrame)));
Input(kVideoStream)
->Dispatch(StreamData::FromMediaSample(
kChild, GetMediaSample(kSample1Start, kSampleDuration, !kKeyFrame)));
Input(kVideoStream)
->Dispatch(StreamData::FromMediaSample(
kChild, GetMediaSample(kSample2Start, kSampleDuration, kKeyFrame)));
Input(kVideoStream)->FlushAllDownstreams();
}
TEST_F(CueAlignmentHandlerTest, AudioInputWithCues) {
const size_t kAudioStream = 0;
const int64_t kSampleDuration = 1000;
const int64_t kSample0Start = 0;
const int64_t kSample1Start = kSample0Start + kSampleDuration;
const int64_t kSample2Start = kSample1Start + kSampleDuration;
const double kSample1StartInSeconds =
static_cast<double>(kSample1Start) / kMsTimeScale;
Cuepoint cue;
cue.start_time_in_seconds = static_cast<double>(kSample1Start) / kMsTimeScale;
AdCueGeneratorParams params;
params.cue_points.push_back(cue);
SyncPointQueue sync_points(params);
std::shared_ptr<MediaHandler> handler =
std::make_shared<CueAlignmentHandler>(&sync_points);
SetUpAndInitializeGraph(handler, kOneInput, kOneOutput);
{
testing::InSequence s;
EXPECT_CALL(*Output(kAudioStream),
OnProcess(IsStreamInfo(kParent, kMsTimeScale, !kEncrypted)));
EXPECT_CALL(*Output(kAudioStream),
OnProcess(IsMediaSample(kParent, kSample0Start, kSampleDuration,
!kEncrypted)));
EXPECT_CALL(*Output(kAudioStream),
OnProcess(IsCueEvent(kParent, kSample1StartInSeconds)));
EXPECT_CALL(*Output(kAudioStream),
OnProcess(IsMediaSample(kParent, kSample1Start, kSampleDuration,
!kEncrypted)));
EXPECT_CALL(*Output(kAudioStream),
OnProcess(IsMediaSample(kParent, kSample2Start, kSampleDuration,
!kEncrypted)));
EXPECT_CALL(*Output(kAudioStream), OnFlush(kParent));
}
Input(kAudioStream)
->Dispatch(
StreamData::FromStreamInfo(kChild, GetAudioStreamInfo(kMsTimeScale)));
Input(kAudioStream)
->Dispatch(StreamData::FromMediaSample(
kChild, GetMediaSample(kSample0Start, kSampleDuration, kKeyFrame)));
Input(kAudioStream)
->Dispatch(StreamData::FromMediaSample(
kChild, GetMediaSample(kSample1Start, kSampleDuration, kKeyFrame)));
Input(kAudioStream)
->Dispatch(StreamData::FromMediaSample(
kChild, GetMediaSample(kSample2Start, kSampleDuration, kKeyFrame)));
Input(kAudioStream)->FlushAllDownstreams();
}
TEST_F(CueAlignmentHandlerTest, TextInputWithCues) {
const size_t kTextStream = 0;
const int64_t kSampleDuration = 1000;
const uint64_t kSample0Start = 0;
const uint64_t kSample0End = kSample0Start + kSampleDuration;
const uint64_t kSample1Start = kSample0End;
const uint64_t kSample1End = kSample1Start + kSampleDuration;
const uint64_t kSample2Start = kSample1End;
const uint64_t kSample2End = kSample2Start + kSampleDuration;
const double kSample1StartInSeconds =
static_cast<double>(kSample1Start) / kMsTimeScale;
Cuepoint cue;
cue.start_time_in_seconds = static_cast<double>(kSample1Start) / kMsTimeScale;
AdCueGeneratorParams params;
params.cue_points.push_back(cue);
SyncPointQueue sync_points(params);
std::shared_ptr<MediaHandler> handler =
std::make_shared<CueAlignmentHandler>(&sync_points);
SetUpAndInitializeGraph(handler, kOneInput, kOneOutput);
{
testing::InSequence s;
EXPECT_CALL(*Output(kTextStream),
OnProcess(IsStreamInfo(kParent, kNoTimeScale, !kEncrypted)));
EXPECT_CALL(*Output(kTextStream),
OnProcess(IsTextSample(kNoId, kSample0Start, kSample0End,
kNoSettings, kNoPayload)));
EXPECT_CALL(*Output(kTextStream),
OnProcess(IsCueEvent(kParent, kSample1StartInSeconds)));
EXPECT_CALL(*Output(kTextStream),
OnProcess(IsTextSample(kNoId, kSample1Start, kSample1End,
kNoSettings, kNoPayload)));
EXPECT_CALL(*Output(kTextStream),
OnProcess(IsTextSample(kNoId, kSample2Start, kSample2End,
kNoSettings, kNoPayload)));
EXPECT_CALL(*Output(kTextStream), OnFlush(kParent));
}
Input(kTextStream)
->Dispatch(StreamData::FromStreamInfo(kChild, GetTextStreamInfo()));
Input(kTextStream)
->Dispatch(StreamData::FromTextSample(
kChild,
GetTextSample(kNoId, kSample0Start, kSample0End, kNoPayload)));
Input(kTextStream)
->Dispatch(StreamData::FromTextSample(
kChild,
GetTextSample(kNoId, kSample1Start, kSample1End, kNoPayload)));
Input(kTextStream)
->Dispatch(StreamData::FromTextSample(
kChild,
GetTextSample(kNoId, kSample2Start, kSample2End, kNoPayload)));
Input(kTextStream)->FlushAllDownstreams();
}
TEST_F(CueAlignmentHandlerTest, TextAudioVideoInputWithCues) {
const size_t kTextStream = 0;
const size_t kAudioStream = 1;
const size_t kVideoStream = 2;
const int64_t kSampleDuration = 1000;
const int64_t kSample0Start = 0;
const int64_t kSample1Start = kSample0Start + kSampleDuration;
const int64_t kSample2Start = kSample1Start + kSampleDuration;
const uint64_t kSample0StartU = 0;
const uint64_t kSample0EndU = kSample0StartU + kSampleDuration;
const uint64_t kSample1StartU = kSample0EndU;
const uint64_t kSample1EndU = kSample1StartU + kSampleDuration;
const uint64_t kSample2StartU = kSample1EndU;
const uint64_t kSample2EndU = kSample2StartU + kSampleDuration;
const double kSample2StartInSeconds =
static_cast<double>(kSample2Start) / kMsTimeScale;
// Put the cue between two key frames.
Cuepoint cue;
cue.start_time_in_seconds = static_cast<double>(kSample1Start) / kMsTimeScale;
AdCueGeneratorParams params;
params.cue_points.push_back(cue);
SyncPointQueue sync_points(params);
std::shared_ptr<MediaHandler> handler =
std::make_shared<CueAlignmentHandler>(&sync_points);
SetUpAndInitializeGraph(handler, kThreeInput, kThreeOutput);
{
testing::InSequence s;
EXPECT_CALL(*Output(kTextStream),
OnProcess(IsStreamInfo(kParent, kNoTimeScale, !kEncrypted)));
EXPECT_CALL(*Output(kTextStream),
OnProcess(IsTextSample(kNoId, kSample0StartU, kSample0EndU,
kNoSettings, kNoPayload)));
EXPECT_CALL(*Output(kTextStream),
OnProcess(IsTextSample(kNoId, kSample1StartU, kSample1EndU,
kNoSettings, kNoPayload)));
EXPECT_CALL(*Output(kTextStream),
OnProcess(IsCueEvent(kParent, kSample2StartInSeconds)));
EXPECT_CALL(*Output(kTextStream),
OnProcess(IsTextSample(kNoId, kSample2StartU, kSample2EndU,
kNoSettings, kNoPayload)));
EXPECT_CALL(*Output(kTextStream), OnFlush(kParent));
}
{
testing::InSequence s;
EXPECT_CALL(*Output(kAudioStream),
OnProcess(IsStreamInfo(kParent, kMsTimeScale, !kEncrypted)));
EXPECT_CALL(*Output(kAudioStream),
OnProcess(IsMediaSample(kParent, kSample0Start, kSampleDuration,
!kEncrypted)));
EXPECT_CALL(*Output(kAudioStream),
OnProcess(IsMediaSample(kParent, kSample1Start, kSampleDuration,
!kEncrypted)));
EXPECT_CALL(*Output(kAudioStream),
OnProcess(IsCueEvent(kParent, kSample2StartInSeconds)));
EXPECT_CALL(*Output(kAudioStream),
OnProcess(IsMediaSample(kParent, kSample2Start, kSampleDuration,
!kEncrypted)));
EXPECT_CALL(*Output(kAudioStream), OnFlush(kParent));
}
{
testing::InSequence s;
EXPECT_CALL(*Output(kVideoStream),
OnProcess(IsStreamInfo(kParent, kMsTimeScale, !kEncrypted)));
EXPECT_CALL(*Output(kVideoStream),
OnProcess(IsMediaSample(kParent, kSample0Start, kSampleDuration,
!kEncrypted)));
EXPECT_CALL(*Output(kVideoStream),
OnProcess(IsMediaSample(kParent, kSample1Start, kSampleDuration,
!kEncrypted)));
EXPECT_CALL(*Output(kVideoStream),
OnProcess(IsCueEvent(kParent, kSample2StartInSeconds)));
EXPECT_CALL(*Output(kVideoStream),
OnProcess(IsMediaSample(kParent, kSample2Start, kSampleDuration,
!kEncrypted)));
EXPECT_CALL(*Output(kVideoStream), OnFlush(kParent));
}
Input(kTextStream)
->Dispatch(StreamData::FromStreamInfo(kChild, GetTextStreamInfo()));
Input(kTextStream)
->Dispatch(StreamData::FromTextSample(
kChild,
GetTextSample(kNoId, kSample0StartU, kSample0EndU, kNoPayload)));
Input(kTextStream)
->Dispatch(StreamData::FromTextSample(
kChild,
GetTextSample(kNoId, kSample1StartU, kSample1EndU, kNoPayload)));
Input(kTextStream)
->Dispatch(StreamData::FromTextSample(
kChild,
GetTextSample(kNoId, kSample2StartU, kSample2EndU, kNoPayload)));
Input(kTextStream)->FlushAllDownstreams();
Input(kAudioStream)
->Dispatch(
StreamData::FromStreamInfo(kChild, GetAudioStreamInfo(kMsTimeScale)));
Input(kAudioStream)
->Dispatch(StreamData::FromMediaSample(
kChild, GetMediaSample(kSample0Start, kSampleDuration, kKeyFrame)));
Input(kAudioStream)
->Dispatch(StreamData::FromMediaSample(
kChild, GetMediaSample(kSample1Start, kSampleDuration, kKeyFrame)));
Input(kAudioStream)
->Dispatch(StreamData::FromMediaSample(
kChild, GetMediaSample(kSample2Start, kSampleDuration, kKeyFrame)));
Input(kAudioStream)->FlushAllDownstreams();
Input(kVideoStream)
->Dispatch(
StreamData::FromStreamInfo(kChild, GetVideoStreamInfo(kMsTimeScale)));
Input(kVideoStream)
->Dispatch(StreamData::FromMediaSample(
kChild, GetMediaSample(kSample0Start, kSampleDuration, kKeyFrame)));
Input(kVideoStream)
->Dispatch(StreamData::FromMediaSample(
kChild, GetMediaSample(kSample1Start, kSampleDuration, !kKeyFrame)));
Input(kVideoStream)
->Dispatch(StreamData::FromMediaSample(
kChild, GetMediaSample(kSample2Start, kSampleDuration, kKeyFrame)));
Input(kVideoStream)->FlushAllDownstreams();
}
// TODO(kqyang): Add more tests, in particular, multi-thread tests.
} // namespace media
} // namespace shaka

View File

@ -0,0 +1,118 @@
// Copyright 2018 Google Inc. All rights reserved.
//
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file or at
// https://developers.google.com/open-source/licenses/bsd
#include "packager/media/chunking/sync_point_queue.h"
#include "packager/media/base/media_handler.h"
#include <algorithm>
#include <limits>
namespace shaka {
namespace media {
SyncPointQueue::SyncPointQueue(const AdCueGeneratorParams& params)
: sync_condition_(&lock_) {
for (const Cuepoint& point : params.cue_points) {
std::shared_ptr<CueEvent> event = std::make_shared<CueEvent>();
event->time_in_seconds = point.start_time_in_seconds;
unpromoted_[point.start_time_in_seconds] = std::move(event);
}
}
void SyncPointQueue::AddThread() {
base::AutoLock auto_lock(lock_);
thread_count_++;
}
double SyncPointQueue::GetHint(double time_in_seconds) {
base::AutoLock auto_lock(lock_);
auto iter = promoted_.upper_bound(time_in_seconds);
if (iter != promoted_.end())
return iter->first;
iter = unpromoted_.upper_bound(time_in_seconds);
if (iter != unpromoted_.end())
return iter->first;
// Use MAX DOUBLE as the fall back so that we can force all streams to run
// out all their samples even when there are no cues.
return std::numeric_limits<double>::max();
}
std::shared_ptr<const CueEvent> SyncPointQueue::GetNext(
double hint_in_seconds) {
base::AutoLock auto_lock(lock_);
while (true) {
// Find the promoted cue that would line up with our hint, which is the
// first cue that is not less than |hint_in_seconds|.
auto iter = promoted_.lower_bound(hint_in_seconds);
if (iter != promoted_.end()) {
return iter->second;
}
// Promote |hint_in_seconds| if everyone is waiting.
if (waiting_thread_count_ + 1 == thread_count_) {
std::shared_ptr<const CueEvent> cue = PromoteAtNoLocking(hint_in_seconds);
CHECK(cue);
return cue;
}
waiting_thread_count_++;
// This blocks until either a cue is promoted or all threads are blocked
// (in which case, the unpromoted cue at the hint will be self-promoted
// and returned - see section above). Spurious signal events are possible
// with most condition variable implementations, so if it returns, we go
// back and check if a cue is actually promoted or not.
sync_condition_.Wait();
waiting_thread_count_--;
}
}
std::shared_ptr<const CueEvent> SyncPointQueue::PromoteAt(
double time_in_seconds) {
base::AutoLock auto_lock(lock_);
return PromoteAtNoLocking(time_in_seconds);
}
std::shared_ptr<const CueEvent> SyncPointQueue::PromoteAtNoLocking(
double time_in_seconds) {
lock_.AssertAcquired();
// It is possible that |time_in_seconds| has been promoted.
auto iter = promoted_.find(time_in_seconds);
if (iter != promoted_.end())
return iter->second;
// Find the unpromoted cue that would work for the given time, which is the
// first cue that is not greater than |time_in_seconds|.
// So find the the first cue that is greater than |time_in_seconds| first and
// then get the previous one.
iter = unpromoted_.upper_bound(time_in_seconds);
// The first cue in |unpromoted_| should not be greater than
// |time_in_seconds|. It could happen only if it has been promoted at a
// different timestamp, which can only be the result of unaligned GOPs.
if (iter == unpromoted_.begin())
return nullptr;
auto prev_iter = std::prev(iter);
DCHECK(prev_iter != unpromoted_.end());
std::shared_ptr<CueEvent> cue = prev_iter->second;
cue->time_in_seconds = time_in_seconds;
promoted_[time_in_seconds] = cue;
// Remove all unpromoted cues up to the cue that was just promoted.
// User may provide multiple cue points at the same or similar timestamps. The
// extra unused cues are simply ignored.
unpromoted_.erase(unpromoted_.begin(), iter);
// Wake up other threads that may be waiting.
sync_condition_.Broadcast();
return std::move(cue);
}
} // namespace media
} // namespace shaka

View File

@ -0,0 +1,65 @@
// Copyright 2018 Google Inc. All rights reserved.
//
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file or at
// https://developers.google.com/open-source/licenses/bsd
#include <map>
#include <memory>
#include "packager/base/synchronization/condition_variable.h"
#include "packager/base/synchronization/lock.h"
#include "packager/media/public/ad_cue_generator_params.h"
namespace shaka {
namespace media {
struct CueEvent;
/// A synchronized queue for cue points.
class SyncPointQueue {
public:
explicit SyncPointQueue(const AdCueGeneratorParams& params);
~SyncPointQueue() = default;
/// Add a new thread. Each thread using this instance must call this method in
/// order to keep track of its clients.
void AddThread();
/// @return A hint for when the next cue event would be. The returned hint is
/// not less than @a time_in_seconds. The actual time for the next cue
/// event will not be less than the returned hint, with the exact
/// value depends on promotion.
double GetHint(double time_in_seconds);
/// @return The next cue based on a previous hint. If a cue has been promoted
/// that comes after @a hint_in_seconds it is returned. If no cue
/// after @a hint_in_seconds has been promoted, this will block until
/// either a cue is promoted or all threads are blocked (in which
/// case, the unpromoted cue at @a hint_in_seconds will be
/// self-promoted and returned).
std::shared_ptr<const CueEvent> GetNext(double hint_in_seconds);
/// Promote the first cue that is not greater than @a time_in_seconds. All
/// unpromoted cues before the cue will be discarded.
std::shared_ptr<const CueEvent> PromoteAt(double time_in_seconds);
private:
SyncPointQueue(const SyncPointQueue&) = delete;
SyncPointQueue& operator=(const SyncPointQueue&) = delete;
// PromoteAt() without locking. It is called by PromoteAt() and other
// functions that have locks.
std::shared_ptr<const CueEvent> PromoteAtNoLocking(double time_in_seconds);
base::Lock lock_;
base::ConditionVariable sync_condition_;
size_t thread_count_ = 0;
size_t waiting_thread_count_ = 0;
std::map<double, std::shared_ptr<CueEvent>> unpromoted_;
std::map<double, std::shared_ptr<CueEvent>> promoted_;
};
} // namespace media
} // namespace shaka