Added implementation and tests of IoCache (thread-safe circular memory buffer).

Change-Id: Id057b641e4eb6f61d18ec3e0565580b23d1e4297
This commit is contained in:
Thomas Inskip 2015-03-12 17:54:12 -07:00 committed by Gerrit Code Review
parent 6a7c6a3f7a
commit 055ccde14f
4 changed files with 480 additions and 0 deletions

View File

@ -16,6 +16,8 @@
'file.cc',
'file.h',
'file_closer.h',
'io_cache.cc',
'io_cache.h',
'local_file.cc',
'local_file.h',
'udp_file.cc',
@ -38,5 +40,18 @@
'file',
],
},
{
'target_name': 'io_cache_unittest',
'type': '<(gtest_target_type)',
'sources': [
'io_cache_unittest.cc',
],
'dependencies': [
'../../testing/gtest.gyp:gtest',
'../../testing/gtest.gyp:gtest_main',
'../base/media_base.gyp:base',
'file',
],
},
],
}

View File

@ -0,0 +1,139 @@
// Copyright 2015 Google Inc. All rights reserved.
//
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file or at
// https://developers.google.com/open-source/licenses/bsd
#include "media/file/io_cache.h"
#include <string.h>
#include <algorithm>
#include "base/logging.h"
namespace edash_packager {
using base::AutoLock;
using base::AutoUnlock;
namespace media {
IoCache::IoCache(uint64_t cache_size)
: cache_size_(cache_size),
read_event_(false, false),
write_event_(false, false),
// Make the buffer one byte larger than the cache so that when the
// condition r_ptr == w_ptr is unambiguous (buffer empty).
circular_buffer_(cache_size + 1),
end_ptr_(&circular_buffer_[0] + cache_size + 1),
r_ptr_(&circular_buffer_[0]),
w_ptr_(&circular_buffer_[0]),
closed_(false) {}
IoCache::~IoCache() {
Close();
}
int64_t IoCache::Read(void* buffer, uint64_t size) {
DCHECK(buffer);
AutoLock lock(lock_);
while (!closed_ && (BytesCachedInternal() == 0)) {
AutoUnlock unlock(lock_);
write_event_.Wait();
}
if (closed_)
return 0;
size = std::min(size, BytesCachedInternal());
uint64_t first_chunk_size(std::min(size, static_cast<uint64_t>(
end_ptr_ - r_ptr_)));
memcpy(buffer, r_ptr_, first_chunk_size);
r_ptr_ += first_chunk_size;
DCHECK_GE(end_ptr_, r_ptr_);
if (r_ptr_ == end_ptr_)
r_ptr_ = &circular_buffer_[0];
uint64_t second_chunk_size(size - first_chunk_size);
if (second_chunk_size) {
memcpy(static_cast<uint8_t*>(buffer) + first_chunk_size, r_ptr_,
second_chunk_size);
r_ptr_ += second_chunk_size;
DCHECK_GT(end_ptr_, r_ptr_);
}
read_event_.Signal();
return size;
}
int64_t IoCache::Write(const void* buffer, uint64_t size) {
DCHECK(buffer);
const uint8_t* r_ptr(static_cast<const uint8_t*>(buffer));
uint64_t bytes_left(size);
while (bytes_left) {
AutoLock lock(lock_);
while (!closed_ && (BytesFreeInternal() == 0)) {
AutoUnlock unlock(lock_);
read_event_.Wait();
}
if (closed_)
return 0;
uint64_t write_size(std::min(bytes_left, BytesFreeInternal()));
uint64_t first_chunk_size(std::min(write_size, static_cast<uint64_t>(
end_ptr_ - w_ptr_)));
memcpy(w_ptr_, r_ptr, first_chunk_size);
w_ptr_ += first_chunk_size;
DCHECK_GE(end_ptr_, w_ptr_);
if (w_ptr_ == end_ptr_)
w_ptr_ = &circular_buffer_[0];
r_ptr += first_chunk_size;
uint64_t second_chunk_size(write_size - first_chunk_size);
if (second_chunk_size) {
memcpy(w_ptr_, r_ptr, second_chunk_size);
w_ptr_ += second_chunk_size;
DCHECK_GT(end_ptr_, w_ptr_);
r_ptr += second_chunk_size;
}
bytes_left -= write_size;
write_event_.Signal();
}
return size;
}
void IoCache::Clear() {
AutoLock lock(lock_);
r_ptr_ = w_ptr_ = &circular_buffer_[0];
// Let any writers know that there is room in the cache.
read_event_.Signal();
}
void IoCache::Close() {
AutoLock lock(lock_);
closed_ = true;
read_event_.Signal();
write_event_.Signal();
}
uint64_t IoCache::BytesCached() {
AutoLock lock(lock_);
return BytesCachedInternal();
}
uint64_t IoCache::BytesFree() {
AutoLock lock(lock_);
return BytesFreeInternal();
}
uint64 IoCache::BytesCachedInternal() {
return (r_ptr_ <= w_ptr_) ?
w_ptr_ - r_ptr_ :
(end_ptr_ - r_ptr_) + (w_ptr_ - &circular_buffer_[0]);
}
uint64 IoCache::BytesFreeInternal() {
return cache_size_ - BytesCachedInternal();
}
} // namespace media
} // namespace edash_packager

