Create Muxer Listener Factory

Moved muxer listener creation into a factory so that it can be
initialized once and then the factory only needs to be passed
around rather than all the parameters.

Change-Id: Ibf7df66c08debddbe6f6ff8995e7910b9ad17f3f
This commit is contained in:
Aaron Vaage 2017-12-12 14:13:47 -08:00
parent c877af9f3b
commit c57da2b42d
4 changed files with 271 additions and 103 deletions

View File

@ -20,6 +20,8 @@
'mpd_notify_muxer_listener.cc', 'mpd_notify_muxer_listener.cc',
'mpd_notify_muxer_listener.h', 'mpd_notify_muxer_listener.h',
'muxer_listener.h', 'muxer_listener.h',
'muxer_listener_factory.cc',
'muxer_listener_factory.h',
'muxer_listener_internal.cc', 'muxer_listener_internal.cc',
'muxer_listener_internal.h', 'muxer_listener_internal.h',
'vod_media_info_dump_muxer_listener.cc', 'vod_media_info_dump_muxer_listener.cc',

View File

@ -0,0 +1,111 @@
// Copyright 2017 Google Inc. All rights reserved.
//
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file or at
// https://developers.google.com/open-source/licenses/bsd
#include "packager/media/event/muxer_listener_factory.h"
#include "packager/base/strings/stringprintf.h"
#include "packager/hls/base/hls_notifier.h"
#include "packager/media/event/combined_muxer_listener.h"
#include "packager/media/event/hls_notify_muxer_listener.h"
#include "packager/media/event/mpd_notify_muxer_listener.h"
#include "packager/media/event/muxer_listener.h"
#include "packager/media/event/vod_media_info_dump_muxer_listener.h"
#include "packager/mpd/base/mpd_notifier.h"
namespace shaka {
namespace media {
namespace {
const char kMediaInfoSuffix[] = ".media_info";
std::unique_ptr<MuxerListener> CreateMediaInfoDumpListenerInternal(
const std::string& output) {
DCHECK(!output.empty());
std::unique_ptr<MuxerListener> listener(
new VodMediaInfoDumpMuxerListener(output + kMediaInfoSuffix));
return listener;
}
std::unique_ptr<MuxerListener> CreateMpdListenerInternal(
MpdNotifier* notifier) {
DCHECK(notifier);
std::unique_ptr<MuxerListener> listener(new MpdNotifyMuxerListener(notifier));
return listener;
}
std::unique_ptr<MuxerListener> CreateHlsListenerInternal(
const MuxerListenerFactory::StreamData& stream,
int stream_index,
hls::HlsNotifier* notifier) {
DCHECK(notifier);
DCHECK_GE(stream_index, 0);
// TODO(rkuroiwa): Do some smart stuff to group the audios, e.g. detect
// languages.
std::string group_id = stream.hls_group_id;
std::string name = stream.hls_name;
std::string hls_playlist_name = stream.hls_playlist_name;
if (group_id.empty()) {
group_id = "audio";
}
if (name.empty()) {
name = base::StringPrintf("stream_%d", stream_index);
}
if (hls_playlist_name.empty()) {
hls_playlist_name = base::StringPrintf("stream_%d.m3u8", stream_index);
}
std::unique_ptr<MuxerListener> listener(
new HlsNotifyMuxerListener(hls_playlist_name, name, group_id, notifier));
return listener;
}
} // namespace
MuxerListenerFactory::MuxerListenerFactory(bool output_media_info,
MpdNotifier* mpd_notifier,
hls::HlsNotifier* hls_notifier)
: output_media_info_(output_media_info),
mpd_notifier_(mpd_notifier),
hls_notifier_(hls_notifier) {}
std::unique_ptr<MuxerListener> MuxerListenerFactory::CreateListener(
const StreamData& stream) {
const int stream_index = stream_index_++;
std::unique_ptr<CombinedMuxerListener> combined_listener(
new CombinedMuxerListener);
if (output_media_info_) {
combined_listener->AddListener(
CreateMediaInfoDumpListenerInternal(stream.media_info_output));
}
if (mpd_notifier_) {
combined_listener->AddListener(CreateMpdListenerInternal(mpd_notifier_));
}
if (hls_notifier_) {
combined_listener->AddListener(
CreateHlsListenerInternal(stream, stream_index, hls_notifier_));
}
return std::move(combined_listener);
}
std::unique_ptr<MuxerListener> MuxerListenerFactory::CreateHlsListener(
const StreamData& stream) {
if (!hls_notifier_) {
return nullptr;
}
const int stream_index = stream_index_++;
return CreateHlsListenerInternal(stream, stream_index, hls_notifier_);
}
} // namespace media
} // namespace shaka

