DASH Media Packaging SDK
 All Classes Namespaces Functions Variables Typedefs Enumerator
threaded_io_file.cc
1 // Copyright 2015 Google Inc. All rights reserved.
2 //
3 // Use of this source code is governed by a BSD-style
4 // license that can be found in the LICENSE file or at
5 // https://developers.google.com/open-source/licenses/bsd
6 
7 #include "packager/media/file/threaded_io_file.h"
8 
9 #include "packager/base/bind.h"
10 #include "packager/base/bind_helpers.h"
11 #include "packager/base/threading/platform_thread.h"
12 #include "packager/media/base/closure_thread.h"
13 
14 namespace edash_packager {
15 namespace media {
16 
17 ThreadedIoFile::ThreadedIoFile(scoped_ptr<File, FileCloser> internal_file,
18  Mode mode,
19  uint64_t io_cache_size,
20  uint64_t io_block_size)
21  : File(internal_file->file_name()),
22  internal_file_(internal_file.Pass()),
23  mode_(mode),
24  cache_(io_cache_size),
25  io_buffer_(io_block_size),
26  size_(0),
27  eof_(false),
28  flushing_(false),
29  flush_complete_event_(false, false),
30  internal_file_error_(0){
31  DCHECK(internal_file_);
32 }
33 
34 ThreadedIoFile::~ThreadedIoFile() {}
35 
37  DCHECK(internal_file_);
38 
39  if (!internal_file_->Open())
40  return false;
41 
42  size_ = internal_file_->Size();
43 
44  thread_.reset(new ClosureThread("ThreadedIoFile",
45  base::Bind(mode_ == kInputMode ?
46  &ThreadedIoFile::RunInInputMode :
47  &ThreadedIoFile::RunInOutputMode,
48  base::Unretained(this))));
49  thread_->Start();
50  return true;
51 }
52 
54  DCHECK(internal_file_);
55  DCHECK(thread_);
56 
57  if (mode_ == kOutputMode)
58  Flush();
59 
60  cache_.Close();
61  thread_->Join();
62 
63  bool result = internal_file_.release()->Close();
64  delete this;
65  return result;
66 }
67 
68 int64_t ThreadedIoFile::Read(void* buffer, uint64_t length) {
69  DCHECK(internal_file_);
70  DCHECK(thread_);
71  DCHECK_EQ(kInputMode, mode_);
72 
73  if (internal_file_error_)
74  return internal_file_error_;
75 
76  if (eof_ && !cache_.BytesCached())
77  return 0;
78 
79  return cache_.Read(buffer, length);
80 }
81 
82 int64_t ThreadedIoFile::Write(const void* buffer, uint64_t length) {
83  DCHECK(internal_file_);
84  DCHECK(thread_);
85  DCHECK_EQ(kOutputMode, mode_);
86 
87  if (internal_file_error_)
88  return internal_file_error_;
89 
90  size_ += length;
91  return cache_.Write(buffer, length);
92 }
93 
95  DCHECK(internal_file_);
96  DCHECK(thread_);
97 
98  return size_;
99 }
100 
102  DCHECK(internal_file_);
103  DCHECK(thread_);
104  DCHECK_EQ(kOutputMode, mode_);
105 
106  flushing_ = true;
107  cache_.Close();
108  flush_complete_event_.Wait();
109  return internal_file_->Flush();
110 }
111 
112 void ThreadedIoFile::RunInInputMode() {
113  DCHECK(internal_file_);
114  DCHECK(thread_);
115  DCHECK_EQ(kInputMode, mode_);
116 
117  while (true) {
118  int64_t read_result = internal_file_->Read(&io_buffer_[0],
119  io_buffer_.size());
120  if (read_result <= 0) {
121  eof_ = read_result == 0;
122  internal_file_error_ = read_result;
123  cache_.Close();
124  return;
125  }
126  cache_.Write(&io_buffer_[0], read_result);
127  }
128 }
129 
130 bool ThreadedIoFile::Seek(uint64_t position) {
131  NOTIMPLEMENTED();
132  return false;
133 }
134 
135 bool ThreadedIoFile::Tell(uint64_t* position) {
136  NOTIMPLEMENTED();
137  return false;
138 }
139 
140 void ThreadedIoFile::RunInOutputMode() {
141  DCHECK(internal_file_);
142  DCHECK(thread_);
143  DCHECK_EQ(kOutputMode, mode_);
144 
145  while (true) {
146  uint64_t write_bytes = cache_.Read(&io_buffer_[0], io_buffer_.size());
147  if (write_bytes == 0) {
148  if (flushing_) {
149  cache_.Reopen();
150  flushing_ = false;
151  flush_complete_event_.Signal();
152  } else {
153  return;
154  }
155  } else {
156  int64_t write_result = internal_file_->Write(&io_buffer_[0], write_bytes);
157  if (write_result < 0) {
158  internal_file_error_ = write_result;
159  cache_.Close();
160  return;
161  }
162  CHECK_EQ(write_result, static_cast<int64_t>(write_bytes));
163  }
164  }
165 }
166 
167 } // namespace media
168 } // namespace edash_packager
void Reopen()
Reopens the cache. Any data still in the cache will be lost.
Definition: io_cache.cc:117
int64_t Write(const void *buffer, uint64_t length) override
bool Seek(uint64_t position) override
int64_t Read(void *buffer, uint64_t length) override
bool Open() override
Internal open. Should not be used directly.
uint64_t Write(const void *buffer, uint64_t size)
Definition: io_cache.cc:67
uint64_t Read(void *buffer, uint64_t size)
Definition: io_cache.cc:39
bool Tell(uint64_t *position) override