diff --git a/packager/packager.cc b/packager/packager.cc index b15211a8a3..d73c65b17e 100644 --- a/packager/packager.cc +++ b/packager/packager.cc @@ -232,7 +232,10 @@ class FakeClock : public base::Clock { class Job : public base::SimpleThread { public: Job(const std::string& name, std::shared_ptr work) - : SimpleThread(name), work_(work) {} + : SimpleThread(name), + work_(work), + wait_(base::WaitableEvent::ResetPolicy::MANUAL, + base::WaitableEvent::InitialState::NOT_SIGNALED) {} void Initialize() { DCHECK(work_); @@ -244,7 +247,9 @@ class Job : public base::SimpleThread { work_->Cancel(); } - Status status() { return status_; } + const Status& status() const { return status_; } + + base::WaitableEvent* wait() { return &wait_; } private: Job(const Job&) = delete; @@ -253,10 +258,13 @@ class Job : public base::SimpleThread { void Run() override { DCHECK(work_); status_ = work_->Run(); + wait_.Signal(); } std::shared_ptr work_; Status status_; + + base::WaitableEvent wait_; }; bool StreamInfoToTextMediaInfo(const StreamDescriptor& stream_descriptor, @@ -522,27 +530,50 @@ bool CreateRemuxJobs(const StreamDescriptorList& stream_descriptors, return true; } -Status RunRemuxJobs(const std::vector>& jobs) { - // Start the job threads. - for (const std::unique_ptr& job : jobs) +Status RunJobs(const std::vector>& jobs) { + // We need to store the jobs and the waits separately in order to use the + // |WaitMany| function. |WaitMany| takes an array of WaitableEvents but we + // need to access the jobs in order to join the thread and check the status. + // The indexes needs to be check in sync or else we won't be able to relate a + // WaitableEvent back to the job. + std::vector active_jobs; + std::vector active_waits; + + // Start every job and add it to the active jobs list so that we can wait + // on each one. + for (auto& job : jobs) { job->Start(); + active_jobs.push_back(job.get()); + active_waits.push_back(job->wait()); + } + // Wait for all jobs to complete or an error occurs. Status status; - bool all_joined; - do { - all_joined = true; - for (const std::unique_ptr& job : jobs) { - if (job->HasBeenJoined()) { - status = job->status(); - if (!status.ok()) - break; - } else { - all_joined = false; - job->Join(); - } - } - } while (!all_joined && status.ok()); + while (status.ok() && active_jobs.size()) { + // Wait for an event to finish and then update our status so that we can + // quit if something has gone wrong. + const size_t done = base::WaitableEvent::WaitMany(active_waits.data(), + active_waits.size()); + Job* job = active_jobs[done]; + + job->Join(); + status.Update(job->status()); + + // Remove the job and the wait from our tracking. + active_jobs.erase(active_jobs.begin() + done); + active_waits.erase(active_waits.begin() + done); + } + + // If the main loop has exited and there are still jobs running, + // we need to cancel them and clean-up. + for (auto& job : active_jobs) { + job->Cancel(); + } + + for (auto& job : active_jobs) { + job->Join(); + } return status; } @@ -639,7 +670,7 @@ Status Packager::Initialize( Status Packager::Run() { if (!internal_) return Status(error::INVALID_ARGUMENT, "Not yet initialized."); - Status status = media::RunRemuxJobs(internal_->jobs); + Status status = media::RunJobs(internal_->jobs); if (!status.ok()) return status;