diff --git a/packager/packager.cc b/packager/packager.cc index 8295a60fe9..27efb3aa7d 100644 --- a/packager/packager.cc +++ b/packager/packager.cc @@ -640,40 +640,42 @@ Status CreateAudioVideoJobs( DCHECK(muxer_listener_factory); DCHECK(muxer_factory); DCHECK(job_manager); + // Store all the demuxers in a map so that we can look up a stream's demuxer. + // This is step one in making this part of the pipeline less dependant on + // order. + std::map> sources; + std::map> cue_aligners; + + for (const StreamDescriptor& stream : streams) { + bool seen_input_before = sources.find(stream.input) != sources.end(); + if (seen_input_before) { + continue; + } + + RETURN_IF_ERROR( + CreateDemuxer(stream, packaging_params, &sources[stream.input])); + cue_aligners[stream.input] = + sync_points ? std::make_shared(sync_points) + : nullptr; + } + + for (auto& source : sources) { + job_manager->Add("RemuxJob", source.second); + } - // Demuxers are shared among all streams with the same input. - std::shared_ptr demuxer; - // When |sync_points| is not null, there should be one CueAlignmentHandler per - // input. All CueAlignmentHandler shares the same |sync_points|, which allows - // sync points / cues to be aligned across streams, whether they are from the - // same input or not. - std::shared_ptr cue_aligner; // Replicators are shared among all streams with the same input and stream // selector. - std::shared_ptr replicator; + std::shared_ptr replicator; std::string previous_input; std::string previous_selector; for (const StreamDescriptor& stream : streams) { - // If we changed our input files, we need a new demuxer. + // Get the demuxer for this stream. + auto& demuxer = sources[stream.input]; + auto& cue_aligner = cue_aligners[stream.input]; + const bool new_input_file = stream.input != previous_input; - if (new_input_file) { - Status status = CreateDemuxer(stream, packaging_params, &demuxer); - if (!status.ok()) { - return status; - } - - job_manager->Add("RemuxJob", demuxer); - - if (sync_points) - cue_aligner = std::make_shared(sync_points); - } - - if (!stream.language.empty()) { - demuxer->SetLanguageOverride(stream.stream_selector, stream.language); - } - const bool new_stream = new_input_file || previous_selector != stream.stream_selector; previous_input = stream.input; @@ -685,58 +687,52 @@ Status CreateAudioVideoJobs( continue; } + // Just because it is a different stream descriptor does not mean it is a + // new stream. Multiple stream descriptors may have the same stream but + // only differ by trick play factor. if (new_stream) { - auto chunker = - std::make_shared(packaging_params.chunking_params); - - std::shared_ptr encryptor = CreateEncryptionHandler( - packaging_params, stream, encryption_key_source); - - replicator = std::make_shared(); - - if (cue_aligner) { - RETURN_IF_ERROR( - demuxer->SetHandler(stream.stream_selector, cue_aligner)); - RETURN_IF_ERROR(cue_aligner->AddHandler(chunker)); - } else { - RETURN_IF_ERROR(demuxer->SetHandler(stream.stream_selector, chunker)); - } - if (encryptor) { - RETURN_IF_ERROR(chunker->AddHandler(encryptor)); - RETURN_IF_ERROR(encryptor->AddHandler(replicator)); - } else { - RETURN_IF_ERROR(chunker->AddHandler(replicator)); - } - if (!stream.language.empty()) { demuxer->SetLanguageOverride(stream.stream_selector, stream.language); } + + replicator = std::make_shared(); + auto chunker = + std::make_shared(packaging_params.chunking_params); + auto encryptor = CreateEncryptionHandler(packaging_params, stream, + encryption_key_source); + + // TODO(vaage) : Create a nicer way to connect handlers to demuxers. + if (sync_points) { + RETURN_IF_ERROR( + ChainHandlers({cue_aligner, chunker, encryptor, replicator})); + RETURN_IF_ERROR( + demuxer->SetHandler(stream.stream_selector, cue_aligner)); + } else { + RETURN_IF_ERROR(ChainHandlers({chunker, encryptor, replicator})); + RETURN_IF_ERROR(demuxer->SetHandler(stream.stream_selector, chunker)); + } } // Create the muxer (output) for this track. - std::unique_ptr muxer_listener = - muxer_listener_factory->CreateListener(ToMuxerListenerData(stream)); std::shared_ptr muxer = muxer_factory->CreateMuxer(GetOutputFormat(stream), stream); - muxer->SetMuxerListener(std::move(muxer_listener)); - if (!muxer) { return Status(error::INVALID_ARGUMENT, "Failed to create muxer for " + stream.input + ":" + stream.stream_selector); } - std::shared_ptr trick_play; - if (stream.trick_play_factor) { - trick_play = std::make_shared(stream.trick_play_factor); - } + std::unique_ptr muxer_listener = + muxer_listener_factory->CreateListener(ToMuxerListenerData(stream)); + muxer->SetMuxerListener(std::move(muxer_listener)); - if (trick_play) { - RETURN_IF_ERROR(replicator->AddHandler(trick_play)); - RETURN_IF_ERROR(trick_play->AddHandler(muxer)); - } else { - RETURN_IF_ERROR(replicator->AddHandler(muxer)); - } + // Trick play is optional. + std::shared_ptr trick_play = + stream.trick_play_factor + ? std::make_shared(stream.trick_play_factor) + : nullptr; + + RETURN_IF_ERROR(ChainHandlers({replicator, trick_play, muxer})); } return Status::OK;