Shaka Packager SDK
sync_point_queue.cc
1 // Copyright 2018 Google Inc. All rights reserved.
2 //
3 // Use of this source code is governed by a BSD-style
4 // license that can be found in the LICENSE file or at
5 // https://developers.google.com/open-source/licenses/bsd
6 
7 #include "packager/media/chunking/sync_point_queue.h"
8 #include "packager/media/base/media_handler.h"
9 
10 #include <algorithm>
11 #include <limits>
12 
13 namespace shaka {
14 namespace media {
15 
16 SyncPointQueue::SyncPointQueue(const AdCueGeneratorParams& params)
17  : sync_condition_(&lock_) {
18  for (const Cuepoint& point : params.cue_points) {
19  std::shared_ptr<CueEvent> event = std::make_shared<CueEvent>();
20  event->time_in_seconds = point.start_time_in_seconds;
21  unpromoted_[point.start_time_in_seconds] = std::move(event);
22  }
23 }
24 
26  base::AutoLock auto_lock(lock_);
27  thread_count_++;
28 }
29 
31  {
32  base::AutoLock auto_lock(lock_);
33  cancelled_ = true;
34  }
35  sync_condition_.Broadcast();
36 }
37 
38 double SyncPointQueue::GetHint(double time_in_seconds) {
39  base::AutoLock auto_lock(lock_);
40 
41  auto iter = promoted_.upper_bound(time_in_seconds);
42  if (iter != promoted_.end())
43  return iter->first;
44 
45  iter = unpromoted_.upper_bound(time_in_seconds);
46  if (iter != unpromoted_.end())
47  return iter->first;
48 
49  // Use MAX DOUBLE as the fall back so that we can force all streams to run
50  // out all their samples even when there are no cues.
51  return std::numeric_limits<double>::max();
52 }
53 
54 std::shared_ptr<const CueEvent> SyncPointQueue::GetNext(
55  double hint_in_seconds) {
56  base::AutoLock auto_lock(lock_);
57  while (!cancelled_) {
58  // Find the promoted cue that would line up with our hint, which is the
59  // first cue that is not less than |hint_in_seconds|.
60  auto iter = promoted_.lower_bound(hint_in_seconds);
61  if (iter != promoted_.end()) {
62  return iter->second;
63  }
64 
65  // Promote |hint_in_seconds| if everyone is waiting.
66  if (waiting_thread_count_ + 1 == thread_count_) {
67  std::shared_ptr<const CueEvent> cue = PromoteAtNoLocking(hint_in_seconds);
68  CHECK(cue);
69  return cue;
70  }
71 
72  waiting_thread_count_++;
73  // This blocks until either a cue is promoted or all threads are blocked
74  // (in which case, the unpromoted cue at the hint will be self-promoted
75  // and returned - see section above). Spurious signal events are possible
76  // with most condition variable implementations, so if it returns, we go
77  // back and check if a cue is actually promoted or not.
78  sync_condition_.Wait();
79  waiting_thread_count_--;
80  }
81  return nullptr;
82 }
83 
84 std::shared_ptr<const CueEvent> SyncPointQueue::PromoteAt(
85  double time_in_seconds) {
86  base::AutoLock auto_lock(lock_);
87  return PromoteAtNoLocking(time_in_seconds);
88 }
89 
90 bool SyncPointQueue::HasMore(double hint_in_seconds) const {
91  return hint_in_seconds < std::numeric_limits<double>::max();
92 }
93 
94 std::shared_ptr<const CueEvent> SyncPointQueue::PromoteAtNoLocking(
95  double time_in_seconds) {
96  lock_.AssertAcquired();
97 
98  // It is possible that |time_in_seconds| has been promoted.
99  auto iter = promoted_.find(time_in_seconds);
100  if (iter != promoted_.end())
101  return iter->second;
102 
103  // Find the unpromoted cue that would work for the given time, which is the
104  // first cue that is not greater than |time_in_seconds|.
105  // So find the the first cue that is greater than |time_in_seconds| first and
106  // then get the previous one.
107  iter = unpromoted_.upper_bound(time_in_seconds);
108  // The first cue in |unpromoted_| should not be greater than
109  // |time_in_seconds|. It could happen only if it has been promoted at a
110  // different timestamp, which can only be the result of unaligned GOPs.
111  if (iter == unpromoted_.begin())
112  return nullptr;
113  auto prev_iter = std::prev(iter);
114  DCHECK(prev_iter != unpromoted_.end());
115 
116  std::shared_ptr<CueEvent> cue = prev_iter->second;
117  cue->time_in_seconds = time_in_seconds;
118 
119  promoted_[time_in_seconds] = cue;
120  // Remove all unpromoted cues up to the cue that was just promoted.
121  // User may provide multiple cue points at the same or similar timestamps. The
122  // extra unused cues are simply ignored.
123  unpromoted_.erase(unpromoted_.begin(), iter);
124 
125  // Wake up other threads that may be waiting.
126  sync_condition_.Broadcast();
127  return std::move(cue);
128 }
129 
130 } // namespace media
131 } // namespace shaka
shaka
All the methods that are virtual are virtual for mocking.
Definition: gflags_hex_bytes.cc:11
shaka::media::SyncPointQueue::HasMore
bool HasMore(double hint_in_seconds) const
Definition: sync_point_queue.cc:90
shaka::media::SyncPointQueue::GetNext
std::shared_ptr< const CueEvent > GetNext(double hint_in_seconds)
Definition: sync_point_queue.cc:54
shaka::media::SyncPointQueue::Cancel
void Cancel()
Cancel the queue and unblock all threads.
Definition: sync_point_queue.cc:30
shaka::media::SyncPointQueue::PromoteAt
std::shared_ptr< const CueEvent > PromoteAt(double time_in_seconds)
Definition: sync_point_queue.cc:84
shaka::media::SyncPointQueue::GetHint
double GetHint(double time_in_seconds)
Definition: sync_point_queue.cc:38
shaka::media::SyncPointQueue::AddThread
void AddThread()
Definition: sync_point_queue.cc:25