Allow SyncPointQueue to be cancelled
Change-Id: Idbf6ee7a5d9721681811189fca4190e7a0286c83
This commit is contained in:
parent
a42200cfaf
commit
265fa9ba90
|
@ -7,6 +7,7 @@
|
||||||
#include "packager/app/job_manager.h"
|
#include "packager/app/job_manager.h"
|
||||||
|
|
||||||
#include "packager/app/libcrypto_threading.h"
|
#include "packager/app/libcrypto_threading.h"
|
||||||
|
#include "packager/media/chunking/sync_point_queue.h"
|
||||||
#include "packager/media/origin/origin_handler.h"
|
#include "packager/media/origin/origin_handler.h"
|
||||||
|
|
||||||
namespace shaka {
|
namespace shaka {
|
||||||
|
@ -29,6 +30,9 @@ void Job::Run() {
|
||||||
wait_.Signal();
|
wait_.Signal();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
JobManager::JobManager(std::unique_ptr<SyncPointQueue> sync_points)
|
||||||
|
: sync_points_(std::move(sync_points)) {}
|
||||||
|
|
||||||
void JobManager::Add(const std::string& name,
|
void JobManager::Add(const std::string& name,
|
||||||
std::shared_ptr<OriginHandler> handler) {
|
std::shared_ptr<OriginHandler> handler) {
|
||||||
// Stores Job entries for delayed construction of Job objects, to avoid
|
// Stores Job entries for delayed construction of Job objects, to avoid
|
||||||
|
@ -87,6 +91,8 @@ Status JobManager::RunJobs() {
|
||||||
|
|
||||||
// If the main loop has exited and there are still jobs running,
|
// If the main loop has exited and there are still jobs running,
|
||||||
// we need to cancel them and clean-up.
|
// we need to cancel them and clean-up.
|
||||||
|
if (sync_points_)
|
||||||
|
sync_points_->Cancel();
|
||||||
for (auto& job : active_jobs) {
|
for (auto& job : active_jobs) {
|
||||||
job->Cancel();
|
job->Cancel();
|
||||||
}
|
}
|
||||||
|
@ -99,6 +105,8 @@ Status JobManager::RunJobs() {
|
||||||
}
|
}
|
||||||
|
|
||||||
void JobManager::CancelJobs() {
|
void JobManager::CancelJobs() {
|
||||||
|
if (sync_points_)
|
||||||
|
sync_points_->Cancel();
|
||||||
for (auto& job : jobs_) {
|
for (auto& job : jobs_) {
|
||||||
job->Cancel();
|
job->Cancel();
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,6 +17,7 @@ namespace shaka {
|
||||||
namespace media {
|
namespace media {
|
||||||
|
|
||||||
class OriginHandler;
|
class OriginHandler;
|
||||||
|
class SyncPointQueue;
|
||||||
|
|
||||||
// A job is a single line of work that is expected to run in parallel with
|
// A job is a single line of work that is expected to run in parallel with
|
||||||
// other jobs.
|
// other jobs.
|
||||||
|
@ -54,7 +55,10 @@ class Job : public base::SimpleThread {
|
||||||
// jobs.
|
// jobs.
|
||||||
class JobManager {
|
class JobManager {
|
||||||
public:
|
public:
|
||||||
JobManager() = default;
|
// @param sync_points is an optional SyncPointQueue used to synchronize and
|
||||||
|
// align cue points. JobManager cancels @a sync_points when any job
|
||||||
|
// fails or is cancelled. It can be NULL.
|
||||||
|
explicit JobManager(std::unique_ptr<SyncPointQueue> sync_points);
|
||||||
|
|
||||||
// Create a new job entry by specifying the origin handler at the top of the
|
// Create a new job entry by specifying the origin handler at the top of the
|
||||||
// chain and a name for the thread. This will only register the job. To start
|
// chain and a name for the thread. This will only register the job. To start
|
||||||
|
@ -75,6 +79,8 @@ class JobManager {
|
||||||
// unblock a call to |RunJobs|.
|
// unblock a call to |RunJobs|.
|
||||||
void CancelJobs();
|
void CancelJobs();
|
||||||
|
|
||||||
|
SyncPointQueue* sync_points() { return sync_points_.get(); }
|
||||||
|
|
||||||
private:
|
private:
|
||||||
JobManager(const JobManager&) = delete;
|
JobManager(const JobManager&) = delete;
|
||||||
JobManager& operator=(const JobManager&) = delete;
|
JobManager& operator=(const JobManager&) = delete;
|
||||||
|
@ -86,6 +92,9 @@ class JobManager {
|
||||||
// Stores Job entries for delayed construction of Job object.
|
// Stores Job entries for delayed construction of Job object.
|
||||||
std::vector<JobEntry> job_entries_;
|
std::vector<JobEntry> job_entries_;
|
||||||
std::vector<std::unique_ptr<Job>> jobs_;
|
std::vector<std::unique_ptr<Job>> jobs_;
|
||||||
|
// Stored in JobManager so JobManager can cancel |sync_points| when any job
|
||||||
|
// fails or is cancelled.
|
||||||
|
std::unique_ptr<SyncPointQueue> sync_points_;
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace media
|
} // namespace media
|
||||||
|
|
|
@ -170,7 +170,10 @@ Status CueAlignmentHandler::OnSample(std::unique_ptr<StreamData> sample) {
|
||||||
|
|
||||||
std::shared_ptr<const CueEvent> next_sync =
|
std::shared_ptr<const CueEvent> next_sync =
|
||||||
sync_points_->GetNext(next_cue_hint);
|
sync_points_->GetNext(next_cue_hint);
|
||||||
DCHECK(next_sync);
|
if (!next_sync) {
|
||||||
|
// This happens only if the job is cancelled.
|
||||||
|
return Status(error::CANCELLED, "SyncPointQueue is cancelled.");
|
||||||
|
}
|
||||||
|
|
||||||
Status status = UseNewSyncPoint(next_sync);
|
Status status = UseNewSyncPoint(next_sync);
|
||||||
if (!status.ok()) {
|
if (!status.ok()) {
|
||||||
|
|
|
@ -27,6 +27,14 @@ void SyncPointQueue::AddThread() {
|
||||||
thread_count_++;
|
thread_count_++;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void SyncPointQueue::Cancel() {
|
||||||
|
{
|
||||||
|
base::AutoLock auto_lock(lock_);
|
||||||
|
cancelled_ = true;
|
||||||
|
}
|
||||||
|
sync_condition_.Broadcast();
|
||||||
|
}
|
||||||
|
|
||||||
double SyncPointQueue::GetHint(double time_in_seconds) {
|
double SyncPointQueue::GetHint(double time_in_seconds) {
|
||||||
base::AutoLock auto_lock(lock_);
|
base::AutoLock auto_lock(lock_);
|
||||||
|
|
||||||
|
@ -46,7 +54,7 @@ double SyncPointQueue::GetHint(double time_in_seconds) {
|
||||||
std::shared_ptr<const CueEvent> SyncPointQueue::GetNext(
|
std::shared_ptr<const CueEvent> SyncPointQueue::GetNext(
|
||||||
double hint_in_seconds) {
|
double hint_in_seconds) {
|
||||||
base::AutoLock auto_lock(lock_);
|
base::AutoLock auto_lock(lock_);
|
||||||
while (true) {
|
while (!cancelled_) {
|
||||||
// Find the promoted cue that would line up with our hint, which is the
|
// Find the promoted cue that would line up with our hint, which is the
|
||||||
// first cue that is not less than |hint_in_seconds|.
|
// first cue that is not less than |hint_in_seconds|.
|
||||||
auto iter = promoted_.lower_bound(hint_in_seconds);
|
auto iter = promoted_.lower_bound(hint_in_seconds);
|
||||||
|
@ -70,6 +78,7 @@ std::shared_ptr<const CueEvent> SyncPointQueue::GetNext(
|
||||||
sync_condition_.Wait();
|
sync_condition_.Wait();
|
||||||
waiting_thread_count_--;
|
waiting_thread_count_--;
|
||||||
}
|
}
|
||||||
|
return nullptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::shared_ptr<const CueEvent> SyncPointQueue::PromoteAt(
|
std::shared_ptr<const CueEvent> SyncPointQueue::PromoteAt(
|
||||||
|
|
|
@ -26,6 +26,9 @@ class SyncPointQueue {
|
||||||
/// order to keep track of its clients.
|
/// order to keep track of its clients.
|
||||||
void AddThread();
|
void AddThread();
|
||||||
|
|
||||||
|
/// Cancel the queue and unblock all threads.
|
||||||
|
void Cancel();
|
||||||
|
|
||||||
/// @return A hint for when the next cue event would be. The returned hint is
|
/// @return A hint for when the next cue event would be. The returned hint is
|
||||||
/// not less than @a time_in_seconds. The actual time for the next cue
|
/// not less than @a time_in_seconds. The actual time for the next cue
|
||||||
/// event will not be less than the returned hint, with the exact
|
/// event will not be less than the returned hint, with the exact
|
||||||
|
@ -37,7 +40,7 @@ class SyncPointQueue {
|
||||||
/// after @a hint_in_seconds has been promoted, this will block until
|
/// after @a hint_in_seconds has been promoted, this will block until
|
||||||
/// either a cue is promoted or all threads are blocked (in which
|
/// either a cue is promoted or all threads are blocked (in which
|
||||||
/// case, the unpromoted cue at @a hint_in_seconds will be
|
/// case, the unpromoted cue at @a hint_in_seconds will be
|
||||||
/// self-promoted and returned).
|
/// self-promoted and returned) or Cancel() is called.
|
||||||
std::shared_ptr<const CueEvent> GetNext(double hint_in_seconds);
|
std::shared_ptr<const CueEvent> GetNext(double hint_in_seconds);
|
||||||
|
|
||||||
/// Promote the first cue that is not greater than @a time_in_seconds. All
|
/// Promote the first cue that is not greater than @a time_in_seconds. All
|
||||||
|
@ -56,6 +59,7 @@ class SyncPointQueue {
|
||||||
base::ConditionVariable sync_condition_;
|
base::ConditionVariable sync_condition_;
|
||||||
size_t thread_count_ = 0;
|
size_t thread_count_ = 0;
|
||||||
size_t waiting_thread_count_ = 0;
|
size_t waiting_thread_count_ = 0;
|
||||||
|
bool cancelled_ = false;
|
||||||
|
|
||||||
std::map<double, std::shared_ptr<CueEvent>> unpromoted_;
|
std::map<double, std::shared_ptr<CueEvent>> unpromoted_;
|
||||||
std::map<double, std::shared_ptr<CueEvent>> promoted_;
|
std::map<double, std::shared_ptr<CueEvent>> promoted_;
|
||||||
|
|
|
@ -54,6 +54,7 @@ namespace shaka {
|
||||||
|
|
||||||
// TODO(kqyang): Clean up namespaces.
|
// TODO(kqyang): Clean up namespaces.
|
||||||
using media::Demuxer;
|
using media::Demuxer;
|
||||||
|
using media::JobManager;
|
||||||
using media::KeySource;
|
using media::KeySource;
|
||||||
using media::MuxerOptions;
|
using media::MuxerOptions;
|
||||||
using media::SyncPointQueue;
|
using media::SyncPointQueue;
|
||||||
|
@ -785,9 +786,8 @@ struct Packager::PackagerInternal {
|
||||||
std::unique_ptr<KeySource> encryption_key_source;
|
std::unique_ptr<KeySource> encryption_key_source;
|
||||||
std::unique_ptr<MpdNotifier> mpd_notifier;
|
std::unique_ptr<MpdNotifier> mpd_notifier;
|
||||||
std::unique_ptr<hls::HlsNotifier> hls_notifier;
|
std::unique_ptr<hls::HlsNotifier> hls_notifier;
|
||||||
std::unique_ptr<SyncPointQueue> sync_points;
|
|
||||||
BufferCallbackParams buffer_callback_params;
|
BufferCallbackParams buffer_callback_params;
|
||||||
media::JobManager job_manager;
|
std::unique_ptr<media::JobManager> job_manager;
|
||||||
};
|
};
|
||||||
|
|
||||||
Packager::Packager() {}
|
Packager::Packager() {}
|
||||||
|
@ -857,10 +857,12 @@ Status Packager::Initialize(
|
||||||
internal->hls_notifier.reset(new hls::SimpleHlsNotifier(hls_params));
|
internal->hls_notifier.reset(new hls::SimpleHlsNotifier(hls_params));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::unique_ptr<SyncPointQueue> sync_points;
|
||||||
if (!packaging_params.ad_cue_generator_params.cue_points.empty()) {
|
if (!packaging_params.ad_cue_generator_params.cue_points.empty()) {
|
||||||
internal->sync_points.reset(
|
sync_points.reset(
|
||||||
new SyncPointQueue(packaging_params.ad_cue_generator_params));
|
new SyncPointQueue(packaging_params.ad_cue_generator_params));
|
||||||
}
|
}
|
||||||
|
internal->job_manager.reset(new JobManager(std::move(sync_points)));
|
||||||
|
|
||||||
std::vector<StreamDescriptor> streams_for_jobs;
|
std::vector<StreamDescriptor> streams_for_jobs;
|
||||||
|
|
||||||
|
@ -904,8 +906,9 @@ Status Packager::Initialize(
|
||||||
|
|
||||||
Status status = media::CreateAllJobs(
|
Status status = media::CreateAllJobs(
|
||||||
streams_for_jobs, packaging_params, internal->mpd_notifier.get(),
|
streams_for_jobs, packaging_params, internal->mpd_notifier.get(),
|
||||||
internal->encryption_key_source.get(), internal->sync_points.get(),
|
internal->encryption_key_source.get(),
|
||||||
&muxer_listener_factory, &muxer_factory, &internal->job_manager);
|
internal->job_manager->sync_points(), &muxer_listener_factory,
|
||||||
|
&muxer_factory, internal->job_manager.get());
|
||||||
|
|
||||||
if (!status.ok()) {
|
if (!status.ok()) {
|
||||||
return status;
|
return status;
|
||||||
|
@ -919,7 +922,7 @@ Status Packager::Run() {
|
||||||
if (!internal_)
|
if (!internal_)
|
||||||
return Status(error::INVALID_ARGUMENT, "Not yet initialized.");
|
return Status(error::INVALID_ARGUMENT, "Not yet initialized.");
|
||||||
|
|
||||||
Status status = internal_->job_manager.RunJobs();
|
Status status = internal_->job_manager->RunJobs();
|
||||||
if (!status.ok())
|
if (!status.ok())
|
||||||
return status;
|
return status;
|
||||||
|
|
||||||
|
@ -939,7 +942,7 @@ void Packager::Cancel() {
|
||||||
LOG(INFO) << "Not yet initialized. Return directly.";
|
LOG(INFO) << "Not yet initialized. Return directly.";
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
internal_->job_manager.CancelJobs();
|
internal_->job_manager->CancelJobs();
|
||||||
}
|
}
|
||||||
|
|
||||||
std::string Packager::GetLibraryVersion() {
|
std::string Packager::GetLibraryVersion() {
|
||||||
|
|
Loading…
Reference in New Issue