Isolate Video and Non Video Actions In Cue Aligner

To make it easier to understand what a video stream and a non
video streams is doing in the cue aligner, each stream type is
given their own functions.

Change-Id: I8b8ca403721bcb06ca3056004420902667a30f6c
This commit is contained in:
Aaron Vaage 2018-05-07 16:53:24 -07:00
parent df19a48aa0
commit ec3bbfff16
2 changed files with 98 additions and 66 deletions

View File

@ -6,6 +6,8 @@
#include "packager/media/chunking/cue_alignment_handler.h" #include "packager/media/chunking/cue_alignment_handler.h"
#include "packager/status_macros.h"
namespace shaka { namespace shaka {
namespace media { namespace media {
namespace { namespace {
@ -79,30 +81,43 @@ Status CueAlignmentHandler::Process(std::unique_ptr<StreamData> data) {
} }
Status CueAlignmentHandler::OnFlushRequest(size_t stream_index) { Status CueAlignmentHandler::OnFlushRequest(size_t stream_index) {
stream_states_[stream_index].to_be_flushed = true;
// We need to wait for all stream to flush before we can flush each stream. // We need to wait for all stream to flush before we can flush each stream.
// This allows cached buffers to be cleared and cues to be properly // This allows cached buffers to be cleared and cues to be properly
// synchronized and set on all streams.. // synchronized and set on all streams.
stream_states_[stream_index].to_be_flushed = true;
for (const StreamState& stream_state : stream_states_) { for (const StreamState& stream_state : stream_states_) {
if (!stream_state.to_be_flushed) if (!stream_state.to_be_flushed) {
return Status::OK; return Status::OK;
} }
}
// |to_be_flushed| is set on all streams. We don't expect to see data in any // Do a once over all the streams to ensure that their states are as we expect
// buffers. // them. Video and non-video streams have different allowances here. Video
for (size_t i = 0; i < stream_states_.size(); ++i) { // should absolutely have no cues or samples where as non-video streams may
StreamState& stream_state = stream_states_[i]; // have cues or samples.
if (!stream_state.samples.empty()) { for (StreamState& stream : stream_states_) {
LOG(WARNING) << "Unexpected data seen on stream " << i; DCHECK(stream.to_be_flushed);
while (!stream_state.samples.empty()) {
Status status = Dispatch(std::move(stream_state.samples.front())); if (stream.info->stream_type() == kStreamVideo) {
if (!status.ok()) DCHECK_EQ(stream.samples.size(), 0u)
return status; << "Video streams should not store samples";
stream_state.samples.pop_front(); DCHECK_EQ(stream.cue, nullptr) << "Video streams should not store cues";
}
if (!stream.samples.empty()) {
LOG(WARNING) << "Unexpected data seen on stream.";
} }
} }
// Go through all the streams and dispatch any remaining samples.
for (StreamState& stream : stream_states_) {
while (!stream.samples.empty()) {
RETURN_IF_ERROR(Dispatch(std::move(stream.samples.front())));
stream.samples.pop_front();
} }
}
return FlushAllDownstreams(); return FlushAllDownstreams();
} }
@ -115,6 +130,62 @@ Status CueAlignmentHandler::OnStreamInfo(std::unique_ptr<StreamData> data) {
return Dispatch(std::move(data)); return Dispatch(std::move(data));
} }
Status CueAlignmentHandler::OnVideoSample(std::unique_ptr<StreamData> sample) {
DCHECK(sample);
DCHECK(sample->media_sample);
const size_t stream_index = sample->stream_index;
StreamState& stream = stream_states_[stream_index];
const double sample_time = TimeInSeconds(*stream.info, *sample);
const bool is_key_frame = sample->media_sample->is_key_frame();
if (is_key_frame && sample_time >= hint_) {
auto next_sync = sync_points_->PromoteAt(sample_time);
if (!next_sync) {
LOG(ERROR) << "Failed to promote sync point at " << sample_time
<< ". This happens only if video streams are not GOP-aligned.";
return Status(error::INVALID_ARGUMENT,
"Streams are not properly GOP-aligned.");
}
RETURN_IF_ERROR(UseNewSyncPoint(std::move(next_sync)));
RETURN_IF_ERROR(DispatchCueEvent(stream_index, std::move(stream.cue)));
}
return Dispatch(std::move(sample));
}
Status CueAlignmentHandler::OnNonVideoSample(
std::unique_ptr<StreamData> sample) {
DCHECK(sample);
DCHECK(sample->media_sample || sample->text_sample);
const size_t stream_index = sample->stream_index;
StreamState& stream_state = stream_states_[stream_index];
// Accept the sample. This will output it if it comes before the hint point or
// will cache it if it comes after the hint point.
RETURN_IF_ERROR(AcceptSample(std::move(sample), &stream_state));
// If all the streams are waiting on a hint, it means that none has next sync
// point determined. It also means that there are no video streams and we need
// to wait for all streams to converge on a hint so that we can get the next
// sync point.
if (EveryoneWaitingAtHint()) {
std::shared_ptr<const CueEvent> next_sync = sync_points_->GetNext(hint_);
if (!next_sync) {
// This happens only if the job is cancelled.
return Status(error::CANCELLED, "SyncPointQueue is cancelled.");
}
RETURN_IF_ERROR(UseNewSyncPoint(next_sync));
}
return Status::OK;
}
Status CueAlignmentHandler::OnSample(std::unique_ptr<StreamData> sample) { Status CueAlignmentHandler::OnSample(std::unique_ptr<StreamData> sample) {
// There are two modes: // There are two modes:
// 1. There is a video input. // 1. There is a video input.
@ -126,56 +197,14 @@ Status CueAlignmentHandler::OnSample(std::unique_ptr<StreamData> sample) {
// When there are no video inputs, we rely on the sync point queue to block // When there are no video inputs, we rely on the sync point queue to block
// us until there is a sync point. // us until there is a sync point.
const uint64_t stream_index = sample->stream_index; const size_t stream_index = sample->stream_index;
StreamState& stream_state = stream_states_[stream_index]; const StreamType stream_type =
stream_states_[stream_index].info->stream_type();
if (stream_state.info->stream_type() == kStreamVideo) { const bool is_video = stream_type == kStreamVideo;
const double sample_time = TimeInSeconds(*stream_state.info, *sample);
if (sample->media_sample->is_key_frame() && sample_time >= hint_) {
std::shared_ptr<const CueEvent> next_sync =
sync_points_->PromoteAt(sample_time);
if (!next_sync) {
LOG(ERROR)
<< "Failed to promote sync point at " << sample_time
<< ". This happens only if video streams are not GOP-aligned.";
return Status(error::INVALID_ARGUMENT,
"Streams are not properly GOP-aligned.");
}
Status status(DispatchCueEvent(stream_index, next_sync)); return is_video ? OnVideoSample(std::move(sample))
stream_state.cue.reset(); : OnNonVideoSample(std::move(sample));
status.Update(UseNewSyncPoint(next_sync));
if (!status.ok())
return status;
}
return Dispatch(std::move(sample));
}
// Accept the sample. This will output it if it comes before the hint point or
// will cache it if it comes after the hint point.
Status status = AcceptSample(std::move(sample), &stream_state);
if (!status.ok()) {
return status;
}
// If all the streams are waiting on a hint, it means that none has next sync
// point determined. It also means that there are no video streams and we need
// to wait for all streams to converge on a hint so that we can get the next
// sync point.
if (EveryoneWaitingAtHint()) {
std::shared_ptr<const CueEvent> next_sync = sync_points_->GetNext(hint_);
if (!next_sync) {
// This happens only if the job is cancelled.
return Status(error::CANCELLED, "SyncPointQueue is cancelled.");
}
Status status = UseNewSyncPoint(next_sync);
if (!status.ok()) {
return status;
}
}
return Status::OK;
} }
Status CueAlignmentHandler::UseNewSyncPoint( Status CueAlignmentHandler::UseNewSyncPoint(

View File

@ -53,6 +53,9 @@ class CueAlignmentHandler : public MediaHandler {
// Internal handling functions for different stream data. // Internal handling functions for different stream data.
Status OnStreamInfo(std::unique_ptr<StreamData> data); Status OnStreamInfo(std::unique_ptr<StreamData> data);
Status OnVideoSample(std::unique_ptr<StreamData> sample);
Status OnNonVideoSample(std::unique_ptr<StreamData> sample);
Status OnSample(std::unique_ptr<StreamData> sample); Status OnSample(std::unique_ptr<StreamData> sample);
// Update stream states with new sync point. // Update stream states with new sync point.