View File

@ -0,0 +1,77 @@
// Copyright 2015 Google Inc. All rights reserved.
//
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file or at
// https://developers.google.com/open-source/licenses/bsd
#ifndef PACKAGER_FILE_IO_CACHE_H_
#define PACKAGER_FILE_IO_CACHE_H_
#include <stdint.h>
#include <vector>
#include "packager/base/macros.h"
#include "packager/base/synchronization/lock.h"
#include "packager/base/synchronization/waitable_event.h"
namespace edash_packager {
namespace media {
/// Declaration of class which implements a thread-safe circular buffer.
class IoCache {
public:
explicit IoCache(uint64_t cache_size);
virtual ~IoCache();
/// Read data from the cache. This function may block until there is data in
/// the cache.
/// @param buffer is a buffer into which to read the data from the cache.
/// @param size is the size of @a buffer.
/// @return the number of bytes read into @a buffer, or 0 if the call
/// unblocked because the cache has been closed.
int64_t Read(void* buffer, uint64_t size);
/// Write data to the cache. This function may block until there is enough
/// room in the cache.
/// @param buffer is a buffer containing the data to be written to the cache.
/// @param size is the size of the data to be written to the cache.
/// @return the amount of data written to the buffer (which will equal
/// @a data), or 0 if the call unblocked because the cache has been
/// closed.
int64_t Write(const void* buffer, uint64_t size);
/// Empties the cache.
void Clear();
/// Close the cache. This will call any blocking calls to unblock, and the
/// cache won't be usable thereafter.
void Close();
/// Returns the number of bytes in the cache.
/// @return the number of bytes in the cache.
uint64_t BytesCached();
/// Returns the number of free bytes in the cache.
/// @return the number of free bytes in the cache.
uint64_t BytesFree();
private:
uint64_t BytesCachedInternal();
uint64_t BytesFreeInternal();
const uint64_t cache_size_;
base::Lock lock_;
base::WaitableEvent read_event_;
base::WaitableEvent write_event_;
std::vector<uint8_t> circular_buffer_;
const uint8_t* end_ptr_;
uint8_t* r_ptr_;
uint8_t* w_ptr_;
bool closed_;
DISALLOW_COPY_AND_ASSIGN(IoCache);
};
} // namespace media
} // namespace edash_packager
#endif // PACKAGER_FILE_IO_CACHE_H

View File

