Fix for race condition when flushing ThreadedIoFile.

Change-Id: I46f26fa9fddf53ca5231c31d6442053ab5202ade
This commit is contained in:
Thomas Inskip 2015-10-16 13:10:42 -07:00 committed by KongQun Yang
parent b48820392d
commit e276a9de2d
7 changed files with 117 additions and 33 deletions

View File

@ -124,5 +124,26 @@ TEST_F(LocalFileTest, WriteRead) {
EXPECT_EQ(data_, read_data);
}
TEST_F(LocalFileTest, WriteFlushCheckSize) {
const uint32 kNumCycles(10);
const uint32 kNumWrites(10);
for (uint32 cycle_idx = 0; cycle_idx < kNumCycles; ++cycle_idx) {
// Write file using File API, using file name directly (without prefix).
File* file = File::Open(local_file_name_no_prefix_.c_str(), "w");
ASSERT_TRUE(file != NULL);
for (uint32 write_idx = 0; write_idx < kNumWrites; ++write_idx)
EXPECT_EQ(kDataSize, file->Write(data_.data(), kDataSize));
ASSERT_NO_FATAL_FAILURE(file->Flush());
EXPECT_TRUE(file->Close());
file = File::Open(local_file_name_.c_str(), "r");
ASSERT_TRUE(file != NULL);
EXPECT_EQ(static_cast<int64_t>(data_.size() * kNumWrites), file->Size());
EXPECT_TRUE(file->Close());
}
}
} // namespace media
} // namespace edash_packager

View File

