Fix for race condition when flushing ThreadedIoFile.

Change-Id: I46f26fa9fddf53ca5231c31d6442053ab5202ade
This commit is contained in:
Thomas Inskip 2015-10-16 13:10:42 -07:00 committed by Gerrit Code Review
parent e9437068cf
commit 05f80f961c
7 changed files with 117 additions and 33 deletions

View File

@ -124,5 +124,26 @@ TEST_F(LocalFileTest, WriteRead) {
EXPECT_EQ(data_, read_data); EXPECT_EQ(data_, read_data);
} }
TEST_F(LocalFileTest, WriteFlushCheckSize) {
const uint32 kNumCycles(10);
const uint32 kNumWrites(10);
for (uint32 cycle_idx = 0; cycle_idx < kNumCycles; ++cycle_idx) {
// Write file using File API, using file name directly (without prefix).
File* file = File::Open(local_file_name_no_prefix_.c_str(), "w");
ASSERT_TRUE(file != NULL);
for (uint32 write_idx = 0; write_idx < kNumWrites; ++write_idx)
EXPECT_EQ(kDataSize, file->Write(data_.data(), kDataSize));
ASSERT_NO_FATAL_FAILURE(file->Flush());
EXPECT_TRUE(file->Close());
file = File::Open(local_file_name_.c_str(), "r");
ASSERT_TRUE(file != NULL);
EXPECT_EQ(static_cast<int64_t>(data_.size() * kNumWrites), file->Size());
EXPECT_TRUE(file->Close());
}
}
} // namespace media } // namespace media
} // namespace edash_packager } // namespace edash_packager

View File

