A thread safe producer consumer queue implementation
Change-Id: I827d8959d9c7c398178d083d01fafdb1779805da
This commit is contained in:
parent
3f8b37a377
commit
7e9b8aa5a4
|
@ -86,6 +86,7 @@
|
|||
'muxer.h',
|
||||
'muxer_options.cc',
|
||||
'muxer_options.h',
|
||||
'producer_consumer_queue.h',
|
||||
'request_signer.cc',
|
||||
'request_signer.h',
|
||||
'rsa_key.cc',
|
||||
|
|
|
@ -0,0 +1,296 @@
|
|||
// Copyright 2014 Google Inc. All rights reserved.
|
||||
//
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file or at
|
||||
// https://developers.google.com/open-source/licenses/bsd
|
||||
|
||||
#ifndef MEDIA_BASE_PRODUCER_CONSUMER_QUEUE_H_
|
||||
#define MEDIA_BASE_PRODUCER_CONSUMER_QUEUE_H_
|
||||
|
||||
#include <deque>
|
||||
|
||||
#include "base/strings/stringprintf.h"
|
||||
#include "base/synchronization/condition_variable.h"
|
||||
#include "base/synchronization/lock.h"
|
||||
#include "base/timer/elapsed_timer.h"
|
||||
#include "media/base/status.h"
|
||||
|
||||
namespace media {
|
||||
|
||||
/// A thread safe producer consumer queue implementation. It allows the standard
|
||||
/// push and pop operations. It also maintains a monotonically-increasing
|
||||
/// element position and allows peeking at the element at certain position.
|
||||
template <class T>
|
||||
class ProducerConsumerQueue {
|
||||
public:
|
||||
/// Create a ProducerConsumerQueue.
|
||||
/// @param capacity is the maximum number of elements that the queue can hold
|
||||
/// at once. A value of zero means unlimited capacity.
|
||||
explicit ProducerConsumerQueue(size_t capacity);
|
||||
~ProducerConsumerQueue();
|
||||
|
||||
/// Push an element to the back of the queue. If the queue has reached its
|
||||
/// capacity limit, block until spare capacity is available or time out or
|
||||
/// stopped.
|
||||
/// @param element refers the element to be pushed.
|
||||
/// @param timeout_ms indicates timeout in milliseconds. A value of zero means
|
||||
/// return immediately. A negative value means waiting indefinitely.
|
||||
/// @return OK if the element was pushed successfully, STOPPED if Stop has
|
||||
/// has been called, TIME_OUT if times out.
|
||||
Status Push(const T& element, int64 timeout_ms);
|
||||
|
||||
/// Pop an element from the front of the queue. If the queue is empty, block
|
||||
/// for an element to be available to be consumed or time out or stopped.
|
||||
/// @param[out] element receives the popped element.
|
||||
/// @param timeout_ms indicates timeout in milliseconds. A value of zero means
|
||||
/// return immediately. A negative value means waiting indefinitely.
|
||||
/// @return STOPPED if Stop has been called and the queue is completely empty,
|
||||
/// TIME_OUT if times out, OK otherwise.
|
||||
Status Pop(T* element, int64 timeout_ms);
|
||||
|
||||
/// Peek at the element at the specified position from the queue. If the
|
||||
/// element is not available yet, block until it to be available or time out
|
||||
/// or stopped.
|
||||
/// NOTE: Elements before (pos - capacity/2) will be removed from the queue
|
||||
/// after Peek operation.
|
||||
/// @param pos refers to the element position.
|
||||
/// @param[out] element receives the peeked element.
|
||||
/// @param timeout_ms indicates timeout in milliseconds. A value of zero means
|
||||
/// return immediately. A negative value means waiting indefinitely.
|
||||
/// @return STOPPED if Stop has been called and @a pos is out of range,
|
||||
/// INVALID_ARGUMENT if the pos < Head(), TIME_OUT if times out,
|
||||
/// OK otherwise.
|
||||
Status Peek(size_t pos, T* element, int64 timeout_ms);
|
||||
|
||||
/// Terminate Pop and Peek requests once the queue drains entirely.
|
||||
/// Also terminate all waiting and future Push requests immediately.
|
||||
/// Stop cannot stall.
|
||||
void Stop() {
|
||||
base::AutoLock l(lock_);
|
||||
stop_requested_ = true;
|
||||
not_empty_cv_.Broadcast();
|
||||
not_full_cv_.Broadcast();
|
||||
new_element_cv_.Broadcast();
|
||||
}
|
||||
|
||||
/// @return true if there are no elements in the queue.
|
||||
bool Empty() const {
|
||||
base::AutoLock l(lock_);
|
||||
return q_.empty();
|
||||
}
|
||||
|
||||
/// @return The number of elements in the queue.
|
||||
size_t Size() const {
|
||||
base::AutoLock l(lock_);
|
||||
return q_.size();
|
||||
}
|
||||
|
||||
/// @return The position of the head element in the queue. Note that the
|
||||
/// returned value may be meaningless if the queue is empty.
|
||||
size_t HeadPos() const {
|
||||
base::AutoLock l(lock_);
|
||||
return head_;
|
||||
}
|
||||
|
||||
/// @return The position of the tail element in the queue. Note that the
|
||||
/// returned value may be meaningless if the queue is empty.
|
||||
size_t TailPos() const {
|
||||
base::AutoLock l(lock_);
|
||||
return head_ + q_.size() - 1;
|
||||
}
|
||||
|
||||
/// @return true if the queue has been stopped using Stop(). This allows
|
||||
/// producers to check if they can add new elements to the queue.
|
||||
bool Stopped() const {
|
||||
base::AutoLock l(lock_);
|
||||
return stop_requested_;
|
||||
}
|
||||
|
||||
private:
|
||||
// Move head_ to center on pos.
|
||||
void SlideHeadOnCenter(size_t pos);
|
||||
|
||||
const size_t capacity_; // Maximum number of elements; zero means unlimited.
|
||||
mutable base::Lock lock_; // Lock protecting all other variables below.
|
||||
size_t head_; // Head position.
|
||||
std::deque<T> q_; // Internal queue holding the elements.
|
||||
base::ConditionVariable not_empty_cv_;
|
||||
base::ConditionVariable not_full_cv_;
|
||||
base::ConditionVariable new_element_cv_;
|
||||
bool stop_requested_; // True after Stop has been called.
|
||||
|
||||
DISALLOW_COPY_AND_ASSIGN(ProducerConsumerQueue);
|
||||
};
|
||||
|
||||
// Implementations of non-inline functions.
|
||||
template <class T>
|
||||
ProducerConsumerQueue<T>::ProducerConsumerQueue(size_t capacity)
|
||||
: capacity_(capacity),
|
||||
head_(0),
|
||||
not_empty_cv_(&lock_),
|
||||
not_full_cv_(&lock_),
|
||||
new_element_cv_(&lock_),
|
||||
stop_requested_(false) {}
|
||||
|
||||
template <class T>
|
||||
ProducerConsumerQueue<T>::~ProducerConsumerQueue() {}
|
||||
|
||||
template <class T>
|
||||
Status ProducerConsumerQueue<T>::Push(const T& element, int64 timeout_ms) {
|
||||
base::AutoLock l(lock_);
|
||||
bool woken = false;
|
||||
|
||||
// Check for queue shutdown.
|
||||
if (stop_requested_)
|
||||
return Status(error::STOPPED, "");
|
||||
|
||||
base::ElapsedTimer timer;
|
||||
base::TimeDelta timeout_delta = base::TimeDelta::FromMilliseconds(timeout_ms);
|
||||
|
||||
if (capacity_) {
|
||||
while (q_.size() == capacity_) {
|
||||
if (timeout_ms < 0) {
|
||||
// Wait forever, or until Stop.
|
||||
not_full_cv_.Wait();
|
||||
} else {
|
||||
base::TimeDelta elapsed = timer.Elapsed();
|
||||
if (elapsed < timeout_delta) {
|
||||
// Wait with timeout, or until Stop.
|
||||
not_full_cv_.TimedWait(timeout_delta - elapsed);
|
||||
} else {
|
||||
// We're through waiting.
|
||||
return Status(error::TIME_OUT, "Time out on pushing.");
|
||||
}
|
||||
}
|
||||
// Re-check for queue shutdown after waking from Wait.
|
||||
if (stop_requested_)
|
||||
return Status(error::STOPPED, "");
|
||||
|
||||
woken = true;
|
||||
}
|
||||
DCHECK_LT(q_.size(), capacity_);
|
||||
}
|
||||
|
||||
// Signal consumer to proceed if we are going to create some elements.
|
||||
if (q_.empty())
|
||||
not_empty_cv_.Signal();
|
||||
new_element_cv_.Signal();
|
||||
|
||||
q_.push_back(element);
|
||||
|
||||
// Signal other producers if we just acquired more capacity.
|
||||
if (woken && q_.size() != capacity_)
|
||||
not_full_cv_.Signal();
|
||||
return Status::OK;
|
||||
}
|
||||
|
||||
template <class T>
|
||||
Status ProducerConsumerQueue<T>::Pop(T* element, int64 timeout_ms) {
|
||||
base::AutoLock l(lock_);
|
||||
bool woken = false;
|
||||
|
||||
base::ElapsedTimer timer;
|
||||
base::TimeDelta timeout_delta = base::TimeDelta::FromMilliseconds(timeout_ms);
|
||||
|
||||
while (q_.empty()) {
|
||||
if (stop_requested_)
|
||||
return Status(error::STOPPED, "");
|
||||
|
||||
if (timeout_ms < 0) {
|
||||
// Wait forever, or until Stop.
|
||||
not_empty_cv_.Wait();
|
||||
} else {
|
||||
base::TimeDelta elapsed = timer.Elapsed();
|
||||
if (elapsed < timeout_delta) {
|
||||
// Wait with timeout, or until Stop.
|
||||
not_empty_cv_.TimedWait(timeout_delta - elapsed);
|
||||
} else {
|
||||
// We're through waiting.
|
||||
return Status(error::TIME_OUT, "Time out on popping.");
|
||||
}
|
||||
}
|
||||
woken = true;
|
||||
}
|
||||
|
||||
// Signal producer to proceed if we are going to create some capacity.
|
||||
if (q_.size() == capacity_)
|
||||
not_full_cv_.Signal();
|
||||
|
||||
*element = q_.front();
|
||||
q_.pop_front();
|
||||
++head_;
|
||||
|
||||
// Signal other consumers if we have more elements.
|
||||
if (woken && !q_.empty())
|
||||
not_empty_cv_.Signal();
|
||||
return Status::OK;
|
||||
}
|
||||
|
||||
template <class T>
|
||||
Status ProducerConsumerQueue<T>::Peek(size_t pos,
|
||||
T* element,
|
||||
int64 timeout_ms) {
|
||||
base::AutoLock l(lock_);
|
||||
if (pos < head_) {
|
||||
return Status(
|
||||
error::INVALID_ARGUMENT,
|
||||
base::StringPrintf("pos (%zu) is too small; head is %zu.", pos, head_));
|
||||
}
|
||||
|
||||
bool woken = false;
|
||||
|
||||
base::ElapsedTimer timer;
|
||||
base::TimeDelta timeout_delta = base::TimeDelta::FromMilliseconds(timeout_ms);
|
||||
|
||||
// Move head_ to create some space (move the sliding window centered @ pos).
|
||||
SlideHeadOnCenter(pos);
|
||||
|
||||
while (pos >= head_ + q_.size()) {
|
||||
if (stop_requested_)
|
||||
return Status(error::STOPPED, "");
|
||||
|
||||
if (timeout_ms < 0) {
|
||||
// Wait forever, or until Stop.
|
||||
new_element_cv_.Wait();
|
||||
} else {
|
||||
base::TimeDelta elapsed = timer.Elapsed();
|
||||
if (elapsed < timeout_delta) {
|
||||
// Wait with timeout, or until Stop.
|
||||
new_element_cv_.TimedWait(timeout_delta - elapsed);
|
||||
} else {
|
||||
// We're through waiting.
|
||||
return Status(error::TIME_OUT, "Time out on peeking.");
|
||||
}
|
||||
}
|
||||
// Move head_ to create some space (move the sliding window centered @ pos).
|
||||
SlideHeadOnCenter(pos);
|
||||
woken = true;
|
||||
}
|
||||
|
||||
*element = q_[pos - head_];
|
||||
|
||||
// Signal other consumers if we have more elements.
|
||||
if (woken && !q_.empty())
|
||||
new_element_cv_.Signal();
|
||||
return Status::OK;
|
||||
}
|
||||
|
||||
template <class T>
|
||||
void ProducerConsumerQueue<T>::SlideHeadOnCenter(size_t pos) {
|
||||
lock_.AssertAcquired();
|
||||
|
||||
if (capacity_) {
|
||||
// Signal producer to proceed if we are going to create some capacity.
|
||||
if (q_.size() == capacity_ && pos > head_ + capacity_ / 2)
|
||||
not_full_cv_.Signal();
|
||||
|
||||
while (!q_.empty() && pos > head_ + capacity_ / 2) {
|
||||
++head_;
|
||||
q_.pop_front();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace media
|
||||
|
||||
#endif // MEDIA_BASE_PRODUCER_CONSUMER_QUEUE_H_
|
|
@ -40,6 +40,10 @@ std::string ErrorCodeToString(Code error_code) {
|
|||
return "SERVER_ERROR";
|
||||
case INTERNAL_ERROR:
|
||||
return "INTERNAL_ERROR";
|
||||
case STOPPED:
|
||||
return "STOPPED";
|
||||
case TIME_OUT:
|
||||
return "TIME_OUT";
|
||||
default:
|
||||
NOTIMPLEMENTED() << "Unknown Status Code: " << error_code;
|
||||
return "UNKNOWN_STATUS";
|
||||
|
|
|
@ -59,6 +59,12 @@ enum Code {
|
|||
|
||||
// Internal errors. Some invariants have been broken.
|
||||
INTERNAL_ERROR,
|
||||
|
||||
// The operation was stopped.
|
||||
STOPPED,
|
||||
|
||||
// The operation timed out.
|
||||
TIME_OUT,
|
||||
};
|
||||
|
||||
} // namespace error
|
||||
|
|
Loading…
Reference in New Issue