From 03afb16c0c708301b8c5e80c78c9542f45e3b99d Mon Sep 17 00:00:00 2001 From: Aaron Vaage Date: Wed, 25 Apr 2018 18:12:36 -0700 Subject: [PATCH] Use Map To Track Demuxers and Aligners To move us toward no longer need to ensure order when building our pipeline, use a map to share demuxers between stream descriptors. This will even allow use to use the same demuxers in the text pipelines while still building them separately from the audio and video streams. Change-Id: I4d4dbddbc06adee36cbe7f4aa1f6769f7bb2a3f6 --- packager/packager.cc | 118 +++++++++++++++++++++---------------------- 1 file changed, 57 insertions(+), 61 deletions(-) diff --git a/packager/packager.cc b/packager/packager.cc index 8295a60fe9..27efb3aa7d 100644 --- a/packager/packager.cc +++ b/packager/packager.cc @@ -640,40 +640,42 @@ Status CreateAudioVideoJobs( DCHECK(muxer_listener_factory); DCHECK(muxer_factory); DCHECK(job_manager); + // Store all the demuxers in a map so that we can look up a stream's demuxer. + // This is step one in making this part of the pipeline less dependant on + // order. + std::map> sources; + std::map> cue_aligners; + + for (const StreamDescriptor& stream : streams) { + bool seen_input_before = sources.find(stream.input) != sources.end(); + if (seen_input_before) { + continue; + } + + RETURN_IF_ERROR( + CreateDemuxer(stream, packaging_params, &sources[stream.input])); + cue_aligners[stream.input] = + sync_points ? std::make_shared(sync_points) + : nullptr; + } + + for (auto& source : sources) { + job_manager->Add("RemuxJob", source.second); + } - // Demuxers are shared among all streams with the same input. - std::shared_ptr demuxer; - // When |sync_points| is not null, there should be one CueAlignmentHandler per - // input. All CueAlignmentHandler shares the same |sync_points|, which allows - // sync points / cues to be aligned across streams, whether they are from the - // same input or not. - std::shared_ptr cue_aligner; // Replicators are shared among all streams with the same input and stream // selector. - std::shared_ptr replicator; + std::shared_ptr replicator; std::string previous_input; std::string previous_selector; for (const StreamDescriptor& stream : streams) { - // If we changed our input files, we need a new demuxer. + // Get the demuxer for this stream. + auto& demuxer = sources[stream.input]; + auto& cue_aligner = cue_aligners[stream.input]; + const bool new_input_file = stream.input != previous_input; - if (new_input_file) { - Status status = CreateDemuxer(stream, packaging_params, &demuxer); - if (!status.ok()) { - return status; - } - - job_manager->Add("RemuxJob", demuxer); - - if (sync_points) - cue_aligner = std::make_shared(sync_points); - } - - if (!stream.language.empty()) { - demuxer->SetLanguageOverride(stream.stream_selector, stream.language); - } - const bool new_stream = new_input_file || previous_selector != stream.stream_selector; previous_input = stream.input; @@ -685,58 +687,52 @@ Status CreateAudioVideoJobs( continue; } + // Just because it is a different stream descriptor does not mean it is a + // new stream. Multiple stream descriptors may have the same stream but + // only differ by trick play factor. if (new_stream) { - auto chunker = - std::make_shared(packaging_params.chunking_params); - - std::shared_ptr encryptor = CreateEncryptionHandler( - packaging_params, stream, encryption_key_source); - - replicator = std::make_shared(); - - if (cue_aligner) { - RETURN_IF_ERROR( - demuxer->SetHandler(stream.stream_selector, cue_aligner)); - RETURN_IF_ERROR(cue_aligner->AddHandler(chunker)); - } else { - RETURN_IF_ERROR(demuxer->SetHandler(stream.stream_selector, chunker)); - } - if (encryptor) { - RETURN_IF_ERROR(chunker->AddHandler(encryptor)); - RETURN_IF_ERROR(encryptor->AddHandler(replicator)); - } else { - RETURN_IF_ERROR(chunker->AddHandler(replicator)); - } - if (!stream.language.empty()) { demuxer->SetLanguageOverride(stream.stream_selector, stream.language); } + + replicator = std::make_shared(); + auto chunker = + std::make_shared(packaging_params.chunking_params); + auto encryptor = CreateEncryptionHandler(packaging_params, stream, + encryption_key_source); + + // TODO(vaage) : Create a nicer way to connect handlers to demuxers. + if (sync_points) { + RETURN_IF_ERROR( + ChainHandlers({cue_aligner, chunker, encryptor, replicator})); + RETURN_IF_ERROR( + demuxer->SetHandler(stream.stream_selector, cue_aligner)); + } else { + RETURN_IF_ERROR(ChainHandlers({chunker, encryptor, replicator})); + RETURN_IF_ERROR(demuxer->SetHandler(stream.stream_selector, chunker)); + } } // Create the muxer (output) for this track. - std::unique_ptr muxer_listener = - muxer_listener_factory->CreateListener(ToMuxerListenerData(stream)); std::shared_ptr muxer = muxer_factory->CreateMuxer(GetOutputFormat(stream), stream); - muxer->SetMuxerListener(std::move(muxer_listener)); - if (!muxer) { return Status(error::INVALID_ARGUMENT, "Failed to create muxer for " + stream.input + ":" + stream.stream_selector); } - std::shared_ptr trick_play; - if (stream.trick_play_factor) { - trick_play = std::make_shared(stream.trick_play_factor); - } + std::unique_ptr muxer_listener = + muxer_listener_factory->CreateListener(ToMuxerListenerData(stream)); + muxer->SetMuxerListener(std::move(muxer_listener)); - if (trick_play) { - RETURN_IF_ERROR(replicator->AddHandler(trick_play)); - RETURN_IF_ERROR(trick_play->AddHandler(muxer)); - } else { - RETURN_IF_ERROR(replicator->AddHandler(muxer)); - } + // Trick play is optional. + std::shared_ptr trick_play = + stream.trick_play_factor + ? std::make_shared(stream.trick_play_factor) + : nullptr; + + RETURN_IF_ERROR(ChainHandlers({replicator, trick_play, muxer})); } return Status::OK;