Added threaded I/O.

Change-Id: I2528f5f48dafa1477f2d849b6b86cdda33e47f96
This commit is contained in:
Thomas Inskip 2015-03-19 11:28:04 -07:00 committed by Gerrit Code Review
parent 299bb97490
commit 5ce7afeda4
8 changed files with 279 additions and 19 deletions

View File

@ -6,12 +6,22 @@
#include "packager/media/file/file.h" #include "packager/media/file/file.h"
#include <gflags/gflags.h>
#include "packager/base/logging.h" #include "packager/base/logging.h"
#include "packager/base/memory/scoped_ptr.h" #include "packager/base/memory/scoped_ptr.h"
#include "packager/media/file/local_file.h" #include "packager/media/file/local_file.h"
#include "packager/media/file/threaded_io_file.h"
#include "packager/media/file/udp_file.h" #include "packager/media/file/udp_file.h"
#include "packager/base/strings/string_util.h" #include "packager/base/strings/string_util.h"
DEFINE_uint64(io_cache_size,
32ULL << 20,
"Size of the threaded I/O cache, in bytes. Specify 0 to disable "
"threaded I/O.");
DEFINE_uint64(io_block_size,
2ULL << 20,
"Size of the block size used for threaded I/O, in bytes.");
namespace edash_packager { namespace edash_packager {
namespace media { namespace media {
@ -64,15 +74,36 @@ static const SupportedTypeInfo kSupportedTypeInfo[] = {
} // namespace } // namespace
File* File::Create(const char* file_name, const char* mode) { File* File::Create(const char* file_name, const char* mode) {
scoped_ptr<File, FileCloser> internal_file;
for (size_t i = 0; i < arraysize(kSupportedTypeInfo); ++i) { for (size_t i = 0; i < arraysize(kSupportedTypeInfo); ++i) {
const SupportedTypeInfo& type_info = kSupportedTypeInfo[i]; const SupportedTypeInfo& type_info = kSupportedTypeInfo[i];
if (strncmp(type_info.type, file_name, type_info.type_length) == 0) { if (strncmp(type_info.type, file_name, type_info.type_length) == 0) {
return type_info.factory_function(file_name + type_info.type_length, internal_file.reset(type_info.factory_function(
mode); file_name + type_info.type_length, mode));
} }
} }
// Otherwise we assume it is a local file // Otherwise we assume it is a local file
return CreateLocalFile(file_name, mode); if (!internal_file)
internal_file.reset(CreateLocalFile(file_name, mode));
if (FLAGS_io_cache_size) {
// Enable threaded I/O for "r", "w", and "a" modes only.
if (!strcmp(mode, "r")) {
return new ThreadedIoFile(internal_file.Pass(),
ThreadedIoFile::kInputMode,
FLAGS_io_cache_size,
FLAGS_io_block_size);
} else if (!strcmp(mode, "w") || !strcmp(mode, "a")) {
return new ThreadedIoFile(internal_file.Pass(),
ThreadedIoFile::kOutputMode,
FLAGS_io_cache_size,
FLAGS_io_block_size);
}
}
// Threaded I/O is disabled.
DLOG(WARNING) << "Threaded I/O is disabled. Performance may be decreased.";
return internal_file.release();
} }
File* File::Open(const char* file_name, const char* mode) { File* File::Open(const char* file_name, const char* mode) {

View File

@ -20,12 +20,15 @@
'io_cache.h', 'io_cache.h',
'local_file.cc', 'local_file.cc',
'local_file.h', 'local_file.h',
'threaded_io_file.cc',
'threaded_io_file.h',
'udp_file.cc', 'udp_file.cc',
'udp_file.h', 'udp_file.h',
], ],
'dependencies': [ 'dependencies': [
'../../base/base.gyp:base', '../../base/base.gyp:base',
'../../third_party/gflags/gflags.gyp:gflags', '../../third_party/gflags/gflags.gyp:gflags',
'../base/media_base.gyp:base',
], ],
}, },
{ {
@ -33,23 +36,11 @@
'type': '<(gtest_target_type)', 'type': '<(gtest_target_type)',
'sources': [ 'sources': [
'file_unittest.cc', 'file_unittest.cc',
],
'dependencies': [
'../../testing/gtest.gyp:gtest',
'../../testing/gtest.gyp:gtest_main',
'file',
],
},
{
'target_name': 'io_cache_unittest',
'type': '<(gtest_target_type)',
'sources': [
'io_cache_unittest.cc', 'io_cache_unittest.cc',
], ],
'dependencies': [ 'dependencies': [
'../../testing/gtest.gyp:gtest', '../../testing/gtest.gyp:gtest',
'../../testing/gtest.gyp:gtest_main', '../../testing/gtest.gyp:gtest_main',
'../base/media_base.gyp:base',
'file', 'file',
], ],
}, },

View File

@ -96,6 +96,8 @@ class File {
virtual bool Open() = 0; virtual bool Open() = 0;
private: private:
friend class ThreadedIoFile;
// This is a file factory method, it creates a proper file, e.g. // This is a file factory method, it creates a proper file, e.g.
// LocalFile, MemFile based on prefix. // LocalFile, MemFile based on prefix.
static File* Create(const char* file_name, const char* mode); static File* Create(const char* file_name, const char* mode);

View File

@ -43,8 +43,6 @@ uint64_t IoCache::Read(void* buffer, uint64_t size) {
AutoUnlock unlock(lock_); AutoUnlock unlock(lock_);
write_event_.Wait(); write_event_.Wait();
} }
if (closed_)
return 0;
size = std::min(size, BytesCachedInternal()); size = std::min(size, BytesCachedInternal());
uint64_t first_chunk_size(std::min(size, static_cast<uint64_t>( uint64_t first_chunk_size(std::min(size, static_cast<uint64_t>(
@ -135,5 +133,13 @@ uint64 IoCache::BytesFreeInternal() {
return cache_size_ - BytesCachedInternal(); return cache_size_ - BytesCachedInternal();
} }
void IoCache::WaitUntilEmptyOrClosed() {
AutoLock lock(lock_);
while (!closed_ && BytesCachedInternal()) {
AutoUnlock unlock(lock_);
read_event_.Wait();
}
}
} // namespace media } // namespace media
} // namespace edash_packager } // namespace edash_packager

View File

@ -27,7 +27,7 @@ class IoCache {
/// @param buffer is a buffer into which to read the data from the cache. /// @param buffer is a buffer into which to read the data from the cache.
/// @param size is the size of @a buffer. /// @param size is the size of @a buffer.
/// @return the number of bytes read into @a buffer, or 0 if the call /// @return the number of bytes read into @a buffer, or 0 if the call
/// unblocked because the cache has been closed. /// unblocked because the cache has been closed and is empty.
uint64_t Read(void* buffer, uint64_t size); uint64_t Read(void* buffer, uint64_t size);
/// Write data to the cache. This function may block until there is enough /// Write data to the cache. This function may block until there is enough
@ -54,6 +54,9 @@ class IoCache {
/// @return the number of free bytes in the cache. /// @return the number of free bytes in the cache.
uint64_t BytesFree(); uint64_t BytesFree();
/// Waits until the cache is empty or has been closed.
void WaitUntilEmptyOrClosed();
private: private:
uint64_t BytesCachedInternal(); uint64_t BytesCachedInternal();
uint64_t BytesFreeInternal(); uint64_t BytesFreeInternal();

View File

@ -0,0 +1,159 @@
// Copyright 2015 Google Inc. All rights reserved.
//
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file or at
// https://developers.google.com/open-source/licenses/bsd
#include "packager/media/file/threaded_io_file.h"
#include "packager/base/bind.h"
#include "packager/base/bind_helpers.h"
#include "packager/base/threading/platform_thread.h"
#include "packager/media/base/closure_thread.h"
namespace edash_packager {
namespace media {
namespace {
const int kWaitDelayMs = 10;
}
ThreadedIoFile::ThreadedIoFile(scoped_ptr<File, FileCloser> internal_file,
Mode mode,
uint64_t io_cache_size,
uint64_t io_block_size)
: File(internal_file->file_name()),
internal_file_(internal_file.Pass()),
mode_(mode),
cache_(io_cache_size),
io_buffer_(io_block_size),
size_(0),
eof_(false),
internal_file_error_(0) {
DCHECK(internal_file_);
}
ThreadedIoFile::~ThreadedIoFile() {}
bool ThreadedIoFile::Open() {
DCHECK(internal_file_);
if (!internal_file_->Open())
return false;
size_ = internal_file_->Size();
thread_.reset(new ClosureThread("ThreadedIoFile",
base::Bind(mode_ == kInputMode ?
&ThreadedIoFile::RunInInputMode :
&ThreadedIoFile::RunInOutputMode,
base::Unretained(this))));
thread_->Start();
return true;
}
bool ThreadedIoFile::Close() {
DCHECK(internal_file_);
DCHECK(thread_);
if (mode_ == kOutputMode)
Flush();
cache_.Close();
thread_->Join();
bool result = internal_file_.release()->Close();
delete this;
return result;
}
int64_t ThreadedIoFile::Read(void* buffer, uint64_t length) {
DCHECK(internal_file_);
DCHECK(thread_);
DCHECK_EQ(kInputMode, mode_);
if (internal_file_error_)
return internal_file_error_;
if (eof_ && !cache_.BytesCached())
return 0;
return cache_.Read(buffer, length);
}
int64_t ThreadedIoFile::Write(const void* buffer, uint64_t length) {
DCHECK(internal_file_);
DCHECK(thread_);
DCHECK_EQ(kOutputMode, mode_);
if (internal_file_error_)
return internal_file_error_;
size_ += length;
return cache_.Write(buffer, length);
}
int64_t ThreadedIoFile::Size() {
DCHECK(internal_file_);
DCHECK(thread_);
return size_;
}
bool ThreadedIoFile::Flush() {
DCHECK(internal_file_);
DCHECK(thread_);
DCHECK_EQ(kOutputMode, mode_);
cache_.WaitUntilEmptyOrClosed();
return internal_file_->Flush();
}
bool ThreadedIoFile::Eof() {
DCHECK(internal_file_);
DCHECK(thread_);
DCHECK_EQ(kInputMode, mode_);
return eof_ && !cache_.BytesCached();
}
void ThreadedIoFile::RunInInputMode() {
DCHECK(internal_file_);
DCHECK(thread_);
DCHECK_EQ(kInputMode, mode_);
while (true) {
int64_t read_result = internal_file_->Read(&io_buffer_[0],
io_buffer_.size());
if (read_result <= 0) {
eof_ = read_result == 0;
internal_file_error_ = read_result;
cache_.Close();
return;
}
cache_.Write(&io_buffer_[0], read_result);
}
}
void ThreadedIoFile::RunInOutputMode() {
DCHECK(internal_file_);
DCHECK(thread_);
DCHECK_EQ(kOutputMode, mode_);
while (true) {
uint64_t write_bytes = cache_.Read(&io_buffer_[0], io_buffer_.size());
if (write_bytes == 0)
return;
int64_t write_result = internal_file_->Write(&io_buffer_[0], write_bytes);
if (write_result < 0) {
internal_file_error_ = write_result;
cache_.Close();
return;
}
CHECK_EQ(write_result, static_cast<int64_t>(write_bytes));
}
}
} // namespace media
} // namespace edash_packager

View File

@ -0,0 +1,68 @@
// Copyright 2015 Google Inc. All rights reserved.
//
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file or at
// https://developers.google.com/open-source/licenses/bsd
#ifndef PACKAGER_FILE_THREADED_IO_FILE_H_
#define PACKAGER_FILE_THREADED_IO_FILE_H_
#include "packager/base/memory/scoped_ptr.h"
#include "packager/base/synchronization/lock.h"
#include "packager/media/file/file.h"
#include "packager/media/file/file_closer.h"
#include "packager/media/file/io_cache.h"
namespace edash_packager {
namespace media {
class ClosureThread;
/// Declaration of class which implements a thread-safe circular buffer.
class ThreadedIoFile : public File {
public:
enum Mode {
kInputMode,
kOutputMode
};
ThreadedIoFile(scoped_ptr<File, FileCloser> internal_file,
Mode mode,
uint64_t io_cache_size,
uint64_t io_block_size);
/// @name File implementation overrides.
/// @{
virtual bool Close() OVERRIDE;
virtual int64_t Read(void* buffer, uint64_t length) OVERRIDE;
virtual int64_t Write(const void* buffer, uint64_t length) OVERRIDE;
virtual int64_t Size() OVERRIDE;
virtual bool Flush() OVERRIDE;
virtual bool Eof() OVERRIDE;
/// @}
protected:
virtual ~ThreadedIoFile();
virtual bool Open() OVERRIDE;
void RunInInputMode();
void RunInOutputMode();
private:
scoped_ptr<File, FileCloser> internal_file_;
const Mode mode_;
IoCache cache_;
std::vector<uint8_t> io_buffer_;
scoped_ptr<ClosureThread> thread_;
uint64_t size_;
bool eof_;
int64_t internal_file_error_;
DISALLOW_COPY_AND_ASSIGN(ThreadedIoFile);
};
} // namespace media
} // namespace edash_packager
#endif // PACKAGER_FILE_THREADED_IO_FILE_H

View File

@ -140,7 +140,7 @@ Status MultiSegmentSegmenter::WriteSegment() {
if (options().segment_template.empty()) { if (options().segment_template.empty()) {
// Append the segment to output file if segment template is not specified. // Append the segment to output file if segment template is not specified.
file_name = options().output_file_name.c_str(); file_name = options().output_file_name.c_str();
file = File::Open(file_name.c_str(), "a+"); file = File::Open(file_name.c_str(), "a");
if (file == NULL) { if (file == NULL) {
return Status( return Status(
error::FILE_FAILURE, error::FILE_FAILURE,