Added Origin Handler

Created a handler called OriginHandler that can be used by the Job class
(previously called RemuxJob). Origin handlers represent the start (or
origin) of a pipeline (chain of handlers).

Change-Id: Ibd748ae0a932b6e0ebb879ea292fcb83c548214b
This commit is contained in:
Aaron Vaage 2017-08-16 10:25:53 -07:00
parent 34c5e011a5
commit e605203fa7
7 changed files with 130 additions and 33 deletions

View File

@ -24,6 +24,7 @@
'../formats/webm/webm.gyp:webm', '../formats/webm/webm.gyp:webm',
'../formats/webvtt/webvtt.gyp:webvtt', '../formats/webvtt/webvtt.gyp:webvtt',
'../formats/wvm/wvm.gyp:wvm', '../formats/wvm/wvm.gyp:wvm',
'../origin/origin.gyp:origin',
], ],
}, },
{ {

View File

@ -13,7 +13,7 @@
#include "packager/base/compiler_specific.h" #include "packager/base/compiler_specific.h"
#include "packager/media/base/container_names.h" #include "packager/media/base/container_names.h"
#include "packager/media/base/media_handler.h" #include "packager/media/origin/origin_handler.h"
#include "packager/status.h" #include "packager/status.h"
namespace shaka { namespace shaka {
@ -31,7 +31,7 @@ class StreamInfo;
/// Demuxer is responsible for extracting elementary stream samples from a /// Demuxer is responsible for extracting elementary stream samples from a
/// media file, e.g. an ISO BMFF file. /// media file, e.g. an ISO BMFF file.
class Demuxer : public MediaHandler { class Demuxer : public OriginHandler {
public: public:
/// @param file_name specifies the input source. It uses prefix matching to /// @param file_name specifies the input source. It uses prefix matching to
/// create a proper File object. The user can extend File to support /// create a proper File object. The user can extend File to support
@ -47,11 +47,11 @@ class Demuxer : public MediaHandler {
/// Drive the remuxing from demuxer side (push). Read the file and push /// Drive the remuxing from demuxer side (push). Read the file and push
/// the Data to Muxer until Eof. /// the Data to Muxer until Eof.
Status Run(); Status Run() override;
/// Cancel a demuxing job in progress. Will cause @a Run to exit with an error /// Cancel a demuxing job in progress. Will cause @a Run to exit with an error
/// status of type CANCELLED. /// status of type CANCELLED.
void Cancel(); void Cancel() override;
/// @return Container name (type). Value is CONTAINER_UNKNOWN if the demuxer /// @return Container name (type). Value is CONTAINER_UNKNOWN if the demuxer
/// is not initialized. /// is not initialized.

View File

@ -26,6 +26,7 @@
'../../../base/base.gyp:base', '../../../base/base.gyp:base',
'../../base/media_base.gyp:media_base', '../../base/media_base.gyp:media_base',
'../../formats/mp4/mp4.gyp:mp4', '../../formats/mp4/mp4.gyp:mp4',
'../../origin/origin.gyp:origin',
], ],
}, },
{ {

View File

@ -0,0 +1,24 @@
# Copyright 2017 Google Inc. All rights reserved.
#
# Use of this source code is governed by a BSD-style
# license that can be found in the LICENSE file or at
# https://developers.google.com/open-source/licenses/bsd
{
'includes': [
'../../common.gypi',
],
'targets': [
{
'target_name': 'origin',
'type': '<(component)',
'sources': [
'origin_handler.cc',
'origin_handler.h',
],
'dependencies': [
'../base/media_base.gyp:media_base',
],
},
],
}

View File

@ -0,0 +1,20 @@
// Copyright 2017 Google Inc. All rights reserved.
//
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file or at
// https://developers.google.com/open-source/licenses/bsd
#include "packager/media/origin/origin_handler.h"
namespace shaka {
namespace media {
// Origin handlers are always at the start of a pipeline (chain or handlers)
// and therefore should never receive input via |Process|.
Status OriginHandler::Process(std::unique_ptr<StreamData> stream_data) {
return Status(error::INTERNAL_ERROR,
"An origin handlers should never be a downstream handler.");
}
} // namespace media
} // namespace shaka

View File

@ -0,0 +1,44 @@
// Copyright 2017 Google Inc. All rights reserved.
//
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file or at
// https://developers.google.com/open-source/licenses/bsd
#ifndef PACKAGER_MEDIA_ORIGIN_ORIGIN_HANDLER_H_
#define PACKAGER_MEDIA_ORIGIN_ORIGIN_HANDLER_H_
#include "packager/media/base/media_handler.h"
namespace shaka {
namespace media {
// Origin handlers are handlers that sit at the head of a pipeline (chain of
// handlers). They are expect to take input from an alternative source (like
// a file or network connection).
class OriginHandler : public MediaHandler {
public:
OriginHandler() = default;
// Process all data and send messages down stream. This is the main
// method of the handler. Since origin handlers do not take input via
// |Process|, run will take input from an alternative source. This call
// is expect to be blocking. To exit a call to |Run|, |Cancel| should
// be used.
virtual Status Run() = 0;
// Non-blocking call to the handler, requesting that it exit the
// current call to |Run|. The handler should stop processing data
// as soon is convenient.
virtual void Cancel() = 0;
private:
OriginHandler(const OriginHandler&) = delete;
OriginHandler& operator=(const OriginHandler&) = delete;
Status Process(std::unique_ptr<StreamData> stream_data) override;
};
} // namespace media
} // namespace shaka
#endif // PACKAGER_MEDIA_ORIGIN_ORIGIN_HANDLER_H_

View File

@ -231,27 +231,33 @@ class FakeClock : public base::Clock {
base::Time Now() override { return base::Time(); } base::Time Now() override { return base::Time(); }
}; };
// Demux, Mux(es) and worker thread used to remux a source file/stream. class Job : public base::SimpleThread {
class RemuxJob : public base::SimpleThread {
public: public:
RemuxJob(std::unique_ptr<Demuxer> demuxer) Job(const std::string& name, std::shared_ptr<OriginHandler> work)
: SimpleThread("RemuxJob"), demuxer_(std::move(demuxer)) {} : SimpleThread(name), work_(work) {}
~RemuxJob() override {} void Initialize() {
DCHECK(work_);
status_ = work_->Initialize();
}
void Cancel() {
DCHECK(work_);
work_->Cancel();
}
Demuxer* demuxer() { return demuxer_.get(); }
Status status() { return status_; } Status status() { return status_; }
private: private:
RemuxJob(const RemuxJob&) = delete; Job(const Job&) = delete;
RemuxJob& operator=(const RemuxJob&) = delete; Job& operator=(const Job&) = delete;
void Run() override { void Run() override {
DCHECK(demuxer_); DCHECK(work_);
status_ = demuxer_->Run(); status_ = work_->Run();
} }
std::unique_ptr<Demuxer> demuxer_; std::shared_ptr<OriginHandler> work_;
Status status_; Status status_;
}; };
@ -316,11 +322,12 @@ bool CreateRemuxJobs(const StreamDescriptorList& stream_descriptors,
KeySource* encryption_key_source, KeySource* encryption_key_source,
MpdNotifier* mpd_notifier, MpdNotifier* mpd_notifier,
hls::HlsNotifier* hls_notifier, hls::HlsNotifier* hls_notifier,
std::vector<std::unique_ptr<RemuxJob>>* remux_jobs) { std::vector<std::unique_ptr<Job>>* jobs) {
// No notifiers OR (mpd_notifier XOR hls_notifier); which is NAND. // No notifiers OR (mpd_notifier XOR hls_notifier); which is NAND.
DCHECK(!(mpd_notifier && hls_notifier)); DCHECK(!(mpd_notifier && hls_notifier));
DCHECK(remux_jobs); DCHECK(jobs);
std::shared_ptr<Demuxer> demuxer;
std::shared_ptr<TrickPlayHandler> trick_play_handler; std::shared_ptr<TrickPlayHandler> trick_play_handler;
std::string previous_input; std::string previous_input;
@ -370,7 +377,8 @@ bool CreateRemuxJobs(const StreamDescriptorList& stream_descriptors,
if (stream_iter->input != previous_input) { if (stream_iter->input != previous_input) {
// New remux job needed. Create demux and job thread. // New remux job needed. Create demux and job thread.
std::unique_ptr<Demuxer> demuxer(new Demuxer(stream_iter->input)); demuxer = std::make_shared<Demuxer>(stream_iter->input);
demuxer->set_dump_stream_info( demuxer->set_dump_stream_info(
packaging_params.test_params.dump_stream_info); packaging_params.test_params.dump_stream_info);
if (packaging_params.decryption_params.key_provider != if (packaging_params.decryption_params.key_provider !=
@ -381,14 +389,14 @@ bool CreateRemuxJobs(const StreamDescriptorList& stream_descriptors,
return false; return false;
demuxer->SetKeySource(std::move(decryption_key_source)); demuxer->SetKeySource(std::move(decryption_key_source));
} }
remux_jobs->emplace_back(new RemuxJob(std::move(demuxer))); jobs->emplace_back(new media::Job("RemuxJob", demuxer));
trick_play_handler.reset(); trick_play_handler.reset();
previous_input = stream_iter->input; previous_input = stream_iter->input;
// Skip setting up muxers if output is not needed. // Skip setting up muxers if output is not needed.
if (stream_iter->output.empty() && stream_iter->segment_template.empty()) if (stream_iter->output.empty() && stream_iter->segment_template.empty())
continue; continue;
} }
DCHECK(!remux_jobs->empty()); DCHECK(!jobs->empty());
// Each stream selector requires an individual trick play handler. // Each stream selector requires an individual trick play handler.
// E.g., an input with two video streams needs two trick play handlers. // E.g., an input with two video streams needs two trick play handlers.
@ -485,7 +493,6 @@ bool CreateRemuxJobs(const StreamDescriptorList& stream_descriptors,
handlers.push_back(std::move(muxer)); handlers.push_back(std::move(muxer));
} }
auto* demuxer = remux_jobs->back()->demuxer();
const std::string& stream_selector = stream_iter->stream_selector; const std::string& stream_selector = stream_iter->stream_selector;
status.Update(demuxer->SetHandler(stream_selector, chunking_handler)); status.Update(demuxer->SetHandler(stream_selector, chunking_handler));
status.Update(ConnectHandlers(handlers)); status.Update(ConnectHandlers(handlers));
@ -499,19 +506,19 @@ bool CreateRemuxJobs(const StreamDescriptorList& stream_descriptors,
} }
// Initialize processing graph. // Initialize processing graph.
for (const std::unique_ptr<RemuxJob>& job : *remux_jobs) { for (const std::unique_ptr<Job>& job : *jobs) {
Status status = job->demuxer()->Initialize(); job->Initialize();
if (!status.ok()) { if (!job->status().ok()) {
LOG(ERROR) << "Failed to initialize processing graph " << status; LOG(ERROR) << "Failed to initialize processing graph " << job->status();
return false; return false;
} }
} }
return true; return true;
} }
Status RunRemuxJobs(const std::vector<std::unique_ptr<RemuxJob>>& remux_jobs) { Status RunRemuxJobs(const std::vector<std::unique_ptr<Job>>& jobs) {
// Start the job threads. // Start the job threads.
for (const std::unique_ptr<RemuxJob>& job : remux_jobs) for (const std::unique_ptr<Job>& job : jobs)
job->Start(); job->Start();
// Wait for all jobs to complete or an error occurs. // Wait for all jobs to complete or an error occurs.
@ -519,7 +526,7 @@ Status RunRemuxJobs(const std::vector<std::unique_ptr<RemuxJob>>& remux_jobs) {
bool all_joined; bool all_joined;
do { do {
all_joined = true; all_joined = true;
for (const std::unique_ptr<RemuxJob>& job : remux_jobs) { for (const std::unique_ptr<Job>& job : jobs) {
if (job->HasBeenJoined()) { if (job->HasBeenJoined()) {
status = job->status(); status = job->status();
if (!status.ok()) if (!status.ok())
@ -560,7 +567,7 @@ struct Packager::PackagerInternal {
std::unique_ptr<KeySource> encryption_key_source; std::unique_ptr<KeySource> encryption_key_source;
std::unique_ptr<MpdNotifier> mpd_notifier; std::unique_ptr<MpdNotifier> mpd_notifier;
std::unique_ptr<hls::HlsNotifier> hls_notifier; std::unique_ptr<hls::HlsNotifier> hls_notifier;
std::vector<std::unique_ptr<media::RemuxJob>> remux_jobs; std::vector<std::unique_ptr<media::Job>> jobs;
}; };
Packager::Packager() {} Packager::Packager() {}
@ -646,7 +653,7 @@ Status Packager::Initialize(
stream_descriptor_list, packaging_params, chunking_options, stream_descriptor_list, packaging_params, chunking_options,
encryption_options, muxer_options, &internal->fake_clock, encryption_options, muxer_options, &internal->fake_clock,
internal->encryption_key_source.get(), internal->mpd_notifier.get(), internal->encryption_key_source.get(), internal->mpd_notifier.get(),
internal->hls_notifier.get(), &internal->remux_jobs)) { internal->hls_notifier.get(), &internal->jobs)) {
return Status(error::INVALID_ARGUMENT, "Failed to create remux jobs."); return Status(error::INVALID_ARGUMENT, "Failed to create remux jobs.");
} }
internal_ = std::move(internal); internal_ = std::move(internal);
@ -656,7 +663,7 @@ Status Packager::Initialize(
Status Packager::Run() { Status Packager::Run() {
if (!internal_) if (!internal_)
return Status(error::INVALID_ARGUMENT, "Not yet initialized."); return Status(error::INVALID_ARGUMENT, "Not yet initialized.");
Status status = media::RunRemuxJobs(internal_->remux_jobs); Status status = media::RunRemuxJobs(internal_->jobs);
if (!status.ok()) if (!status.ok())
return status; return status;
@ -676,8 +683,8 @@ void Packager::Cancel() {
LOG(INFO) << "Not yet initialized. Return directly."; LOG(INFO) << "Not yet initialized. Return directly.";
return; return;
} }
for (const std::unique_ptr<media::RemuxJob>& job : internal_->remux_jobs) for (const std::unique_ptr<media::Job>& job : internal_->jobs)
job->demuxer()->Cancel(); job->Cancel();
} }
std::string Packager::GetLibraryVersion() { std::string Packager::GetLibraryVersion() {