From e605203fa7be3fa20eb287d3b9c445f55969b02e Mon Sep 17 00:00:00 2001 From: Aaron Vaage Date: Wed, 16 Aug 2017 10:25:53 -0700 Subject: [PATCH] Added Origin Handler Created a handler called OriginHandler that can be used by the Job class (previously called RemuxJob). Origin handlers represent the start (or origin) of a pipeline (chain of handlers). Change-Id: Ibd748ae0a932b6e0ebb879ea292fcb83c548214b --- packager/media/demuxer/demuxer.gyp | 1 + packager/media/demuxer/demuxer.h | 8 +-- packager/media/formats/webvtt/webvtt.gyp | 1 + packager/media/origin/origin.gyp | 24 +++++++++ packager/media/origin/origin_handler.cc | 20 ++++++++ packager/media/origin/origin_handler.h | 44 ++++++++++++++++ packager/packager.cc | 65 +++++++++++++----------- 7 files changed, 130 insertions(+), 33 deletions(-) create mode 100644 packager/media/origin/origin.gyp create mode 100644 packager/media/origin/origin_handler.cc create mode 100644 packager/media/origin/origin_handler.h diff --git a/packager/media/demuxer/demuxer.gyp b/packager/media/demuxer/demuxer.gyp index 4059682bac..788a76ed56 100644 --- a/packager/media/demuxer/demuxer.gyp +++ b/packager/media/demuxer/demuxer.gyp @@ -24,6 +24,7 @@ '../formats/webm/webm.gyp:webm', '../formats/webvtt/webvtt.gyp:webvtt', '../formats/wvm/wvm.gyp:wvm', + '../origin/origin.gyp:origin', ], }, { diff --git a/packager/media/demuxer/demuxer.h b/packager/media/demuxer/demuxer.h index 6f93ed44d3..612735873f 100644 --- a/packager/media/demuxer/demuxer.h +++ b/packager/media/demuxer/demuxer.h @@ -13,7 +13,7 @@ #include "packager/base/compiler_specific.h" #include "packager/media/base/container_names.h" -#include "packager/media/base/media_handler.h" +#include "packager/media/origin/origin_handler.h" #include "packager/status.h" namespace shaka { @@ -31,7 +31,7 @@ class StreamInfo; /// Demuxer is responsible for extracting elementary stream samples from a /// media file, e.g. an ISO BMFF file. -class Demuxer : public MediaHandler { +class Demuxer : public OriginHandler { public: /// @param file_name specifies the input source. It uses prefix matching to /// create a proper File object. The user can extend File to support @@ -47,11 +47,11 @@ class Demuxer : public MediaHandler { /// Drive the remuxing from demuxer side (push). Read the file and push /// the Data to Muxer until Eof. - Status Run(); + Status Run() override; /// Cancel a demuxing job in progress. Will cause @a Run to exit with an error /// status of type CANCELLED. - void Cancel(); + void Cancel() override; /// @return Container name (type). Value is CONTAINER_UNKNOWN if the demuxer /// is not initialized. diff --git a/packager/media/formats/webvtt/webvtt.gyp b/packager/media/formats/webvtt/webvtt.gyp index ba9a760f9b..c0c7d3317a 100644 --- a/packager/media/formats/webvtt/webvtt.gyp +++ b/packager/media/formats/webvtt/webvtt.gyp @@ -26,6 +26,7 @@ '../../../base/base.gyp:base', '../../base/media_base.gyp:media_base', '../../formats/mp4/mp4.gyp:mp4', + '../../origin/origin.gyp:origin', ], }, { diff --git a/packager/media/origin/origin.gyp b/packager/media/origin/origin.gyp new file mode 100644 index 0000000000..4a5c34e3b0 --- /dev/null +++ b/packager/media/origin/origin.gyp @@ -0,0 +1,24 @@ +# Copyright 2017 Google Inc. All rights reserved. +# +# Use of this source code is governed by a BSD-style +# license that can be found in the LICENSE file or at +# https://developers.google.com/open-source/licenses/bsd + +{ + 'includes': [ + '../../common.gypi', + ], + 'targets': [ + { + 'target_name': 'origin', + 'type': '<(component)', + 'sources': [ + 'origin_handler.cc', + 'origin_handler.h', + ], + 'dependencies': [ + '../base/media_base.gyp:media_base', + ], + }, + ], +} diff --git a/packager/media/origin/origin_handler.cc b/packager/media/origin/origin_handler.cc new file mode 100644 index 0000000000..0877d5feda --- /dev/null +++ b/packager/media/origin/origin_handler.cc @@ -0,0 +1,20 @@ +// Copyright 2017 Google Inc. All rights reserved. +// +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file or at +// https://developers.google.com/open-source/licenses/bsd + +#include "packager/media/origin/origin_handler.h" + +namespace shaka { +namespace media { + +// Origin handlers are always at the start of a pipeline (chain or handlers) +// and therefore should never receive input via |Process|. +Status OriginHandler::Process(std::unique_ptr stream_data) { + return Status(error::INTERNAL_ERROR, + "An origin handlers should never be a downstream handler."); +} + +} // namespace media +} // namespace shaka diff --git a/packager/media/origin/origin_handler.h b/packager/media/origin/origin_handler.h new file mode 100644 index 0000000000..450db4d6a4 --- /dev/null +++ b/packager/media/origin/origin_handler.h @@ -0,0 +1,44 @@ +// Copyright 2017 Google Inc. All rights reserved. +// +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file or at +// https://developers.google.com/open-source/licenses/bsd + +#ifndef PACKAGER_MEDIA_ORIGIN_ORIGIN_HANDLER_H_ +#define PACKAGER_MEDIA_ORIGIN_ORIGIN_HANDLER_H_ + +#include "packager/media/base/media_handler.h" + +namespace shaka { +namespace media { + +// Origin handlers are handlers that sit at the head of a pipeline (chain of +// handlers). They are expect to take input from an alternative source (like +// a file or network connection). +class OriginHandler : public MediaHandler { + public: + OriginHandler() = default; + + // Process all data and send messages down stream. This is the main + // method of the handler. Since origin handlers do not take input via + // |Process|, run will take input from an alternative source. This call + // is expect to be blocking. To exit a call to |Run|, |Cancel| should + // be used. + virtual Status Run() = 0; + + // Non-blocking call to the handler, requesting that it exit the + // current call to |Run|. The handler should stop processing data + // as soon is convenient. + virtual void Cancel() = 0; + + private: + OriginHandler(const OriginHandler&) = delete; + OriginHandler& operator=(const OriginHandler&) = delete; + + Status Process(std::unique_ptr stream_data) override; +}; + +} // namespace media +} // namespace shaka + +#endif // PACKAGER_MEDIA_ORIGIN_ORIGIN_HANDLER_H_ diff --git a/packager/packager.cc b/packager/packager.cc index ee1c85610d..447be1ed54 100644 --- a/packager/packager.cc +++ b/packager/packager.cc @@ -231,27 +231,33 @@ class FakeClock : public base::Clock { base::Time Now() override { return base::Time(); } }; -// Demux, Mux(es) and worker thread used to remux a source file/stream. -class RemuxJob : public base::SimpleThread { +class Job : public base::SimpleThread { public: - RemuxJob(std::unique_ptr demuxer) - : SimpleThread("RemuxJob"), demuxer_(std::move(demuxer)) {} + Job(const std::string& name, std::shared_ptr work) + : SimpleThread(name), work_(work) {} - ~RemuxJob() override {} + void Initialize() { + DCHECK(work_); + status_ = work_->Initialize(); + } + + void Cancel() { + DCHECK(work_); + work_->Cancel(); + } - Demuxer* demuxer() { return demuxer_.get(); } Status status() { return status_; } private: - RemuxJob(const RemuxJob&) = delete; - RemuxJob& operator=(const RemuxJob&) = delete; + Job(const Job&) = delete; + Job& operator=(const Job&) = delete; void Run() override { - DCHECK(demuxer_); - status_ = demuxer_->Run(); + DCHECK(work_); + status_ = work_->Run(); } - std::unique_ptr demuxer_; + std::shared_ptr work_; Status status_; }; @@ -316,11 +322,12 @@ bool CreateRemuxJobs(const StreamDescriptorList& stream_descriptors, KeySource* encryption_key_source, MpdNotifier* mpd_notifier, hls::HlsNotifier* hls_notifier, - std::vector>* remux_jobs) { + std::vector>* jobs) { // No notifiers OR (mpd_notifier XOR hls_notifier); which is NAND. DCHECK(!(mpd_notifier && hls_notifier)); - DCHECK(remux_jobs); + DCHECK(jobs); + std::shared_ptr demuxer; std::shared_ptr trick_play_handler; std::string previous_input; @@ -370,7 +377,8 @@ bool CreateRemuxJobs(const StreamDescriptorList& stream_descriptors, if (stream_iter->input != previous_input) { // New remux job needed. Create demux and job thread. - std::unique_ptr demuxer(new Demuxer(stream_iter->input)); + demuxer = std::make_shared(stream_iter->input); + demuxer->set_dump_stream_info( packaging_params.test_params.dump_stream_info); if (packaging_params.decryption_params.key_provider != @@ -381,14 +389,14 @@ bool CreateRemuxJobs(const StreamDescriptorList& stream_descriptors, return false; demuxer->SetKeySource(std::move(decryption_key_source)); } - remux_jobs->emplace_back(new RemuxJob(std::move(demuxer))); + jobs->emplace_back(new media::Job("RemuxJob", demuxer)); trick_play_handler.reset(); previous_input = stream_iter->input; // Skip setting up muxers if output is not needed. if (stream_iter->output.empty() && stream_iter->segment_template.empty()) continue; } - DCHECK(!remux_jobs->empty()); + DCHECK(!jobs->empty()); // Each stream selector requires an individual trick play handler. // E.g., an input with two video streams needs two trick play handlers. @@ -485,7 +493,6 @@ bool CreateRemuxJobs(const StreamDescriptorList& stream_descriptors, handlers.push_back(std::move(muxer)); } - auto* demuxer = remux_jobs->back()->demuxer(); const std::string& stream_selector = stream_iter->stream_selector; status.Update(demuxer->SetHandler(stream_selector, chunking_handler)); status.Update(ConnectHandlers(handlers)); @@ -499,19 +506,19 @@ bool CreateRemuxJobs(const StreamDescriptorList& stream_descriptors, } // Initialize processing graph. - for (const std::unique_ptr& job : *remux_jobs) { - Status status = job->demuxer()->Initialize(); - if (!status.ok()) { - LOG(ERROR) << "Failed to initialize processing graph " << status; + for (const std::unique_ptr& job : *jobs) { + job->Initialize(); + if (!job->status().ok()) { + LOG(ERROR) << "Failed to initialize processing graph " << job->status(); return false; } } return true; } -Status RunRemuxJobs(const std::vector>& remux_jobs) { +Status RunRemuxJobs(const std::vector>& jobs) { // Start the job threads. - for (const std::unique_ptr& job : remux_jobs) + for (const std::unique_ptr& job : jobs) job->Start(); // Wait for all jobs to complete or an error occurs. @@ -519,7 +526,7 @@ Status RunRemuxJobs(const std::vector>& remux_jobs) { bool all_joined; do { all_joined = true; - for (const std::unique_ptr& job : remux_jobs) { + for (const std::unique_ptr& job : jobs) { if (job->HasBeenJoined()) { status = job->status(); if (!status.ok()) @@ -560,7 +567,7 @@ struct Packager::PackagerInternal { std::unique_ptr encryption_key_source; std::unique_ptr mpd_notifier; std::unique_ptr hls_notifier; - std::vector> remux_jobs; + std::vector> jobs; }; Packager::Packager() {} @@ -646,7 +653,7 @@ Status Packager::Initialize( stream_descriptor_list, packaging_params, chunking_options, encryption_options, muxer_options, &internal->fake_clock, internal->encryption_key_source.get(), internal->mpd_notifier.get(), - internal->hls_notifier.get(), &internal->remux_jobs)) { + internal->hls_notifier.get(), &internal->jobs)) { return Status(error::INVALID_ARGUMENT, "Failed to create remux jobs."); } internal_ = std::move(internal); @@ -656,7 +663,7 @@ Status Packager::Initialize( Status Packager::Run() { if (!internal_) return Status(error::INVALID_ARGUMENT, "Not yet initialized."); - Status status = media::RunRemuxJobs(internal_->remux_jobs); + Status status = media::RunRemuxJobs(internal_->jobs); if (!status.ok()) return status; @@ -676,8 +683,8 @@ void Packager::Cancel() { LOG(INFO) << "Not yet initialized. Return directly."; return; } - for (const std::unique_ptr& job : internal_->remux_jobs) - job->demuxer()->Cancel(); + for (const std::unique_ptr& job : internal_->jobs) + job->Cancel(); } std::string Packager::GetLibraryVersion() {