From 055ccde14fd1c686e817615f01ee1be9d3cabec0 Mon Sep 17 00:00:00 2001 From: Thomas Inskip Date: Thu, 12 Mar 2015 17:54:12 -0700 Subject: [PATCH] Added implementation and tests of IoCache (thread-safe circular memory buffer). Change-Id: Id057b641e4eb6f61d18ec3e0565580b23d1e4297 --- packager/media/file/file.gyp | 15 ++ packager/media/file/io_cache.cc | 139 +++++++++++++ packager/media/file/io_cache.h | 77 +++++++ packager/media/file/io_cache_unittest.cc | 249 +++++++++++++++++++++++ 4 files changed, 480 insertions(+) create mode 100644 packager/media/file/io_cache.cc create mode 100644 packager/media/file/io_cache.h create mode 100644 packager/media/file/io_cache_unittest.cc diff --git a/packager/media/file/file.gyp b/packager/media/file/file.gyp index 8e46c4104a..37e4efec16 100644 --- a/packager/media/file/file.gyp +++ b/packager/media/file/file.gyp @@ -16,6 +16,8 @@ 'file.cc', 'file.h', 'file_closer.h', + 'io_cache.cc', + 'io_cache.h', 'local_file.cc', 'local_file.h', 'udp_file.cc', @@ -38,5 +40,18 @@ 'file', ], }, + { + 'target_name': 'io_cache_unittest', + 'type': '<(gtest_target_type)', + 'sources': [ + 'io_cache_unittest.cc', + ], + 'dependencies': [ + '../../testing/gtest.gyp:gtest', + '../../testing/gtest.gyp:gtest_main', + '../base/media_base.gyp:base', + 'file', + ], + }, ], } diff --git a/packager/media/file/io_cache.cc b/packager/media/file/io_cache.cc new file mode 100644 index 0000000000..136ba09080 --- /dev/null +++ b/packager/media/file/io_cache.cc @@ -0,0 +1,139 @@ +// Copyright 2015 Google Inc. All rights reserved. +// +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file or at +// https://developers.google.com/open-source/licenses/bsd + +#include "media/file/io_cache.h" + +#include + +#include + +#include "base/logging.h" + +namespace edash_packager { + +using base::AutoLock; +using base::AutoUnlock; + +namespace media { + +IoCache::IoCache(uint64_t cache_size) + : cache_size_(cache_size), + read_event_(false, false), + write_event_(false, false), + // Make the buffer one byte larger than the cache so that when the + // condition r_ptr == w_ptr is unambiguous (buffer empty). + circular_buffer_(cache_size + 1), + end_ptr_(&circular_buffer_[0] + cache_size + 1), + r_ptr_(&circular_buffer_[0]), + w_ptr_(&circular_buffer_[0]), + closed_(false) {} + +IoCache::~IoCache() { + Close(); +} + +int64_t IoCache::Read(void* buffer, uint64_t size) { + DCHECK(buffer); + + AutoLock lock(lock_); + while (!closed_ && (BytesCachedInternal() == 0)) { + AutoUnlock unlock(lock_); + write_event_.Wait(); + } + if (closed_) + return 0; + + size = std::min(size, BytesCachedInternal()); + uint64_t first_chunk_size(std::min(size, static_cast( + end_ptr_ - r_ptr_))); + memcpy(buffer, r_ptr_, first_chunk_size); + r_ptr_ += first_chunk_size; + DCHECK_GE(end_ptr_, r_ptr_); + if (r_ptr_ == end_ptr_) + r_ptr_ = &circular_buffer_[0]; + uint64_t second_chunk_size(size - first_chunk_size); + if (second_chunk_size) { + memcpy(static_cast(buffer) + first_chunk_size, r_ptr_, + second_chunk_size); + r_ptr_ += second_chunk_size; + DCHECK_GT(end_ptr_, r_ptr_); + } + read_event_.Signal(); + return size; +} + +int64_t IoCache::Write(const void* buffer, uint64_t size) { + DCHECK(buffer); + + const uint8_t* r_ptr(static_cast(buffer)); + uint64_t bytes_left(size); + while (bytes_left) { + AutoLock lock(lock_); + while (!closed_ && (BytesFreeInternal() == 0)) { + AutoUnlock unlock(lock_); + read_event_.Wait(); + } + if (closed_) + return 0; + + uint64_t write_size(std::min(bytes_left, BytesFreeInternal())); + uint64_t first_chunk_size(std::min(write_size, static_cast( + end_ptr_ - w_ptr_))); + memcpy(w_ptr_, r_ptr, first_chunk_size); + w_ptr_ += first_chunk_size; + DCHECK_GE(end_ptr_, w_ptr_); + if (w_ptr_ == end_ptr_) + w_ptr_ = &circular_buffer_[0]; + r_ptr += first_chunk_size; + uint64_t second_chunk_size(write_size - first_chunk_size); + if (second_chunk_size) { + memcpy(w_ptr_, r_ptr, second_chunk_size); + w_ptr_ += second_chunk_size; + DCHECK_GT(end_ptr_, w_ptr_); + r_ptr += second_chunk_size; + } + bytes_left -= write_size; + write_event_.Signal(); + } + return size; +} + +void IoCache::Clear() { + AutoLock lock(lock_); + r_ptr_ = w_ptr_ = &circular_buffer_[0]; + // Let any writers know that there is room in the cache. + read_event_.Signal(); +} + +void IoCache::Close() { + AutoLock lock(lock_); + closed_ = true; + read_event_.Signal(); + write_event_.Signal(); +} + +uint64_t IoCache::BytesCached() { + AutoLock lock(lock_); + return BytesCachedInternal(); +} + +uint64_t IoCache::BytesFree() { + AutoLock lock(lock_); + return BytesFreeInternal(); +} + +uint64 IoCache::BytesCachedInternal() { + return (r_ptr_ <= w_ptr_) ? + w_ptr_ - r_ptr_ : + (end_ptr_ - r_ptr_) + (w_ptr_ - &circular_buffer_[0]); +} + +uint64 IoCache::BytesFreeInternal() { + return cache_size_ - BytesCachedInternal(); +} + +} // namespace media +} // namespace edash_packager diff --git a/packager/media/file/io_cache.h b/packager/media/file/io_cache.h new file mode 100644 index 0000000000..b6cc6ae9c5 --- /dev/null +++ b/packager/media/file/io_cache.h @@ -0,0 +1,77 @@ +// Copyright 2015 Google Inc. All rights reserved. +// +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file or at +// https://developers.google.com/open-source/licenses/bsd + +#ifndef PACKAGER_FILE_IO_CACHE_H_ +#define PACKAGER_FILE_IO_CACHE_H_ + +#include +#include +#include "packager/base/macros.h" +#include "packager/base/synchronization/lock.h" +#include "packager/base/synchronization/waitable_event.h" + +namespace edash_packager { +namespace media { + +/// Declaration of class which implements a thread-safe circular buffer. +class IoCache { + public: + explicit IoCache(uint64_t cache_size); + virtual ~IoCache(); + + /// Read data from the cache. This function may block until there is data in + /// the cache. + /// @param buffer is a buffer into which to read the data from the cache. + /// @param size is the size of @a buffer. + /// @return the number of bytes read into @a buffer, or 0 if the call + /// unblocked because the cache has been closed. + int64_t Read(void* buffer, uint64_t size); + + /// Write data to the cache. This function may block until there is enough + /// room in the cache. + /// @param buffer is a buffer containing the data to be written to the cache. + /// @param size is the size of the data to be written to the cache. + /// @return the amount of data written to the buffer (which will equal + /// @a data), or 0 if the call unblocked because the cache has been + /// closed. + int64_t Write(const void* buffer, uint64_t size); + + /// Empties the cache. + void Clear(); + + /// Close the cache. This will call any blocking calls to unblock, and the + /// cache won't be usable thereafter. + void Close(); + + /// Returns the number of bytes in the cache. + /// @return the number of bytes in the cache. + uint64_t BytesCached(); + + /// Returns the number of free bytes in the cache. + /// @return the number of free bytes in the cache. + uint64_t BytesFree(); + + private: + uint64_t BytesCachedInternal(); + uint64_t BytesFreeInternal(); + + const uint64_t cache_size_; + base::Lock lock_; + base::WaitableEvent read_event_; + base::WaitableEvent write_event_; + std::vector circular_buffer_; + const uint8_t* end_ptr_; + uint8_t* r_ptr_; + uint8_t* w_ptr_; + bool closed_; + + DISALLOW_COPY_AND_ASSIGN(IoCache); +}; + +} // namespace media +} // namespace edash_packager + +#endif // PACKAGER_FILE_IO_CACHE_H diff --git a/packager/media/file/io_cache_unittest.cc b/packager/media/file/io_cache_unittest.cc new file mode 100644 index 0000000000..a377775d21 --- /dev/null +++ b/packager/media/file/io_cache_unittest.cc @@ -0,0 +1,249 @@ +// Copyright 2015 Google Inc. All rights reserved. +// +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file or at +// https://developers.google.com/open-source/licenses/bsd + +#include +#include +#include +#include "packager/base/bind.h" +#include "packager/base/bind_helpers.h" +#include "packager/base/threading/platform_thread.h" +#include "packager/media/base/closure_thread.h" +#include "packager/media/file/io_cache.h" + +namespace { +const uint64_t kBlockSize = 256; +const uint64_t kCacheSize = 16 * kBlockSize; +} + +namespace edash_packager { +namespace media { + +class IoCacheTest : public testing::Test { + public: + void WriteToCache(const std::vector& test_buffer, + uint64_t num_writes, + int sleep_between_writes, + bool close_when_done) { + for (uint64_t write_idx = 0; write_idx < num_writes; ++write_idx) { + uint64_t write_result = cache_->Write(&test_buffer[0], + test_buffer.size()); + if (!write_result) { + // Cache was closed. + cache_closed_ = true; + break; + } + EXPECT_EQ(test_buffer.size(), write_result); + if (sleep_between_writes) { + base::PlatformThread::Sleep( + base::TimeDelta::FromMilliseconds(sleep_between_writes)); + } + } + if (close_when_done) + cache_->Close(); + } + + protected: + virtual void SetUp() OVERRIDE { + for (unsigned int idx = 0; idx < kBlockSize; ++idx) + reference_block_[idx] = idx; + cache_.reset(new IoCache(kCacheSize)); + cache_closed_ = false; + } + + virtual void TearDown() OVERRIDE { + WaitForWriterThread(); + } + + void GenerateTestBuffer(uint64_t size, std::vector* test_buffer) { + test_buffer->resize(size); + uint8_t* w_ptr(&(*test_buffer)[0]); + while (size) { + uint64_t copy_size(std::min(size, kBlockSize)); + memcpy(w_ptr, reference_block_, copy_size); + w_ptr += copy_size; + size -= copy_size; + } + } + + void WriteToCacheThreaded(const std::vector& test_buffer, + uint64_t num_writes, + int sleep_between_writes, + bool close_when_done) { + writer_thread_.reset(new ClosureThread("WriterThread", + base::Bind( + &IoCacheTest::WriteToCache, + base::Unretained(this), + test_buffer, + num_writes, + sleep_between_writes, + close_when_done))); + writer_thread_->Start(); + } + + + void WaitForWriterThread() { + if (writer_thread_) { + writer_thread_->Join(); + writer_thread_.reset(); + } + } + + scoped_ptr cache_; + scoped_ptr writer_thread_; + uint8_t reference_block_[kBlockSize]; + bool cache_closed_; +}; + +TEST_F(IoCacheTest, VerySmallWrite) { + const uint64_t kTestBytes(5); + + std::vector write_buffer; + GenerateTestBuffer(kTestBytes, &write_buffer); + WriteToCacheThreaded(write_buffer, 1, 0, false); + + std::vector read_buffer(kTestBytes); + EXPECT_EQ(kTestBytes, cache_->Read(&read_buffer[0], kTestBytes)); + EXPECT_EQ(write_buffer, read_buffer); +} + +TEST_F(IoCacheTest, LotsOfAlignedBlocks) { + const uint64_t kNumWrites(kCacheSize * 1000 / kBlockSize); + + std::vector write_buffer; + GenerateTestBuffer(kBlockSize, &write_buffer); + WriteToCacheThreaded(write_buffer, kNumWrites, 0, false); + for (uint64_t num_reads = 0; num_reads < kNumWrites; ++num_reads) { + std::vector read_buffer(kBlockSize); + EXPECT_EQ(kBlockSize, cache_->Read(&read_buffer[0], kBlockSize)); + EXPECT_EQ(write_buffer, read_buffer); + } +} + +TEST_F(IoCacheTest, LotsOfUnalignedBlocks) { + const uint64_t kNumWrites(kCacheSize * 1000 / kBlockSize); + const uint64_t kUnalignBlockSize(55); + + std::vector write_buffer1; + GenerateTestBuffer(kUnalignBlockSize, &write_buffer1); + WriteToCacheThreaded(write_buffer1, 1, 0, false); + WaitForWriterThread(); + std::vector write_buffer2; + GenerateTestBuffer(kBlockSize, &write_buffer2); + WriteToCacheThreaded(write_buffer2, kNumWrites, 0, false); + + std::vector read_buffer1(kUnalignBlockSize); + EXPECT_EQ(kUnalignBlockSize, cache_->Read(&read_buffer1[0], + kUnalignBlockSize)); + EXPECT_EQ(write_buffer1, read_buffer1); + std::vector verify_buffer; + for (uint64_t idx = 0; idx < kNumWrites; ++idx) + verify_buffer.insert(verify_buffer.end(), + write_buffer2.begin(), + write_buffer2.end()); + uint64_t verify_index(0); + while (verify_index < verify_buffer.size()) { + std::vector read_buffer2(kBlockSize); + uint64_t bytes_read = cache_->Read(&read_buffer2[0], kBlockSize); + EXPECT_NE(0, bytes_read); + EXPECT_FALSE(memcmp(&verify_buffer[verify_index], + &read_buffer2[0], + bytes_read)); + verify_index += bytes_read; + } +} + +TEST_F(IoCacheTest, SlowWrite) { + const int kWriteDelayMs(50); + const uint64_t kNumWrites(kCacheSize * 5 / kBlockSize); + + std::vector write_buffer; + GenerateTestBuffer(kBlockSize, &write_buffer); + WriteToCacheThreaded(write_buffer, kNumWrites, kWriteDelayMs, false); + for (uint64_t num_reads = 0; num_reads < kNumWrites; ++num_reads) { + std::vector read_buffer(kBlockSize); + EXPECT_EQ(kBlockSize, cache_->Read(&read_buffer[0], kBlockSize)); + EXPECT_EQ(write_buffer, read_buffer); + } +} + +TEST_F(IoCacheTest, SlowRead) { + const int kReadDelayMs(50); + const uint64_t kNumWrites(kCacheSize * 5 / kBlockSize); + + std::vector write_buffer; + GenerateTestBuffer(kBlockSize, &write_buffer); + WriteToCacheThreaded(write_buffer, kNumWrites, 0, false); + for (uint64_t num_reads = 0; num_reads < kNumWrites; ++num_reads) { + std::vector read_buffer(kBlockSize); + EXPECT_EQ(kBlockSize, cache_->Read(&read_buffer[0], kBlockSize)); + EXPECT_EQ(write_buffer, read_buffer); + base::PlatformThread::Sleep( + base::TimeDelta::FromMilliseconds(kReadDelayMs)); + } +} + +TEST_F(IoCacheTest, CloseByReader) { + const uint64_t kNumWrites(kCacheSize * 1000 / kBlockSize); + + std::vector write_buffer; + GenerateTestBuffer(kBlockSize, &write_buffer); + WriteToCacheThreaded(write_buffer, kNumWrites, 0, false); + while (cache_->BytesCached() < kCacheSize) { + base::PlatformThread::Sleep( + base::TimeDelta::FromMilliseconds(10)); + } + cache_->Close(); + WaitForWriterThread(); + EXPECT_TRUE(cache_closed_); +} + +TEST_F(IoCacheTest, CloseByWriter) { + uint8_t test_buffer[kBlockSize]; + std::vector write_buffer; + WriteToCacheThreaded(write_buffer, 0, 0, true); + EXPECT_EQ(0, cache_->Read(test_buffer, kBlockSize)); + WaitForWriterThread(); +} + +TEST_F(IoCacheTest, SingleLargeWrite) { + const uint64_t kTestBytes(kCacheSize * 10); + + std::vector write_buffer; + GenerateTestBuffer(kTestBytes, &write_buffer); + WriteToCacheThreaded(write_buffer, 1, 0, false); + uint64_t bytes_read(0); + std::vector read_buffer(kTestBytes); + while (bytes_read < kTestBytes) { + EXPECT_EQ(kBlockSize, cache_->Read(&read_buffer[bytes_read], kBlockSize)); + bytes_read += kBlockSize; + } + EXPECT_EQ(write_buffer, read_buffer); +} + +TEST_F(IoCacheTest, LargeRead) { + const uint64_t kNumWrites(kCacheSize * 10 / kBlockSize); + + std::vector write_buffer; + GenerateTestBuffer(kBlockSize, &write_buffer); + WriteToCacheThreaded(write_buffer, kNumWrites, 0, false); + std::vector verify_buffer; + while (verify_buffer.size() < kCacheSize) { + verify_buffer.insert(verify_buffer.end(), + write_buffer.begin(), + write_buffer.end()); + } + while (cache_->BytesCached() < kCacheSize) { + base::PlatformThread::Sleep( + base::TimeDelta::FromMilliseconds(10)); + } + std::vector read_buffer(kCacheSize); + EXPECT_EQ(kCacheSize, cache_->Read(&read_buffer[0], kCacheSize)); + EXPECT_EQ(verify_buffer, read_buffer); + cache_->Close(); +} + +} // namespace media +} // namespace edash_packager