From dedbd8b724eea1231d87df5eed52da1181a72dc2 Mon Sep 17 00:00:00 2001 From: Aaron Vaage Date: Tue, 12 Dec 2017 13:05:12 -0800 Subject: [PATCH] Isolate Job Management Moved all code dealing with jobs into its own class so that the packager code does not need to worry about initializing, running, or stopping jobs. Change-Id: I3e9ef1f22bd93d671f77d59ad15f23d1239078cf --- packager/app/job_manager.cc | 107 +++++++++++++++++++++++++++++++ packager/app/job_manager.h | 93 +++++++++++++++++++++++++++ packager/packager.cc | 122 ++++++------------------------------ packager/packager.gyp | 6 +- 4 files changed, 222 insertions(+), 106 deletions(-) create mode 100644 packager/app/job_manager.cc create mode 100644 packager/app/job_manager.h diff --git a/packager/app/job_manager.cc b/packager/app/job_manager.cc new file mode 100644 index 0000000000..f20c0fe836 --- /dev/null +++ b/packager/app/job_manager.cc @@ -0,0 +1,107 @@ +// Copyright 2017 Google Inc. All rights reserved. +// +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file or at +// https://developers.google.com/open-source/licenses/bsd + +#include "packager/app/job_manager.h" + +#include "packager/app/libcrypto_threading.h" +#include "packager/media/origin/origin_handler.h" + +namespace shaka { +namespace media { + +Job::Job(const std::string& name, std::shared_ptr work) + : SimpleThread(name), + work_(work), + wait_(base::WaitableEvent::ResetPolicy::MANUAL, + base::WaitableEvent::InitialState::NOT_SIGNALED) { + DCHECK(work); +} + +void Job::Initialize() { + status_ = work_->Initialize(); +} + +void Job::Cancel() { + work_->Cancel(); +} + +void Job::Run() { + status_ = work_->Run(); + wait_.Signal(); +} + +void JobManager::Add(const std::string& name, + std::shared_ptr handler) { + jobs_.emplace_back(new Job(name, std::move(handler))); +} + +Status JobManager::InitializeJobs() { + Status status; + + for (auto& job : jobs_) { + job->Initialize(); + status.Update(job->status()); + } + + return status; +} + +Status JobManager::RunJobs() { + // We need to store the jobs and the waits separately in order to use the + // |WaitMany| function. |WaitMany| takes an array of WaitableEvents but we + // need to access the jobs in order to join the thread and check the status. + // The indexes needs to be check in sync or else we won't be able to relate a + // WaitableEvent back to the job. + std::vector active_jobs; + std::vector active_waits; + + // Start every job and add it to the active jobs list so that we can wait + // on each one. + for (auto& job : jobs_) { + job->Start(); + + active_jobs.push_back(job.get()); + active_waits.push_back(job->wait()); + } + + // Wait for all jobs to complete or an error occurs. + Status status; + while (status.ok() && active_jobs.size()) { + // Wait for an event to finish and then update our status so that we can + // quit if something has gone wrong. + const size_t done = + base::WaitableEvent::WaitMany(active_waits.data(), active_waits.size()); + Job* job = active_jobs[done]; + + job->Join(); + status.Update(job->status()); + + // Remove the job and the wait from our tracking. + active_jobs.erase(active_jobs.begin() + done); + active_waits.erase(active_waits.begin() + done); + } + + // If the main loop has exited and there are still jobs running, + // we need to cancel them and clean-up. + for (auto& job : active_jobs) { + job->Cancel(); + } + + for (auto& job : active_jobs) { + job->Join(); + } + + return status; +} + +void JobManager::CancelJobs() { + for (auto& job : jobs_) { + job->Cancel(); + } +} + +} // namespace media +} // namespace shaka diff --git a/packager/app/job_manager.h b/packager/app/job_manager.h new file mode 100644 index 0000000000..7201f18246 --- /dev/null +++ b/packager/app/job_manager.h @@ -0,0 +1,93 @@ +// Copyright 2017 Google Inc. All rights reserved. +// +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file or at +// https://developers.google.com/open-source/licenses/bsd + +#ifndef PACKAGER_APP_JOB_MANAGER_H_ +#define PACKAGER_APP_JOB_MANAGER_H_ + +#include +#include + +#include "packager/base/threading/simple_thread.h" +#include "packager/status.h" + +namespace shaka { +namespace media { + +class OriginHandler; + +// A job is a single line of work that is expected to run in parallel with +// other jobs. +class Job : public base::SimpleThread { + public: + Job(const std::string& name, std::shared_ptr work); + + // Initialize the chain of handlers that make up this job. This only + // initializes the handlers, it does not execute the job. If + // initialization fails, |status| will return a non-ok status. + void Initialize(); + + // Request that the job stops executing. This is only a request and + // will not block. If you want to wait for the job to complete, use + // |wait|. + void Cancel(); + + // Get the current status of the job. If the job failed to initialize + // or encountered an error during execution this will return the error. + const Status& status() const { return status_; } + + // If you want to wait for this job to complete, this will return the + // WaitableEvent you can wait on. + base::WaitableEvent* wait() { return &wait_; } + + private: + Job(const Job&) = delete; + Job& operator=(const Job&) = delete; + + void Run() override; + + std::shared_ptr work_; + Status status_; + + base::WaitableEvent wait_; +}; + +// Similar to a thread pool, JobManager manages multiple jobs that are expected +// to run in parallel. It can be used to register, run, and stop a batch of +// jobs. +class JobManager { + public: + JobManager() = default; + + // Create a new job entry by specifying the origin handler at the top of the + // chain and a name for the thread. This will only register the job. To start + // the job, you need to call |RunJobs|. + void Add(const std::string& name, std::shared_ptr handler); + + // Initialize all registered jobs. If any job fails to initialize, this will + // return the error and it will not be safe to call |RunJobs| as not all jobs + // will be properly initialized. + Status InitializeJobs(); + + // Run all registered jobs. Before calling this make sure that + // |InitializedJobs| returned |Status::OK|. This call is blocking and will + // block until all jobs exit. + Status RunJobs(); + + // Ask all jobs to stop running. This call is non-blocking and can be used to + // unblock a call to |RunJobs|. + void CancelJobs(); + + private: + JobManager(const JobManager&) = delete; + JobManager& operator=(const JobManager&) = delete; + + std::vector> jobs_; +}; + +} // namespace media +} // namespace shaka + +#endif // PACKAGER_APP_JOB_MANAGER_H_ diff --git a/packager/packager.cc b/packager/packager.cc index 26c785c39e..e6f3ea3bce 100644 --- a/packager/packager.cc +++ b/packager/packager.cc @@ -8,6 +8,7 @@ #include +#include "packager/app/job_manager.h" #include "packager/app/libcrypto_threading.h" #include "packager/app/packager_util.h" #include "packager/app/stream_descriptor.h" @@ -238,44 +239,6 @@ class FakeClock : public base::Clock { base::Time Now() override { return base::Time(); } }; -class Job : public base::SimpleThread { - public: - Job(const std::string& name, std::shared_ptr work) - : SimpleThread(name), - work_(work), - wait_(base::WaitableEvent::ResetPolicy::MANUAL, - base::WaitableEvent::InitialState::NOT_SIGNALED) {} - - void Initialize() { - DCHECK(work_); - status_ = work_->Initialize(); - } - - void Cancel() { - DCHECK(work_); - work_->Cancel(); - } - - const Status& status() const { return status_; } - - base::WaitableEvent* wait() { return &wait_; } - - private: - Job(const Job&) = delete; - Job& operator=(const Job&) = delete; - - void Run() override { - DCHECK(work_); - status_ = work_->Run(); - wait_.Signal(); - } - - std::shared_ptr work_; - Status status_; - - base::WaitableEvent wait_; -}; - bool StreamInfoToTextMediaInfo(const StreamDescriptor& stream_descriptor, MediaInfo* text_media_info) { const std::string& language = stream_descriptor.language; @@ -448,7 +411,9 @@ Status CreateTextJobs( const std::vector>& streams, const PackagingParams& packaging_params, MpdNotifier* mpd_notifier, - std::vector>* jobs) { + JobManager* job_manager) { + DCHECK(job_manager); + for (const StreamDescriptor& stream : streams) { const MediaContainerName output_format = GetOutputFormat(stream); @@ -491,8 +456,8 @@ Status CreateAudioVideoJobs( KeySource* encryption_key_source, MpdNotifier* mpd_notifier, hls::HlsNotifier* hls_notifier, - std::vector>* jobs) { - DCHECK(jobs); + JobManager* job_manager) { + DCHECK(job_manager); // Demuxers are shared among all streams with the same input. std::shared_ptr demuxer; @@ -528,7 +493,7 @@ Status CreateAudioVideoJobs( demuxer->SetKeySource(std::move(decryption_key_source)); } - jobs->emplace_back(new media::Job("RemuxJob", demuxer)); + job_manager->Add("RemuxJob", demuxer); } if (!stream.language.empty()) { @@ -627,8 +592,8 @@ Status CreateAllJobs(const std::vector& stream_descriptors, KeySource* encryption_key_source, MpdNotifier* mpd_notifier, hls::HlsNotifier* hls_notifier, - std::vector>* jobs) { - DCHECK(jobs); + JobManager* job_manager) { + DCHECK(job_manager); // Group all streams based on which pipeline they will use. std::vector> text_streams; @@ -653,72 +618,21 @@ Status CreateAllJobs(const std::vector& stream_descriptors, Status status; - status.Update( - CreateTextJobs(text_streams, packaging_params, mpd_notifier, jobs)); + status.Update(CreateTextJobs(text_streams, packaging_params, mpd_notifier, + job_manager)); int stream_number = text_streams.size(); status.Update(CreateAudioVideoJobs( stream_number, audio_video_streams, packaging_params, fake_clock, - encryption_key_source, mpd_notifier, hls_notifier, jobs)); + encryption_key_source, mpd_notifier, hls_notifier, job_manager)); if (!status.ok()) { return status; } // Initialize processing graph. - for (const std::unique_ptr& job : *jobs) { - job->Initialize(); - status.Update(job->status()); - } - - return status; -} - -Status RunJobs(const std::vector>& jobs) { - // We need to store the jobs and the waits separately in order to use the - // |WaitMany| function. |WaitMany| takes an array of WaitableEvents but we - // need to access the jobs in order to join the thread and check the status. - // The indexes needs to be check in sync or else we won't be able to relate a - // WaitableEvent back to the job. - std::vector active_jobs; - std::vector active_waits; - - // Start every job and add it to the active jobs list so that we can wait - // on each one. - for (auto& job : jobs) { - job->Start(); - - active_jobs.push_back(job.get()); - active_waits.push_back(job->wait()); - } - - // Wait for all jobs to complete or an error occurs. - Status status; - while (status.ok() && active_jobs.size()) { - // Wait for an event to finish and then update our status so that we can - // quit if something has gone wrong. - const size_t done = - base::WaitableEvent::WaitMany(active_waits.data(), active_waits.size()); - Job* job = active_jobs[done]; - - job->Join(); - status.Update(job->status()); - - // Remove the job and the wait from our tracking. - active_jobs.erase(active_jobs.begin() + done); - active_waits.erase(active_waits.begin() + done); - } - - // If the main loop has exited and there are still jobs running, - // we need to cancel them and clean-up. - for (auto& job : active_jobs) { - job->Cancel(); - } - - for (auto& job : active_jobs) { - job->Join(); - } + status.Update(job_manager->InitializeJobs()); return status; } @@ -731,8 +645,8 @@ struct Packager::PackagerInternal { std::unique_ptr encryption_key_source; std::unique_ptr mpd_notifier; std::unique_ptr hls_notifier; - std::vector> jobs; BufferCallbackParams buffer_callback_params; + media::JobManager job_manager; }; Packager::Packager() {} @@ -848,7 +762,7 @@ Status Packager::Initialize( Status status = media::CreateAllJobs( streams_for_jobs, packaging_params, &internal->fake_clock, internal->encryption_key_source.get(), internal->mpd_notifier.get(), - internal->hls_notifier.get(), &internal->jobs); + internal->hls_notifier.get(), &internal->job_manager); if (!status.ok()) { return status; @@ -861,7 +775,8 @@ Status Packager::Initialize( Status Packager::Run() { if (!internal_) return Status(error::INVALID_ARGUMENT, "Not yet initialized."); - Status status = media::RunJobs(internal_->jobs); + + Status status = internal_->job_manager.RunJobs(); if (!status.ok()) return status; @@ -881,8 +796,7 @@ void Packager::Cancel() { LOG(INFO) << "Not yet initialized. Return directly."; return; } - for (const std::unique_ptr& job : internal_->jobs) - job->Cancel(); + internal_->job_manager.CancelJobs(); } std::string Packager::GetLibraryVersion() { diff --git a/packager/packager.gyp b/packager/packager.gyp index 6d3696eb1f..2b010ce128 100644 --- a/packager/packager.gyp +++ b/packager/packager.gyp @@ -13,13 +13,15 @@ 'target_name': 'libpackager', 'type': '<(libpackager_type)', 'sources': [ - 'packager.cc', - 'packager.h', # TODO(kqyang): Clean up the file path. + 'app/job_manager.cc', + 'app/job_manager.h', 'app/libcrypto_threading.cc', 'app/libcrypto_threading.h', 'app/packager_util.cc', 'app/packager_util.h', + 'packager.cc', + 'packager.h', ], 'dependencies': [ 'file/file.gyp:file',