From 5ce7afeda43ab86138f5e2fa287ea2d4c1ea6bc9 Mon Sep 17 00:00:00 2001 From: Thomas Inskip Date: Thu, 19 Mar 2015 11:28:04 -0700 Subject: [PATCH] Added threaded I/O. Change-Id: I2528f5f48dafa1477f2d849b6b86cdda33e47f96 --- packager/media/file/file.cc | 37 +++- packager/media/file/file.gyp | 15 +- packager/media/file/file.h | 2 + packager/media/file/io_cache.cc | 10 +- packager/media/file/io_cache.h | 5 +- packager/media/file/threaded_io_file.cc | 159 ++++++++++++++++++ packager/media/file/threaded_io_file.h | 68 ++++++++ .../formats/mp4/multi_segment_segmenter.cc | 2 +- 8 files changed, 279 insertions(+), 19 deletions(-) create mode 100644 packager/media/file/threaded_io_file.cc create mode 100644 packager/media/file/threaded_io_file.h diff --git a/packager/media/file/file.cc b/packager/media/file/file.cc index 878de3584c..cab663244d 100644 --- a/packager/media/file/file.cc +++ b/packager/media/file/file.cc @@ -6,12 +6,22 @@ #include "packager/media/file/file.h" +#include #include "packager/base/logging.h" #include "packager/base/memory/scoped_ptr.h" #include "packager/media/file/local_file.h" +#include "packager/media/file/threaded_io_file.h" #include "packager/media/file/udp_file.h" #include "packager/base/strings/string_util.h" +DEFINE_uint64(io_cache_size, + 32ULL << 20, + "Size of the threaded I/O cache, in bytes. Specify 0 to disable " + "threaded I/O."); +DEFINE_uint64(io_block_size, + 2ULL << 20, + "Size of the block size used for threaded I/O, in bytes."); + namespace edash_packager { namespace media { @@ -64,15 +74,36 @@ static const SupportedTypeInfo kSupportedTypeInfo[] = { } // namespace File* File::Create(const char* file_name, const char* mode) { + scoped_ptr internal_file; for (size_t i = 0; i < arraysize(kSupportedTypeInfo); ++i) { const SupportedTypeInfo& type_info = kSupportedTypeInfo[i]; if (strncmp(type_info.type, file_name, type_info.type_length) == 0) { - return type_info.factory_function(file_name + type_info.type_length, - mode); + internal_file.reset(type_info.factory_function( + file_name + type_info.type_length, mode)); } } // Otherwise we assume it is a local file - return CreateLocalFile(file_name, mode); + if (!internal_file) + internal_file.reset(CreateLocalFile(file_name, mode)); + + if (FLAGS_io_cache_size) { + // Enable threaded I/O for "r", "w", and "a" modes only. + if (!strcmp(mode, "r")) { + return new ThreadedIoFile(internal_file.Pass(), + ThreadedIoFile::kInputMode, + FLAGS_io_cache_size, + FLAGS_io_block_size); + } else if (!strcmp(mode, "w") || !strcmp(mode, "a")) { + return new ThreadedIoFile(internal_file.Pass(), + ThreadedIoFile::kOutputMode, + FLAGS_io_cache_size, + FLAGS_io_block_size); + } + } + + // Threaded I/O is disabled. + DLOG(WARNING) << "Threaded I/O is disabled. Performance may be decreased."; + return internal_file.release(); } File* File::Open(const char* file_name, const char* mode) { diff --git a/packager/media/file/file.gyp b/packager/media/file/file.gyp index 37e4efec16..44fe1b0398 100644 --- a/packager/media/file/file.gyp +++ b/packager/media/file/file.gyp @@ -20,12 +20,15 @@ 'io_cache.h', 'local_file.cc', 'local_file.h', + 'threaded_io_file.cc', + 'threaded_io_file.h', 'udp_file.cc', 'udp_file.h', ], 'dependencies': [ '../../base/base.gyp:base', '../../third_party/gflags/gflags.gyp:gflags', + '../base/media_base.gyp:base', ], }, { @@ -33,23 +36,11 @@ 'type': '<(gtest_target_type)', 'sources': [ 'file_unittest.cc', - ], - 'dependencies': [ - '../../testing/gtest.gyp:gtest', - '../../testing/gtest.gyp:gtest_main', - 'file', - ], - }, - { - 'target_name': 'io_cache_unittest', - 'type': '<(gtest_target_type)', - 'sources': [ 'io_cache_unittest.cc', ], 'dependencies': [ '../../testing/gtest.gyp:gtest', '../../testing/gtest.gyp:gtest_main', - '../base/media_base.gyp:base', 'file', ], }, diff --git a/packager/media/file/file.h b/packager/media/file/file.h index cd50bcfafe..6a240d147e 100644 --- a/packager/media/file/file.h +++ b/packager/media/file/file.h @@ -96,6 +96,8 @@ class File { virtual bool Open() = 0; private: + friend class ThreadedIoFile; + // This is a file factory method, it creates a proper file, e.g. // LocalFile, MemFile based on prefix. static File* Create(const char* file_name, const char* mode); diff --git a/packager/media/file/io_cache.cc b/packager/media/file/io_cache.cc index 3e04aaa4db..d612d7335c 100644 --- a/packager/media/file/io_cache.cc +++ b/packager/media/file/io_cache.cc @@ -43,8 +43,6 @@ uint64_t IoCache::Read(void* buffer, uint64_t size) { AutoUnlock unlock(lock_); write_event_.Wait(); } - if (closed_) - return 0; size = std::min(size, BytesCachedInternal()); uint64_t first_chunk_size(std::min(size, static_cast( @@ -135,5 +133,13 @@ uint64 IoCache::BytesFreeInternal() { return cache_size_ - BytesCachedInternal(); } +void IoCache::WaitUntilEmptyOrClosed() { + AutoLock lock(lock_); + while (!closed_ && BytesCachedInternal()) { + AutoUnlock unlock(lock_); + read_event_.Wait(); + } +} + } // namespace media } // namespace edash_packager diff --git a/packager/media/file/io_cache.h b/packager/media/file/io_cache.h index fb898aef4e..13be7376a8 100644 --- a/packager/media/file/io_cache.h +++ b/packager/media/file/io_cache.h @@ -27,7 +27,7 @@ class IoCache { /// @param buffer is a buffer into which to read the data from the cache. /// @param size is the size of @a buffer. /// @return the number of bytes read into @a buffer, or 0 if the call - /// unblocked because the cache has been closed. + /// unblocked because the cache has been closed and is empty. uint64_t Read(void* buffer, uint64_t size); /// Write data to the cache. This function may block until there is enough @@ -54,6 +54,9 @@ class IoCache { /// @return the number of free bytes in the cache. uint64_t BytesFree(); + /// Waits until the cache is empty or has been closed. + void WaitUntilEmptyOrClosed(); + private: uint64_t BytesCachedInternal(); uint64_t BytesFreeInternal(); diff --git a/packager/media/file/threaded_io_file.cc b/packager/media/file/threaded_io_file.cc new file mode 100644 index 0000000000..b6a2265efc --- /dev/null +++ b/packager/media/file/threaded_io_file.cc @@ -0,0 +1,159 @@ +// Copyright 2015 Google Inc. All rights reserved. +// +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file or at +// https://developers.google.com/open-source/licenses/bsd + +#include "packager/media/file/threaded_io_file.h" + +#include "packager/base/bind.h" +#include "packager/base/bind_helpers.h" +#include "packager/base/threading/platform_thread.h" +#include "packager/media/base/closure_thread.h" + +namespace edash_packager { +namespace media { + +namespace { +const int kWaitDelayMs = 10; +} + +ThreadedIoFile::ThreadedIoFile(scoped_ptr internal_file, + Mode mode, + uint64_t io_cache_size, + uint64_t io_block_size) + : File(internal_file->file_name()), + internal_file_(internal_file.Pass()), + mode_(mode), + cache_(io_cache_size), + io_buffer_(io_block_size), + size_(0), + eof_(false), + internal_file_error_(0) { + DCHECK(internal_file_); +} + +ThreadedIoFile::~ThreadedIoFile() {} + +bool ThreadedIoFile::Open() { + DCHECK(internal_file_); + + if (!internal_file_->Open()) + return false; + + size_ = internal_file_->Size(); + + thread_.reset(new ClosureThread("ThreadedIoFile", + base::Bind(mode_ == kInputMode ? + &ThreadedIoFile::RunInInputMode : + &ThreadedIoFile::RunInOutputMode, + base::Unretained(this)))); + thread_->Start(); + return true; +} + +bool ThreadedIoFile::Close() { + DCHECK(internal_file_); + DCHECK(thread_); + + if (mode_ == kOutputMode) + Flush(); + + cache_.Close(); + thread_->Join(); + + bool result = internal_file_.release()->Close(); + delete this; + return result; +} + +int64_t ThreadedIoFile::Read(void* buffer, uint64_t length) { + DCHECK(internal_file_); + DCHECK(thread_); + DCHECK_EQ(kInputMode, mode_); + + if (internal_file_error_) + return internal_file_error_; + + if (eof_ && !cache_.BytesCached()) + return 0; + + return cache_.Read(buffer, length); +} + +int64_t ThreadedIoFile::Write(const void* buffer, uint64_t length) { + DCHECK(internal_file_); + DCHECK(thread_); + DCHECK_EQ(kOutputMode, mode_); + + if (internal_file_error_) + return internal_file_error_; + + size_ += length; + return cache_.Write(buffer, length); +} + +int64_t ThreadedIoFile::Size() { + DCHECK(internal_file_); + DCHECK(thread_); + + return size_; +} + +bool ThreadedIoFile::Flush() { + DCHECK(internal_file_); + DCHECK(thread_); + DCHECK_EQ(kOutputMode, mode_); + + cache_.WaitUntilEmptyOrClosed(); + return internal_file_->Flush(); +} + +bool ThreadedIoFile::Eof() { + DCHECK(internal_file_); + DCHECK(thread_); + DCHECK_EQ(kInputMode, mode_); + + return eof_ && !cache_.BytesCached(); +} + +void ThreadedIoFile::RunInInputMode() { + DCHECK(internal_file_); + DCHECK(thread_); + DCHECK_EQ(kInputMode, mode_); + + while (true) { + int64_t read_result = internal_file_->Read(&io_buffer_[0], + io_buffer_.size()); + if (read_result <= 0) { + eof_ = read_result == 0; + internal_file_error_ = read_result; + cache_.Close(); + return; + } + cache_.Write(&io_buffer_[0], read_result); + } +} + +void ThreadedIoFile::RunInOutputMode() { + DCHECK(internal_file_); + DCHECK(thread_); + DCHECK_EQ(kOutputMode, mode_); + + while (true) { + uint64_t write_bytes = cache_.Read(&io_buffer_[0], io_buffer_.size()); + if (write_bytes == 0) + return; + + int64_t write_result = internal_file_->Write(&io_buffer_[0], write_bytes); + if (write_result < 0) { + internal_file_error_ = write_result; + cache_.Close(); + return; + } + CHECK_EQ(write_result, static_cast(write_bytes)); + } +} + +} // namespace media +} // namespace edash_packager diff --git a/packager/media/file/threaded_io_file.h b/packager/media/file/threaded_io_file.h new file mode 100644 index 0000000000..7d668bd214 --- /dev/null +++ b/packager/media/file/threaded_io_file.h @@ -0,0 +1,68 @@ +// Copyright 2015 Google Inc. All rights reserved. +// +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file or at +// https://developers.google.com/open-source/licenses/bsd + +#ifndef PACKAGER_FILE_THREADED_IO_FILE_H_ +#define PACKAGER_FILE_THREADED_IO_FILE_H_ + +#include "packager/base/memory/scoped_ptr.h" +#include "packager/base/synchronization/lock.h" +#include "packager/media/file/file.h" +#include "packager/media/file/file_closer.h" +#include "packager/media/file/io_cache.h" + +namespace edash_packager { +namespace media { + +class ClosureThread; + +/// Declaration of class which implements a thread-safe circular buffer. +class ThreadedIoFile : public File { + public: + enum Mode { + kInputMode, + kOutputMode + }; + + ThreadedIoFile(scoped_ptr internal_file, + Mode mode, + uint64_t io_cache_size, + uint64_t io_block_size); + + /// @name File implementation overrides. + /// @{ + virtual bool Close() OVERRIDE; + virtual int64_t Read(void* buffer, uint64_t length) OVERRIDE; + virtual int64_t Write(const void* buffer, uint64_t length) OVERRIDE; + virtual int64_t Size() OVERRIDE; + virtual bool Flush() OVERRIDE; + virtual bool Eof() OVERRIDE; + /// @} + + protected: + virtual ~ThreadedIoFile(); + + virtual bool Open() OVERRIDE; + + void RunInInputMode(); + void RunInOutputMode(); + + private: + scoped_ptr internal_file_; + const Mode mode_; + IoCache cache_; + std::vector io_buffer_; + scoped_ptr thread_; + uint64_t size_; + bool eof_; + int64_t internal_file_error_; + + DISALLOW_COPY_AND_ASSIGN(ThreadedIoFile); +}; + +} // namespace media +} // namespace edash_packager + +#endif // PACKAGER_FILE_THREADED_IO_FILE_H diff --git a/packager/media/formats/mp4/multi_segment_segmenter.cc b/packager/media/formats/mp4/multi_segment_segmenter.cc index 89a3e16f41..cc9311e7a2 100644 --- a/packager/media/formats/mp4/multi_segment_segmenter.cc +++ b/packager/media/formats/mp4/multi_segment_segmenter.cc @@ -140,7 +140,7 @@ Status MultiSegmentSegmenter::WriteSegment() { if (options().segment_template.empty()) { // Append the segment to output file if segment template is not specified. file_name = options().output_file_name.c_str(); - file = File::Open(file_name.c_str(), "a+"); + file = File::Open(file_name.c_str(), "a"); if (file == NULL) { return Status( error::FILE_FAILURE,