7 #ifndef PACKAGER_MEDIA_BASE_PRODUCER_CONSUMER_QUEUE_H_
8 #define PACKAGER_MEDIA_BASE_PRODUCER_CONSUMER_QUEUE_H_
12 #include "packager/base/strings/stringprintf.h"
13 #include "packager/base/synchronization/condition_variable.h"
14 #include "packager/base/synchronization/lock.h"
15 #include "packager/base/timer/elapsed_timer.h"
16 #include "packager/status.h"
21 static const size_t kUnlimitedCapacity = 0u;
22 static const int64_t kInfiniteTimeout = -1;
51 Status Push(
const T& element, int64_t timeout_ms);
60 Status Pop(T* element, int64_t timeout_ms);
74 Status Peek(
size_t pos, T* element, int64_t timeout_ms);
80 base::AutoLock l(lock_);
81 stop_requested_ =
true;
82 not_empty_cv_.Broadcast();
83 not_full_cv_.Broadcast();
84 new_element_cv_.Broadcast();
89 base::AutoLock l(lock_);
95 base::AutoLock l(lock_);
102 base::AutoLock l(lock_);
109 base::AutoLock l(lock_);
110 return head_pos_ + q_.size() - 1;
116 base::AutoLock l(lock_);
117 return stop_requested_;
122 void SlideHeadOnCenter(
size_t pos);
124 const size_t capacity_;
125 mutable base::Lock lock_;
128 base::ConditionVariable not_empty_cv_;
129 base::ConditionVariable not_full_cv_;
130 base::ConditionVariable new_element_cv_;
131 bool stop_requested_;
139 : capacity_(capacity),
141 not_empty_cv_(&lock_),
142 not_full_cv_(&lock_),
143 new_element_cv_(&lock_),
144 stop_requested_(false) {}
149 : capacity_(capacity),
150 head_pos_(starting_pos),
151 not_empty_cv_(&lock_),
152 not_full_cv_(&lock_),
153 new_element_cv_(&lock_),
154 stop_requested_(false) {
162 base::AutoLock l(lock_);
167 return Status(error::STOPPED,
"");
169 base::ElapsedTimer timer;
170 base::TimeDelta timeout_delta = base::TimeDelta::FromMilliseconds(timeout_ms);
173 while (q_.size() == capacity_) {
174 if (timeout_ms < 0) {
178 base::TimeDelta elapsed = timer.Elapsed();
179 if (elapsed < timeout_delta) {
181 not_full_cv_.TimedWait(timeout_delta - elapsed);
184 return Status(error::TIME_OUT,
"Time out on pushing.");
189 return Status(error::STOPPED,
"");
193 DCHECK_LT(q_.size(), capacity_);
198 not_empty_cv_.Signal();
199 new_element_cv_.Signal();
201 q_.push_back(element);
204 if (woken && q_.size() != capacity_)
205 not_full_cv_.Signal();
211 base::AutoLock l(lock_);
214 base::ElapsedTimer timer;
215 base::TimeDelta timeout_delta = base::TimeDelta::FromMilliseconds(timeout_ms);
219 return Status(error::STOPPED,
"");
221 if (timeout_ms < 0) {
223 not_empty_cv_.Wait();
225 base::TimeDelta elapsed = timer.Elapsed();
226 if (elapsed < timeout_delta) {
228 not_empty_cv_.TimedWait(timeout_delta - elapsed);
231 return Status(error::TIME_OUT,
"Time out on popping.");
238 if (q_.size() == capacity_)
239 not_full_cv_.Signal();
241 *element = q_.front();
246 if (woken && !q_.empty())
247 not_empty_cv_.Signal();
254 int64_t timeout_ms) {
255 base::AutoLock l(lock_);
256 if (pos < head_pos_) {
258 error::INVALID_ARGUMENT,
260 "pos (%zu) is too small; head is at %zu.", pos, head_pos_));
265 base::ElapsedTimer timer;
266 base::TimeDelta timeout_delta = base::TimeDelta::FromMilliseconds(timeout_ms);
269 SlideHeadOnCenter(pos);
271 while (pos >= head_pos_ + q_.size()) {
273 return Status(error::STOPPED,
"");
275 if (timeout_ms < 0) {
277 new_element_cv_.Wait();
279 base::TimeDelta elapsed = timer.Elapsed();
280 if (elapsed < timeout_delta) {
282 new_element_cv_.TimedWait(timeout_delta - elapsed);
285 return Status(error::TIME_OUT,
"Time out on peeking.");
289 SlideHeadOnCenter(pos);
293 *element = q_[pos - head_pos_];
296 if (woken && !q_.empty())
297 new_element_cv_.Signal();
303 lock_.AssertAcquired();
307 if (q_.size() == capacity_ && pos > head_pos_ + capacity_ / 2)
308 not_full_cv_.Signal();
310 while (!q_.empty() && pos > head_pos_ + capacity_ / 2) {
320 #endif // PACKAGER_MEDIA_BASE_PRODUCER_CONSUMER_QUEUE_H_