7 #include "packager/app/job_manager.h"
9 #include "packager/app/libcrypto_threading.h"
10 #include "packager/media/chunking/sync_point_queue.h"
11 #include "packager/media/origin/origin_handler.h"
16 Job::Job(
const std::string& name, std::shared_ptr<OriginHandler> work)
18 work_(std::move(work)),
19 wait_(base::WaitableEvent::ResetPolicy::MANUAL,
20 base::WaitableEvent::InitialState::NOT_SIGNALED) {
29 status_ = work_->Run();
33 JobManager::JobManager(std::unique_ptr<SyncPointQueue> sync_points)
34 : sync_points_(std::move(sync_points)) {}
36 void JobManager::Add(
const std::string& name,
37 std::shared_ptr<OriginHandler> handler) {
41 job_entries_.push_back({name, std::move(handler)});
44 Status JobManager::InitializeJobs() {
46 for (
const JobEntry& job_entry : job_entries_)
47 status.Update(job_entry.worker->Initialize());
52 for (
const JobEntry& job_entry : job_entries_)
53 jobs_.emplace_back(
new Job(job_entry.name, std::move(job_entry.worker)));
57 Status JobManager::RunJobs() {
63 std::vector<Job*> active_jobs;
64 std::vector<base::WaitableEvent*> active_waits;
68 for (
auto& job : jobs_) {
71 active_jobs.push_back(job.get());
72 active_waits.push_back(job->wait());
77 while (status.ok() && active_jobs.size()) {
81 base::WaitableEvent::WaitMany(active_waits.data(), active_waits.size());
82 Job* job = active_jobs[done];
85 status.Update(job->status());
88 active_jobs.erase(active_jobs.begin() + done);
89 active_waits.erase(active_waits.begin() + done);
95 sync_points_->Cancel();
96 for (
auto& job : active_jobs) {
100 for (
auto& job : active_jobs) {
107 void JobManager::CancelJobs() {
109 sync_points_->Cancel();
110 for (
auto& job : jobs_) {