Isolate Job Management

Moved all code dealing with jobs into its own class so that the
packager code does not need to worry about initializing, running,
or stopping jobs.

Change-Id: I3e9ef1f22bd93d671f77d59ad15f23d1239078cf
This commit is contained in:
Aaron Vaage 2017-12-12 13:05:12 -08:00
parent 2d025dae2d
commit dedbd8b724
4 changed files with 222 additions and 106 deletions

107
packager/app/job_manager.cc Normal file
View File

@ -0,0 +1,107 @@
// Copyright 2017 Google Inc. All rights reserved.
//
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file or at
// https://developers.google.com/open-source/licenses/bsd
#include "packager/app/job_manager.h"
#include "packager/app/libcrypto_threading.h"
#include "packager/media/origin/origin_handler.h"
namespace shaka {
namespace media {
Job::Job(const std::string& name, std::shared_ptr<OriginHandler> work)
: SimpleThread(name),
work_(work),
wait_(base::WaitableEvent::ResetPolicy::MANUAL,
base::WaitableEvent::InitialState::NOT_SIGNALED) {
DCHECK(work);
}
void Job::Initialize() {
status_ = work_->Initialize();
}
void Job::Cancel() {
work_->Cancel();
}
void Job::Run() {
status_ = work_->Run();
wait_.Signal();
}
void JobManager::Add(const std::string& name,
std::shared_ptr<OriginHandler> handler) {
jobs_.emplace_back(new Job(name, std::move(handler)));
}
Status JobManager::InitializeJobs() {
Status status;
for (auto& job : jobs_) {
job->Initialize();
status.Update(job->status());
}
return status;
}
Status JobManager::RunJobs() {
// We need to store the jobs and the waits separately in order to use the
// |WaitMany| function. |WaitMany| takes an array of WaitableEvents but we
// need to access the jobs in order to join the thread and check the status.
// The indexes needs to be check in sync or else we won't be able to relate a
// WaitableEvent back to the job.
std::vector<Job*> active_jobs;
std::vector<base::WaitableEvent*> active_waits;
// Start every job and add it to the active jobs list so that we can wait
// on each one.
for (auto& job : jobs_) {
job->Start();
active_jobs.push_back(job.get());
active_waits.push_back(job->wait());
}
// Wait for all jobs to complete or an error occurs.
Status status;
while (status.ok() && active_jobs.size()) {
// Wait for an event to finish and then update our status so that we can
// quit if something has gone wrong.
const size_t done =
base::WaitableEvent::WaitMany(active_waits.data(), active_waits.size());
Job* job = active_jobs[done];
job->Join();
status.Update(job->status());
// Remove the job and the wait from our tracking.
active_jobs.erase(active_jobs.begin() + done);
active_waits.erase(active_waits.begin() + done);
}
// If the main loop has exited and there are still jobs running,
// we need to cancel them and clean-up.
for (auto& job : active_jobs) {
job->Cancel();
}
for (auto& job : active_jobs) {
job->Join();
}
return status;
}
void JobManager::CancelJobs() {
for (auto& job : jobs_) {
job->Cancel();
}
}
} // namespace media
} // namespace shaka

View File

