// Copyright 2018 Google Inc. All rights reserved. // // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file or at // https://developers.google.com/open-source/licenses/bsd #include "packager/media/chunking/cue_alignment_handler.h" #include "packager/status_macros.h" namespace shaka { namespace media { namespace { // The max number of samples that are allowed to be buffered before we shutdown // because there is likely a problem with the content or how the pipeline was // configured. This is about 20 seconds of buffer for audio with 48kHz. const size_t kMaxBufferSize = 1000; double TimeInSeconds(const StreamInfo& info, const StreamData& data) { int64_t time_scale; int64_t scaled_time; switch (data.stream_data_type) { case StreamDataType::kMediaSample: time_scale = info.time_scale(); if (info.stream_type() == kStreamAudio) { // Return the start time for video and mid-point for audio, so that for // an audio sample, if the portion of the sample after the cue point is // bigger than the portion of the sample before the cue point, the // sample is placed after the cue. // It does not matter for text samples as text samples will be cut at // cue point. scaled_time = data.media_sample->pts() + data.media_sample->duration() / 2; } else { scaled_time = data.media_sample->pts(); } break; case StreamDataType::kTextSample: // Text is always in MS but the stream info time scale is 0. time_scale = 1000; scaled_time = data.text_sample->start_time(); break; default: time_scale = 0; scaled_time = 0; NOTREACHED() << "TimeInSeconds should only be called on media samples " "and text samples."; break; } return static_cast(scaled_time) / time_scale; } } // namespace CueAlignmentHandler::CueAlignmentHandler(SyncPointQueue* sync_points) : sync_points_(sync_points) {} Status CueAlignmentHandler::InitializeInternal() { sync_points_->AddThread(); stream_states_.resize(num_input_streams()); // Get the first hint for the stream. Use a negative hint so that if there is // suppose to be a sync point at zero, we will still respect it. hint_ = sync_points_->GetHint(-1); return Status::OK; } Status CueAlignmentHandler::Process(std::unique_ptr data) { switch (data->stream_data_type) { case StreamDataType::kStreamInfo: return OnStreamInfo(std::move(data)); case StreamDataType::kTextSample: case StreamDataType::kMediaSample: return OnSample(std::move(data)); default: VLOG(3) << "Dropping unsupported data type " << static_cast(data->stream_data_type); return Status::OK; } } Status CueAlignmentHandler::OnFlushRequest(size_t stream_index) { stream_states_[stream_index].to_be_flushed = true; // We need to wait for all stream to flush before we can flush each stream. // This allows cached buffers to be cleared and cues to be properly // synchronized and set on all streams. for (const StreamState& stream_state : stream_states_) { if (!stream_state.to_be_flushed) { return Status::OK; } } // Do a once over all the streams to ensure that their states are as we expect // them. Video and non-video streams have different allowances here. Video // should absolutely have no cues or samples where as non-video streams may // have cues or samples. for (StreamState& stream : stream_states_) { DCHECK(stream.to_be_flushed); if (stream.info->stream_type() == kStreamVideo) { DCHECK_EQ(stream.samples.size(), 0u) << "Video streams should not store samples"; DCHECK_EQ(stream.cues.size(), 0u) << "Video streams should not store cues"; } if (!stream.samples.empty()) { LOG(WARNING) << "Unexpected samples seen on stream. Skipping samples"; } } // Go through all the streams and dispatch any remaining cues. for (StreamState& stream : stream_states_) { while (stream.cues.size()) { RETURN_IF_ERROR(Dispatch(std::move(stream.cues.front()))); stream.cues.pop_front(); } } return FlushAllDownstreams(); } Status CueAlignmentHandler::OnStreamInfo(std::unique_ptr data) { StreamState& stream_state = stream_states_[data->stream_index]; // Keep a copy of the stream info so that we can check type and check // timescale. stream_state.info = data->stream_info; return Dispatch(std::move(data)); } Status CueAlignmentHandler::OnVideoSample(std::unique_ptr sample) { DCHECK(sample); DCHECK(sample->media_sample); const size_t stream_index = sample->stream_index; StreamState& stream = stream_states_[stream_index]; const double sample_time = TimeInSeconds(*stream.info, *sample); const bool is_key_frame = sample->media_sample->is_key_frame(); if (is_key_frame && sample_time >= hint_) { auto next_sync = sync_points_->PromoteAt(sample_time); if (!next_sync) { LOG(ERROR) << "Failed to promote sync point at " << sample_time << ". This happens only if video streams are not GOP-aligned."; return Status(error::INVALID_ARGUMENT, "Streams are not properly GOP-aligned."); } RETURN_IF_ERROR(UseNewSyncPoint(std::move(next_sync))); DCHECK_EQ(stream.cues.size(), 1u); RETURN_IF_ERROR(Dispatch(std::move(stream.cues.front()))); stream.cues.pop_front(); } return Dispatch(std::move(sample)); } Status CueAlignmentHandler::OnNonVideoSample( std::unique_ptr sample) { DCHECK(sample); DCHECK(sample->media_sample || sample->text_sample); const size_t stream_index = sample->stream_index; StreamState& stream_state = stream_states_[stream_index]; // Accept the sample. This will output it if it comes before the hint point or // will cache it if it comes after the hint point. RETURN_IF_ERROR(AcceptSample(std::move(sample), &stream_state)); // If all the streams are waiting on a hint, it means that none has next sync // point determined. It also means that there are no video streams and we need // to wait for all streams to converge on a hint so that we can get the next // sync point. if (EveryoneWaitingAtHint()) { std::shared_ptr next_sync = sync_points_->GetNext(hint_); if (!next_sync) { // This happens only if the job is cancelled. return Status(error::CANCELLED, "SyncPointQueue is cancelled."); } RETURN_IF_ERROR(UseNewSyncPoint(next_sync)); } return Status::OK; } Status CueAlignmentHandler::OnSample(std::unique_ptr sample) { // There are two modes: // 1. There is a video input. // 2. There are no video inputs. // // When there is a video input, we rely on the video input get the next sync // point and release all the samples. // // When there are no video inputs, we rely on the sync point queue to block // us until there is a sync point. const size_t stream_index = sample->stream_index; const StreamType stream_type = stream_states_[stream_index].info->stream_type(); const bool is_video = stream_type == kStreamVideo; return is_video ? OnVideoSample(std::move(sample)) : OnNonVideoSample(std::move(sample)); } Status CueAlignmentHandler::UseNewSyncPoint( std::shared_ptr new_sync) { hint_ = sync_points_->GetHint(new_sync->time_in_seconds); DCHECK_GT(hint_, new_sync->time_in_seconds); for (size_t stream_index = 0; stream_index < stream_states_.size(); stream_index++) { StreamState& stream = stream_states_[stream_index]; stream.cues.push_back(StreamData::FromCueEvent(stream_index, new_sync)); RETURN_IF_ERROR(RunThroughSamples(&stream)); } return Status::OK; } bool CueAlignmentHandler::EveryoneWaitingAtHint() const { for (const StreamState& stream_state : stream_states_) { if (stream_state.samples.empty()) { return false; } } return true; } Status CueAlignmentHandler::AcceptSample(std::unique_ptr sample, StreamState* stream) { DCHECK(sample); DCHECK(sample->media_sample || sample->text_sample); DCHECK(stream); // Need to cache the stream index as we will lose the pointer when we add // the sample to the queue. const size_t stream_index = sample->stream_index; stream->samples.push_back(std::move(sample)); if (stream->samples.size() > kMaxBufferSize) { LOG(ERROR) << "Stream " << stream_index << " has buffered " << stream->samples.size() << " when the max is " << kMaxBufferSize; return Status(error::INVALID_ARGUMENT, "Streams are not properly multiplexed."); } return RunThroughSamples(stream); } Status CueAlignmentHandler::RunThroughSamples(StreamState* stream) { // Step through all our samples until we find where we can insert the cue. // Think of this as a merge sort. while (stream->cues.size() && stream->samples.size()) { const double cue_time = stream->cues.front()->cue_event->time_in_seconds; const double sample_time = TimeInSeconds(*stream->info, *stream->samples.front()); if (sample_time < cue_time) { RETURN_IF_ERROR(Dispatch(std::move(stream->samples.front()))); stream->samples.pop_front(); } else { RETURN_IF_ERROR(Dispatch(std::move(stream->cues.front()))); stream->cues.pop_front(); } } // If we still have samples, then it means that we sent out the cue and can // now work up to the hint. So now send all samples that come before the hint // downstream. while (stream->samples.size() && TimeInSeconds(*stream->info, *stream->samples.front()) < hint_) { RETURN_IF_ERROR(Dispatch(std::move(stream->samples.front()))); stream->samples.pop_front(); } return Status::OK; } } // namespace media } // namespace shaka