From b17d24006031aad9d79ad82850bcd52109a23e9a Mon Sep 17 00:00:00 2001 From: KongQun Yang Date: Tue, 29 Dec 2015 17:09:12 -0800 Subject: [PATCH] Fix a memory leak due to thread object tracking ThreadedIoFile spawns a new thread for every new file. Thread information is stored for tracking purpose by base::tracked_objects. The tracking object remains even if the thread itself is destroyed. This results in memory usage increased by a couple of bytes for every new segment created in live mode (new segments spawns new threads). Use WorkerPool instead to avoid spawning new threads. Fixes Issue #61. Change-Id: Id93283903c3ba8ebf172a0d58e19b082a72c6cf0 --- packager/app/mpd_generator.cc | 2 ++ packager/app/packager_main.cc | 2 ++ packager/media/event/media_event.gyp | 2 +- packager/media/file/file.gyp | 2 +- packager/media/file/threaded_io_file.cc | 47 ++++++++++++------------- packager/media/file/threaded_io_file.h | 11 +++--- 6 files changed, 35 insertions(+), 31 deletions(-) diff --git a/packager/app/mpd_generator.cc b/packager/app/mpd_generator.cc index 86b718c517..b11ae38368 100644 --- a/packager/app/mpd_generator.cc +++ b/packager/app/mpd_generator.cc @@ -6,6 +6,7 @@ #include "packager/app/mpd_generator_flags.h" #include "packager/app/vlog_flags.h" +#include "packager/base/at_exit.h" #include "packager/base/command_line.h" #include "packager/base/logging.h" #include "packager/base/strings/string_split.h" @@ -80,6 +81,7 @@ ExitStatus RunMpdGenerator() { } int MpdMain(int argc, char** argv) { + base::AtExitManager exit; // Needed to enable VLOG/DVLOG through --vmodule or --v. base::CommandLine::Init(argc, argv); CHECK(logging::InitLogging(logging::LoggingSettings())); diff --git a/packager/app/packager_main.cc b/packager/app/packager_main.cc index eadbafb611..4de1e6b8d0 100644 --- a/packager/app/packager_main.cc +++ b/packager/app/packager_main.cc @@ -15,6 +15,7 @@ #include "packager/app/stream_descriptor.h" #include "packager/app/vlog_flags.h" #include "packager/app/widevine_encryption_flags.h" +#include "packager/base/at_exit.h" #include "packager/base/command_line.h" #include "packager/base/logging.h" #include "packager/base/stl_util.h" @@ -434,6 +435,7 @@ bool RunPackager(const StreamDescriptorList& stream_descriptors) { } int PackagerMain(int argc, char** argv) { + base::AtExitManager exit; // Needed to enable VLOG/DVLOG through --vmodule or --v. base::CommandLine::Init(argc, argv); CHECK(logging::InitLogging(logging::LoggingSettings())); diff --git a/packager/media/event/media_event.gyp b/packager/media/event/media_event.gyp index 5f5334ed02..af4119eee2 100644 --- a/packager/media/event/media_event.gyp +++ b/packager/media/event/media_event.gyp @@ -44,9 +44,9 @@ '../../mpd/mpd.gyp:mpd_mocks', '../../testing/gmock.gyp:gmock', '../../testing/gtest.gyp:gtest', - '../../testing/gtest.gyp:gtest_main', # Depends on full protobuf to read/write with TextFormat. '../../third_party/protobuf/protobuf.gyp:protobuf_full_do_not_use', + '../test/media_test.gyp:run_all_unittests', 'media_event', ], }, diff --git a/packager/media/file/file.gyp b/packager/media/file/file.gyp index 54f922f7a0..8164ce3845 100644 --- a/packager/media/file/file.gyp +++ b/packager/media/file/file.gyp @@ -53,8 +53,8 @@ ], 'dependencies': [ '../../testing/gtest.gyp:gtest', - '../../testing/gtest.gyp:gtest_main', '../../third_party/gflags/gflags.gyp:gflags', + '../test/media_test.gyp:run_all_unittests', 'file', ], }, diff --git a/packager/media/file/threaded_io_file.cc b/packager/media/file/threaded_io_file.cc index 8c361e97e4..b71acb9abe 100644 --- a/packager/media/file/threaded_io_file.cc +++ b/packager/media/file/threaded_io_file.cc @@ -8,8 +8,8 @@ #include "packager/base/bind.h" #include "packager/base/bind_helpers.h" -#include "packager/base/threading/platform_thread.h" -#include "packager/media/base/closure_thread.h" +#include "packager/base/location.h" +#include "packager/base/threading/worker_pool.h" namespace edash_packager { namespace media { @@ -31,7 +31,8 @@ ThreadedIoFile::ThreadedIoFile(scoped_ptr internal_file, eof_(false), flushing_(false), flush_complete_event_(false, false), - internal_file_error_(0){ + internal_file_error_(0), + task_exit_event_(false, false) { DCHECK(internal_file_); } @@ -46,24 +47,20 @@ bool ThreadedIoFile::Open() { position_ = 0; size_ = internal_file_->Size(); - thread_.reset(new ClosureThread("ThreadedIoFile", - base::Bind(mode_ == kInputMode ? - &ThreadedIoFile::RunInInputMode : - &ThreadedIoFile::RunInOutputMode, - base::Unretained(this)))); - thread_->Start(); + base::WorkerPool::PostTask(FROM_HERE, base::Bind(&ThreadedIoFile::TaskHandler, + base::Unretained(this)), + true /* task_is_slow */); return true; } bool ThreadedIoFile::Close() { DCHECK(internal_file_); - DCHECK(thread_); if (mode_ == kOutputMode) Flush(); cache_.Close(); - thread_->Join(); + task_exit_event_.Wait(); bool result = internal_file_.release()->Close(); delete this; @@ -72,7 +69,6 @@ bool ThreadedIoFile::Close() { int64_t ThreadedIoFile::Read(void* buffer, uint64_t length) { DCHECK(internal_file_); - DCHECK(thread_); DCHECK_EQ(kInputMode, mode_); if (NoBarrier_Load(&eof_) && !cache_.BytesCached()) @@ -90,7 +86,6 @@ int64_t ThreadedIoFile::Read(void* buffer, uint64_t length) { int64_t ThreadedIoFile::Write(const void* buffer, uint64_t length) { DCHECK(internal_file_); - DCHECK(thread_); DCHECK_EQ(kOutputMode, mode_); if (NoBarrier_Load(&internal_file_error_)) @@ -106,14 +101,12 @@ int64_t ThreadedIoFile::Write(const void* buffer, uint64_t length) { int64_t ThreadedIoFile::Size() { DCHECK(internal_file_); - DCHECK(thread_); return size_; } bool ThreadedIoFile::Flush() { DCHECK(internal_file_); - DCHECK(thread_); DCHECK_EQ(kOutputMode, mode_); flushing_ = true; @@ -128,10 +121,10 @@ bool ThreadedIoFile::Seek(uint64_t position) { if (!Flush()) return false; if (!internal_file_->Seek(position)) return false; } else { - // Reading. Close cache, wait for I/O thread to exit, seek, and restart - // I/O thread. + // Reading. Close cache, wait for thread task to exit, seek, and re-post + // the task. cache_.Close(); - thread_->Join(); + task_exit_event_.Wait(); bool result = internal_file_->Seek(position); if (!result) { // Seek failed. Seek to logical position instead. @@ -141,10 +134,10 @@ bool ThreadedIoFile::Seek(uint64_t position) { } cache_.Reopen(); eof_ = false; - thread_.reset(new ClosureThread("ThreadedIoFile", - base::Bind(&ThreadedIoFile::RunInInputMode, - base::Unretained(this)))); - thread_->Start(); + base::WorkerPool::PostTask( + FROM_HERE, + base::Bind(&ThreadedIoFile::TaskHandler, base::Unretained(this)), + true /* task_is_slow */); if (!result) return false; } position_ = position; @@ -158,9 +151,16 @@ bool ThreadedIoFile::Tell(uint64_t* position) { return true; } +void ThreadedIoFile::TaskHandler() { + if (mode_ == kInputMode) + RunInInputMode(); + else + RunInOutputMode(); + task_exit_event_.Signal(); +} + void ThreadedIoFile::RunInInputMode() { DCHECK(internal_file_); - DCHECK(thread_); DCHECK_EQ(kInputMode, mode_); while (true) { @@ -180,7 +180,6 @@ void ThreadedIoFile::RunInInputMode() { void ThreadedIoFile::RunInOutputMode() { DCHECK(internal_file_); - DCHECK(thread_); DCHECK_EQ(kOutputMode, mode_); while (true) { diff --git a/packager/media/file/threaded_io_file.h b/packager/media/file/threaded_io_file.h index 40f7e4923f..1fc2ee4c99 100644 --- a/packager/media/file/threaded_io_file.h +++ b/packager/media/file/threaded_io_file.h @@ -9,7 +9,6 @@ #include "packager/base/atomicops.h" #include "packager/base/memory/scoped_ptr.h" -#include "packager/base/synchronization/lock.h" #include "packager/base/synchronization/waitable_event.h" #include "packager/media/file/file.h" #include "packager/media/file/file_closer.h" @@ -18,8 +17,6 @@ namespace edash_packager { namespace media { -class ClosureThread; - /// Declaration of class which implements a thread-safe circular buffer. class ThreadedIoFile : public File { public: @@ -49,21 +46,25 @@ class ThreadedIoFile : public File { bool Open() override; + private: + // Internal task handler implementation. Will dispatch to either + // |RunInInputMode| or |RunInOutputMode| depending on |mode_|. + void TaskHandler(); void RunInInputMode(); void RunInOutputMode(); - private: scoped_ptr internal_file_; const Mode mode_; IoCache cache_; std::vector io_buffer_; - scoped_ptr thread_; uint64_t position_; uint64_t size_; base::subtle::Atomic32 eof_; bool flushing_; base::WaitableEvent flush_complete_event_; base::subtle::Atomic32 internal_file_error_; + // Signalled when thread task exits. + base::WaitableEvent task_exit_event_; DISALLOW_COPY_AND_ASSIGN(ThreadedIoFile); };