Use Map To Track Demuxers and Aligners

To move us toward no longer need to ensure order when building
our pipeline, use a map to share demuxers between stream descriptors.
This will even allow use to use the same demuxers in the text pipelines
while still building them separately from the audio and video streams.

Change-Id: I4d4dbddbc06adee36cbe7f4aa1f6769f7bb2a3f6
This commit is contained in:
Aaron Vaage 2018-04-25 18:12:36 -07:00
parent 37d4ff017f
commit 03afb16c0c
1 changed files with 57 additions and 61 deletions

View File

@ -640,40 +640,42 @@ Status CreateAudioVideoJobs(
DCHECK(muxer_listener_factory); DCHECK(muxer_listener_factory);
DCHECK(muxer_factory); DCHECK(muxer_factory);
DCHECK(job_manager); DCHECK(job_manager);
// Store all the demuxers in a map so that we can look up a stream's demuxer.
// This is step one in making this part of the pipeline less dependant on
// order.
std::map<std::string, std::shared_ptr<Demuxer>> sources;
std::map<std::string, std::shared_ptr<MediaHandler>> cue_aligners;
for (const StreamDescriptor& stream : streams) {
bool seen_input_before = sources.find(stream.input) != sources.end();
if (seen_input_before) {
continue;
}
RETURN_IF_ERROR(
CreateDemuxer(stream, packaging_params, &sources[stream.input]));
cue_aligners[stream.input] =
sync_points ? std::make_shared<CueAlignmentHandler>(sync_points)
: nullptr;
}
for (auto& source : sources) {
job_manager->Add("RemuxJob", source.second);
}
// Demuxers are shared among all streams with the same input.
std::shared_ptr<Demuxer> demuxer;
// When |sync_points| is not null, there should be one CueAlignmentHandler per
// input. All CueAlignmentHandler shares the same |sync_points|, which allows
// sync points / cues to be aligned across streams, whether they are from the
// same input or not.
std::shared_ptr<CueAlignmentHandler> cue_aligner;
// Replicators are shared among all streams with the same input and stream // Replicators are shared among all streams with the same input and stream
// selector. // selector.
std::shared_ptr<Replicator> replicator; std::shared_ptr<MediaHandler> replicator;
std::string previous_input; std::string previous_input;
std::string previous_selector; std::string previous_selector;
for (const StreamDescriptor& stream : streams) { for (const StreamDescriptor& stream : streams) {
// If we changed our input files, we need a new demuxer. // Get the demuxer for this stream.
auto& demuxer = sources[stream.input];
auto& cue_aligner = cue_aligners[stream.input];
const bool new_input_file = stream.input != previous_input; const bool new_input_file = stream.input != previous_input;
if (new_input_file) {
Status status = CreateDemuxer(stream, packaging_params, &demuxer);
if (!status.ok()) {
return status;
}
job_manager->Add("RemuxJob", demuxer);
if (sync_points)
cue_aligner = std::make_shared<CueAlignmentHandler>(sync_points);
}
if (!stream.language.empty()) {
demuxer->SetLanguageOverride(stream.stream_selector, stream.language);
}
const bool new_stream = const bool new_stream =
new_input_file || previous_selector != stream.stream_selector; new_input_file || previous_selector != stream.stream_selector;
previous_input = stream.input; previous_input = stream.input;
@ -685,58 +687,52 @@ Status CreateAudioVideoJobs(
continue; continue;
} }
// Just because it is a different stream descriptor does not mean it is a
// new stream. Multiple stream descriptors may have the same stream but
// only differ by trick play factor.
if (new_stream) { if (new_stream) {
auto chunker =
std::make_shared<ChunkingHandler>(packaging_params.chunking_params);
std::shared_ptr<MediaHandler> encryptor = CreateEncryptionHandler(
packaging_params, stream, encryption_key_source);
replicator = std::make_shared<Replicator>();
if (cue_aligner) {
RETURN_IF_ERROR(
demuxer->SetHandler(stream.stream_selector, cue_aligner));
RETURN_IF_ERROR(cue_aligner->AddHandler(chunker));
} else {
RETURN_IF_ERROR(demuxer->SetHandler(stream.stream_selector, chunker));
}
if (encryptor) {
RETURN_IF_ERROR(chunker->AddHandler(encryptor));
RETURN_IF_ERROR(encryptor->AddHandler(replicator));
} else {
RETURN_IF_ERROR(chunker->AddHandler(replicator));
}
if (!stream.language.empty()) { if (!stream.language.empty()) {
demuxer->SetLanguageOverride(stream.stream_selector, stream.language); demuxer->SetLanguageOverride(stream.stream_selector, stream.language);
} }
replicator = std::make_shared<Replicator>();
auto chunker =
std::make_shared<ChunkingHandler>(packaging_params.chunking_params);
auto encryptor = CreateEncryptionHandler(packaging_params, stream,
encryption_key_source);
// TODO(vaage) : Create a nicer way to connect handlers to demuxers.
if (sync_points) {
RETURN_IF_ERROR(
ChainHandlers({cue_aligner, chunker, encryptor, replicator}));
RETURN_IF_ERROR(
demuxer->SetHandler(stream.stream_selector, cue_aligner));
} else {
RETURN_IF_ERROR(ChainHandlers({chunker, encryptor, replicator}));
RETURN_IF_ERROR(demuxer->SetHandler(stream.stream_selector, chunker));
}
} }
// Create the muxer (output) for this track. // Create the muxer (output) for this track.
std::unique_ptr<MuxerListener> muxer_listener =
muxer_listener_factory->CreateListener(ToMuxerListenerData(stream));
std::shared_ptr<Muxer> muxer = std::shared_ptr<Muxer> muxer =
muxer_factory->CreateMuxer(GetOutputFormat(stream), stream); muxer_factory->CreateMuxer(GetOutputFormat(stream), stream);
muxer->SetMuxerListener(std::move(muxer_listener));
if (!muxer) { if (!muxer) {
return Status(error::INVALID_ARGUMENT, "Failed to create muxer for " + return Status(error::INVALID_ARGUMENT, "Failed to create muxer for " +
stream.input + ":" + stream.input + ":" +
stream.stream_selector); stream.stream_selector);
} }
std::shared_ptr<MediaHandler> trick_play; std::unique_ptr<MuxerListener> muxer_listener =
if (stream.trick_play_factor) { muxer_listener_factory->CreateListener(ToMuxerListenerData(stream));
trick_play = std::make_shared<TrickPlayHandler>(stream.trick_play_factor); muxer->SetMuxerListener(std::move(muxer_listener));
}
if (trick_play) { // Trick play is optional.
RETURN_IF_ERROR(replicator->AddHandler(trick_play)); std::shared_ptr<MediaHandler> trick_play =
RETURN_IF_ERROR(trick_play->AddHandler(muxer)); stream.trick_play_factor
} else { ? std::make_shared<TrickPlayHandler>(stream.trick_play_factor)
RETURN_IF_ERROR(replicator->AddHandler(muxer)); : nullptr;
}
RETURN_IF_ERROR(ChainHandlers({replicator, trick_play, muxer}));
} }
return Status::OK; return Status::OK;