@ -0,0 +1,93 @@
// Copyright 2017 Google Inc. All rights reserved.
//
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file or at
// https://developers.google.com/open-source/licenses/bsd
#ifndef PACKAGER_APP_JOB_MANAGER_H_
#define PACKAGER_APP_JOB_MANAGER_H_
#include <memory>
#include <vector>
#include "packager/base/threading/simple_thread.h"
#include "packager/status.h"
namespace shaka {
namespace media {
class OriginHandler;
// A job is a single line of work that is expected to run in parallel with
// other jobs.
class Job : public base::SimpleThread {
public:
Job(const std::string& name, std::shared_ptr<OriginHandler> work);
// Initialize the chain of handlers that make up this job. This only
// initializes the handlers, it does not execute the job. If
// initialization fails, |status| will return a non-ok status.
void Initialize();
// Request that the job stops executing. This is only a request and
// will not block. If you want to wait for the job to complete, use
// |wait|.
void Cancel();
// Get the current status of the job. If the job failed to initialize
// or encountered an error during execution this will return the error.
const Status& status() const { return status_; }
// If you want to wait for this job to complete, this will return the
// WaitableEvent you can wait on.
base::WaitableEvent* wait() { return &wait_; }
private:
Job(const Job&) = delete;
Job& operator=(const Job&) = delete;
void Run() override;
std::shared_ptr<OriginHandler> work_;
Status status_;
base::WaitableEvent wait_;
};
// Similar to a thread pool, JobManager manages multiple jobs that are expected
// to run in parallel. It can be used to register, run, and stop a batch of
// jobs.
class JobManager {
public:
JobManager() = default;
// Create a new job entry by specifying the origin handler at the top of the
// chain and a name for the thread. This will only register the job. To start
// the job, you need to call |RunJobs|.
void Add(const std::string& name, std::shared_ptr<OriginHandler> handler);
// Initialize all registered jobs. If any job fails to initialize, this will
// return the error and it will not be safe to call |RunJobs| as not all jobs
// will be properly initialized.
Status InitializeJobs();
// Run all registered jobs. Before calling this make sure that
// |InitializedJobs| returned |Status::OK|. This call is blocking and will
// block until all jobs exit.
Status RunJobs();
// Ask all jobs to stop running. This call is non-blocking and can be used to
// unblock a call to |RunJobs|.
void CancelJobs();
private:
JobManager(const JobManager&) = delete;
JobManager& operator=(const JobManager&) = delete;
std::vector<std::unique_ptr<Job>> jobs_;
};
} // namespace media
} // namespace shaka
#endif // PACKAGER_APP_JOB_MANAGER_H_

View File