@ -11,6 +11,7 @@
#include <algorithm> #include <algorithm>
#include "packager/base/logging.h" #include "packager/base/logging.h"
#include "packager/base/stl_util.h"
namespace edash_packager { namespace edash_packager {
@ -27,8 +28,8 @@ IoCache::IoCache(uint64_t cache_size)
// condition r_ptr == w_ptr is unambiguous (buffer empty). // condition r_ptr == w_ptr is unambiguous (buffer empty).
circular_buffer_(cache_size + 1), circular_buffer_(cache_size + 1),
end_ptr_(&circular_buffer_[0] + cache_size + 1), end_ptr_(&circular_buffer_[0] + cache_size + 1),
r_ptr_(&circular_buffer_[0]), r_ptr_(vector_as_array(&circular_buffer_)),
w_ptr_(&circular_buffer_[0]), w_ptr_(vector_as_array(&circular_buffer_)),
closed_(false) {} closed_(false) {}
IoCache::~IoCache() { IoCache::~IoCache() {
@ -101,7 +102,7 @@ uint64_t IoCache::Write(const void* buffer, uint64_t size) {
void IoCache::Clear() { void IoCache::Clear() {
AutoLock lock(lock_); AutoLock lock(lock_);
r_ptr_ = w_ptr_ = &circular_buffer_[0]; r_ptr_ = w_ptr_ = vector_as_array(&circular_buffer_);
// Let any writers know that there is room in the cache. // Let any writers know that there is room in the cache.
read_event_.Signal(); read_event_.Signal();
} }
@ -113,6 +114,15 @@ void IoCache::Close() {
write_event_.Signal(); write_event_.Signal();
} }
void IoCache::Reopen() {
AutoLock lock(lock_);
CHECK(closed_);
r_ptr_ = w_ptr_ = vector_as_array(&circular_buffer_);
closed_ = false;
read_event_.Reset();
write_event_.Reset();
}
uint64_t IoCache::BytesCached() { uint64_t IoCache::BytesCached() {
AutoLock lock(lock_); AutoLock lock(lock_);
return BytesCachedInternal(); return BytesCachedInternal();
@ -126,7 +136,7 @@ uint64_t IoCache::BytesFree() {
uint64_t IoCache::BytesCachedInternal() { uint64_t IoCache::BytesCachedInternal() {
return (r_ptr_ <= w_ptr_) ? return (r_ptr_ <= w_ptr_) ?
w_ptr_ - r_ptr_ : w_ptr_ - r_ptr_ :
(end_ptr_ - r_ptr_) + (w_ptr_ - &circular_buffer_[0]); (end_ptr_ - r_ptr_) + (w_ptr_ - vector_as_array(&circular_buffer_));
} }
uint64_t IoCache::BytesFreeInternal() { uint64_t IoCache::BytesFreeInternal() {

View File

@ -43,9 +43,15 @@ class IoCache {
void Clear(); void Clear();
/// Close the cache. This will call any blocking calls to unblock, and the /// Close the cache. This will call any blocking calls to unblock, and the
/// cache won't be usable thereafter. /// cache won't be usable until Reopened.
void Close(); void Close();
/// @return true if the cache is closed, false otherwise.
bool closed() { return closed_; }
/// Reopens the cache. Any data still in the cache will be lost.
void Reopen();
/// Returns the number of bytes in the cache. /// Returns the number of bytes in the cache.
/// @return the number of bytes in the cache. /// @return the number of bytes in the cache.
uint64_t BytesCached(); uint64_t BytesCached();

View File

@ -9,6 +9,7 @@
#include <algorithm> #include <algorithm>
#include "packager/base/bind.h" #include "packager/base/bind.h"
#include "packager/base/bind_helpers.h" #include "packager/base/bind_helpers.h"
#include "packager/base/stl_util.h"
#include "packager/base/threading/platform_thread.h" #include "packager/base/threading/platform_thread.h"
#include "packager/media/base/closure_thread.h" #include "packager/media/base/closure_thread.h"
#include "packager/media/file/io_cache.h" #include "packager/media/file/io_cache.h"
@ -28,7 +29,7 @@ class IoCacheTest : public testing::Test {
int sleep_between_writes, int sleep_between_writes,
bool close_when_done) { bool close_when_done) {
for (uint64_t write_idx = 0; write_idx < num_writes; ++write_idx) { for (uint64_t write_idx = 0; write_idx < num_writes; ++write_idx) {
uint64_t write_result = cache_->Write(&test_buffer[0], uint64_t write_result = cache_->Write(vector_as_array(&test_buffer),
test_buffer.size()); test_buffer.size());
if (!write_result) { if (!write_result) {
// Cache was closed. // Cache was closed.
@ -59,7 +60,7 @@ class IoCacheTest : public testing::Test {
void GenerateTestBuffer(uint64_t size, std::vector<uint8_t>* test_buffer) { void GenerateTestBuffer(uint64_t size, std::vector<uint8_t>* test_buffer) {
test_buffer->resize(size); test_buffer->resize(size);
uint8_t* w_ptr(&(*test_buffer)[0]); uint8_t* w_ptr(vector_as_array(test_buffer));
while (size) { while (size) {
uint64_t copy_size(std::min(size, kBlockSize)); uint64_t copy_size(std::min(size, kBlockSize));
memcpy(w_ptr, reference_block_, copy_size); memcpy(w_ptr, reference_block_, copy_size);
@ -105,7 +106,8 @@ TEST_F(IoCacheTest, VerySmallWrite) {
WriteToCacheThreaded(write_buffer, 1, 0, false); WriteToCacheThreaded(write_buffer, 1, 0, false);
std::vector<uint8_t> read_buffer(kTestBytes); std::vector<uint8_t> read_buffer(kTestBytes);
EXPECT_EQ(kTestBytes, cache_->Read(&read_buffer[0], kTestBytes)); EXPECT_EQ(kTestBytes, cache_->Read(vector_as_array(&read_buffer),
kTestBytes));
EXPECT_EQ(write_buffer, read_buffer); EXPECT_EQ(write_buffer, read_buffer);
} }
@ -117,7 +119,8 @@ TEST_F(IoCacheTest, LotsOfAlignedBlocks) {
WriteToCacheThreaded(write_buffer, kNumWrites, 0, false); WriteToCacheThreaded(write_buffer, kNumWrites, 0, false);
for (uint64_t num_reads = 0; num_reads < kNumWrites; ++num_reads) { for (uint64_t num_reads = 0; num_reads < kNumWrites; ++num_reads) {
std::vector<uint8_t> read_buffer(kBlockSize); std::vector<uint8_t> read_buffer(kBlockSize);
EXPECT_EQ(kBlockSize, cache_->Read(&read_buffer[0], kBlockSize)); EXPECT_EQ(kBlockSize, cache_->Read(vector_as_array(&read_buffer),
kBlockSize));
EXPECT_EQ(write_buffer, read_buffer); EXPECT_EQ(write_buffer, read_buffer);
} }
} }
@ -135,7 +138,7 @@ TEST_F(IoCacheTest, LotsOfUnalignedBlocks) {
WriteToCacheThreaded(write_buffer2, kNumWrites, 0, false); WriteToCacheThreaded(write_buffer2, kNumWrites, 0, false);
std::vector<uint8_t> read_buffer1(kUnalignBlockSize); std::vector<uint8_t> read_buffer1(kUnalignBlockSize);
EXPECT_EQ(kUnalignBlockSize, cache_->Read(&read_buffer1[0], EXPECT_EQ(kUnalignBlockSize, cache_->Read(vector_as_array(&read_buffer1),
kUnalignBlockSize)); kUnalignBlockSize));
EXPECT_EQ(write_buffer1, read_buffer1); EXPECT_EQ(write_buffer1, read_buffer1);
std::vector<uint8> verify_buffer; std::vector<uint8> verify_buffer;
@ -146,10 +149,11 @@ TEST_F(IoCacheTest, LotsOfUnalignedBlocks) {
uint64_t verify_index(0); uint64_t verify_index(0);
while (verify_index < verify_buffer.size()) { while (verify_index < verify_buffer.size()) {
std::vector<uint8_t> read_buffer2(kBlockSize); std::vector<uint8_t> read_buffer2(kBlockSize);
uint64_t bytes_read = cache_->Read(&read_buffer2[0], kBlockSize); uint64_t bytes_read = cache_->Read(vector_as_array(&read_buffer2),
kBlockSize);
EXPECT_NE(0U, bytes_read); EXPECT_NE(0U, bytes_read);
EXPECT_FALSE(memcmp(&verify_buffer[verify_index], EXPECT_FALSE(memcmp(&verify_buffer[verify_index],
&read_buffer2[0], vector_as_array(&read_buffer2),
bytes_read)); bytes_read));
verify_index += bytes_read; verify_index += bytes_read;
} }
@ -164,7 +168,8 @@ TEST_F(IoCacheTest, SlowWrite) {
WriteToCacheThreaded(write_buffer, kNumWrites, kWriteDelayMs, false); WriteToCacheThreaded(write_buffer, kNumWrites, kWriteDelayMs, false);
for (uint64_t num_reads = 0; num_reads < kNumWrites; ++num_reads) { for (uint64_t num_reads = 0; num_reads < kNumWrites; ++num_reads) {
std::vector<uint8_t> read_buffer(kBlockSize); std::vector<uint8_t> read_buffer(kBlockSize);
EXPECT_EQ(kBlockSize, cache_->Read(&read_buffer[0], kBlockSize)); EXPECT_EQ(kBlockSize, cache_->Read(vector_as_array(&read_buffer),
kBlockSize));
EXPECT_EQ(write_buffer, read_buffer); EXPECT_EQ(write_buffer, read_buffer);
} }
} }
@ -178,7 +183,8 @@ TEST_F(IoCacheTest, SlowRead) {
WriteToCacheThreaded(write_buffer, kNumWrites, 0, false); WriteToCacheThreaded(write_buffer, kNumWrites, 0, false);
for (uint64_t num_reads = 0; num_reads < kNumWrites; ++num_reads) { for (uint64_t num_reads = 0; num_reads < kNumWrites; ++num_reads) {
std::vector<uint8_t> read_buffer(kBlockSize); std::vector<uint8_t> read_buffer(kBlockSize);
EXPECT_EQ(kBlockSize, cache_->Read(&read_buffer[0], kBlockSize)); EXPECT_EQ(kBlockSize, cache_->Read(vector_as_array(&read_buffer),
kBlockSize));
EXPECT_EQ(write_buffer, read_buffer); EXPECT_EQ(write_buffer, read_buffer);
base::PlatformThread::Sleep( base::PlatformThread::Sleep(
base::TimeDelta::FromMilliseconds(kReadDelayMs)); base::TimeDelta::FromMilliseconds(kReadDelayMs));
@ -208,6 +214,32 @@ TEST_F(IoCacheTest, CloseByWriter) {
WaitForWriterThread(); WaitForWriterThread();
} }
TEST_F(IoCacheTest, Reopen) {
const uint64_t kTestBytes1(5);
const uint64_t kTestBytes2(10);
std::vector<uint8_t> write_buffer;
GenerateTestBuffer(kTestBytes1, &write_buffer);
WriteToCacheThreaded(write_buffer, 1, 0, true);
std::vector<uint8_t> read_buffer(kTestBytes1);
EXPECT_EQ(kTestBytes1, cache_->Read(vector_as_array(&read_buffer),
kTestBytes1));
EXPECT_EQ(write_buffer, read_buffer);
WaitForWriterThread();
ASSERT_TRUE(cache_->closed());
cache_->Reopen();
ASSERT_FALSE(cache_->closed());
GenerateTestBuffer(kTestBytes2, &write_buffer);
WriteToCacheThreaded(write_buffer, 1, 0, false);
read_buffer.resize(kTestBytes2);
EXPECT_EQ(kTestBytes2, cache_->Read(vector_as_array(&read_buffer),
kTestBytes2));
EXPECT_EQ(write_buffer, read_buffer);
}
TEST_F(IoCacheTest, SingleLargeWrite) { TEST_F(IoCacheTest, SingleLargeWrite) {
const uint64_t kTestBytes(kCacheSize * 10); const uint64_t kTestBytes(kCacheSize * 10);
@ -240,7 +272,8 @@ TEST_F(IoCacheTest, LargeRead) {
base::TimeDelta::FromMilliseconds(10)); base::TimeDelta::FromMilliseconds(10));
} }
std::vector<uint8_t> read_buffer(kCacheSize); std::vector<uint8_t> read_buffer(kCacheSize);
EXPECT_EQ(kCacheSize, cache_->Read(&read_buffer[0], kCacheSize)); EXPECT_EQ(kCacheSize, cache_->Read(vector_as_array(&read_buffer),
kCacheSize));
EXPECT_EQ(verify_buffer, read_buffer); EXPECT_EQ(verify_buffer, read_buffer);
cache_->Close(); cache_->Close();
} }

View File

@ -25,7 +25,9 @@ ThreadedIoFile::ThreadedIoFile(scoped_ptr<File, FileCloser> internal_file,
io_buffer_(io_block_size), io_buffer_(io_block_size),
size_(0), size_(0),
eof_(false), eof_(false),
internal_file_error_(0) { flushing_(false),
flush_complete_event_(false, false),
internal_file_error_(0){
DCHECK(internal_file_); DCHECK(internal_file_);
} }
@ -101,7 +103,9 @@ bool ThreadedIoFile::Flush() {
DCHECK(thread_); DCHECK(thread_);
DCHECK_EQ(kOutputMode, mode_); DCHECK_EQ(kOutputMode, mode_);
cache_.WaitUntilEmptyOrClosed(); flushing_ = true;
cache_.Close();
flush_complete_event_.Wait();
return internal_file_->Flush(); return internal_file_->Flush();
} }
@ -140,9 +144,15 @@ void ThreadedIoFile::RunInOutputMode() {
while (true) { while (true) {
uint64_t write_bytes = cache_.Read(&io_buffer_[0], io_buffer_.size()); uint64_t write_bytes = cache_.Read(&io_buffer_[0], io_buffer_.size());
if (write_bytes == 0) if (write_bytes == 0) {
if (flushing_) {
cache_.Reopen();
flushing_ = false;
flush_complete_event_.Signal();
} else {
return; return;
}
} else {
int64_t write_result = internal_file_->Write(&io_buffer_[0], write_bytes); int64_t write_result = internal_file_->Write(&io_buffer_[0], write_bytes);
if (write_result < 0) { if (write_result < 0) {
internal_file_error_ = write_result; internal_file_error_ = write_result;
@ -151,6 +161,7 @@ void ThreadedIoFile::RunInOutputMode() {
} }
CHECK_EQ(write_result, static_cast<int64_t>(write_bytes)); CHECK_EQ(write_result, static_cast<int64_t>(write_bytes));
} }
}
} }
} // namespace media } // namespace media

View File

@ -9,6 +9,7 @@
#include "packager/base/memory/scoped_ptr.h" #include "packager/base/memory/scoped_ptr.h"
#include "packager/base/synchronization/lock.h" #include "packager/base/synchronization/lock.h"
#include "packager/base/synchronization/waitable_event.h"
#include "packager/media/file/file.h" #include "packager/media/file/file.h"
#include "packager/media/file/file_closer.h" #include "packager/media/file/file_closer.h"
#include "packager/media/file/io_cache.h" #include "packager/media/file/io_cache.h"
@ -58,6 +59,8 @@ class ThreadedIoFile : public File {
scoped_ptr<ClosureThread> thread_; scoped_ptr<ClosureThread> thread_;
uint64_t size_; uint64_t size_;
bool eof_; bool eof_;
bool flushing_;
base::WaitableEvent flush_complete_event_;
int64_t internal_file_error_; int64_t internal_file_error_;
DISALLOW_COPY_AND_ASSIGN(ThreadedIoFile); DISALLOW_COPY_AND_ASSIGN(ThreadedIoFile);

View File

@ -129,11 +129,11 @@ void WvmMediaParser::Init(const InitCB& init_cb,
bool WvmMediaParser::Parse(const uint8_t* buf, int size) { bool WvmMediaParser::Parse(const uint8_t* buf, int size) {
uint32_t num_bytes, prev_size; uint32_t num_bytes, prev_size;
num_bytes = prev_size = 0; num_bytes = prev_size = 0;
uint8_t* read_ptr = (uint8_t*)(&buf[0]); const uint8_t* read_ptr = buf;
uint8_t* end = read_ptr + size; const uint8_t* end = read_ptr + size;
while (read_ptr < end) { while (read_ptr < end) {
switch(parse_state_) { switch (parse_state_) {
case StartCode1: case StartCode1:
if (*read_ptr == kStartCode1) { if (*read_ptr == kStartCode1) {
parse_state_ = StartCode2; parse_state_ = StartCode2;
@ -260,7 +260,6 @@ bool WvmMediaParser::Parse(const uint8_t* buf, int size) {
case PesExtension1: case PesExtension1:
prev_pes_flags_1_ = pes_flags_1_; prev_pes_flags_1_ = pes_flags_1_;
pes_flags_1_ = *read_ptr; pes_flags_1_ = *read_ptr;
*read_ptr &= ~kScramblingBitsMask;
--pes_packet_bytes_; --pes_packet_bytes_;
parse_state_ = PesExtension2; parse_state_ = PesExtension2;
break; break;
@ -861,7 +860,8 @@ bool WvmMediaParser::Output(bool output_encrypted_sample) {
if (!ExtractResolutionFromDecoderConfig( if (!ExtractResolutionFromDecoderConfig(
vector_as_array(&decoder_config_record), vector_as_array(&decoder_config_record),
decoder_config_record.size(), decoder_config_record.size(),
&coded_width, &coded_height, &pixel_width, &pixel_height)) { &coded_width, &coded_height,
&pixel_width, &pixel_height)) {
LOG(ERROR) << "Failed to parse AVCDecoderConfigurationRecord."; LOG(ERROR) << "Failed to parse AVCDecoderConfigurationRecord.";
return false; return false;
} }