View File

@ -0,0 +1,80 @@
// Copyright 2017 Google Inc. All rights reserved.
//
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file or at
// https://developers.google.com/open-source/licenses/bsd
#ifndef PACKAGER_MEDIA_EVENT_MUXER_LISTENER_FACTORY_H_
#define PACKAGER_MEDIA_EVENT_MUXER_LISTENER_FACTORY_H_
#include <memory>
#include <string>
namespace shaka {
class MpdNotifier;
namespace hls {
class HlsNotifier;
}
namespace media {
class MuxerListener;
/// Factory class for creating MuxerListeners. Will produce a single muxer
/// listener that will wrap the various muxer listeners that the factory
/// supports. Currently the factory supports:
/// - Media Info Dump
/// - HLS
/// - MPD
///
/// The listeners that will be combined will be based on the parameters given
/// when constructing the factory.
class MuxerListenerFactory {
public:
/// The subset of data from a stream descriptor that the muxer listener
/// factory needs in order to create listeners for the stream.
struct StreamData {
// The stream's output destination. Will only be used if the factory is
// told to output media info.
std::string media_info_output;
// HLS specific values needed to write to HLS manifests. Will only be used
// if an HlsNotifier is given to the factory.
std::string hls_group_id;
std::string hls_name;
std::string hls_playlist_name;
};
/// Create a new muxer listener.
/// @param output_media_info must be true for the combined listener to include
/// a media info dump listener.
/// @param mpd_notifer must be non-null for the combined listener to include a
/// mpd listener.
/// @param hls_notifier must be non-null for the combined listener to include
/// an HLS listener.
MuxerListenerFactory(bool output_media_info,
MpdNotifier* mpd_notifier,
hls::HlsNotifier* hls_notifier);
/// Create a listener for a stream.
std::unique_ptr<MuxerListener> CreateListener(const StreamData& stream);
/// Create an HLS listener if possible. If it is not possible to
/// create an HLS listener, this method will return null.
std::unique_ptr<MuxerListener> CreateHlsListener(const StreamData& stream);
private:
MuxerListenerFactory(const MuxerListenerFactory&) = delete;
MuxerListenerFactory operator=(const MuxerListenerFactory&) = delete;
bool output_media_info_;
MpdNotifier* mpd_notifier_;
hls::HlsNotifier* hls_notifier_;
// A counter to track which stream we are on.
int stream_index_ = 0;
};
} // namespace media
} // namespace shaka
#endif // PACKAGER_MEDIA_EVENT_MUXER_LISTENER_FACTORY_H_

View File

