DASH Media Packaging SDK
 All Classes Namespaces Functions Variables Typedefs Enumerator
threaded_io_file.cc
1 // Copyright 2015 Google Inc. All rights reserved.
2 //
3 // Use of this source code is governed by a BSD-style
4 // license that can be found in the LICENSE file or at
5 // https://developers.google.com/open-source/licenses/bsd
6 
7 #include "packager/media/file/threaded_io_file.h"
8 
9 #include "packager/base/bind.h"
10 #include "packager/base/bind_helpers.h"
11 #include "packager/base/threading/platform_thread.h"
12 #include "packager/media/base/closure_thread.h"
13 
14 namespace edash_packager {
15 namespace media {
16 
17 using base::subtle::NoBarrier_Load;
18 using base::subtle::NoBarrier_Store;
19 
20 ThreadedIoFile::ThreadedIoFile(scoped_ptr<File, FileCloser> internal_file,
21  Mode mode,
22  uint64_t io_cache_size,
23  uint64_t io_block_size)
24  : File(internal_file->file_name()),
25  internal_file_(internal_file.Pass()),
26  mode_(mode),
27  cache_(io_cache_size),
28  io_buffer_(io_block_size),
29  size_(0),
30  eof_(false),
31  flushing_(false),
32  flush_complete_event_(false, false),
33  internal_file_error_(0){
34  DCHECK(internal_file_);
35 }
36 
37 ThreadedIoFile::~ThreadedIoFile() {}
38 
40  DCHECK(internal_file_);
41 
42  if (!internal_file_->Open())
43  return false;
44 
45  size_ = internal_file_->Size();
46 
47  thread_.reset(new ClosureThread("ThreadedIoFile",
48  base::Bind(mode_ == kInputMode ?
49  &ThreadedIoFile::RunInInputMode :
50  &ThreadedIoFile::RunInOutputMode,
51  base::Unretained(this))));
52  thread_->Start();
53  return true;
54 }
55 
57  DCHECK(internal_file_);
58  DCHECK(thread_);
59 
60  if (mode_ == kOutputMode)
61  Flush();
62 
63  cache_.Close();
64  thread_->Join();
65 
66  bool result = internal_file_.release()->Close();
67  delete this;
68  return result;
69 }
70 
71 int64_t ThreadedIoFile::Read(void* buffer, uint64_t length) {
72  DCHECK(internal_file_);
73  DCHECK(thread_);
74  DCHECK_EQ(kInputMode, mode_);
75 
76  if (NoBarrier_Load(&eof_) && !cache_.BytesCached())
77  return 0;
78 
79  if (NoBarrier_Load(&internal_file_error_))
80  return NoBarrier_Load(&internal_file_error_);
81 
82  return cache_.Read(buffer, length);
83 }
84 
85 int64_t ThreadedIoFile::Write(const void* buffer, uint64_t length) {
86  DCHECK(internal_file_);
87  DCHECK(thread_);
88  DCHECK_EQ(kOutputMode, mode_);
89 
90  if (NoBarrier_Load(&internal_file_error_))
91  return NoBarrier_Load(&internal_file_error_);
92 
93  size_ += length;
94  return cache_.Write(buffer, length);
95 }
96 
98  DCHECK(internal_file_);
99  DCHECK(thread_);
100 
101  return size_;
102 }
103 
105  DCHECK(internal_file_);
106  DCHECK(thread_);
107  DCHECK_EQ(kOutputMode, mode_);
108 
109  flushing_ = true;
110  cache_.Close();
111  flush_complete_event_.Wait();
112  return internal_file_->Flush();
113 }
114 
115 void ThreadedIoFile::RunInInputMode() {
116  DCHECK(internal_file_);
117  DCHECK(thread_);
118  DCHECK_EQ(kInputMode, mode_);
119 
120  while (true) {
121  int64_t read_result = internal_file_->Read(&io_buffer_[0],
122  io_buffer_.size());
123  if (read_result <= 0) {
124  NoBarrier_Store(&eof_, read_result == 0);
125  NoBarrier_Store(&internal_file_error_, read_result);
126  cache_.Close();
127  return;
128  }
129  cache_.Write(&io_buffer_[0], read_result);
130  }
131 }
132 
133 bool ThreadedIoFile::Seek(uint64_t position) {
134  NOTIMPLEMENTED();
135  return false;
136 }
137 
138 bool ThreadedIoFile::Tell(uint64_t* position) {
139  NOTIMPLEMENTED();
140  return false;
141 }
142 
143 void ThreadedIoFile::RunInOutputMode() {
144  DCHECK(internal_file_);
145  DCHECK(thread_);
146  DCHECK_EQ(kOutputMode, mode_);
147 
148  while (true) {
149  uint64_t write_bytes = cache_.Read(&io_buffer_[0], io_buffer_.size());
150  if (write_bytes == 0) {
151  if (flushing_) {
152  cache_.Reopen();
153  flushing_ = false;
154  flush_complete_event_.Signal();
155  } else {
156  return;
157  }
158  } else {
159  uint64_t bytes_written(0);
160  while (bytes_written < write_bytes) {
161  int64_t write_result = internal_file_->Write(
162  &io_buffer_[bytes_written], write_bytes - bytes_written);
163  if (write_result < 0) {
164  NoBarrier_Store(&internal_file_error_, write_result);
165  cache_.Close();
166  return;
167  }
168  bytes_written += write_result;
169  }
170  }
171  }
172 }
173 
174 } // namespace media
175 } // namespace edash_packager
void Reopen()
Reopens the cache. Any data still in the cache will be lost.
Definition: io_cache.cc:117
int64_t Write(const void *buffer, uint64_t length) override
bool Seek(uint64_t position) override
int64_t Read(void *buffer, uint64_t length) override
bool Open() override
Internal open. Should not be used directly.
uint64_t Write(const void *buffer, uint64_t size)
Definition: io_cache.cc:67
uint64_t Read(void *buffer, uint64_t size)
Definition: io_cache.cc:39
bool Tell(uint64_t *position) override