@ -0,0 +1,249 @@
// Copyright 2015 Google Inc. All rights reserved.
//
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file or at
// https://developers.google.com/open-source/licenses/bsd
#include <gtest/gtest.h>
#include <string.h>
#include <algorithm>
#include "packager/base/bind.h"
#include "packager/base/bind_helpers.h"
#include "packager/base/threading/platform_thread.h"
#include "packager/media/base/closure_thread.h"
#include "packager/media/file/io_cache.h"
namespace {
const uint64_t kBlockSize = 256;
const uint64_t kCacheSize = 16 * kBlockSize;
}
namespace edash_packager {
namespace media {
class IoCacheTest : public testing::Test {
public:
void WriteToCache(const std::vector<uint8_t>& test_buffer,
uint64_t num_writes,
int sleep_between_writes,
bool close_when_done) {
for (uint64_t write_idx = 0; write_idx < num_writes; ++write_idx) {
uint64_t write_result = cache_->Write(&test_buffer[0],
test_buffer.size());
if (!write_result) {
// Cache was closed.
cache_closed_ = true;
break;
}
EXPECT_EQ(test_buffer.size(), write_result);
if (sleep_between_writes) {
base::PlatformThread::Sleep(
base::TimeDelta::FromMilliseconds(sleep_between_writes));
}
}
if (close_when_done)
cache_->Close();
}
protected:
virtual void SetUp() OVERRIDE {
for (unsigned int idx = 0; idx < kBlockSize; ++idx)
reference_block_[idx] = idx;
cache_.reset(new IoCache(kCacheSize));
cache_closed_ = false;
}
virtual void TearDown() OVERRIDE {
WaitForWriterThread();
}
void GenerateTestBuffer(uint64_t size, std::vector<uint8_t>* test_buffer) {
test_buffer->resize(size);
uint8_t* w_ptr(&(*test_buffer)[0]);
while (size) {
uint64_t copy_size(std::min(size, kBlockSize));
memcpy(w_ptr, reference_block_, copy_size);
w_ptr += copy_size;
size -= copy_size;
}
}
void WriteToCacheThreaded(const std::vector<uint8_t>& test_buffer,
uint64_t num_writes,
int sleep_between_writes,
bool close_when_done) {
writer_thread_.reset(new ClosureThread("WriterThread",
base::Bind(
&IoCacheTest::WriteToCache,
base::Unretained(this),
test_buffer,
num_writes,
sleep_between_writes,
close_when_done)));
writer_thread_->Start();
}
void WaitForWriterThread() {
if (writer_thread_) {
writer_thread_->Join();
writer_thread_.reset();
}
}
scoped_ptr<IoCache> cache_;
scoped_ptr<ClosureThread> writer_thread_;
uint8_t reference_block_[kBlockSize];
bool cache_closed_;
};
TEST_F(IoCacheTest, VerySmallWrite) {
const uint64_t kTestBytes(5);
std::vector<uint8_t> write_buffer;
GenerateTestBuffer(kTestBytes, &write_buffer);
WriteToCacheThreaded(write_buffer, 1, 0, false);
std::vector<uint8_t> read_buffer(kTestBytes);
EXPECT_EQ(kTestBytes, cache_->Read(&read_buffer[0], kTestBytes));
EXPECT_EQ(write_buffer, read_buffer);
}
TEST_F(IoCacheTest, LotsOfAlignedBlocks) {
const uint64_t kNumWrites(kCacheSize * 1000 / kBlockSize);
std::vector<uint8_t> write_buffer;
GenerateTestBuffer(kBlockSize, &write_buffer);
WriteToCacheThreaded(write_buffer, kNumWrites, 0, false);
for (uint64_t num_reads = 0; num_reads < kNumWrites; ++num_reads) {
std::vector<uint8_t> read_buffer(kBlockSize);
EXPECT_EQ(kBlockSize, cache_->Read(&read_buffer[0], kBlockSize));
EXPECT_EQ(write_buffer, read_buffer);
}
}
TEST_F(IoCacheTest, LotsOfUnalignedBlocks) {
const uint64_t kNumWrites(kCacheSize * 1000 / kBlockSize);
const uint64_t kUnalignBlockSize(55);
std::vector<uint8_t> write_buffer1;
GenerateTestBuffer(kUnalignBlockSize, &write_buffer1);
WriteToCacheThreaded(write_buffer1, 1, 0, false);
WaitForWriterThread();
std::vector<uint8_t> write_buffer2;
GenerateTestBuffer(kBlockSize, &write_buffer2);
WriteToCacheThreaded(write_buffer2, kNumWrites, 0, false);
std::vector<uint8_t> read_buffer1(kUnalignBlockSize);
EXPECT_EQ(kUnalignBlockSize, cache_->Read(&read_buffer1[0],
kUnalignBlockSize));
EXPECT_EQ(write_buffer1, read_buffer1);
std::vector<uint8> verify_buffer;
for (uint64_t idx = 0; idx < kNumWrites; ++idx)
verify_buffer.insert(verify_buffer.end(),
write_buffer2.begin(),
write_buffer2.end());
uint64_t verify_index(0);
while (verify_index < verify_buffer.size()) {
std::vector<uint8_t> read_buffer2(kBlockSize);
uint64_t bytes_read = cache_->Read(&read_buffer2[0], kBlockSize);
EXPECT_NE(0, bytes_read);
EXPECT_FALSE(memcmp(&verify_buffer[verify_index],
&read_buffer2[0],
bytes_read));
verify_index += bytes_read;
}
}
TEST_F(IoCacheTest, SlowWrite) {
const int kWriteDelayMs(50);
const uint64_t kNumWrites(kCacheSize * 5 / kBlockSize);
std::vector<uint8_t> write_buffer;
GenerateTestBuffer(kBlockSize, &write_buffer);
WriteToCacheThreaded(write_buffer, kNumWrites, kWriteDelayMs, false);
for (uint64_t num_reads = 0; num_reads < kNumWrites; ++num_reads) {
std::vector<uint8_t> read_buffer(kBlockSize);
EXPECT_EQ(kBlockSize, cache_->Read(&read_buffer[0], kBlockSize));
EXPECT_EQ(write_buffer, read_buffer);
}
}
TEST_F(IoCacheTest, SlowRead) {
const int kReadDelayMs(50);
const uint64_t kNumWrites(kCacheSize * 5 / kBlockSize);
std::vector<uint8_t> write_buffer;
GenerateTestBuffer(kBlockSize, &write_buffer);
WriteToCacheThreaded(write_buffer, kNumWrites, 0, false);
for (uint64_t num_reads = 0; num_reads < kNumWrites; ++num_reads) {
std::vector<uint8_t> read_buffer(kBlockSize);
EXPECT_EQ(kBlockSize, cache_->Read(&read_buffer[0], kBlockSize));
EXPECT_EQ(write_buffer, read_buffer);
base::PlatformThread::Sleep(
base::TimeDelta::FromMilliseconds(kReadDelayMs));
}
}
TEST_F(IoCacheTest, CloseByReader) {
const uint64_t kNumWrites(kCacheSize * 1000 / kBlockSize);
std::vector<uint8_t> write_buffer;
GenerateTestBuffer(kBlockSize, &write_buffer);
WriteToCacheThreaded(write_buffer, kNumWrites, 0, false);
while (cache_->BytesCached() < kCacheSize) {
base::PlatformThread::Sleep(
base::TimeDelta::FromMilliseconds(10));
}
cache_->Close();
WaitForWriterThread();
EXPECT_TRUE(cache_closed_);
}
TEST_F(IoCacheTest, CloseByWriter) {
uint8_t test_buffer[kBlockSize];
std::vector<uint8_t> write_buffer;
WriteToCacheThreaded(write_buffer, 0, 0, true);
EXPECT_EQ(0, cache_->Read(test_buffer, kBlockSize));
WaitForWriterThread();
}
TEST_F(IoCacheTest, SingleLargeWrite) {
const uint64_t kTestBytes(kCacheSize * 10);
std::vector<uint8_t> write_buffer;
GenerateTestBuffer(kTestBytes, &write_buffer);
WriteToCacheThreaded(write_buffer, 1, 0, false);
uint64_t bytes_read(0);
std::vector<uint8_t> read_buffer(kTestBytes);
while (bytes_read < kTestBytes) {
EXPECT_EQ(kBlockSize, cache_->Read(&read_buffer[bytes_read], kBlockSize));
bytes_read += kBlockSize;
}
EXPECT_EQ(write_buffer, read_buffer);
}
TEST_F(IoCacheTest, LargeRead) {
const uint64_t kNumWrites(kCacheSize * 10 / kBlockSize);
std::vector<uint8_t> write_buffer;
GenerateTestBuffer(kBlockSize, &write_buffer);
WriteToCacheThreaded(write_buffer, kNumWrites, 0, false);
std::vector<uint8_t> verify_buffer;
while (verify_buffer.size() < kCacheSize) {
verify_buffer.insert(verify_buffer.end(),
write_buffer.begin(),
write_buffer.end());
}
while (cache_->BytesCached() < kCacheSize) {
base::PlatformThread::Sleep(
base::TimeDelta::FromMilliseconds(10));
}
std::vector<uint8_t> read_buffer(kCacheSize);
EXPECT_EQ(kCacheSize, cache_->Read(&read_buffer[0], kCacheSize));
EXPECT_EQ(verify_buffer, read_buffer);
cache_->Close();
}
} // namespace media
} // namespace edash_packager