Cleaned Up RunJobs

Rewrote the RunRemuxJobs function to use waitable events so that we never
get stuck waiting for a job to finish before getting to a thread that
has already exited.

This should allow us to end execution when a job failed rather than
waiting for all jobs before the failed job to finish.

Closes #94

Change-Id: I413f62561a7a4cab83b8905e75986230b6c498bc
This commit is contained in:
Aaron Vaage 2017-08-17 09:25:11 -07:00
parent 675f2631b8
commit 591fd5456e
1 changed files with 51 additions and 20 deletions

View File

@ -232,7 +232,10 @@ class FakeClock : public base::Clock {
class Job : public base::SimpleThread { class Job : public base::SimpleThread {
public: public:
Job(const std::string& name, std::shared_ptr<OriginHandler> work) Job(const std::string& name, std::shared_ptr<OriginHandler> work)
: SimpleThread(name), work_(work) {} : SimpleThread(name),
work_(work),
wait_(base::WaitableEvent::ResetPolicy::MANUAL,
base::WaitableEvent::InitialState::NOT_SIGNALED) {}
void Initialize() { void Initialize() {
DCHECK(work_); DCHECK(work_);
@ -244,7 +247,9 @@ class Job : public base::SimpleThread {
work_->Cancel(); work_->Cancel();
} }
Status status() { return status_; } const Status& status() const { return status_; }
base::WaitableEvent* wait() { return &wait_; }
private: private:
Job(const Job&) = delete; Job(const Job&) = delete;
@ -253,10 +258,13 @@ class Job : public base::SimpleThread {
void Run() override { void Run() override {
DCHECK(work_); DCHECK(work_);
status_ = work_->Run(); status_ = work_->Run();
wait_.Signal();
} }
std::shared_ptr<OriginHandler> work_; std::shared_ptr<OriginHandler> work_;
Status status_; Status status_;
base::WaitableEvent wait_;
}; };
bool StreamInfoToTextMediaInfo(const StreamDescriptor& stream_descriptor, bool StreamInfoToTextMediaInfo(const StreamDescriptor& stream_descriptor,
@ -522,27 +530,50 @@ bool CreateRemuxJobs(const StreamDescriptorList& stream_descriptors,
return true; return true;
} }
Status RunRemuxJobs(const std::vector<std::unique_ptr<Job>>& jobs) { Status RunJobs(const std::vector<std::unique_ptr<Job>>& jobs) {
// Start the job threads. // We need to store the jobs and the waits separately in order to use the
for (const std::unique_ptr<Job>& job : jobs) // |WaitMany| function. |WaitMany| takes an array of WaitableEvents but we
// need to access the jobs in order to join the thread and check the status.
// The indexes needs to be check in sync or else we won't be able to relate a
// WaitableEvent back to the job.
std::vector<Job*> active_jobs;
std::vector<base::WaitableEvent*> active_waits;
// Start every job and add it to the active jobs list so that we can wait
// on each one.
for (auto& job : jobs) {
job->Start(); job->Start();
active_jobs.push_back(job.get());
active_waits.push_back(job->wait());
}
// Wait for all jobs to complete or an error occurs. // Wait for all jobs to complete or an error occurs.
Status status; Status status;
bool all_joined; while (status.ok() && active_jobs.size()) {
do { // Wait for an event to finish and then update our status so that we can
all_joined = true; // quit if something has gone wrong.
for (const std::unique_ptr<Job>& job : jobs) { const size_t done = base::WaitableEvent::WaitMany(active_waits.data(),
if (job->HasBeenJoined()) { active_waits.size());
status = job->status(); Job* job = active_jobs[done];
if (!status.ok())
break; job->Join();
} else { status.Update(job->status());
all_joined = false;
// Remove the job and the wait from our tracking.
active_jobs.erase(active_jobs.begin() + done);
active_waits.erase(active_waits.begin() + done);
}
// If the main loop has exited and there are still jobs running,
// we need to cancel them and clean-up.
for (auto& job : active_jobs) {
job->Cancel();
}
for (auto& job : active_jobs) {
job->Join(); job->Join();
} }
}
} while (!all_joined && status.ok());
return status; return status;
} }
@ -639,7 +670,7 @@ Status Packager::Initialize(
Status Packager::Run() { Status Packager::Run() {
if (!internal_) if (!internal_)
return Status(error::INVALID_ARGUMENT, "Not yet initialized."); return Status(error::INVALID_ARGUMENT, "Not yet initialized.");
Status status = media::RunRemuxJobs(internal_->jobs); Status status = media::RunJobs(internal_->jobs);
if (!status.ok()) if (!status.ok())
return status; return status;