Shaka Packager SDK
cue_alignment_handler.cc
1 // Copyright 2018 Google Inc. All rights reserved.
2 //
3 // Use of this source code is governed by a BSD-style
4 // license that can be found in the LICENSE file or at
5 // https://developers.google.com/open-source/licenses/bsd
6 
7 #include "packager/media/chunking/cue_alignment_handler.h"
8 
9 #include <algorithm>
10 
11 #include "packager/status_macros.h"
12 
13 namespace shaka {
14 namespace media {
15 namespace {
16 // The max number of samples that are allowed to be buffered before we shutdown
17 // because there is likely a problem with the content or how the pipeline was
18 // configured. This is about 20 seconds of buffer for audio with 48kHz.
19 const size_t kMaxBufferSize = 1000;
20 
21 int64_t GetScaledTime(const StreamInfo& info, const StreamData& data) {
22  DCHECK(data.text_sample || data.media_sample);
23 
24  if (data.text_sample) {
25  return data.text_sample->start_time();
26  }
27 
28  if (info.stream_type() == kStreamText) {
29  // This class does not support splitting MediaSample at cue points, which is
30  // required for text stream. This class expects MediaSample to be converted
31  // to TextSample before passing to this class.
32  NOTREACHED()
33  << "A text streams should use text samples, not media samples.";
34  }
35 
36  if (info.stream_type() == kStreamAudio) {
37  // Return the mid-point for audio because if the portion of the sample
38  // after the cue point is bigger than the portion of the sample before
39  // the cue point, the sample is placed after the cue.
40  return data.media_sample->pts() + data.media_sample->duration() / 2;
41  }
42 
43  DCHECK_EQ(info.stream_type(), kStreamVideo);
44  return data.media_sample->pts();
45 }
46 
47 double TimeInSeconds(const StreamInfo& info, const StreamData& data) {
48  const int64_t scaled_time = GetScaledTime(info, data);
49  const uint32_t time_scale = info.time_scale();
50 
51  return static_cast<double>(scaled_time) / time_scale;
52 }
53 
54 double TextEndTimeInSeconds(const StreamInfo& info, const StreamData& data) {
55  DCHECK(data.text_sample);
56 
57  const int64_t scaled_time = data.text_sample->EndTime();
58  const uint32_t time_scale = info.time_scale();
59 
60  return static_cast<double>(scaled_time) / time_scale;
61 }
62 
63 Status GetNextCue(double hint,
64  SyncPointQueue* sync_points,
65  std::shared_ptr<const CueEvent>* out_cue) {
66  DCHECK(sync_points);
67  DCHECK(out_cue);
68 
69  *out_cue = sync_points->GetNext(hint);
70 
71  // |*out_cue| will only be null if the job was cancelled.
72  return *out_cue ? Status::OK
73  : Status(error::CANCELLED, "SyncPointQueue is cancelled.");
74 }
75 } // namespace
76 
77 CueAlignmentHandler::CueAlignmentHandler(SyncPointQueue* sync_points)
78  : sync_points_(sync_points) {}
79 
80 Status CueAlignmentHandler::InitializeInternal() {
81  sync_points_->AddThread();
82  stream_states_.resize(num_input_streams());
83 
84  // Get the first hint for the stream. Use a negative hint so that if there is
85  // suppose to be a sync point at zero, we will still respect it.
86  hint_ = sync_points_->GetHint(-1);
87 
88  return Status::OK;
89 }
90 
91 Status CueAlignmentHandler::Process(std::unique_ptr<StreamData> data) {
92  switch (data->stream_data_type) {
93  case StreamDataType::kStreamInfo:
94  return OnStreamInfo(std::move(data));
95  case StreamDataType::kTextSample:
96  case StreamDataType::kMediaSample:
97  return OnSample(std::move(data));
98  default:
99  VLOG(3) << "Dropping unsupported data type "
100  << static_cast<int>(data->stream_data_type);
101  return Status::OK;
102  }
103 }
104 
105 Status CueAlignmentHandler::OnFlushRequest(size_t stream_index) {
106  stream_states_[stream_index].to_be_flushed = true;
107 
108  // We need to wait for all stream to flush before we can flush each stream.
109  // This allows cached buffers to be cleared and cues to be properly
110  // synchronized and set on all streams.
111  for (const StreamState& stream_state : stream_states_) {
112  if (!stream_state.to_be_flushed) {
113  return Status::OK;
114  }
115  }
116 
117  // Do a once over all the streams to ensure that their states are as we expect
118  // them. Video and non-video streams have different allowances here. Video
119  // should absolutely have no cues or samples where as non-video streams may
120  // have cues or samples.
121  for (StreamState& stream : stream_states_) {
122  DCHECK(stream.to_be_flushed);
123 
124  if (stream.info->stream_type() == kStreamVideo) {
125  DCHECK_EQ(stream.samples.size(), 0u)
126  << "Video streams should not store samples";
127  DCHECK_EQ(stream.cues.size(), 0u)
128  << "Video streams should not store cues";
129  }
130  }
131 
132  // It is possible that we did not get all the cues. |hint_| will get updated
133  // when we call |UseNextSyncPoint|.
134  while (sync_points_->HasMore(hint_)) {
135  std::shared_ptr<const CueEvent> next_cue;
136  RETURN_IF_ERROR(GetNextCue(hint_, sync_points_, &next_cue));
137  RETURN_IF_ERROR(UseNewSyncPoint(std::move(next_cue)));
138  }
139 
140  // Now that there are new cues, it may be possible to dispatch some of the
141  // samples that may be left waiting.
142  for (StreamState& stream : stream_states_) {
143  RETURN_IF_ERROR(RunThroughSamples(&stream));
144  DCHECK_EQ(stream.samples.size(), 0u);
145 
146  // Ignore extra cues at the end, except for text, as they will result in
147  // empty DASH Representations, which is not spec compliant.
148  // For text, if the cue is before the max end time, it will still be
149  // dispatched as the text samples intercepted by the cue can be split into
150  // two at the cue point.
151  for (auto& cue : stream.cues) {
152  // |max_text_sample_end_time_seconds| is always 0 for non-text samples.
153  if (cue->cue_event->time_in_seconds <
154  stream.max_text_sample_end_time_seconds) {
155  RETURN_IF_ERROR(Dispatch(std::move(cue)));
156  } else {
157  VLOG(1) << "Ignore extra cue in stream " << cue->stream_index
158  << " with time " << cue->cue_event->time_in_seconds
159  << "s in the end.";
160  }
161  }
162  stream.cues.clear();
163  }
164 
165  return FlushAllDownstreams();
166 }
167 
168 Status CueAlignmentHandler::OnStreamInfo(std::unique_ptr<StreamData> data) {
169  StreamState& stream_state = stream_states_[data->stream_index];
170  // Keep a copy of the stream info so that we can check type and check
171  // timescale.
172  stream_state.info = data->stream_info;
173 
174  return Dispatch(std::move(data));
175 }
176 
177 Status CueAlignmentHandler::OnVideoSample(std::unique_ptr<StreamData> sample) {
178  DCHECK(sample);
179  DCHECK(sample->media_sample);
180 
181  const size_t stream_index = sample->stream_index;
182  StreamState& stream = stream_states_[stream_index];
183 
184  const double sample_time = TimeInSeconds(*stream.info, *sample);
185  const bool is_key_frame = sample->media_sample->is_key_frame();
186 
187  if (is_key_frame && sample_time >= hint_) {
188  auto next_sync = sync_points_->PromoteAt(sample_time);
189 
190  if (!next_sync) {
191  LOG(ERROR) << "Failed to promote sync point at " << sample_time
192  << ". This happens only if video streams are not GOP-aligned.";
193  return Status(error::INVALID_ARGUMENT,
194  "Streams are not properly GOP-aligned.");
195  }
196 
197  RETURN_IF_ERROR(UseNewSyncPoint(std::move(next_sync)));
198  DCHECK_EQ(stream.cues.size(), 1u);
199  RETURN_IF_ERROR(Dispatch(std::move(stream.cues.front())));
200  stream.cues.pop_front();
201  }
202 
203  return Dispatch(std::move(sample));
204 }
205 
206 Status CueAlignmentHandler::OnNonVideoSample(
207  std::unique_ptr<StreamData> sample) {
208  DCHECK(sample);
209  DCHECK(sample->media_sample || sample->text_sample);
210 
211  const size_t stream_index = sample->stream_index;
212  StreamState& stream_state = stream_states_[stream_index];
213 
214  // Accept the sample. This will output it if it comes before the hint point or
215  // will cache it if it comes after the hint point.
216  RETURN_IF_ERROR(AcceptSample(std::move(sample), &stream_state));
217 
218  // If all the streams are waiting on a hint, it means that none has next sync
219  // point determined. It also means that there are no video streams and we need
220  // to wait for all streams to converge on a hint so that we can get the next
221  // sync point.
222  if (EveryoneWaitingAtHint()) {
223  std::shared_ptr<const CueEvent> next_sync;
224  RETURN_IF_ERROR(GetNextCue(hint_, sync_points_, &next_sync));
225  RETURN_IF_ERROR(UseNewSyncPoint(next_sync));
226  }
227 
228  return Status::OK;
229 }
230 
231 Status CueAlignmentHandler::OnSample(std::unique_ptr<StreamData> sample) {
232  // There are two modes:
233  // 1. There is a video input.
234  // 2. There are no video inputs.
235  //
236  // When there is a video input, we rely on the video input get the next sync
237  // point and release all the samples.
238  //
239  // When there are no video inputs, we rely on the sync point queue to block
240  // us until there is a sync point.
241 
242  const size_t stream_index = sample->stream_index;
243 
244  if (sample->text_sample) {
245  StreamState& stream = stream_states_[stream_index];
246  stream.max_text_sample_end_time_seconds =
247  std::max(stream.max_text_sample_end_time_seconds,
248  TextEndTimeInSeconds(*stream.info, *sample));
249  }
250 
251  const StreamType stream_type =
252  stream_states_[stream_index].info->stream_type();
253  const bool is_video = stream_type == kStreamVideo;
254 
255  return is_video ? OnVideoSample(std::move(sample))
256  : OnNonVideoSample(std::move(sample));
257 }
258 
259 Status CueAlignmentHandler::UseNewSyncPoint(
260  std::shared_ptr<const CueEvent> new_sync) {
261  hint_ = sync_points_->GetHint(new_sync->time_in_seconds);
262  DCHECK_GT(hint_, new_sync->time_in_seconds);
263 
264  for (size_t stream_index = 0; stream_index < stream_states_.size();
265  stream_index++) {
266  StreamState& stream = stream_states_[stream_index];
267  stream.cues.push_back(StreamData::FromCueEvent(stream_index, new_sync));
268 
269  RETURN_IF_ERROR(RunThroughSamples(&stream));
270  }
271 
272  return Status::OK;
273 }
274 
275 bool CueAlignmentHandler::EveryoneWaitingAtHint() const {
276  for (const StreamState& stream_state : stream_states_) {
277  if (stream_state.samples.empty()) {
278  return false;
279  }
280  }
281  return true;
282 }
283 
284 Status CueAlignmentHandler::AcceptSample(std::unique_ptr<StreamData> sample,
285  StreamState* stream) {
286  DCHECK(sample);
287  DCHECK(sample->media_sample || sample->text_sample);
288  DCHECK(stream);
289 
290  // Need to cache the stream index as we will lose the pointer when we add
291  // the sample to the queue.
292  const size_t stream_index = sample->stream_index;
293 
294  stream->samples.push_back(std::move(sample));
295 
296  if (stream->samples.size() > kMaxBufferSize) {
297  LOG(ERROR) << "Stream " << stream_index << " has buffered "
298  << stream->samples.size() << " when the max is "
299  << kMaxBufferSize;
300  return Status(error::INVALID_ARGUMENT,
301  "Streams are not properly multiplexed.");
302  }
303 
304  return RunThroughSamples(stream);
305 }
306 
307 Status CueAlignmentHandler::RunThroughSamples(StreamState* stream) {
308  // Step through all our samples until we find where we can insert the cue.
309  // Think of this as a merge sort.
310  while (stream->cues.size() && stream->samples.size()) {
311  const double cue_time = stream->cues.front()->cue_event->time_in_seconds;
312  const double sample_time =
313  TimeInSeconds(*stream->info, *stream->samples.front());
314 
315  if (sample_time < cue_time) {
316  RETURN_IF_ERROR(Dispatch(std::move(stream->samples.front())));
317  stream->samples.pop_front();
318  } else {
319  RETURN_IF_ERROR(Dispatch(std::move(stream->cues.front())));
320  stream->cues.pop_front();
321  }
322  }
323 
324  // If we still have samples, then it means that we sent out the cue and can
325  // now work up to the hint. So now send all samples that come before the hint
326  // downstream.
327  while (stream->samples.size() &&
328  TimeInSeconds(*stream->info, *stream->samples.front()) < hint_) {
329  RETURN_IF_ERROR(Dispatch(std::move(stream->samples.front())));
330  stream->samples.pop_front();
331  }
332 
333  return Status::OK;
334 }
335 } // namespace media
336 } // namespace shaka
shaka
All the methods that are virtual are virtual for mocking.
Definition: gflags_hex_bytes.cc:11
shaka::media::SyncPointQueue::HasMore
bool HasMore(double hint_in_seconds) const
Definition: sync_point_queue.cc:90
shaka::media::MediaHandler::Dispatch
Status Dispatch(std::unique_ptr< StreamData > stream_data) const
Definition: media_handler.cc:94
shaka::media::MediaHandler::FlushAllDownstreams
Status FlushAllDownstreams()
Flush all connected downstream handlers.
Definition: media_handler.cc:114
shaka::media::SyncPointQueue::PromoteAt
std::shared_ptr< const CueEvent > PromoteAt(double time_in_seconds)
Definition: sync_point_queue.cc:84
shaka::media::SyncPointQueue::GetHint
double GetHint(double time_in_seconds)
Definition: sync_point_queue.cc:38
shaka::media::SyncPointQueue::AddThread
void AddThread()
Definition: sync_point_queue.cc:25