Shaka Packager SDK
cue_alignment_handler.cc
1 // Copyright 2018 Google Inc. All rights reserved.
2 //
3 // Use of this source code is governed by a BSD-style
4 // license that can be found in the LICENSE file or at
5 // https://developers.google.com/open-source/licenses/bsd
6 
7 #include "packager/media/chunking/cue_alignment_handler.h"
8 
9 #include "packager/status_macros.h"
10 
11 namespace shaka {
12 namespace media {
13 namespace {
14 // The max number of samples that are allowed to be buffered before we shutdown
15 // because there is likely a problem with the content or how the pipeline was
16 // configured. This is about 20 seconds of buffer for audio with 48kHz.
17 const size_t kMaxBufferSize = 1000;
18 
19 double TimeInSeconds(const StreamInfo& info, const StreamData& data) {
20  int64_t time_scale;
21  int64_t scaled_time;
22  switch (data.stream_data_type) {
23  case StreamDataType::kMediaSample:
24  time_scale = info.time_scale();
25  if (info.stream_type() == kStreamAudio) {
26  // Return the start time for video and mid-point for audio, so that for
27  // an audio sample, if the portion of the sample after the cue point is
28  // bigger than the portion of the sample before the cue point, the
29  // sample is placed after the cue.
30  // It does not matter for text samples as text samples will be cut at
31  // cue point.
32  scaled_time =
33  data.media_sample->pts() + data.media_sample->duration() / 2;
34  } else {
35  scaled_time = data.media_sample->pts();
36  }
37  break;
38  case StreamDataType::kTextSample:
39  // Text is always in MS but the stream info time scale is 0.
40  time_scale = 1000;
41  scaled_time = data.text_sample->start_time();
42  break;
43  default:
44  time_scale = 0;
45  scaled_time = 0;
46  NOTREACHED() << "TimeInSeconds should only be called on media samples "
47  "and text samples.";
48  break;
49  }
50 
51  return static_cast<double>(scaled_time) / time_scale;
52 }
53 
54 Status GetNextCue(double hint,
55  SyncPointQueue* sync_points,
56  std::shared_ptr<const CueEvent>* out_cue) {
57  DCHECK(sync_points);
58  DCHECK(out_cue);
59 
60  *out_cue = sync_points->GetNext(hint);
61 
62  // |*out_cue| will only be null if the job was cancelled.
63  return *out_cue ? Status::OK
64  : Status(error::CANCELLED, "SyncPointQueue is cancelled.");
65 }
66 } // namespace
67 
68 CueAlignmentHandler::CueAlignmentHandler(SyncPointQueue* sync_points)
69  : sync_points_(sync_points) {}
70 
71 Status CueAlignmentHandler::InitializeInternal() {
72  sync_points_->AddThread();
73  stream_states_.resize(num_input_streams());
74 
75  // Get the first hint for the stream. Use a negative hint so that if there is
76  // suppose to be a sync point at zero, we will still respect it.
77  hint_ = sync_points_->GetHint(-1);
78 
79  return Status::OK;
80 }
81 
82 Status CueAlignmentHandler::Process(std::unique_ptr<StreamData> data) {
83  switch (data->stream_data_type) {
84  case StreamDataType::kStreamInfo:
85  return OnStreamInfo(std::move(data));
86  case StreamDataType::kTextSample:
87  case StreamDataType::kMediaSample:
88  return OnSample(std::move(data));
89  default:
90  VLOG(3) << "Dropping unsupported data type "
91  << static_cast<int>(data->stream_data_type);
92  return Status::OK;
93  }
94 }
95 
96 Status CueAlignmentHandler::OnFlushRequest(size_t stream_index) {
97  stream_states_[stream_index].to_be_flushed = true;
98 
99  // We need to wait for all stream to flush before we can flush each stream.
100  // This allows cached buffers to be cleared and cues to be properly
101  // synchronized and set on all streams.
102  for (const StreamState& stream_state : stream_states_) {
103  if (!stream_state.to_be_flushed) {
104  return Status::OK;
105  }
106  }
107 
108  // Do a once over all the streams to ensure that their states are as we expect
109  // them. Video and non-video streams have different allowances here. Video
110  // should absolutely have no cues or samples where as non-video streams may
111  // have cues or samples.
112  for (StreamState& stream : stream_states_) {
113  DCHECK(stream.to_be_flushed);
114 
115  if (stream.info->stream_type() == kStreamVideo) {
116  DCHECK_EQ(stream.samples.size(), 0u)
117  << "Video streams should not store samples";
118  DCHECK_EQ(stream.cues.size(), 0u)
119  << "Video streams should not store cues";
120  }
121  }
122 
123  // It is possible that we did not get all the cues. |hint_| will get updated
124  // when we call |UseNextSyncPoint|.
125  while (sync_points_->HasMore(hint_)) {
126  std::shared_ptr<const CueEvent> next_cue;
127  RETURN_IF_ERROR(GetNextCue(hint_, sync_points_, &next_cue));
128  RETURN_IF_ERROR(UseNewSyncPoint(std::move(next_cue)));
129  }
130 
131  // Now that there are new cues, it may be possible to dispatch some of the
132  // samples that may be left waiting.
133  for (StreamState& stream : stream_states_) {
134  RETURN_IF_ERROR(RunThroughSamples(&stream));
135  DCHECK_EQ(stream.samples.size(), 0u);
136 
137  // It is possible for there to be cues that come after all the samples. Make
138  // sure to send them out too.
139  while (stream.cues.size()) {
140  RETURN_IF_ERROR(Dispatch(std::move(stream.cues.front())));
141  stream.cues.pop_front();
142  }
143  }
144 
145  return FlushAllDownstreams();
146 }
147 
148 Status CueAlignmentHandler::OnStreamInfo(std::unique_ptr<StreamData> data) {
149  StreamState& stream_state = stream_states_[data->stream_index];
150  // Keep a copy of the stream info so that we can check type and check
151  // timescale.
152  stream_state.info = data->stream_info;
153 
154  return Dispatch(std::move(data));
155 }
156 
157 Status CueAlignmentHandler::OnVideoSample(std::unique_ptr<StreamData> sample) {
158  DCHECK(sample);
159  DCHECK(sample->media_sample);
160 
161  const size_t stream_index = sample->stream_index;
162  StreamState& stream = stream_states_[stream_index];
163 
164  const double sample_time = TimeInSeconds(*stream.info, *sample);
165  const bool is_key_frame = sample->media_sample->is_key_frame();
166 
167  if (is_key_frame && sample_time >= hint_) {
168  auto next_sync = sync_points_->PromoteAt(sample_time);
169 
170  if (!next_sync) {
171  LOG(ERROR) << "Failed to promote sync point at " << sample_time
172  << ". This happens only if video streams are not GOP-aligned.";
173  return Status(error::INVALID_ARGUMENT,
174  "Streams are not properly GOP-aligned.");
175  }
176 
177  RETURN_IF_ERROR(UseNewSyncPoint(std::move(next_sync)));
178  DCHECK_EQ(stream.cues.size(), 1u);
179  RETURN_IF_ERROR(Dispatch(std::move(stream.cues.front())));
180  stream.cues.pop_front();
181  }
182 
183  return Dispatch(std::move(sample));
184 }
185 
186 Status CueAlignmentHandler::OnNonVideoSample(
187  std::unique_ptr<StreamData> sample) {
188  DCHECK(sample);
189  DCHECK(sample->media_sample || sample->text_sample);
190 
191  const size_t stream_index = sample->stream_index;
192  StreamState& stream_state = stream_states_[stream_index];
193 
194  // Accept the sample. This will output it if it comes before the hint point or
195  // will cache it if it comes after the hint point.
196  RETURN_IF_ERROR(AcceptSample(std::move(sample), &stream_state));
197 
198  // If all the streams are waiting on a hint, it means that none has next sync
199  // point determined. It also means that there are no video streams and we need
200  // to wait for all streams to converge on a hint so that we can get the next
201  // sync point.
202  if (EveryoneWaitingAtHint()) {
203  std::shared_ptr<const CueEvent> next_sync;
204  RETURN_IF_ERROR(GetNextCue(hint_, sync_points_, &next_sync));
205  RETURN_IF_ERROR(UseNewSyncPoint(next_sync));
206  }
207 
208  return Status::OK;
209 }
210 
211 Status CueAlignmentHandler::OnSample(std::unique_ptr<StreamData> sample) {
212  // There are two modes:
213  // 1. There is a video input.
214  // 2. There are no video inputs.
215  //
216  // When there is a video input, we rely on the video input get the next sync
217  // point and release all the samples.
218  //
219  // When there are no video inputs, we rely on the sync point queue to block
220  // us until there is a sync point.
221 
222  const size_t stream_index = sample->stream_index;
223  const StreamType stream_type =
224  stream_states_[stream_index].info->stream_type();
225 
226  const bool is_video = stream_type == kStreamVideo;
227 
228  return is_video ? OnVideoSample(std::move(sample))
229  : OnNonVideoSample(std::move(sample));
230 }
231 
232 Status CueAlignmentHandler::UseNewSyncPoint(
233  std::shared_ptr<const CueEvent> new_sync) {
234  hint_ = sync_points_->GetHint(new_sync->time_in_seconds);
235  DCHECK_GT(hint_, new_sync->time_in_seconds);
236 
237  for (size_t stream_index = 0; stream_index < stream_states_.size();
238  stream_index++) {
239  StreamState& stream = stream_states_[stream_index];
240  stream.cues.push_back(StreamData::FromCueEvent(stream_index, new_sync));
241 
242  RETURN_IF_ERROR(RunThroughSamples(&stream));
243  }
244 
245  return Status::OK;
246 }
247 
248 bool CueAlignmentHandler::EveryoneWaitingAtHint() const {
249  for (const StreamState& stream_state : stream_states_) {
250  if (stream_state.samples.empty()) {
251  return false;
252  }
253  }
254  return true;
255 }
256 
257 Status CueAlignmentHandler::AcceptSample(std::unique_ptr<StreamData> sample,
258  StreamState* stream) {
259  DCHECK(sample);
260  DCHECK(sample->media_sample || sample->text_sample);
261  DCHECK(stream);
262 
263  // Need to cache the stream index as we will lose the pointer when we add
264  // the sample to the queue.
265  const size_t stream_index = sample->stream_index;
266 
267  stream->samples.push_back(std::move(sample));
268 
269  if (stream->samples.size() > kMaxBufferSize) {
270  LOG(ERROR) << "Stream " << stream_index << " has buffered "
271  << stream->samples.size() << " when the max is "
272  << kMaxBufferSize;
273  return Status(error::INVALID_ARGUMENT,
274  "Streams are not properly multiplexed.");
275  }
276 
277  return RunThroughSamples(stream);
278 }
279 
280 Status CueAlignmentHandler::RunThroughSamples(StreamState* stream) {
281  // Step through all our samples until we find where we can insert the cue.
282  // Think of this as a merge sort.
283  while (stream->cues.size() && stream->samples.size()) {
284  const double cue_time = stream->cues.front()->cue_event->time_in_seconds;
285  const double sample_time =
286  TimeInSeconds(*stream->info, *stream->samples.front());
287 
288  if (sample_time < cue_time) {
289  RETURN_IF_ERROR(Dispatch(std::move(stream->samples.front())));
290  stream->samples.pop_front();
291  } else {
292  RETURN_IF_ERROR(Dispatch(std::move(stream->cues.front())));
293  stream->cues.pop_front();
294  }
295  }
296 
297  // If we still have samples, then it means that we sent out the cue and can
298  // now work up to the hint. So now send all samples that come before the hint
299  // downstream.
300  while (stream->samples.size() &&
301  TimeInSeconds(*stream->info, *stream->samples.front()) < hint_) {
302  RETURN_IF_ERROR(Dispatch(std::move(stream->samples.front())));
303  stream->samples.pop_front();
304  }
305 
306  return Status::OK;
307 }
308 } // namespace media
309 } // namespace shaka
Status Dispatch(std::unique_ptr< StreamData > stream_data) const
STL namespace.
All the methods that are virtual are virtual for mocking.
Status FlushAllDownstreams()
Flush all connected downstream handlers.