@ -34,9 +34,7 @@
#include "packager/media/chunking/chunking_handler.h" #include "packager/media/chunking/chunking_handler.h"
#include "packager/media/crypto/encryption_handler.h" #include "packager/media/crypto/encryption_handler.h"
#include "packager/media/demuxer/demuxer.h" #include "packager/media/demuxer/demuxer.h"
#include "packager/media/event/combined_muxer_listener.h" #include "packager/media/event/muxer_listener_factory.h"
#include "packager/media/event/hls_notify_muxer_listener.h"
#include "packager/media/event/mpd_notify_muxer_listener.h"
#include "packager/media/event/vod_media_info_dump_muxer_listener.h" #include "packager/media/event/vod_media_info_dump_muxer_listener.h"
#include "packager/media/formats/webvtt/text_readers.h" #include "packager/media/formats/webvtt/text_readers.h"
#include "packager/media/formats/webvtt/webvtt_output_handler.h" #include "packager/media/formats/webvtt/webvtt_output_handler.h"
@ -74,6 +72,16 @@ MuxerOptions CreateMuxerOptions(const StreamDescriptor& stream,
return options; return options;
} }
MuxerListenerFactory::StreamData ToMuxerListenerData(
const StreamDescriptor& stream) {
MuxerListenerFactory::StreamData data;
data.media_info_output = stream.output;
data.hls_group_id = stream.hls_group_id;
data.hls_name = stream.hls_name;
data.hls_playlist_name = stream.hls_playlist_name;
return data;
};
// TODO(rkuroiwa): Write TTML and WebVTT parser (demuxing) for a better check // TODO(rkuroiwa): Write TTML and WebVTT parser (demuxing) for a better check
// and for supporting live/segmenting (muxing). With a demuxer and a muxer, // and for supporting live/segmenting (muxing). With a demuxer and a muxer,
// CreateAllJobs() shouldn't treat text as a special case. // CreateAllJobs() shouldn't treat text as a special case.
@ -292,57 +300,6 @@ bool StreamInfoToTextMediaInfo(const StreamDescriptor& stream_descriptor,
return true; return true;
} }
std::unique_ptr<MuxerListener> CreateHlsListener(const StreamDescriptor& stream,
int stream_number,
hls::HlsNotifier* notifier) {
DCHECK(notifier);
DCHECK_GE(stream_number, 0);
// TODO(rkuroiwa): Do some smart stuff to group the audios, e.g. detect
// languages.
std::string group_id = stream.hls_group_id;
std::string name = stream.hls_name;
std::string hls_playlist_name = stream.hls_playlist_name;
if (group_id.empty())
group_id = "audio";
if (name.empty())
name = base::StringPrintf("stream_%d", stream_number);
if (hls_playlist_name.empty())
hls_playlist_name = base::StringPrintf("stream_%d.m3u8", stream_number);
return std::unique_ptr<MuxerListener>(
new HlsNotifyMuxerListener(hls_playlist_name, name, group_id, notifier));
}
std::unique_ptr<MuxerListener> CreateMuxerListener(
const StreamDescriptor& stream,
int stream_number,
bool output_media_info,
MpdNotifier* mpd_notifier,
hls::HlsNotifier* hls_notifier) {
std::unique_ptr<CombinedMuxerListener> combined_listener(
new CombinedMuxerListener);
if (output_media_info) {
std::unique_ptr<MuxerListener> listener(
new VodMediaInfoDumpMuxerListener(stream.output + kMediaInfoSuffix));
combined_listener->AddListener(std::move(listener));
}
if (mpd_notifier) {
std::unique_ptr<MuxerListener> listener(
new MpdNotifyMuxerListener(mpd_notifier));
combined_listener->AddListener(std::move(listener));
}
if (hls_notifier) {
combined_listener->AddListener(
CreateHlsListener(stream, stream_number, hls_notifier));
}
return std::move(combined_listener);
}
/// Create a new demuxer handler for the given stream. If a demuxer cannot be /// Create a new demuxer handler for the given stream. If a demuxer cannot be
/// created, an error will be returned. If a demuxer can be created, this /// created, an error will be returned. If a demuxer can be created, this
/// |new_demuxer| will be set and Status::OK will be returned. /// |new_demuxer| will be set and Status::OK will be returned.
@ -408,12 +365,10 @@ std::shared_ptr<MediaHandler> CreateEncryptionHandler(
return std::make_shared<EncryptionHandler>(encryption_params, key_source); return std::make_shared<EncryptionHandler>(encryption_params, key_source);
} }
Status CreateMp4ToMp4TextJob(int stream_number, Status CreateMp4ToMp4TextJob(const StreamDescriptor& stream,
const StreamDescriptor& stream,
const PackagingParams& packaging_params, const PackagingParams& packaging_params,
MuxerFactory* muxer_factory, MuxerFactory* muxer_factory,
MpdNotifier* mpd_notifier, MuxerListenerFactory* muxer_listener_factory,
hls::HlsNotifier* hls_notifier,
std::shared_ptr<OriginHandler>* root) { std::shared_ptr<OriginHandler>* root) {
Status status; Status status;
std::shared_ptr<Demuxer> demuxer; std::shared_ptr<Demuxer> demuxer;
@ -425,9 +380,8 @@ Status CreateMp4ToMp4TextJob(int stream_number,
std::shared_ptr<MediaHandler> chunker( std::shared_ptr<MediaHandler> chunker(
new ChunkingHandler(packaging_params.chunking_params)); new ChunkingHandler(packaging_params.chunking_params));
std::unique_ptr<MuxerListener> muxer_listener = CreateMuxerListener( std::unique_ptr<MuxerListener> muxer_listener =
stream, stream_number, packaging_params.output_media_info, mpd_notifier, muxer_listener_factory->CreateListener(ToMuxerListenerData(stream));
hls_notifier);
std::shared_ptr<Muxer> muxer = std::shared_ptr<Muxer> muxer =
muxer_factory->CreateMuxer(GetOutputFormat(stream), stream); muxer_factory->CreateMuxer(GetOutputFormat(stream), stream);
muxer->SetMuxerListener(std::move(muxer_listener)); muxer->SetMuxerListener(std::move(muxer_listener));
@ -439,12 +393,11 @@ Status CreateMp4ToMp4TextJob(int stream_number,
} }
Status CreateHlsTextJob(const StreamDescriptor& stream, Status CreateHlsTextJob(const StreamDescriptor& stream,
int stream_number,
const PackagingParams& packaging_params, const PackagingParams& packaging_params,
hls::HlsNotifier* notifier, std::unique_ptr<MuxerListener> muxer_listener,
JobManager* job_manager) { JobManager* job_manager) {
DCHECK(notifier); DCHECK(muxer_listener);
DCHECK_GE(stream_number, 0); DCHECK(job_manager);
if (stream.segment_template.empty()) { if (stream.segment_template.empty()) {
return Status(error::INVALID_ARGUMENT, return Status(error::INVALID_ARGUMENT,
@ -463,9 +416,6 @@ Status CreateHlsTextJob(const StreamDescriptor& stream,
MuxerOptions muxer_options = CreateMuxerOptions(stream, packaging_params); MuxerOptions muxer_options = CreateMuxerOptions(stream, packaging_params);
muxer_options.bandwidth = stream.bandwidth ? stream.bandwidth : 256; muxer_options.bandwidth = stream.bandwidth ? stream.bandwidth : 256;
std::unique_ptr<MuxerListener> muxer_listener =
CreateHlsListener(stream, stream_number, notifier);
std::shared_ptr<WebVttSegmentedOutputHandler> output( std::shared_ptr<WebVttSegmentedOutputHandler> output(
new WebVttSegmentedOutputHandler(muxer_options, new WebVttSegmentedOutputHandler(muxer_options,
std::move(muxer_listener))); std::move(muxer_listener)));
@ -495,19 +445,19 @@ Status CreateHlsTextJob(const StreamDescriptor& stream,
Status CreateTextJobs( Status CreateTextJobs(
const std::vector<std::reference_wrapper<const StreamDescriptor>>& streams, const std::vector<std::reference_wrapper<const StreamDescriptor>>& streams,
const PackagingParams& packaging_params, const PackagingParams& packaging_params,
int* stream_number, MuxerListenerFactory* muxer_listener_factory,
MuxerFactory* muxer_factory, MuxerFactory* muxer_factory,
MpdNotifier* mpd_notifier, MpdNotifier* mpd_notifier,
hls::HlsNotifier* hls_notifier,
JobManager* job_manager) { JobManager* job_manager) {
DCHECK(muxer_listener_factory);
DCHECK(job_manager); DCHECK(job_manager);
for (const StreamDescriptor& stream : streams) { for (const StreamDescriptor& stream : streams) {
// TODO(70990714): Support webvtt to mp4 // TODO(70990714): Support webvtt to mp4
if (GetOutputFormat(stream) == CONTAINER_MOV) { if (GetOutputFormat(stream) == CONTAINER_MOV) {
std::shared_ptr<OriginHandler> root; std::shared_ptr<OriginHandler> root;
Status status = CreateMp4ToMp4TextJob((*stream_number)++, stream, Status status =
packaging_params, muxer_factory, CreateMp4ToMp4TextJob(stream, packaging_params, muxer_factory,
mpd_notifier, hls_notifier, &root); muxer_listener_factory, &root);
if (!status.ok()) { if (!status.ok()) {
return status; return status;
@ -515,12 +465,32 @@ Status CreateTextJobs(
job_manager->Add("MP4 text job", std::move(root)); job_manager->Add("MP4 text job", std::move(root));
} else { } else {
std::unique_ptr<MuxerListener> hls_listener =
muxer_listener_factory->CreateHlsListener(
ToMuxerListenerData(stream));
// Check input to ensure that output is possible.
if (hls_listener) {
if (stream.segment_template.empty() || !stream.output.empty()) {
return Status(error::INVALID_ARGUMENT,
"segment_template needs to be specified for HLS text "
"output. Single file output is not supported yet.");
}
}
if (mpd_notifier && !stream.segment_template.empty()) {
return Status(error::INVALID_ARGUMENT,
"Cannot create text output for MPD with segment output.");
}
MediaInfo text_media_info; MediaInfo text_media_info;
if (!StreamInfoToTextMediaInfo(stream, &text_media_info)) { if (!StreamInfoToTextMediaInfo(stream, &text_media_info)) {
return Status(error::INVALID_ARGUMENT, return Status(error::INVALID_ARGUMENT,
"Could not create media info for stream."); "Could not create media info for stream.");
} }
// If we are outputting to MPD, just add the input to the outputted
// manifest.
if (mpd_notifier) { if (mpd_notifier) {
uint32_t unused; uint32_t unused;
if (mpd_notifier->NotifyNewContainer(text_media_info, &unused)) { if (mpd_notifier->NotifyNewContainer(text_media_info, &unused)) {
@ -531,10 +501,11 @@ Status CreateTextJobs(
} }
} }
if (hls_notifier) { // If we are outputting to HLS, then create the HLS test pipeline that
Status status = // will create segmented text output.
CreateHlsTextJob(stream, (*stream_number)++, packaging_params, if (hls_listener) {
hls_notifier, job_manager); Status status = CreateHlsTextJob(stream, packaging_params,
std::move(hls_listener), job_manager);
if (!status.ok()) { if (!status.ok()) {
return status; return status;
} }
@ -553,12 +524,12 @@ Status CreateTextJobs(
Status CreateAudioVideoJobs( Status CreateAudioVideoJobs(
const std::vector<std::reference_wrapper<const StreamDescriptor>>& streams, const std::vector<std::reference_wrapper<const StreamDescriptor>>& streams,
const PackagingParams& packaging_params, const PackagingParams& packaging_params,
int* stream_number,
KeySource* encryption_key_source, KeySource* encryption_key_source,
MuxerListenerFactory* muxer_listener_factory,
MuxerFactory* muxer_factory, MuxerFactory* muxer_factory,
MpdNotifier* mpd_notifier,
hls::HlsNotifier* hls_notifier,
JobManager* job_manager) { JobManager* job_manager) {
DCHECK(muxer_listener_factory);
DCHECK(muxer_factory);
DCHECK(job_manager); DCHECK(job_manager);
// Demuxers are shared among all streams with the same input. // Demuxers are shared among all streams with the same input.
@ -636,9 +607,8 @@ Status CreateAudioVideoJobs(
} }
// Create the muxer (output) for this track. // Create the muxer (output) for this track.
std::unique_ptr<MuxerListener> muxer_listener = CreateMuxerListener( std::unique_ptr<MuxerListener> muxer_listener =
stream, (*stream_number)++, packaging_params.output_media_info, muxer_listener_factory->CreateListener(ToMuxerListenerData(stream));
mpd_notifier, hls_notifier);
std::shared_ptr<Muxer> muxer = std::shared_ptr<Muxer> muxer =
muxer_factory->CreateMuxer(GetOutputFormat(stream), stream); muxer_factory->CreateMuxer(GetOutputFormat(stream), stream);
muxer->SetMuxerListener(std::move(muxer_listener)); muxer->SetMuxerListener(std::move(muxer_listener));
@ -672,11 +642,13 @@ Status CreateAudioVideoJobs(
Status CreateAllJobs(const std::vector<StreamDescriptor>& stream_descriptors, Status CreateAllJobs(const std::vector<StreamDescriptor>& stream_descriptors,
const PackagingParams& packaging_params, const PackagingParams& packaging_params,
FakeClock* fake_clock,
KeySource* encryption_key_source,
MpdNotifier* mpd_notifier, MpdNotifier* mpd_notifier,
hls::HlsNotifier* hls_notifier, KeySource* encryption_key_source,
MuxerListenerFactory* muxer_listener_factory,
MuxerFactory* muxer_factory,
JobManager* job_manager) { JobManager* job_manager) {
DCHECK(muxer_factory);
DCHECK(muxer_listener_factory);
DCHECK(job_manager); DCHECK(job_manager);
// Group all streams based on which pipeline they will use. // Group all streams based on which pipeline they will use.
@ -700,20 +672,14 @@ Status CreateAllJobs(const std::vector<StreamDescriptor>& stream_descriptors,
std::sort(audio_video_streams.begin(), audio_video_streams.end(), std::sort(audio_video_streams.begin(), audio_video_streams.end(),
media::StreamDescriptorCompareFn); media::StreamDescriptorCompareFn);
MuxerFactory muxer_factory(packaging_params);
if (packaging_params.test_params.inject_fake_clock) {
muxer_factory.OverrideClock(fake_clock);
}
int stream_number = 0;
Status status; Status status;
status.Update(CreateTextJobs(text_streams, packaging_params, &stream_number, status.Update(CreateTextJobs(text_streams, packaging_params,
&muxer_factory, mpd_notifier, hls_notifier, muxer_listener_factory, muxer_factory,
job_manager)); mpd_notifier, job_manager));
status.Update(CreateAudioVideoJobs(audio_video_streams, packaging_params, status.Update(CreateAudioVideoJobs(
&stream_number, encryption_key_source, audio_video_streams, packaging_params, encryption_key_source,
&muxer_factory, mpd_notifier, hls_notifier, muxer_listener_factory, muxer_factory, job_manager));
job_manager));
if (!status.ok()) { if (!status.ok()) {
return status; return status;
} }
@ -776,7 +742,7 @@ Status Packager::Initialize(
// Store callback params to make it available during packaging. // Store callback params to make it available during packaging.
internal->buffer_callback_params = packaging_params.buffer_callback_params; internal->buffer_callback_params = packaging_params.buffer_callback_params;
// Update mpd output and hls output if callback param is specified. // Update MPD output and HLS output if callback param is specified.
MpdParams mpd_params = packaging_params.mpd_params; MpdParams mpd_params = packaging_params.mpd_params;
HlsParams hls_params = packaging_params.hls_params; HlsParams hls_params = packaging_params.hls_params;
if (internal->buffer_callback_params.write_func) { if (internal->buffer_callback_params.write_func) {
@ -842,10 +808,19 @@ Status Packager::Initialize(
streams_for_jobs.push_back(copy); streams_for_jobs.push_back(copy);
} }
media::MuxerFactory muxer_factory(packaging_params);
if (packaging_params.test_params.inject_fake_clock) {
muxer_factory.OverrideClock(&internal->fake_clock);
}
media::MuxerListenerFactory muxer_listener_factory(
packaging_params.output_media_info, internal->mpd_notifier.get(),
internal->hls_notifier.get());
Status status = media::CreateAllJobs( Status status = media::CreateAllJobs(
streams_for_jobs, packaging_params, &internal->fake_clock, streams_for_jobs, packaging_params, internal->mpd_notifier.get(),
internal->encryption_key_source.get(), internal->mpd_notifier.get(), internal->encryption_key_source.get(), &muxer_listener_factory,
internal->hls_notifier.get(), &internal->job_manager); &muxer_factory, &internal->job_manager);
if (!status.ok()) { if (!status.ok()) {
return status; return status;