diff --git a/packager/media/chunking/cue_alignment_handler.cc b/packager/media/chunking/cue_alignment_handler.cc index 77e96d8c06..ced5281156 100644 --- a/packager/media/chunking/cue_alignment_handler.cc +++ b/packager/media/chunking/cue_alignment_handler.cc @@ -6,6 +6,8 @@ #include "packager/media/chunking/cue_alignment_handler.h" +#include "packager/status_macros.h" + namespace shaka { namespace media { namespace { @@ -79,30 +81,43 @@ Status CueAlignmentHandler::Process(std::unique_ptr data) { } Status CueAlignmentHandler::OnFlushRequest(size_t stream_index) { + stream_states_[stream_index].to_be_flushed = true; + // We need to wait for all stream to flush before we can flush each stream. // This allows cached buffers to be cleared and cues to be properly - // synchronized and set on all streams.. - - stream_states_[stream_index].to_be_flushed = true; + // synchronized and set on all streams. for (const StreamState& stream_state : stream_states_) { - if (!stream_state.to_be_flushed) + if (!stream_state.to_be_flushed) { return Status::OK; - } - - // |to_be_flushed| is set on all streams. We don't expect to see data in any - // buffers. - for (size_t i = 0; i < stream_states_.size(); ++i) { - StreamState& stream_state = stream_states_[i]; - if (!stream_state.samples.empty()) { - LOG(WARNING) << "Unexpected data seen on stream " << i; - while (!stream_state.samples.empty()) { - Status status = Dispatch(std::move(stream_state.samples.front())); - if (!status.ok()) - return status; - stream_state.samples.pop_front(); - } } } + + // Do a once over all the streams to ensure that their states are as we expect + // them. Video and non-video streams have different allowances here. Video + // should absolutely have no cues or samples where as non-video streams may + // have cues or samples. + for (StreamState& stream : stream_states_) { + DCHECK(stream.to_be_flushed); + + if (stream.info->stream_type() == kStreamVideo) { + DCHECK_EQ(stream.samples.size(), 0u) + << "Video streams should not store samples"; + DCHECK_EQ(stream.cue, nullptr) << "Video streams should not store cues"; + } + + if (!stream.samples.empty()) { + LOG(WARNING) << "Unexpected data seen on stream."; + } + } + + // Go through all the streams and dispatch any remaining samples. + for (StreamState& stream : stream_states_) { + while (!stream.samples.empty()) { + RETURN_IF_ERROR(Dispatch(std::move(stream.samples.front()))); + stream.samples.pop_front(); + } + } + return FlushAllDownstreams(); } @@ -115,6 +130,62 @@ Status CueAlignmentHandler::OnStreamInfo(std::unique_ptr data) { return Dispatch(std::move(data)); } +Status CueAlignmentHandler::OnVideoSample(std::unique_ptr sample) { + DCHECK(sample); + DCHECK(sample->media_sample); + + const size_t stream_index = sample->stream_index; + StreamState& stream = stream_states_[stream_index]; + + const double sample_time = TimeInSeconds(*stream.info, *sample); + const bool is_key_frame = sample->media_sample->is_key_frame(); + + if (is_key_frame && sample_time >= hint_) { + auto next_sync = sync_points_->PromoteAt(sample_time); + + if (!next_sync) { + LOG(ERROR) << "Failed to promote sync point at " << sample_time + << ". This happens only if video streams are not GOP-aligned."; + return Status(error::INVALID_ARGUMENT, + "Streams are not properly GOP-aligned."); + } + + RETURN_IF_ERROR(UseNewSyncPoint(std::move(next_sync))); + RETURN_IF_ERROR(DispatchCueEvent(stream_index, std::move(stream.cue))); + } + + return Dispatch(std::move(sample)); +} + +Status CueAlignmentHandler::OnNonVideoSample( + std::unique_ptr sample) { + DCHECK(sample); + DCHECK(sample->media_sample || sample->text_sample); + + const size_t stream_index = sample->stream_index; + StreamState& stream_state = stream_states_[stream_index]; + + // Accept the sample. This will output it if it comes before the hint point or + // will cache it if it comes after the hint point. + RETURN_IF_ERROR(AcceptSample(std::move(sample), &stream_state)); + + // If all the streams are waiting on a hint, it means that none has next sync + // point determined. It also means that there are no video streams and we need + // to wait for all streams to converge on a hint so that we can get the next + // sync point. + if (EveryoneWaitingAtHint()) { + std::shared_ptr next_sync = sync_points_->GetNext(hint_); + if (!next_sync) { + // This happens only if the job is cancelled. + return Status(error::CANCELLED, "SyncPointQueue is cancelled."); + } + + RETURN_IF_ERROR(UseNewSyncPoint(next_sync)); + } + + return Status::OK; +} + Status CueAlignmentHandler::OnSample(std::unique_ptr sample) { // There are two modes: // 1. There is a video input. @@ -126,56 +197,14 @@ Status CueAlignmentHandler::OnSample(std::unique_ptr sample) { // When there are no video inputs, we rely on the sync point queue to block // us until there is a sync point. - const uint64_t stream_index = sample->stream_index; - StreamState& stream_state = stream_states_[stream_index]; + const size_t stream_index = sample->stream_index; + const StreamType stream_type = + stream_states_[stream_index].info->stream_type(); - if (stream_state.info->stream_type() == kStreamVideo) { - const double sample_time = TimeInSeconds(*stream_state.info, *sample); - if (sample->media_sample->is_key_frame() && sample_time >= hint_) { - std::shared_ptr next_sync = - sync_points_->PromoteAt(sample_time); - if (!next_sync) { - LOG(ERROR) - << "Failed to promote sync point at " << sample_time - << ". This happens only if video streams are not GOP-aligned."; - return Status(error::INVALID_ARGUMENT, - "Streams are not properly GOP-aligned."); - } + const bool is_video = stream_type == kStreamVideo; - Status status(DispatchCueEvent(stream_index, next_sync)); - stream_state.cue.reset(); - status.Update(UseNewSyncPoint(next_sync)); - if (!status.ok()) - return status; - } - return Dispatch(std::move(sample)); - } - - // Accept the sample. This will output it if it comes before the hint point or - // will cache it if it comes after the hint point. - Status status = AcceptSample(std::move(sample), &stream_state); - if (!status.ok()) { - return status; - } - - // If all the streams are waiting on a hint, it means that none has next sync - // point determined. It also means that there are no video streams and we need - // to wait for all streams to converge on a hint so that we can get the next - // sync point. - if (EveryoneWaitingAtHint()) { - std::shared_ptr next_sync = sync_points_->GetNext(hint_); - if (!next_sync) { - // This happens only if the job is cancelled. - return Status(error::CANCELLED, "SyncPointQueue is cancelled."); - } - - Status status = UseNewSyncPoint(next_sync); - if (!status.ok()) { - return status; - } - } - - return Status::OK; + return is_video ? OnVideoSample(std::move(sample)) + : OnNonVideoSample(std::move(sample)); } Status CueAlignmentHandler::UseNewSyncPoint( diff --git a/packager/media/chunking/cue_alignment_handler.h b/packager/media/chunking/cue_alignment_handler.h index 6ac0bebeaf..ee4dd9f557 100644 --- a/packager/media/chunking/cue_alignment_handler.h +++ b/packager/media/chunking/cue_alignment_handler.h @@ -53,6 +53,9 @@ class CueAlignmentHandler : public MediaHandler { // Internal handling functions for different stream data. Status OnStreamInfo(std::unique_ptr data); + + Status OnVideoSample(std::unique_ptr sample); + Status OnNonVideoSample(std::unique_ptr sample); Status OnSample(std::unique_ptr sample); // Update stream states with new sync point.