@ -8,6 +8,7 @@
#include <algorithm> #include <algorithm>
#include "packager/app/job_manager.h"
#include "packager/app/libcrypto_threading.h" #include "packager/app/libcrypto_threading.h"
#include "packager/app/packager_util.h" #include "packager/app/packager_util.h"
#include "packager/app/stream_descriptor.h" #include "packager/app/stream_descriptor.h"
@ -238,44 +239,6 @@ class FakeClock : public base::Clock {
base::Time Now() override { return base::Time(); } base::Time Now() override { return base::Time(); }
}; };
class Job : public base::SimpleThread {
public:
Job(const std::string& name, std::shared_ptr<OriginHandler> work)
: SimpleThread(name),
work_(work),
wait_(base::WaitableEvent::ResetPolicy::MANUAL,
base::WaitableEvent::InitialState::NOT_SIGNALED) {}
void Initialize() {
DCHECK(work_);
status_ = work_->Initialize();
}
void Cancel() {
DCHECK(work_);
work_->Cancel();
}
const Status& status() const { return status_; }
base::WaitableEvent* wait() { return &wait_; }
private:
Job(const Job&) = delete;
Job& operator=(const Job&) = delete;
void Run() override {
DCHECK(work_);
status_ = work_->Run();
wait_.Signal();
}
std::shared_ptr<OriginHandler> work_;
Status status_;
base::WaitableEvent wait_;
};
bool StreamInfoToTextMediaInfo(const StreamDescriptor& stream_descriptor, bool StreamInfoToTextMediaInfo(const StreamDescriptor& stream_descriptor,
MediaInfo* text_media_info) { MediaInfo* text_media_info) {
const std::string& language = stream_descriptor.language; const std::string& language = stream_descriptor.language;
@ -448,7 +411,9 @@ Status CreateTextJobs(
const std::vector<std::reference_wrapper<const StreamDescriptor>>& streams, const std::vector<std::reference_wrapper<const StreamDescriptor>>& streams,
const PackagingParams& packaging_params, const PackagingParams& packaging_params,
MpdNotifier* mpd_notifier, MpdNotifier* mpd_notifier,
std::vector<std::unique_ptr<Job>>* jobs) { JobManager* job_manager) {
DCHECK(job_manager);
for (const StreamDescriptor& stream : streams) { for (const StreamDescriptor& stream : streams) {
const MediaContainerName output_format = GetOutputFormat(stream); const MediaContainerName output_format = GetOutputFormat(stream);
@ -491,8 +456,8 @@ Status CreateAudioVideoJobs(
KeySource* encryption_key_source, KeySource* encryption_key_source,
MpdNotifier* mpd_notifier, MpdNotifier* mpd_notifier,
hls::HlsNotifier* hls_notifier, hls::HlsNotifier* hls_notifier,
std::vector<std::unique_ptr<Job>>* jobs) { JobManager* job_manager) {
DCHECK(jobs); DCHECK(job_manager);
// Demuxers are shared among all streams with the same input. // Demuxers are shared among all streams with the same input.
std::shared_ptr<Demuxer> demuxer; std::shared_ptr<Demuxer> demuxer;
@ -528,7 +493,7 @@ Status CreateAudioVideoJobs(
demuxer->SetKeySource(std::move(decryption_key_source)); demuxer->SetKeySource(std::move(decryption_key_source));
} }
jobs->emplace_back(new media::Job("RemuxJob", demuxer)); job_manager->Add("RemuxJob", demuxer);
} }
if (!stream.language.empty()) { if (!stream.language.empty()) {
@ -627,8 +592,8 @@ Status CreateAllJobs(const std::vector<StreamDescriptor>& stream_descriptors,
KeySource* encryption_key_source, KeySource* encryption_key_source,
MpdNotifier* mpd_notifier, MpdNotifier* mpd_notifier,
hls::HlsNotifier* hls_notifier, hls::HlsNotifier* hls_notifier,
std::vector<std::unique_ptr<Job>>* jobs) { JobManager* job_manager) {
DCHECK(jobs); DCHECK(job_manager);
// Group all streams based on which pipeline they will use. // Group all streams based on which pipeline they will use.
std::vector<std::reference_wrapper<const StreamDescriptor>> text_streams; std::vector<std::reference_wrapper<const StreamDescriptor>> text_streams;
@ -653,72 +618,21 @@ Status CreateAllJobs(const std::vector<StreamDescriptor>& stream_descriptors,
Status status; Status status;
status.Update( status.Update(CreateTextJobs(text_streams, packaging_params, mpd_notifier,
CreateTextJobs(text_streams, packaging_params, mpd_notifier, jobs)); job_manager));
int stream_number = text_streams.size(); int stream_number = text_streams.size();
status.Update(CreateAudioVideoJobs( status.Update(CreateAudioVideoJobs(
stream_number, audio_video_streams, packaging_params, fake_clock, stream_number, audio_video_streams, packaging_params, fake_clock,
encryption_key_source, mpd_notifier, hls_notifier, jobs)); encryption_key_source, mpd_notifier, hls_notifier, job_manager));
if (!status.ok()) { if (!status.ok()) {
return status; return status;
} }
// Initialize processing graph. // Initialize processing graph.
for (const std::unique_ptr<Job>& job : *jobs) { status.Update(job_manager->InitializeJobs());
job->Initialize();
status.Update(job->status());
}
return status;
}
Status RunJobs(const std::vector<std::unique_ptr<Job>>& jobs) {
// We need to store the jobs and the waits separately in order to use the
// |WaitMany| function. |WaitMany| takes an array of WaitableEvents but we
// need to access the jobs in order to join the thread and check the status.
// The indexes needs to be check in sync or else we won't be able to relate a
// WaitableEvent back to the job.
std::vector<Job*> active_jobs;
std::vector<base::WaitableEvent*> active_waits;
// Start every job and add it to the active jobs list so that we can wait
// on each one.
for (auto& job : jobs) {
job->Start();
active_jobs.push_back(job.get());
active_waits.push_back(job->wait());
}
// Wait for all jobs to complete or an error occurs.
Status status;
while (status.ok() && active_jobs.size()) {
// Wait for an event to finish and then update our status so that we can
// quit if something has gone wrong.
const size_t done =
base::WaitableEvent::WaitMany(active_waits.data(), active_waits.size());
Job* job = active_jobs[done];
job->Join();
status.Update(job->status());
// Remove the job and the wait from our tracking.
active_jobs.erase(active_jobs.begin() + done);
active_waits.erase(active_waits.begin() + done);
}
// If the main loop has exited and there are still jobs running,
// we need to cancel them and clean-up.
for (auto& job : active_jobs) {
job->Cancel();
}
for (auto& job : active_jobs) {
job->Join();
}
return status; return status;
} }
@ -731,8 +645,8 @@ struct Packager::PackagerInternal {
std::unique_ptr<KeySource> encryption_key_source; std::unique_ptr<KeySource> encryption_key_source;
std::unique_ptr<MpdNotifier> mpd_notifier; std::unique_ptr<MpdNotifier> mpd_notifier;
std::unique_ptr<hls::HlsNotifier> hls_notifier; std::unique_ptr<hls::HlsNotifier> hls_notifier;
std::vector<std::unique_ptr<media::Job>> jobs;
BufferCallbackParams buffer_callback_params; BufferCallbackParams buffer_callback_params;
media::JobManager job_manager;
}; };
Packager::Packager() {} Packager::Packager() {}
@ -848,7 +762,7 @@ Status Packager::Initialize(
Status status = media::CreateAllJobs( Status status = media::CreateAllJobs(
streams_for_jobs, packaging_params, &internal->fake_clock, streams_for_jobs, packaging_params, &internal->fake_clock,
internal->encryption_key_source.get(), internal->mpd_notifier.get(), internal->encryption_key_source.get(), internal->mpd_notifier.get(),
internal->hls_notifier.get(), &internal->jobs); internal->hls_notifier.get(), &internal->job_manager);
if (!status.ok()) { if (!status.ok()) {
return status; return status;
@ -861,7 +775,8 @@ Status Packager::Initialize(
Status Packager::Run() { Status Packager::Run() {
if (!internal_) if (!internal_)
return Status(error::INVALID_ARGUMENT, "Not yet initialized."); return Status(error::INVALID_ARGUMENT, "Not yet initialized.");
Status status = media::RunJobs(internal_->jobs);
Status status = internal_->job_manager.RunJobs();
if (!status.ok()) if (!status.ok())
return status; return status;
@ -881,8 +796,7 @@ void Packager::Cancel() {
LOG(INFO) << "Not yet initialized. Return directly."; LOG(INFO) << "Not yet initialized. Return directly.";
return; return;
} }
for (const std::unique_ptr<media::Job>& job : internal_->jobs) internal_->job_manager.CancelJobs();
job->Cancel();
} }
std::string Packager::GetLibraryVersion() { std::string Packager::GetLibraryVersion() {

View File

@ -13,13 +13,15 @@
'target_name': 'libpackager', 'target_name': 'libpackager',
'type': '<(libpackager_type)', 'type': '<(libpackager_type)',
'sources': [ 'sources': [
'packager.cc',
'packager.h',
# TODO(kqyang): Clean up the file path. # TODO(kqyang): Clean up the file path.
'app/job_manager.cc',
'app/job_manager.h',
'app/libcrypto_threading.cc', 'app/libcrypto_threading.cc',
'app/libcrypto_threading.h', 'app/libcrypto_threading.h',
'app/packager_util.cc', 'app/packager_util.cc',
'app/packager_util.h', 'app/packager_util.h',
'packager.cc',
'packager.h',
], ],
'dependencies': [ 'dependencies': [
'file/file.gyp:file', 'file/file.gyp:file',