feat: Port app/job_manager to cmake and absl (#1249)

Issue #1047 (cmake)
Issue #346 (absl)
This commit is contained in:
Joey Parrish 2023-07-21 03:41:49 -07:00 committed by GitHub
parent 4515a9834d
commit ba51270b0e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 152 additions and 101 deletions

View File

@ -59,3 +59,4 @@ add_subdirectory(third_party)
add_subdirectory(tools)
add_subdirectory(utils)
add_subdirectory(version)
add_subdirectory(app)

View File

@ -0,0 +1,14 @@
# Copyright 2023 Google LLC. All rights reserved.
#
# Use of this source code is governed by a BSD-style
# license that can be found in the LICENSE file or at
# https://developers.google.com/open-source/licenses/bsd
add_library(libpackager STATIC
job_manager.cc
single_thread_job_manager.cc)
target_link_libraries(libpackager
absl::synchronization
media_chunking
media_origin
status)

View File

@ -6,28 +6,49 @@
#include "packager/app/job_manager.h"
#include "packager/app/libcrypto_threading.h"
#include <set>
#include "packager/media/chunking/sync_point_queue.h"
#include "packager/media/origin/origin_handler.h"
namespace shaka {
namespace media {
Job::Job(const std::string& name, std::shared_ptr<OriginHandler> work)
: SimpleThread(name),
Job::Job(const std::string& name,
std::shared_ptr<OriginHandler> work,
OnCompleteFunction on_complete)
: name_(name),
work_(std::move(work)),
wait_(base::WaitableEvent::ResetPolicy::MANUAL,
base::WaitableEvent::InitialState::NOT_SIGNALED) {
on_complete_(on_complete),
status_(error::Code::UNKNOWN, "Job uninitialized") {
DCHECK(work_);
}
const Status& Job::Initialize() {
status_ = work_->Initialize();
return status_;
}
void Job::Start() {
thread_.reset(new std::thread(&Job::Run, this));
}
void Job::Cancel() {
work_->Cancel();
}
void Job::Run() {
status_ = work_->Run();
wait_.Signal();
const Status& Job::Run() {
if (status_.ok()) // initialized correctly
status_ = work_->Run();
on_complete_(this);
return status_;
}
void Job::Join() {
if (thread_)
thread_->join();
}
JobManager::JobManager(std::unique_ptr<SyncPointQueue> sync_points)
@ -35,81 +56,77 @@ JobManager::JobManager(std::unique_ptr<SyncPointQueue> sync_points)
void JobManager::Add(const std::string& name,
std::shared_ptr<OriginHandler> handler) {
// Stores Job entries for delayed construction of Job objects, to avoid
// setting up SimpleThread until we know all workers can be initialized
// successfully.
job_entries_.push_back({name, std::move(handler)});
jobs_.emplace_back(new Job(
name, std::move(handler),
std::bind(&JobManager::OnJobComplete, this, std::placeholders::_1)));
}
Status JobManager::InitializeJobs() {
Status status;
for (const JobEntry& job_entry : job_entries_)
status.Update(job_entry.worker->Initialize());
if (!status.ok())
return status;
// Create Job objects after successfully initialized all workers.
for (const JobEntry& job_entry : job_entries_)
jobs_.emplace_back(new Job(job_entry.name, std::move(job_entry.worker)));
for (auto& job : jobs_)
status.Update(job->Initialize());
return status;
}
Status JobManager::RunJobs() {
// We need to store the jobs and the waits separately in order to use the
// |WaitMany| function. |WaitMany| takes an array of WaitableEvents but we
// need to access the jobs in order to join the thread and check the status.
// The indexes needs to be check in sync or else we won't be able to relate a
// WaitableEvent back to the job.
std::vector<Job*> active_jobs;
std::vector<base::WaitableEvent*> active_waits;
std::set<Job*> active_jobs;
// Start every job and add it to the active jobs list so that we can wait
// on each one.
for (auto& job : jobs_) {
job->Start();
active_jobs.push_back(job.get());
active_waits.push_back(job->wait());
active_jobs.insert(job.get());
}
// Wait for all jobs to complete or an error occurs.
// Wait for all jobs to complete or any job to error.
Status status;
while (status.ok() && active_jobs.size()) {
// Wait for an event to finish and then update our status so that we can
// quit if something has gone wrong.
const size_t done =
base::WaitableEvent::WaitMany(active_waits.data(), active_waits.size());
Job* job = active_jobs[done];
{
absl::MutexLock lock(&mutex_);
while (status.ok() && active_jobs.size()) {
// any_job_complete_ is protected by mutex_.
any_job_complete_.Wait(&mutex_);
job->Join();
status.Update(job->status());
// Remove the job and the wait from our tracking.
active_jobs.erase(active_jobs.begin() + done);
active_waits.erase(active_waits.begin() + done);
// complete_ is protected by mutex_.
for (const auto& entry : complete_) {
Job* job = entry.first;
bool complete = entry.second;
if (complete) {
job->Join();
status.Update(job->status());
active_jobs.erase(job);
}
}
}
}
// If the main loop has exited and there are still jobs running,
// we need to cancel them and clean-up.
if (sync_points_)
sync_points_->Cancel();
for (auto& job : active_jobs) {
job->Cancel();
}
for (auto& job : active_jobs) {
for (auto& job : active_jobs)
job->Cancel();
for (auto& job : active_jobs)
job->Join();
}
return status;
}
void JobManager::OnJobComplete(Job* job) {
absl::MutexLock lock(&mutex_);
// These are both protected by mutex_.
complete_[job] = true;
any_job_complete_.Signal();
}
void JobManager::CancelJobs() {
if (sync_points_)
sync_points_->Cancel();
for (auto& job : jobs_) {
for (auto& job : jobs_)
job->Cancel();
}
}
} // namespace media

View File

@ -7,11 +7,14 @@
#ifndef PACKAGER_APP_JOB_MANAGER_H_
#define PACKAGER_APP_JOB_MANAGER_H_
#include <functional>
#include <map>
#include <memory>
#include <thread>
#include <vector>
#include "packager/base/threading/simple_thread.h"
#include "packager/status.h"
#include "absl/synchronization/mutex.h"
#include "packager/status/status.h"
namespace shaka {
namespace media {
@ -21,33 +24,53 @@ class SyncPointQueue;
// A job is a single line of work that is expected to run in parallel with
// other jobs.
class Job : public base::SimpleThread {
class Job {
public:
Job(const std::string& name, std::shared_ptr<OriginHandler> work);
typedef std::function<void(Job*)> OnCompleteFunction;
// Request that the job stops executing. This is only a request and
// will not block. If you want to wait for the job to complete, use
// |wait|.
Job(const std::string& name,
std::shared_ptr<OriginHandler> work,
OnCompleteFunction on_complete);
// Initialize the work object. Call before Start() or Run(). Updates status()
// and returns it for convenience.
const Status& Initialize();
// Begin the job in a new thread. This is only a request and will not block.
// If you want to wait for the job to complete, use |complete|.
// Use either Start() for threaded operation or Run() for non-threaded
// operation. DO NOT USE BOTH!
void Start();
// Run the job's work synchronously, blocking until complete. Updates status()
// and returns it for convenience.
// Use either Start() for threaded operation or Run() for non-threaded
// operation. DO NOT USE BOTH!
const Status& Run();
// Request that the job stops executing. This is only a request and will not
// block. If you want to wait for the job to complete, use |complete|.
void Cancel();
// Get the current status of the job. If the job failed to initialize
// or encountered an error during execution this will return the error.
// Join the thread, if any was started. Blocks until the thread has stopped.
void Join();
// Get the current status of the job. If the job failed to initialize or
// encountered an error during execution this will return the error.
const Status& status() const { return status_; }
// If you want to wait for this job to complete, this will return the
// WaitableEvent you can wait on.
base::WaitableEvent* wait() { return &wait_; }
// The name given to this job in the constructor.
const std::string& name() const { return name_; }
private:
Job(const Job&) = delete;
Job& operator=(const Job&) = delete;
void Run() override;
std::string name_;
std::shared_ptr<OriginHandler> work_;
OnCompleteFunction on_complete_;
std::unique_ptr<std::thread> thread_;
Status status_;
base::WaitableEvent wait_;
};
// Similar to a thread pool, JobManager manages multiple jobs that are expected
@ -70,7 +93,7 @@ class JobManager {
// Initialize all registered jobs. If any job fails to initialize, this will
// return the error and it will not be safe to call |RunJobs| as not all jobs
// will be properly initialized.
virtual Status InitializeJobs();
Status InitializeJobs();
// Run all registered jobs. Before calling this make sure that
// |InitializedJobs| returned |Status::OK|. This call is blocking and will
@ -87,16 +110,17 @@ class JobManager {
JobManager(const JobManager&) = delete;
JobManager& operator=(const JobManager&) = delete;
struct JobEntry {
std::string name;
std::shared_ptr<OriginHandler> worker;
};
// Stores Job entries for delayed construction of Job object.
std::vector<JobEntry> job_entries_;
std::vector<std::unique_ptr<Job>> jobs_;
void OnJobComplete(Job* job);
// Stored in JobManager so JobManager can cancel |sync_points| when any job
// fails or is cancelled.
std::unique_ptr<SyncPointQueue> sync_points_;
std::vector<std::unique_ptr<Job>> jobs_;
absl::Mutex mutex_;
std::map<Job*, bool> complete_ ABSL_GUARDED_BY(mutex_);
absl::CondVar any_job_complete_ ABSL_GUARDED_BY(mutex_);
};
} // namespace media

View File

@ -1,4 +1,4 @@
// Copyright 2020 Google LLLC All rights reserved.
// Copyright 2020 Google LLC. All rights reserved.
//
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file or at
@ -16,17 +16,12 @@ SingleThreadJobManager::SingleThreadJobManager(
std::unique_ptr<SyncPointQueue> sync_points)
: JobManager(std::move(sync_points)) {}
Status SingleThreadJobManager::InitializeJobs() {
Status status;
for (const JobEntry& job_entry : job_entries_)
status.Update(job_entry.worker->Initialize());
return status;
}
Status SingleThreadJobManager::RunJobs() {
Status status;
for (const JobEntry& job_entry : job_entries_)
status.Update(job_entry.worker->Run());
for (auto& job : jobs_)
status.Update(job->Run());
return status;
}

View File

@ -1,4 +1,4 @@
// Copyright 2020 Google LLLC All rights reserved.
// Copyright 2020 Google LLC. All rights reserved.
//
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file or at
@ -22,7 +22,7 @@ class SingleThreadJobManager : public JobManager {
// fails or is cancelled. It can be NULL.
explicit SingleThreadJobManager(std::unique_ptr<SyncPointQueue> sync_points);
Status InitializeJobs() override;
// Run all registered jobs serially in this thread.
Status RunJobs() override;
};

View File

@ -4,26 +4,26 @@
# license that can be found in the LICENSE file or at
# https://developers.google.com/open-source/licenses/bsd
add_library(chunking STATIC
add_library(media_chunking STATIC
chunking_handler.cc
cue_alignment_handler.cc
sync_point_queue.cc
text_chunker.cc
)
target_link_libraries(chunking
target_link_libraries(media_chunking
media_base
)
add_executable(chunking_unittest
add_executable(media_chunking_unittest
chunking_handler_unittest.cc
cue_alignment_handler_unittest.cc
text_chunker_unittest.cc
)
target_link_libraries(chunking_unittest
target_link_libraries(media_chunking_unittest
gmock
gtest
gtest_main
media_chunking
media_handler_test_base
chunking
)
add_gtest(chunking_unittest)
add_gtest(media_chunking_unittest)

View File

@ -4,7 +4,7 @@
# license that can be found in the LICENSE file or at
# https://developers.google.com/open-source/licenses/bsd
add_library(codecs STATIC
add_library(media_codecs STATIC
aac_audio_specific_config.cc
ac3_audio_util.cc
av1_codec_configuration_record.cc
@ -31,10 +31,10 @@ add_library(codecs STATIC
vp9_parser.cc
)
target_link_libraries(codecs
target_link_libraries(media_codecs
media_base)
add_executable(codecs_unittest
add_executable(media_codecs_unittest
aac_audio_specific_config_unittest.cc
ac3_audio_util_unittest.cc
av1_codec_configuration_record_unittest.cc
@ -59,11 +59,11 @@ add_executable(codecs_unittest
vp9_parser_unittest.cc
)
target_link_libraries(codecs_unittest
codecs
target_link_libraries(media_codecs_unittest
gmock
gtest
gtest_main
media_codecs
test_data_util)
add_gtest(codecs_unittest)
add_gtest(media_codecs_unittest)

View File

@ -10,8 +10,8 @@ add_library(media_crypto STATIC
sample_aes_ec3_cryptor.cc
subsample_generator.cc)
target_link_libraries(media_crypto
codecs
media_base
media_codecs
absl::base
glog)
@ -20,8 +20,8 @@ add_executable(media_crypto_unittest
sample_aes_ec3_cryptor_unittest.cc
subsample_generator_unittest.cc)
target_link_libraries(media_crypto_unittest
codecs
media_base
media_codecs
media_crypto
media_handler_test_base
status

View File

@ -17,7 +17,7 @@ target_link_libraries(media_event
file
mpd_media_info_proto
media_base
codecs
media_codecs
)
add_library(mock_muxer_listener STATIC

View File

@ -30,7 +30,7 @@ target_link_libraries(formats_webm
webm
file
media_base
codecs
media_codecs
)
add_executable(webm_unittest