diff --git a/packager/app/job_manager.cc b/packager/app/job_manager.cc index 576566a302..df170d3ede 100644 --- a/packager/app/job_manager.cc +++ b/packager/app/job_manager.cc @@ -7,6 +7,7 @@ #include "packager/app/job_manager.h" #include "packager/app/libcrypto_threading.h" +#include "packager/media/chunking/sync_point_queue.h" #include "packager/media/origin/origin_handler.h" namespace shaka { @@ -29,6 +30,9 @@ void Job::Run() { wait_.Signal(); } +JobManager::JobManager(std::unique_ptr sync_points) + : sync_points_(std::move(sync_points)) {} + void JobManager::Add(const std::string& name, std::shared_ptr handler) { // Stores Job entries for delayed construction of Job objects, to avoid @@ -87,6 +91,8 @@ Status JobManager::RunJobs() { // If the main loop has exited and there are still jobs running, // we need to cancel them and clean-up. + if (sync_points_) + sync_points_->Cancel(); for (auto& job : active_jobs) { job->Cancel(); } @@ -99,6 +105,8 @@ Status JobManager::RunJobs() { } void JobManager::CancelJobs() { + if (sync_points_) + sync_points_->Cancel(); for (auto& job : jobs_) { job->Cancel(); } diff --git a/packager/app/job_manager.h b/packager/app/job_manager.h index 258f85dee5..09a7c20554 100644 --- a/packager/app/job_manager.h +++ b/packager/app/job_manager.h @@ -17,6 +17,7 @@ namespace shaka { namespace media { class OriginHandler; +class SyncPointQueue; // A job is a single line of work that is expected to run in parallel with // other jobs. @@ -54,7 +55,10 @@ class Job : public base::SimpleThread { // jobs. class JobManager { public: - JobManager() = default; + // @param sync_points is an optional SyncPointQueue used to synchronize and + // align cue points. JobManager cancels @a sync_points when any job + // fails or is cancelled. It can be NULL. + explicit JobManager(std::unique_ptr sync_points); // Create a new job entry by specifying the origin handler at the top of the // chain and a name for the thread. This will only register the job. To start @@ -75,6 +79,8 @@ class JobManager { // unblock a call to |RunJobs|. void CancelJobs(); + SyncPointQueue* sync_points() { return sync_points_.get(); } + private: JobManager(const JobManager&) = delete; JobManager& operator=(const JobManager&) = delete; @@ -86,6 +92,9 @@ class JobManager { // Stores Job entries for delayed construction of Job object. std::vector job_entries_; std::vector> jobs_; + // Stored in JobManager so JobManager can cancel |sync_points| when any job + // fails or is cancelled. + std::unique_ptr sync_points_; }; } // namespace media diff --git a/packager/media/chunking/cue_alignment_handler.cc b/packager/media/chunking/cue_alignment_handler.cc index 26b7c5d73d..e1d17bb079 100644 --- a/packager/media/chunking/cue_alignment_handler.cc +++ b/packager/media/chunking/cue_alignment_handler.cc @@ -170,7 +170,10 @@ Status CueAlignmentHandler::OnSample(std::unique_ptr sample) { std::shared_ptr next_sync = sync_points_->GetNext(next_cue_hint); - DCHECK(next_sync); + if (!next_sync) { + // This happens only if the job is cancelled. + return Status(error::CANCELLED, "SyncPointQueue is cancelled."); + } Status status = UseNewSyncPoint(next_sync); if (!status.ok()) { diff --git a/packager/media/chunking/sync_point_queue.cc b/packager/media/chunking/sync_point_queue.cc index f8c4ce820f..58ebfb2621 100644 --- a/packager/media/chunking/sync_point_queue.cc +++ b/packager/media/chunking/sync_point_queue.cc @@ -27,6 +27,14 @@ void SyncPointQueue::AddThread() { thread_count_++; } +void SyncPointQueue::Cancel() { + { + base::AutoLock auto_lock(lock_); + cancelled_ = true; + } + sync_condition_.Broadcast(); +} + double SyncPointQueue::GetHint(double time_in_seconds) { base::AutoLock auto_lock(lock_); @@ -46,7 +54,7 @@ double SyncPointQueue::GetHint(double time_in_seconds) { std::shared_ptr SyncPointQueue::GetNext( double hint_in_seconds) { base::AutoLock auto_lock(lock_); - while (true) { + while (!cancelled_) { // Find the promoted cue that would line up with our hint, which is the // first cue that is not less than |hint_in_seconds|. auto iter = promoted_.lower_bound(hint_in_seconds); @@ -70,6 +78,7 @@ std::shared_ptr SyncPointQueue::GetNext( sync_condition_.Wait(); waiting_thread_count_--; } + return nullptr; } std::shared_ptr SyncPointQueue::PromoteAt( diff --git a/packager/media/chunking/sync_point_queue.h b/packager/media/chunking/sync_point_queue.h index 30c0ae7d7e..a8ccb93618 100644 --- a/packager/media/chunking/sync_point_queue.h +++ b/packager/media/chunking/sync_point_queue.h @@ -26,6 +26,9 @@ class SyncPointQueue { /// order to keep track of its clients. void AddThread(); + /// Cancel the queue and unblock all threads. + void Cancel(); + /// @return A hint for when the next cue event would be. The returned hint is /// not less than @a time_in_seconds. The actual time for the next cue /// event will not be less than the returned hint, with the exact @@ -37,7 +40,7 @@ class SyncPointQueue { /// after @a hint_in_seconds has been promoted, this will block until /// either a cue is promoted or all threads are blocked (in which /// case, the unpromoted cue at @a hint_in_seconds will be - /// self-promoted and returned). + /// self-promoted and returned) or Cancel() is called. std::shared_ptr GetNext(double hint_in_seconds); /// Promote the first cue that is not greater than @a time_in_seconds. All @@ -56,6 +59,7 @@ class SyncPointQueue { base::ConditionVariable sync_condition_; size_t thread_count_ = 0; size_t waiting_thread_count_ = 0; + bool cancelled_ = false; std::map> unpromoted_; std::map> promoted_; diff --git a/packager/packager.cc b/packager/packager.cc index 2afa7b7a58..2a324fc481 100644 --- a/packager/packager.cc +++ b/packager/packager.cc @@ -54,6 +54,7 @@ namespace shaka { // TODO(kqyang): Clean up namespaces. using media::Demuxer; +using media::JobManager; using media::KeySource; using media::MuxerOptions; using media::SyncPointQueue; @@ -785,9 +786,8 @@ struct Packager::PackagerInternal { std::unique_ptr encryption_key_source; std::unique_ptr mpd_notifier; std::unique_ptr hls_notifier; - std::unique_ptr sync_points; BufferCallbackParams buffer_callback_params; - media::JobManager job_manager; + std::unique_ptr job_manager; }; Packager::Packager() {} @@ -857,10 +857,12 @@ Status Packager::Initialize( internal->hls_notifier.reset(new hls::SimpleHlsNotifier(hls_params)); } + std::unique_ptr sync_points; if (!packaging_params.ad_cue_generator_params.cue_points.empty()) { - internal->sync_points.reset( + sync_points.reset( new SyncPointQueue(packaging_params.ad_cue_generator_params)); } + internal->job_manager.reset(new JobManager(std::move(sync_points))); std::vector streams_for_jobs; @@ -904,8 +906,9 @@ Status Packager::Initialize( Status status = media::CreateAllJobs( streams_for_jobs, packaging_params, internal->mpd_notifier.get(), - internal->encryption_key_source.get(), internal->sync_points.get(), - &muxer_listener_factory, &muxer_factory, &internal->job_manager); + internal->encryption_key_source.get(), + internal->job_manager->sync_points(), &muxer_listener_factory, + &muxer_factory, internal->job_manager.get()); if (!status.ok()) { return status; @@ -919,7 +922,7 @@ Status Packager::Run() { if (!internal_) return Status(error::INVALID_ARGUMENT, "Not yet initialized."); - Status status = internal_->job_manager.RunJobs(); + Status status = internal_->job_manager->RunJobs(); if (!status.ok()) return status; @@ -939,7 +942,7 @@ void Packager::Cancel() { LOG(INFO) << "Not yet initialized. Return directly."; return; } - internal_->job_manager.CancelJobs(); + internal_->job_manager->CancelJobs(); } std::string Packager::GetLibraryVersion() {