Shaka Packager SDK
producer_consumer_queue.h
1 // Copyright 2014 Google Inc. All rights reserved.
2 //
3 // Use of this source code is governed by a BSD-style
4 // license that can be found in the LICENSE file or at
5 // https://developers.google.com/open-source/licenses/bsd
6 
7 #ifndef PACKAGER_MEDIA_BASE_PRODUCER_CONSUMER_QUEUE_H_
8 #define PACKAGER_MEDIA_BASE_PRODUCER_CONSUMER_QUEUE_H_
9 
10 #include <deque>
11 
12 #include "packager/base/strings/stringprintf.h"
13 #include "packager/base/synchronization/condition_variable.h"
14 #include "packager/base/synchronization/lock.h"
15 #include "packager/base/timer/elapsed_timer.h"
16 #include "packager/status.h"
17 
18 namespace shaka {
19 namespace media {
20 
21 static const size_t kUnlimitedCapacity = 0u;
22 static const int64_t kInfiniteTimeout = -1;
23 
27 template <class T>
29  public:
33  explicit ProducerConsumerQueue(size_t capacity);
34 
39  ProducerConsumerQueue(size_t capacity, size_t starting_pos);
40 
42 
51  Status Push(const T& element, int64_t timeout_ms);
52 
60  Status Pop(T* element, int64_t timeout_ms);
61 
74  Status Peek(size_t pos, T* element, int64_t timeout_ms);
75 
79  void Stop() {
80  base::AutoLock l(lock_);
81  stop_requested_ = true;
82  not_empty_cv_.Broadcast();
83  not_full_cv_.Broadcast();
84  new_element_cv_.Broadcast();
85  }
86 
88  bool Empty() const {
89  base::AutoLock l(lock_);
90  return q_.empty();
91  }
92 
94  size_t Size() const {
95  base::AutoLock l(lock_);
96  return q_.size();
97  }
98 
101  size_t HeadPos() const {
102  base::AutoLock l(lock_);
103  return head_pos_;
104  }
105 
108  size_t TailPos() const {
109  base::AutoLock l(lock_);
110  return head_pos_ + q_.size() - 1;
111  }
112 
115  bool Stopped() const {
116  base::AutoLock l(lock_);
117  return stop_requested_;
118  }
119 
120  private:
121  // Move head_pos_ to center on pos.
122  void SlideHeadOnCenter(size_t pos);
123 
124  const size_t capacity_; // Maximum number of elements; zero means unlimited.
125  mutable base::Lock lock_; // Lock protecting all other variables below.
126  size_t head_pos_; // Head position.
127  std::deque<T> q_; // Internal queue holding the elements.
128  base::ConditionVariable not_empty_cv_;
129  base::ConditionVariable not_full_cv_;
130  base::ConditionVariable new_element_cv_;
131  bool stop_requested_; // True after Stop has been called.
132 
133  DISALLOW_COPY_AND_ASSIGN(ProducerConsumerQueue);
134 };
135 
136 // Implementations of non-inline functions.
137 template <class T>
139  : capacity_(capacity),
140  head_pos_(0),
141  not_empty_cv_(&lock_),
142  not_full_cv_(&lock_),
143  new_element_cv_(&lock_),
144  stop_requested_(false) {}
145 
146 template <class T>
148  size_t starting_pos)
149  : capacity_(capacity),
150  head_pos_(starting_pos),
151  not_empty_cv_(&lock_),
152  not_full_cv_(&lock_),
153  new_element_cv_(&lock_),
154  stop_requested_(false) {
155 }
156 
157 template <class T>
159 
160 template <class T>
161 Status ProducerConsumerQueue<T>::Push(const T& element, int64_t timeout_ms) {
162  base::AutoLock l(lock_);
163  bool woken = false;
164 
165  // Check for queue shutdown.
166  if (stop_requested_)
167  return Status(error::STOPPED, "");
168 
169  base::ElapsedTimer timer;
170  base::TimeDelta timeout_delta = base::TimeDelta::FromMilliseconds(timeout_ms);
171 
172  if (capacity_) {
173  while (q_.size() == capacity_) {
174  if (timeout_ms < 0) {
175  // Wait forever, or until Stop.
176  not_full_cv_.Wait();
177  } else {
178  base::TimeDelta elapsed = timer.Elapsed();
179  if (elapsed < timeout_delta) {
180  // Wait with timeout, or until Stop.
181  not_full_cv_.TimedWait(timeout_delta - elapsed);
182  } else {
183  // We're through waiting.
184  return Status(error::TIME_OUT, "Time out on pushing.");
185  }
186  }
187  // Re-check for queue shutdown after waking from Wait.
188  if (stop_requested_)
189  return Status(error::STOPPED, "");
190 
191  woken = true;
192  }
193  DCHECK_LT(q_.size(), capacity_);
194  }
195 
196  // Signal consumer to proceed if we are going to create some elements.
197  if (q_.empty())
198  not_empty_cv_.Signal();
199  new_element_cv_.Signal();
200 
201  q_.push_back(element);
202 
203  // Signal other producers if we just acquired more capacity.
204  if (woken && q_.size() != capacity_)
205  not_full_cv_.Signal();
206  return Status::OK;
207 }
208 
209 template <class T>
210 Status ProducerConsumerQueue<T>::Pop(T* element, int64_t timeout_ms) {
211  base::AutoLock l(lock_);
212  bool woken = false;
213 
214  base::ElapsedTimer timer;
215  base::TimeDelta timeout_delta = base::TimeDelta::FromMilliseconds(timeout_ms);
216 
217  while (q_.empty()) {
218  if (stop_requested_)
219  return Status(error::STOPPED, "");
220 
221  if (timeout_ms < 0) {
222  // Wait forever, or until Stop.
223  not_empty_cv_.Wait();
224  } else {
225  base::TimeDelta elapsed = timer.Elapsed();
226  if (elapsed < timeout_delta) {
227  // Wait with timeout, or until Stop.
228  not_empty_cv_.TimedWait(timeout_delta - elapsed);
229  } else {
230  // We're through waiting.
231  return Status(error::TIME_OUT, "Time out on popping.");
232  }
233  }
234  woken = true;
235  }
236 
237  // Signal producer to proceed if we are going to create some capacity.
238  if (q_.size() == capacity_)
239  not_full_cv_.Signal();
240 
241  *element = q_.front();
242  q_.pop_front();
243  ++head_pos_;
244 
245  // Signal other consumers if we have more elements.
246  if (woken && !q_.empty())
247  not_empty_cv_.Signal();
248  return Status::OK;
249 }
250 
251 template <class T>
253  T* element,
254  int64_t timeout_ms) {
255  base::AutoLock l(lock_);
256  if (pos < head_pos_) {
257  return Status(
258  error::INVALID_ARGUMENT,
259  base::StringPrintf(
260  "pos (%zu) is too small; head is at %zu.", pos, head_pos_));
261  }
262 
263  bool woken = false;
264 
265  base::ElapsedTimer timer;
266  base::TimeDelta timeout_delta = base::TimeDelta::FromMilliseconds(timeout_ms);
267 
268  // Move head to create some space (move the sliding window centered @ pos).
269  SlideHeadOnCenter(pos);
270 
271  while (pos >= head_pos_ + q_.size()) {
272  if (stop_requested_)
273  return Status(error::STOPPED, "");
274 
275  if (timeout_ms < 0) {
276  // Wait forever, or until Stop.
277  new_element_cv_.Wait();
278  } else {
279  base::TimeDelta elapsed = timer.Elapsed();
280  if (elapsed < timeout_delta) {
281  // Wait with timeout, or until Stop.
282  new_element_cv_.TimedWait(timeout_delta - elapsed);
283  } else {
284  // We're through waiting.
285  return Status(error::TIME_OUT, "Time out on peeking.");
286  }
287  }
288  // Move head to create some space (move the sliding window centered @ pos).
289  SlideHeadOnCenter(pos);
290  woken = true;
291  }
292 
293  *element = q_[pos - head_pos_];
294 
295  // Signal other consumers if we have more elements.
296  if (woken && !q_.empty())
297  new_element_cv_.Signal();
298  return Status::OK;
299 }
300 
301 template <class T>
303  lock_.AssertAcquired();
304 
305  if (capacity_) {
306  // Signal producer to proceed if we are going to create some capacity.
307  if (q_.size() == capacity_ && pos > head_pos_ + capacity_ / 2)
308  not_full_cv_.Signal();
309 
310  while (!q_.empty() && pos > head_pos_ + capacity_ / 2) {
311  ++head_pos_;
312  q_.pop_front();
313  }
314  }
315 }
316 
317 } // namespace media
318 } // namespace shaka
319 
320 #endif // PACKAGER_MEDIA_BASE_PRODUCER_CONSUMER_QUEUE_H_
Status Push(const T &element, int64_t timeout_ms)
Status Peek(size_t pos, T *element, int64_t timeout_ms)
Status Pop(T *element, int64_t timeout_ms)
All the methods that are virtual are virtual for mocking.