From 7e9b8aa5a48642e7e55df3c571000ba150ca46a8 Mon Sep 17 00:00:00 2001 From: Kongqun Yang Date: Mon, 7 Apr 2014 12:40:52 -0700 Subject: [PATCH] A thread safe producer consumer queue implementation Change-Id: I827d8959d9c7c398178d083d01fafdb1779805da --- media/base/media_base.gyp | 1 + media/base/producer_consumer_queue.h | 296 +++++++++++++++++++++++++++ media/base/status.cc | 4 + media/base/status.h | 6 + 4 files changed, 307 insertions(+) create mode 100644 media/base/producer_consumer_queue.h diff --git a/media/base/media_base.gyp b/media/base/media_base.gyp index 2a21b3f172..472de8e69f 100644 --- a/media/base/media_base.gyp +++ b/media/base/media_base.gyp @@ -86,6 +86,7 @@ 'muxer.h', 'muxer_options.cc', 'muxer_options.h', + 'producer_consumer_queue.h', 'request_signer.cc', 'request_signer.h', 'rsa_key.cc', diff --git a/media/base/producer_consumer_queue.h b/media/base/producer_consumer_queue.h new file mode 100644 index 0000000000..b8eda996b7 --- /dev/null +++ b/media/base/producer_consumer_queue.h @@ -0,0 +1,296 @@ +// Copyright 2014 Google Inc. All rights reserved. +// +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file or at +// https://developers.google.com/open-source/licenses/bsd + +#ifndef MEDIA_BASE_PRODUCER_CONSUMER_QUEUE_H_ +#define MEDIA_BASE_PRODUCER_CONSUMER_QUEUE_H_ + +#include + +#include "base/strings/stringprintf.h" +#include "base/synchronization/condition_variable.h" +#include "base/synchronization/lock.h" +#include "base/timer/elapsed_timer.h" +#include "media/base/status.h" + +namespace media { + +/// A thread safe producer consumer queue implementation. It allows the standard +/// push and pop operations. It also maintains a monotonically-increasing +/// element position and allows peeking at the element at certain position. +template +class ProducerConsumerQueue { + public: + /// Create a ProducerConsumerQueue. + /// @param capacity is the maximum number of elements that the queue can hold + /// at once. A value of zero means unlimited capacity. + explicit ProducerConsumerQueue(size_t capacity); + ~ProducerConsumerQueue(); + + /// Push an element to the back of the queue. If the queue has reached its + /// capacity limit, block until spare capacity is available or time out or + /// stopped. + /// @param element refers the element to be pushed. + /// @param timeout_ms indicates timeout in milliseconds. A value of zero means + /// return immediately. A negative value means waiting indefinitely. + /// @return OK if the element was pushed successfully, STOPPED if Stop has + /// has been called, TIME_OUT if times out. + Status Push(const T& element, int64 timeout_ms); + + /// Pop an element from the front of the queue. If the queue is empty, block + /// for an element to be available to be consumed or time out or stopped. + /// @param[out] element receives the popped element. + /// @param timeout_ms indicates timeout in milliseconds. A value of zero means + /// return immediately. A negative value means waiting indefinitely. + /// @return STOPPED if Stop has been called and the queue is completely empty, + /// TIME_OUT if times out, OK otherwise. + Status Pop(T* element, int64 timeout_ms); + + /// Peek at the element at the specified position from the queue. If the + /// element is not available yet, block until it to be available or time out + /// or stopped. + /// NOTE: Elements before (pos - capacity/2) will be removed from the queue + /// after Peek operation. + /// @param pos refers to the element position. + /// @param[out] element receives the peeked element. + /// @param timeout_ms indicates timeout in milliseconds. A value of zero means + /// return immediately. A negative value means waiting indefinitely. + /// @return STOPPED if Stop has been called and @a pos is out of range, + /// INVALID_ARGUMENT if the pos < Head(), TIME_OUT if times out, + /// OK otherwise. + Status Peek(size_t pos, T* element, int64 timeout_ms); + + /// Terminate Pop and Peek requests once the queue drains entirely. + /// Also terminate all waiting and future Push requests immediately. + /// Stop cannot stall. + void Stop() { + base::AutoLock l(lock_); + stop_requested_ = true; + not_empty_cv_.Broadcast(); + not_full_cv_.Broadcast(); + new_element_cv_.Broadcast(); + } + + /// @return true if there are no elements in the queue. + bool Empty() const { + base::AutoLock l(lock_); + return q_.empty(); + } + + /// @return The number of elements in the queue. + size_t Size() const { + base::AutoLock l(lock_); + return q_.size(); + } + + /// @return The position of the head element in the queue. Note that the + /// returned value may be meaningless if the queue is empty. + size_t HeadPos() const { + base::AutoLock l(lock_); + return head_; + } + + /// @return The position of the tail element in the queue. Note that the + /// returned value may be meaningless if the queue is empty. + size_t TailPos() const { + base::AutoLock l(lock_); + return head_ + q_.size() - 1; + } + + /// @return true if the queue has been stopped using Stop(). This allows + /// producers to check if they can add new elements to the queue. + bool Stopped() const { + base::AutoLock l(lock_); + return stop_requested_; + } + + private: + // Move head_ to center on pos. + void SlideHeadOnCenter(size_t pos); + + const size_t capacity_; // Maximum number of elements; zero means unlimited. + mutable base::Lock lock_; // Lock protecting all other variables below. + size_t head_; // Head position. + std::deque q_; // Internal queue holding the elements. + base::ConditionVariable not_empty_cv_; + base::ConditionVariable not_full_cv_; + base::ConditionVariable new_element_cv_; + bool stop_requested_; // True after Stop has been called. + + DISALLOW_COPY_AND_ASSIGN(ProducerConsumerQueue); +}; + +// Implementations of non-inline functions. +template +ProducerConsumerQueue::ProducerConsumerQueue(size_t capacity) + : capacity_(capacity), + head_(0), + not_empty_cv_(&lock_), + not_full_cv_(&lock_), + new_element_cv_(&lock_), + stop_requested_(false) {} + +template +ProducerConsumerQueue::~ProducerConsumerQueue() {} + +template +Status ProducerConsumerQueue::Push(const T& element, int64 timeout_ms) { + base::AutoLock l(lock_); + bool woken = false; + + // Check for queue shutdown. + if (stop_requested_) + return Status(error::STOPPED, ""); + + base::ElapsedTimer timer; + base::TimeDelta timeout_delta = base::TimeDelta::FromMilliseconds(timeout_ms); + + if (capacity_) { + while (q_.size() == capacity_) { + if (timeout_ms < 0) { + // Wait forever, or until Stop. + not_full_cv_.Wait(); + } else { + base::TimeDelta elapsed = timer.Elapsed(); + if (elapsed < timeout_delta) { + // Wait with timeout, or until Stop. + not_full_cv_.TimedWait(timeout_delta - elapsed); + } else { + // We're through waiting. + return Status(error::TIME_OUT, "Time out on pushing."); + } + } + // Re-check for queue shutdown after waking from Wait. + if (stop_requested_) + return Status(error::STOPPED, ""); + + woken = true; + } + DCHECK_LT(q_.size(), capacity_); + } + + // Signal consumer to proceed if we are going to create some elements. + if (q_.empty()) + not_empty_cv_.Signal(); + new_element_cv_.Signal(); + + q_.push_back(element); + + // Signal other producers if we just acquired more capacity. + if (woken && q_.size() != capacity_) + not_full_cv_.Signal(); + return Status::OK; +} + +template +Status ProducerConsumerQueue::Pop(T* element, int64 timeout_ms) { + base::AutoLock l(lock_); + bool woken = false; + + base::ElapsedTimer timer; + base::TimeDelta timeout_delta = base::TimeDelta::FromMilliseconds(timeout_ms); + + while (q_.empty()) { + if (stop_requested_) + return Status(error::STOPPED, ""); + + if (timeout_ms < 0) { + // Wait forever, or until Stop. + not_empty_cv_.Wait(); + } else { + base::TimeDelta elapsed = timer.Elapsed(); + if (elapsed < timeout_delta) { + // Wait with timeout, or until Stop. + not_empty_cv_.TimedWait(timeout_delta - elapsed); + } else { + // We're through waiting. + return Status(error::TIME_OUT, "Time out on popping."); + } + } + woken = true; + } + + // Signal producer to proceed if we are going to create some capacity. + if (q_.size() == capacity_) + not_full_cv_.Signal(); + + *element = q_.front(); + q_.pop_front(); + ++head_; + + // Signal other consumers if we have more elements. + if (woken && !q_.empty()) + not_empty_cv_.Signal(); + return Status::OK; +} + +template +Status ProducerConsumerQueue::Peek(size_t pos, + T* element, + int64 timeout_ms) { + base::AutoLock l(lock_); + if (pos < head_) { + return Status( + error::INVALID_ARGUMENT, + base::StringPrintf("pos (%zu) is too small; head is %zu.", pos, head_)); + } + + bool woken = false; + + base::ElapsedTimer timer; + base::TimeDelta timeout_delta = base::TimeDelta::FromMilliseconds(timeout_ms); + + // Move head_ to create some space (move the sliding window centered @ pos). + SlideHeadOnCenter(pos); + + while (pos >= head_ + q_.size()) { + if (stop_requested_) + return Status(error::STOPPED, ""); + + if (timeout_ms < 0) { + // Wait forever, or until Stop. + new_element_cv_.Wait(); + } else { + base::TimeDelta elapsed = timer.Elapsed(); + if (elapsed < timeout_delta) { + // Wait with timeout, or until Stop. + new_element_cv_.TimedWait(timeout_delta - elapsed); + } else { + // We're through waiting. + return Status(error::TIME_OUT, "Time out on peeking."); + } + } + // Move head_ to create some space (move the sliding window centered @ pos). + SlideHeadOnCenter(pos); + woken = true; + } + + *element = q_[pos - head_]; + + // Signal other consumers if we have more elements. + if (woken && !q_.empty()) + new_element_cv_.Signal(); + return Status::OK; +} + +template +void ProducerConsumerQueue::SlideHeadOnCenter(size_t pos) { + lock_.AssertAcquired(); + + if (capacity_) { + // Signal producer to proceed if we are going to create some capacity. + if (q_.size() == capacity_ && pos > head_ + capacity_ / 2) + not_full_cv_.Signal(); + + while (!q_.empty() && pos > head_ + capacity_ / 2) { + ++head_; + q_.pop_front(); + } + } +} + +} // namespace media + +#endif // MEDIA_BASE_PRODUCER_CONSUMER_QUEUE_H_ diff --git a/media/base/status.cc b/media/base/status.cc index d21059a779..b4c5f613b7 100644 --- a/media/base/status.cc +++ b/media/base/status.cc @@ -40,6 +40,10 @@ std::string ErrorCodeToString(Code error_code) { return "SERVER_ERROR"; case INTERNAL_ERROR: return "INTERNAL_ERROR"; + case STOPPED: + return "STOPPED"; + case TIME_OUT: + return "TIME_OUT"; default: NOTIMPLEMENTED() << "Unknown Status Code: " << error_code; return "UNKNOWN_STATUS"; diff --git a/media/base/status.h b/media/base/status.h index 6ed6189ec8..c227de6f39 100644 --- a/media/base/status.h +++ b/media/base/status.h @@ -59,6 +59,12 @@ enum Code { // Internal errors. Some invariants have been broken. INTERNAL_ERROR, + + // The operation was stopped. + STOPPED, + + // The operation timed out. + TIME_OUT, }; } // namespace error