Shaka Packager SDK
io_cache.cc
1 // Copyright 2015 Google Inc. All rights reserved.
2 //
3 // Use of this source code is governed by a BSD-style
4 // license that can be found in the LICENSE file or at
5 // https://developers.google.com/open-source/licenses/bsd
6 
7 #include "packager/file/io_cache.h"
8 
9 #include <string.h>
10 
11 #include <algorithm>
12 
13 #include "packager/base/logging.h"
14 
15 namespace shaka {
16 
17 using base::AutoLock;
18 using base::AutoUnlock;
19 
20 IoCache::IoCache(uint64_t cache_size)
21  : cache_size_(cache_size),
22  read_event_(base::WaitableEvent::ResetPolicy::AUTOMATIC,
23  base::WaitableEvent::InitialState::NOT_SIGNALED),
24  write_event_(base::WaitableEvent::ResetPolicy::AUTOMATIC,
25  base::WaitableEvent::InitialState::NOT_SIGNALED),
26  // Make the buffer one byte larger than the cache so that when the
27  // condition r_ptr == w_ptr is unambiguous (buffer empty).
28  circular_buffer_(cache_size + 1),
29  end_ptr_(&circular_buffer_[0] + cache_size + 1),
30  r_ptr_(circular_buffer_.data()),
31  w_ptr_(circular_buffer_.data()),
32  closed_(false) {}
33 
34 IoCache::~IoCache() {
35  Close();
36 }
37 
38 uint64_t IoCache::Read(void* buffer, uint64_t size) {
39  DCHECK(buffer);
40 
41  AutoLock lock(lock_);
42  while (!closed_ && (BytesCachedInternal() == 0)) {
43  AutoUnlock unlock(lock_);
44  write_event_.Wait();
45  }
46 
47  size = std::min(size, BytesCachedInternal());
48  uint64_t first_chunk_size(
49  std::min(size, static_cast<uint64_t>(end_ptr_ - r_ptr_)));
50  memcpy(buffer, r_ptr_, first_chunk_size);
51  r_ptr_ += first_chunk_size;
52  DCHECK_GE(end_ptr_, r_ptr_);
53  if (r_ptr_ == end_ptr_)
54  r_ptr_ = &circular_buffer_[0];
55  uint64_t second_chunk_size(size - first_chunk_size);
56  if (second_chunk_size) {
57  memcpy(static_cast<uint8_t*>(buffer) + first_chunk_size, r_ptr_,
58  second_chunk_size);
59  r_ptr_ += second_chunk_size;
60  DCHECK_GT(end_ptr_, r_ptr_);
61  }
62  read_event_.Signal();
63  return size;
64 }
65 
66 uint64_t IoCache::Write(const void* buffer, uint64_t size) {
67  DCHECK(buffer);
68 
69  const uint8_t* r_ptr(static_cast<const uint8_t*>(buffer));
70  uint64_t bytes_left(size);
71  while (bytes_left) {
72  AutoLock lock(lock_);
73  while (!closed_ && (BytesFreeInternal() == 0)) {
74  AutoUnlock unlock(lock_);
75  VLOG(1) << "Circular buffer is full, which can happen if data arrives "
76  "faster than being consumed by packager. Ignore if it is not "
77  "live packaging. Otherwise, try increasing --io_cache_size.";
78  read_event_.Wait();
79  }
80  if (closed_)
81  return 0;
82 
83  uint64_t write_size(std::min(bytes_left, BytesFreeInternal()));
84  uint64_t first_chunk_size(
85  std::min(write_size, static_cast<uint64_t>(end_ptr_ - w_ptr_)));
86  memcpy(w_ptr_, r_ptr, first_chunk_size);
87  w_ptr_ += first_chunk_size;
88  DCHECK_GE(end_ptr_, w_ptr_);
89  if (w_ptr_ == end_ptr_)
90  w_ptr_ = &circular_buffer_[0];
91  r_ptr += first_chunk_size;
92  uint64_t second_chunk_size(write_size - first_chunk_size);
93  if (second_chunk_size) {
94  memcpy(w_ptr_, r_ptr, second_chunk_size);
95  w_ptr_ += second_chunk_size;
96  DCHECK_GT(end_ptr_, w_ptr_);
97  r_ptr += second_chunk_size;
98  }
99  bytes_left -= write_size;
100  write_event_.Signal();
101  }
102  return size;
103 }
104 
106  AutoLock lock(lock_);
107  r_ptr_ = w_ptr_ = circular_buffer_.data();
108  // Let any writers know that there is room in the cache.
109  read_event_.Signal();
110 }
111 
113  AutoLock lock(lock_);
114  closed_ = true;
115  read_event_.Signal();
116  write_event_.Signal();
117 }
118 
120  AutoLock lock(lock_);
121  CHECK(closed_);
122  r_ptr_ = w_ptr_ = circular_buffer_.data();
123  closed_ = false;
124  read_event_.Reset();
125  write_event_.Reset();
126 }
127 
129  AutoLock lock(lock_);
130  return BytesCachedInternal();
131 }
132 
133 uint64_t IoCache::BytesFree() {
134  AutoLock lock(lock_);
135  return BytesFreeInternal();
136 }
137 
138 uint64_t IoCache::BytesCachedInternal() {
139  return (r_ptr_ <= w_ptr_)
140  ? w_ptr_ - r_ptr_
141  : (end_ptr_ - r_ptr_) + (w_ptr_ - circular_buffer_.data());
142 }
143 
144 uint64_t IoCache::BytesFreeInternal() {
145  return cache_size_ - BytesCachedInternal();
146 }
147 
149  AutoLock lock(lock_);
150  while (!closed_ && BytesCachedInternal()) {
151  AutoUnlock unlock(lock_);
152  read_event_.Wait();
153  }
154 }
155 
156 } // namespace shaka
void Reopen()
Reopens the cache. Any data still in the cache will be lost.
Definition: io_cache.cc:119
void Clear()
Empties the cache.
Definition: io_cache.cc:105
void Close()
Definition: io_cache.cc:112
All the methods that are virtual are virtual for mocking.
uint64_t Write(const void *buffer, uint64_t size)
Definition: io_cache.cc:66
void WaitUntilEmptyOrClosed()
Waits until the cache is empty or has been closed.
Definition: io_cache.cc:148
uint64_t Read(void *buffer, uint64_t size)
Definition: io_cache.cc:38
uint64_t BytesFree()
Definition: io_cache.cc:133
uint64_t BytesCached()
Definition: io_cache.cc:128