diff --git a/packager/media/chunking/cue_alignment_handler.cc b/packager/media/chunking/cue_alignment_handler.cc index ced5281156..cb1f2124c9 100644 --- a/packager/media/chunking/cue_alignment_handler.cc +++ b/packager/media/chunking/cue_alignment_handler.cc @@ -102,7 +102,7 @@ Status CueAlignmentHandler::OnFlushRequest(size_t stream_index) { if (stream.info->stream_type() == kStreamVideo) { DCHECK_EQ(stream.samples.size(), 0u) << "Video streams should not store samples"; - DCHECK_EQ(stream.cue, nullptr) << "Video streams should not store cues"; + DCHECK(!stream.cue) << "Video streams should not store cues"; } if (!stream.samples.empty()) { @@ -151,7 +151,8 @@ Status CueAlignmentHandler::OnVideoSample(std::unique_ptr sample) { } RETURN_IF_ERROR(UseNewSyncPoint(std::move(next_sync))); - RETURN_IF_ERROR(DispatchCueEvent(stream_index, std::move(stream.cue))); + DCHECK(stream.cue); + RETURN_IF_ERROR(Dispatch(std::move(stream.cue))); } return Dispatch(std::move(sample)); @@ -212,40 +213,31 @@ Status CueAlignmentHandler::UseNewSyncPoint( hint_ = sync_points_->GetHint(new_sync->time_in_seconds); DCHECK_GT(hint_, new_sync->time_in_seconds); - Status status; - for (StreamState& stream_state : stream_states_) { - // No stream should be so out of sync with the others that they would - // still be working on an old cue. - if (stream_state.cue) { - // TODO(kqyang): Could this happen for text when there are no text samples - // between the two cues? - LOG(ERROR) << "Found two cue events that are too close together. One at " - << stream_state.cue->time_in_seconds << " and the other at " - << new_sync->time_in_seconds; - return Status(error::INVALID_ARGUMENT, "Cue events too close together"); + // No stream should be so out of sync with the others that they would + // still be working on an old cue. + for (auto& stream : stream_states_) { + if (!stream.cue) { + continue; } - stream_state.cue = new_sync; - - while (status.ok() && !stream_state.samples.empty()) { - std::unique_ptr& sample = stream_state.samples.front(); - const double sample_time_in_seconds = - TimeInSeconds(*stream_state.info, *sample); - if (sample_time_in_seconds >= hint_) { - DCHECK(!stream_state.cue); - break; - } - - const size_t stream_index = sample->stream_index; - if (stream_state.cue) { - status.Update(DispatchCueIfNeeded(stream_index, sample_time_in_seconds, - &stream_state)); - } - status.Update(Dispatch(std::move(sample))); - stream_state.samples.pop_front(); - } + // TODO(kqyang): Could this happen for text when there are no text samples + // between the two cues? + LOG(ERROR) << "Found two cue events that are too close together. One at " + << stream.cue->cue_event->time_in_seconds << " and the other at " + << new_sync->time_in_seconds; + return Status(error::INVALID_ARGUMENT, "Cue events too close together"); } - return status; + + // Add the cue to each stream and give them each a chance to run through their + // samples. + for (size_t index = 0; index < stream_states_.size(); index++) { + StreamState& stream = stream_states_[index]; + stream.cue = StreamData::FromCueEvent(index, new_sync); + + RETURN_IF_ERROR(RunThroughSamples(&stream)); + } + + return Status::OK; } bool CueAlignmentHandler::EveryoneWaitingAtHint() const { @@ -263,25 +255,16 @@ bool CueAlignmentHandler::EveryoneWaitingAtHint() const { // 2. Save the sample in the buffer as it comes after the next sync point. Status CueAlignmentHandler::AcceptSample(std::unique_ptr sample, StreamState* stream_state) { + DCHECK(sample); + DCHECK(sample->media_sample || sample->text_sample); DCHECK(stream_state); + // Need to cache the stream index as we will lose the pointer when we add + // the sample to the queue. const size_t stream_index = sample->stream_index; - if (stream_state->samples.empty()) { - const double sample_time_in_seconds = - TimeInSeconds(*stream_state->info, *sample); - if (sample_time_in_seconds < hint_) { - Status status; - if (stream_state->cue) { - status.Update(DispatchCueIfNeeded(stream_index, sample_time_in_seconds, - stream_state)); - } - status.Update(Dispatch(std::move(sample))); - return status; - } - DCHECK(!stream_state->cue); - } stream_state->samples.push_back(std::move(sample)); + if (stream_state->samples.size() > kMaxBufferSize) { LOG(ERROR) << "Stream " << stream_index << " has buffered " << stream_state->samples.size() << " when the max is " @@ -290,19 +273,37 @@ Status CueAlignmentHandler::AcceptSample(std::unique_ptr sample, "Streams are not properly multiplexed."); } + return RunThroughSamples(stream_state); +} + +Status CueAlignmentHandler::RunThroughSamples(StreamState* stream) { + // Step through all our samples until we find where we can insert the cue. + // Think of this as a merge sort. + while (stream->cue && stream->samples.size()) { + const double sample_time_in_seconds = + TimeInSeconds(*stream->info, *stream->samples.front()); + const double cue_time_in_seconds = stream->cue->cue_event->time_in_seconds; + + // The cue can go out now. So let it, but we can't let the sample out yet + // because we need to check it with the hint. + if (cue_time_in_seconds <= sample_time_in_seconds) { + RETURN_IF_ERROR(Dispatch(std::move(stream->cue))); + } else { + RETURN_IF_ERROR(Dispatch(std::move(stream->samples.front()))); + stream->samples.pop_front(); + } + } + + // If we still have samples, then it means that we sent out the cue and can + // now work up to the hint. So now send all samples that come before the hint + // downstream. + while (stream->samples.size() && + TimeInSeconds(*stream->info, *stream->samples.front()) < hint_) { + RETURN_IF_ERROR(Dispatch(std::move(stream->samples.front()))); + stream->samples.pop_front(); + } + return Status::OK; } - -Status CueAlignmentHandler::DispatchCueIfNeeded( - size_t stream_index, - double next_sample_time_in_seconds, - StreamState* stream_state) { - DCHECK(stream_state->cue); - if (next_sample_time_in_seconds < stream_state->cue->time_in_seconds) - return Status::OK; - DCHECK_LT(stream_state->cue->time_in_seconds, hint_); - return DispatchCueEvent(stream_index, std::move(stream_state->cue)); -} - } // namespace media } // namespace shaka diff --git a/packager/media/chunking/cue_alignment_handler.h b/packager/media/chunking/cue_alignment_handler.h index ee4dd9f557..1471e4c509 100644 --- a/packager/media/chunking/cue_alignment_handler.h +++ b/packager/media/chunking/cue_alignment_handler.h @@ -41,9 +41,9 @@ class CueAlignmentHandler : public MediaHandler { bool to_be_flushed = false; // If set, it points to the next cue it has to send downstream. Note that if - // it is not set, the next cue is not determined. - // This is set but not really used by video stream. - std::shared_ptr cue; + // it is not set, the next cue is not determined. We store the stream data + // so that we don't need a stream index to send it out. + std::unique_ptr cue; }; // MediaHandler overrides. @@ -68,10 +68,8 @@ class CueAlignmentHandler : public MediaHandler { Status AcceptSample(std::unique_ptr sample, StreamState* stream_state); - // Dispatch the cue if the new sample comes at or after it. - Status DispatchCueIfNeeded(size_t stream_index, - double next_sample_time_in_seconds, - StreamState* stream_state); + // Dispatch all samples and cues (in the correct order) for the given stream. + Status RunThroughSamples(StreamState* stream); SyncPointQueue* const sync_points_ = nullptr; std::vector stream_states_;