shaka-packager/packager/app/job_manager.cc

134 lines
3.0 KiB
C++

// Copyright 2017 Google LLC. All rights reserved.
//
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file or at
// https://developers.google.com/open-source/licenses/bsd
#include "packager/app/job_manager.h"
#include <set>
#include "packager/media/chunking/sync_point_queue.h"
#include "packager/media/origin/origin_handler.h"
namespace shaka {
namespace media {
Job::Job(const std::string& name,
std::shared_ptr<OriginHandler> work,
OnCompleteFunction on_complete)
: name_(name),
work_(std::move(work)),
on_complete_(on_complete),
status_(error::Code::UNKNOWN, "Job uninitialized") {
DCHECK(work_);
}
const Status& Job::Initialize() {
status_ = work_->Initialize();
return status_;
}
void Job::Start() {
thread_.reset(new std::thread(&Job::Run, this));
}
void Job::Cancel() {
work_->Cancel();
}
const Status& Job::Run() {
if (status_.ok()) // initialized correctly
status_ = work_->Run();
on_complete_(this);
return status_;
}
void Job::Join() {
if (thread_)
thread_->join();
}
JobManager::JobManager(std::unique_ptr<SyncPointQueue> sync_points)
: sync_points_(std::move(sync_points)) {}
void JobManager::Add(const std::string& name,
std::shared_ptr<OriginHandler> handler) {
jobs_.emplace_back(new Job(
name, std::move(handler),
std::bind(&JobManager::OnJobComplete, this, std::placeholders::_1)));
}
Status JobManager::InitializeJobs() {
Status status;
for (auto& job : jobs_)
status.Update(job->Initialize());
return status;
}
Status JobManager::RunJobs() {
std::set<Job*> active_jobs;
// Start every job and add it to the active jobs list so that we can wait
// on each one.
for (auto& job : jobs_) {
job->Start();
active_jobs.insert(job.get());
}
// Wait for all jobs to complete or any job to error.
Status status;
{
absl::MutexLock lock(&mutex_);
while (status.ok() && active_jobs.size()) {
// any_job_complete_ is protected by mutex_.
any_job_complete_.Wait(&mutex_);
// complete_ is protected by mutex_.
for (const auto& entry : complete_) {
Job* job = entry.first;
bool complete = entry.second;
if (complete) {
job->Join();
status.Update(job->status());
active_jobs.erase(job);
}
}
}
}
// If the main loop has exited and there are still jobs running,
// we need to cancel them and clean-up.
if (sync_points_)
sync_points_->Cancel();
for (auto& job : active_jobs)
job->Cancel();
for (auto& job : active_jobs)
job->Join();
return status;
}
void JobManager::OnJobComplete(Job* job) {
absl::MutexLock lock(&mutex_);
// These are both protected by mutex_.
complete_[job] = true;
any_job_complete_.Signal();
}
void JobManager::CancelJobs() {
if (sync_points_)
sync_points_->Cancel();
for (auto& job : jobs_)
job->Cancel();
}
} // namespace media
} // namespace shaka