Shaka Packager SDK
job_manager.cc
1 // Copyright 2017 Google Inc. All rights reserved.
2 //
3 // Use of this source code is governed by a BSD-style
4 // license that can be found in the LICENSE file or at
5 // https://developers.google.com/open-source/licenses/bsd
6 
7 #include "packager/app/job_manager.h"
8 
9 #include "packager/app/libcrypto_threading.h"
10 #include "packager/media/chunking/sync_point_queue.h"
11 #include "packager/media/origin/origin_handler.h"
12 
13 namespace shaka {
14 namespace media {
15 
16 Job::Job(const std::string& name, std::shared_ptr<OriginHandler> work)
17  : SimpleThread(name),
18  work_(std::move(work)),
19  wait_(base::WaitableEvent::ResetPolicy::MANUAL,
20  base::WaitableEvent::InitialState::NOT_SIGNALED) {
21  DCHECK(work_);
22 }
23 
24 void Job::Cancel() {
25  work_->Cancel();
26 }
27 
28 void Job::Run() {
29  status_ = work_->Run();
30  wait_.Signal();
31 }
32 
33 JobManager::JobManager(std::unique_ptr<SyncPointQueue> sync_points)
34  : sync_points_(std::move(sync_points)) {}
35 
36 void JobManager::Add(const std::string& name,
37  std::shared_ptr<OriginHandler> handler) {
38  // Stores Job entries for delayed construction of Job objects, to avoid
39  // setting up SimpleThread until we know all workers can be initialized
40  // successfully.
41  job_entries_.push_back({name, std::move(handler)});
42 }
43 
44 Status JobManager::InitializeJobs() {
45  Status status;
46  for (const JobEntry& job_entry : job_entries_)
47  status.Update(job_entry.worker->Initialize());
48  if (!status.ok())
49  return status;
50 
51  // Create Job objects after successfully initialized all workers.
52  for (const JobEntry& job_entry : job_entries_)
53  jobs_.emplace_back(new Job(job_entry.name, std::move(job_entry.worker)));
54  return status;
55 }
56 
57 Status JobManager::RunJobs() {
58  // We need to store the jobs and the waits separately in order to use the
59  // |WaitMany| function. |WaitMany| takes an array of WaitableEvents but we
60  // need to access the jobs in order to join the thread and check the status.
61  // The indexes needs to be check in sync or else we won't be able to relate a
62  // WaitableEvent back to the job.
63  std::vector<Job*> active_jobs;
64  std::vector<base::WaitableEvent*> active_waits;
65 
66  // Start every job and add it to the active jobs list so that we can wait
67  // on each one.
68  for (auto& job : jobs_) {
69  job->Start();
70 
71  active_jobs.push_back(job.get());
72  active_waits.push_back(job->wait());
73  }
74 
75  // Wait for all jobs to complete or an error occurs.
76  Status status;
77  while (status.ok() && active_jobs.size()) {
78  // Wait for an event to finish and then update our status so that we can
79  // quit if something has gone wrong.
80  const size_t done =
81  base::WaitableEvent::WaitMany(active_waits.data(), active_waits.size());
82  Job* job = active_jobs[done];
83 
84  job->Join();
85  status.Update(job->status());
86 
87  // Remove the job and the wait from our tracking.
88  active_jobs.erase(active_jobs.begin() + done);
89  active_waits.erase(active_waits.begin() + done);
90  }
91 
92  // If the main loop has exited and there are still jobs running,
93  // we need to cancel them and clean-up.
94  if (sync_points_)
95  sync_points_->Cancel();
96  for (auto& job : active_jobs) {
97  job->Cancel();
98  }
99 
100  for (auto& job : active_jobs) {
101  job->Join();
102  }
103 
104  return status;
105 }
106 
107 void JobManager::CancelJobs() {
108  if (sync_points_)
109  sync_points_->Cancel();
110  for (auto& job : jobs_) {
111  job->Cancel();
112  }
113 }
114 
115 } // namespace media
116 } // namespace shaka
All the methods that are virtual are virtual for mocking.