@ -11,6 +11,7 @@
#include <algorithm>
#include "packager/base/logging.h"
#include "packager/base/stl_util.h"
namespace edash_packager {
@ -27,8 +28,8 @@ IoCache::IoCache(uint64_t cache_size)
// condition r_ptr == w_ptr is unambiguous (buffer empty).
circular_buffer_(cache_size + 1),
end_ptr_(&circular_buffer_[0] + cache_size + 1),
r_ptr_(&circular_buffer_[0]),
w_ptr_(&circular_buffer_[0]),
r_ptr_(vector_as_array(&circular_buffer_)),
w_ptr_(vector_as_array(&circular_buffer_)),
closed_(false) {}
IoCache::~IoCache() {
@ -101,7 +102,7 @@ uint64_t IoCache::Write(const void* buffer, uint64_t size) {
void IoCache::Clear() {
AutoLock lock(lock_);
r_ptr_ = w_ptr_ = &circular_buffer_[0];
r_ptr_ = w_ptr_ = vector_as_array(&circular_buffer_);
// Let any writers know that there is room in the cache.
read_event_.Signal();
}
@ -113,6 +114,15 @@ void IoCache::Close() {
write_event_.Signal();
}
void IoCache::Reopen() {
AutoLock lock(lock_);
CHECK(closed_);
r_ptr_ = w_ptr_ = vector_as_array(&circular_buffer_);
closed_ = false;
read_event_.Reset();
write_event_.Reset();
}
uint64_t IoCache::BytesCached() {
AutoLock lock(lock_);
return BytesCachedInternal();
@ -126,7 +136,7 @@ uint64_t IoCache::BytesFree() {
uint64_t IoCache::BytesCachedInternal() {
return (r_ptr_ <= w_ptr_) ?
w_ptr_ - r_ptr_ :
(end_ptr_ - r_ptr_) + (w_ptr_ - &circular_buffer_[0]);
(end_ptr_ - r_ptr_) + (w_ptr_ - vector_as_array(&circular_buffer_));
}
uint64_t IoCache::BytesFreeInternal() {

View File

@ -43,9 +43,15 @@ class IoCache {
void Clear();
/// Close the cache. This will call any blocking calls to unblock, and the
/// cache won't be usable thereafter.
/// cache won't be usable until Reopened.
void Close();
/// @return true if the cache is closed, false otherwise.
bool closed() { return closed_; }
/// Reopens the cache. Any data still in the cache will be lost.
void Reopen();
/// Returns the number of bytes in the cache.
/// @return the number of bytes in the cache.
uint64_t BytesCached();

View File

@ -9,6 +9,7 @@
#include <algorithm>
#include "packager/base/bind.h"
#include "packager/base/bind_helpers.h"
#include "packager/base/stl_util.h"
#include "packager/base/threading/platform_thread.h"
#include "packager/media/base/closure_thread.h"
#include "packager/media/file/io_cache.h"
@ -28,7 +29,7 @@ class IoCacheTest : public testing::Test {
int sleep_between_writes,
bool close_when_done) {
for (uint64_t write_idx = 0; write_idx < num_writes; ++write_idx) {
uint64_t write_result = cache_->Write(&test_buffer[0],
uint64_t write_result = cache_->Write(vector_as_array(&test_buffer),
test_buffer.size());
if (!write_result) {
// Cache was closed.
@ -59,7 +60,7 @@ class IoCacheTest : public testing::Test {
void GenerateTestBuffer(uint64_t size, std::vector<uint8_t>* test_buffer) {
test_buffer->resize(size);
uint8_t* w_ptr(&(*test_buffer)[0]);
uint8_t* w_ptr(vector_as_array(test_buffer));
while (size) {
uint64_t copy_size(std::min(size, kBlockSize));
memcpy(w_ptr, reference_block_, copy_size);
@ -105,7 +106,8 @@ TEST_F(IoCacheTest, VerySmallWrite) {
WriteToCacheThreaded(write_buffer, 1, 0, false);
std::vector<uint8_t> read_buffer(kTestBytes);
EXPECT_EQ(kTestBytes, cache_->Read(&read_buffer[0], kTestBytes));
EXPECT_EQ(kTestBytes, cache_->Read(vector_as_array(&read_buffer),
kTestBytes));
EXPECT_EQ(write_buffer, read_buffer);
}
@ -117,7 +119,8 @@ TEST_F(IoCacheTest, LotsOfAlignedBlocks) {
WriteToCacheThreaded(write_buffer, kNumWrites, 0, false);
for (uint64_t num_reads = 0; num_reads < kNumWrites; ++num_reads) {
std::vector<uint8_t> read_buffer(kBlockSize);
EXPECT_EQ(kBlockSize, cache_->Read(&read_buffer[0], kBlockSize));
EXPECT_EQ(kBlockSize, cache_->Read(vector_as_array(&read_buffer),
kBlockSize));
EXPECT_EQ(write_buffer, read_buffer);
}
}
@ -135,7 +138,7 @@ TEST_F(IoCacheTest, LotsOfUnalignedBlocks) {
WriteToCacheThreaded(write_buffer2, kNumWrites, 0, false);
std::vector<uint8_t> read_buffer1(kUnalignBlockSize);
EXPECT_EQ(kUnalignBlockSize, cache_->Read(&read_buffer1[0],
EXPECT_EQ(kUnalignBlockSize, cache_->Read(vector_as_array(&read_buffer1),
kUnalignBlockSize));
EXPECT_EQ(write_buffer1, read_buffer1);
std::vector<uint8> verify_buffer;
@ -146,10 +149,11 @@ TEST_F(IoCacheTest, LotsOfUnalignedBlocks) {
uint64_t verify_index(0);
while (verify_index < verify_buffer.size()) {
std::vector<uint8_t> read_buffer2(kBlockSize);
uint64_t bytes_read = cache_->Read(&read_buffer2[0], kBlockSize);
uint64_t bytes_read = cache_->Read(vector_as_array(&read_buffer2),
kBlockSize);
EXPECT_NE(0U, bytes_read);
EXPECT_FALSE(memcmp(&verify_buffer[verify_index],
&read_buffer2[0],
vector_as_array(&read_buffer2),
bytes_read));
verify_index += bytes_read;
}
@ -164,7 +168,8 @@ TEST_F(IoCacheTest, SlowWrite) {
WriteToCacheThreaded(write_buffer, kNumWrites, kWriteDelayMs, false);
for (uint64_t num_reads = 0; num_reads < kNumWrites; ++num_reads) {
std::vector<uint8_t> read_buffer(kBlockSize);
EXPECT_EQ(kBlockSize, cache_->Read(&read_buffer[0], kBlockSize));
EXPECT_EQ(kBlockSize, cache_->Read(vector_as_array(&read_buffer),
kBlockSize));
EXPECT_EQ(write_buffer, read_buffer);
}
}
@ -178,7 +183,8 @@ TEST_F(IoCacheTest, SlowRead) {
WriteToCacheThreaded(write_buffer, kNumWrites, 0, false);
for (uint64_t num_reads = 0; num_reads < kNumWrites; ++num_reads) {
std::vector<uint8_t> read_buffer(kBlockSize);
EXPECT_EQ(kBlockSize, cache_->Read(&read_buffer[0], kBlockSize));
EXPECT_EQ(kBlockSize, cache_->Read(vector_as_array(&read_buffer),
kBlockSize));
EXPECT_EQ(write_buffer, read_buffer);
base::PlatformThread::Sleep(
base::TimeDelta::FromMilliseconds(kReadDelayMs));
@ -208,6 +214,32 @@ TEST_F(IoCacheTest, CloseByWriter) {
WaitForWriterThread();
}
TEST_F(IoCacheTest, Reopen) {
const uint64_t kTestBytes1(5);
const uint64_t kTestBytes2(10);
std::vector<uint8_t> write_buffer;
GenerateTestBuffer(kTestBytes1, &write_buffer);
WriteToCacheThreaded(write_buffer, 1, 0, true);
std::vector<uint8_t> read_buffer(kTestBytes1);
EXPECT_EQ(kTestBytes1, cache_->Read(vector_as_array(&read_buffer),
kTestBytes1));
EXPECT_EQ(write_buffer, read_buffer);
WaitForWriterThread();
ASSERT_TRUE(cache_->closed());
cache_->Reopen();
ASSERT_FALSE(cache_->closed());
GenerateTestBuffer(kTestBytes2, &write_buffer);
WriteToCacheThreaded(write_buffer, 1, 0, false);
read_buffer.resize(kTestBytes2);
EXPECT_EQ(kTestBytes2, cache_->Read(vector_as_array(&read_buffer),
kTestBytes2));
EXPECT_EQ(write_buffer, read_buffer);
}
TEST_F(IoCacheTest, SingleLargeWrite) {
const uint64_t kTestBytes(kCacheSize * 10);
@ -240,7 +272,8 @@ TEST_F(IoCacheTest, LargeRead) {
base::TimeDelta::FromMilliseconds(10));
}
std::vector<uint8_t> read_buffer(kCacheSize);
EXPECT_EQ(kCacheSize, cache_->Read(&read_buffer[0], kCacheSize));
EXPECT_EQ(kCacheSize, cache_->Read(vector_as_array(&read_buffer),
kCacheSize));
EXPECT_EQ(verify_buffer, read_buffer);
cache_->Close();
}

View File

@ -25,7 +25,9 @@ ThreadedIoFile::ThreadedIoFile(scoped_ptr<File, FileCloser> internal_file,
io_buffer_(io_block_size),
size_(0),
eof_(false),
internal_file_error_(0) {
flushing_(false),
flush_complete_event_(false, false),
internal_file_error_(0){
DCHECK(internal_file_);
}
@ -101,7 +103,9 @@ bool ThreadedIoFile::Flush() {
DCHECK(thread_);
DCHECK_EQ(kOutputMode, mode_);
cache_.WaitUntilEmptyOrClosed();
flushing_ = true;
cache_.Close();
flush_complete_event_.Wait();
return internal_file_->Flush();
}
@ -140,16 +144,23 @@ void ThreadedIoFile::RunInOutputMode() {
while (true) {
uint64_t write_bytes = cache_.Read(&io_buffer_[0], io_buffer_.size());
if (write_bytes == 0)
return;
int64_t write_result = internal_file_->Write(&io_buffer_[0], write_bytes);
if (write_result < 0) {
internal_file_error_ = write_result;
cache_.Close();
return;
if (write_bytes == 0) {
if (flushing_) {
cache_.Reopen();
flushing_ = false;
flush_complete_event_.Signal();
} else {
return;
}
} else {
int64_t write_result = internal_file_->Write(&io_buffer_[0], write_bytes);
if (write_result < 0) {
internal_file_error_ = write_result;
cache_.Close();
return;
}
CHECK_EQ(write_result, static_cast<int64_t>(write_bytes));
}
CHECK_EQ(write_result, static_cast<int64_t>(write_bytes));
}
}

View File

@ -9,6 +9,7 @@
#include "packager/base/memory/scoped_ptr.h"
#include "packager/base/synchronization/lock.h"
#include "packager/base/synchronization/waitable_event.h"
#include "packager/media/file/file.h"
#include "packager/media/file/file_closer.h"
#include "packager/media/file/io_cache.h"
@ -58,6 +59,8 @@ class ThreadedIoFile : public File {
scoped_ptr<ClosureThread> thread_;
uint64_t size_;
bool eof_;
bool flushing_;
base::WaitableEvent flush_complete_event_;
int64_t internal_file_error_;
DISALLOW_COPY_AND_ASSIGN(ThreadedIoFile);

View File

@ -52,7 +52,7 @@ const uint32_t kVersion4 = 4;
const int kAdtsHeaderMinSize = 7;
const uint8_t kAacSampleSizeBits = 16;
// Applies to all video streams.
const uint8_t kNaluLengthSize = 4; // unit is bytes.
const uint8_t kNaluLengthSize = 4; // unit is bytes.
// Placeholder sampling frequency for all audio streams, which
// will be overwritten after filter parsing.
const uint32_t kDefaultSamplingFrequency = 100;
@ -83,7 +83,7 @@ enum Type {
Type_string = 9,
Type_BinaryData = 10
};
} // namespace
} // namespace
namespace edash_packager {
namespace media {
@ -129,11 +129,11 @@ void WvmMediaParser::Init(const InitCB& init_cb,
bool WvmMediaParser::Parse(const uint8_t* buf, int size) {
uint32_t num_bytes, prev_size;
num_bytes = prev_size = 0;
uint8_t* read_ptr = (uint8_t*)(&buf[0]);
uint8_t* end = read_ptr + size;
const uint8_t* read_ptr = buf;
const uint8_t* end = read_ptr + size;
while (read_ptr < end) {
switch(parse_state_) {
switch (parse_state_) {
case StartCode1:
if (*read_ptr == kStartCode1) {
parse_state_ = StartCode2;
@ -260,7 +260,6 @@ bool WvmMediaParser::Parse(const uint8_t* buf, int size) {
case PesExtension1:
prev_pes_flags_1_ = pes_flags_1_;
pes_flags_1_ = *read_ptr;
*read_ptr &= ~kScramblingBitsMask;
--pes_packet_bytes_;
parse_state_ = PesExtension2;
break;
@ -861,7 +860,8 @@ bool WvmMediaParser::Output(bool output_encrypted_sample) {
if (!ExtractResolutionFromDecoderConfig(
vector_as_array(&decoder_config_record),
decoder_config_record.size(),
&coded_width, &coded_height, &pixel_width, &pixel_height)) {
&coded_width, &coded_height,
&pixel_width, &pixel_height)) {
LOG(ERROR) << "Failed to parse AVCDecoderConfigurationRecord.";
return false;
}