Shaka Packager SDK
cue_alignment_handler.cc
1 // Copyright 2018 Google Inc. All rights reserved.
2 //
3 // Use of this source code is governed by a BSD-style
4 // license that can be found in the LICENSE file or at
5 // https://developers.google.com/open-source/licenses/bsd
6 
7 #include "packager/media/chunking/cue_alignment_handler.h"
8 
9 #include "packager/status_macros.h"
10 
11 namespace shaka {
12 namespace media {
13 namespace {
14 // The max number of samples that are allowed to be buffered before we shutdown
15 // because there is likely a problem with the content or how the pipeline was
16 // configured. This is about 20 seconds of buffer for audio with 48kHz.
17 const size_t kMaxBufferSize = 1000;
18 
19 int64_t GetScaledTime(const StreamInfo& info, const StreamData& data) {
20  DCHECK(data.text_sample || data.media_sample);
21 
22  if (data.text_sample) {
23  return data.text_sample->start_time();
24  }
25 
26  if (info.stream_type() == kStreamText) {
27  // This class does not support splitting MediaSample at cue points, which is
28  // required for text stream. This class expects MediaSample to be converted
29  // to TextSample before passing to this class.
30  NOTREACHED()
31  << "A text streams should use text samples, not media samples.";
32  }
33 
34  if (info.stream_type() == kStreamAudio) {
35  // Return the mid-point for audio because if the portion of the sample
36  // after the cue point is bigger than the portion of the sample before
37  // the cue point, the sample is placed after the cue.
38  return data.media_sample->pts() + data.media_sample->duration() / 2;
39  }
40 
41  DCHECK_EQ(info.stream_type(), kStreamVideo);
42  return data.media_sample->pts();
43 }
44 
45 double TimeInSeconds(const StreamInfo& info, const StreamData& data) {
46  const int64_t scaled_time = GetScaledTime(info, data);
47  const uint32_t time_scale = info.time_scale();
48 
49  return static_cast<double>(scaled_time) / time_scale;
50 }
51 
52 Status GetNextCue(double hint,
53  SyncPointQueue* sync_points,
54  std::shared_ptr<const CueEvent>* out_cue) {
55  DCHECK(sync_points);
56  DCHECK(out_cue);
57 
58  *out_cue = sync_points->GetNext(hint);
59 
60  // |*out_cue| will only be null if the job was cancelled.
61  return *out_cue ? Status::OK
62  : Status(error::CANCELLED, "SyncPointQueue is cancelled.");
63 }
64 } // namespace
65 
66 CueAlignmentHandler::CueAlignmentHandler(SyncPointQueue* sync_points)
67  : sync_points_(sync_points) {}
68 
69 Status CueAlignmentHandler::InitializeInternal() {
70  sync_points_->AddThread();
71  stream_states_.resize(num_input_streams());
72 
73  // Get the first hint for the stream. Use a negative hint so that if there is
74  // suppose to be a sync point at zero, we will still respect it.
75  hint_ = sync_points_->GetHint(-1);
76 
77  return Status::OK;
78 }
79 
80 Status CueAlignmentHandler::Process(std::unique_ptr<StreamData> data) {
81  switch (data->stream_data_type) {
82  case StreamDataType::kStreamInfo:
83  return OnStreamInfo(std::move(data));
84  case StreamDataType::kTextSample:
85  case StreamDataType::kMediaSample:
86  return OnSample(std::move(data));
87  default:
88  VLOG(3) << "Dropping unsupported data type "
89  << static_cast<int>(data->stream_data_type);
90  return Status::OK;
91  }
92 }
93 
94 Status CueAlignmentHandler::OnFlushRequest(size_t stream_index) {
95  stream_states_[stream_index].to_be_flushed = true;
96 
97  // We need to wait for all stream to flush before we can flush each stream.
98  // This allows cached buffers to be cleared and cues to be properly
99  // synchronized and set on all streams.
100  for (const StreamState& stream_state : stream_states_) {
101  if (!stream_state.to_be_flushed) {
102  return Status::OK;
103  }
104  }
105 
106  // Do a once over all the streams to ensure that their states are as we expect
107  // them. Video and non-video streams have different allowances here. Video
108  // should absolutely have no cues or samples where as non-video streams may
109  // have cues or samples.
110  for (StreamState& stream : stream_states_) {
111  DCHECK(stream.to_be_flushed);
112 
113  if (stream.info->stream_type() == kStreamVideo) {
114  DCHECK_EQ(stream.samples.size(), 0u)
115  << "Video streams should not store samples";
116  DCHECK_EQ(stream.cues.size(), 0u)
117  << "Video streams should not store cues";
118  }
119  }
120 
121  // It is possible that we did not get all the cues. |hint_| will get updated
122  // when we call |UseNextSyncPoint|.
123  while (sync_points_->HasMore(hint_)) {
124  std::shared_ptr<const CueEvent> next_cue;
125  RETURN_IF_ERROR(GetNextCue(hint_, sync_points_, &next_cue));
126  RETURN_IF_ERROR(UseNewSyncPoint(std::move(next_cue)));
127  }
128 
129  // Now that there are new cues, it may be possible to dispatch some of the
130  // samples that may be left waiting.
131  for (StreamState& stream : stream_states_) {
132  RETURN_IF_ERROR(RunThroughSamples(&stream));
133  DCHECK_EQ(stream.samples.size(), 0u);
134 
135  // It is possible for there to be cues that come after all the samples. Make
136  // sure to send them out too.
137  while (stream.cues.size()) {
138  RETURN_IF_ERROR(Dispatch(std::move(stream.cues.front())));
139  stream.cues.pop_front();
140  }
141  }
142 
143  return FlushAllDownstreams();
144 }
145 
146 Status CueAlignmentHandler::OnStreamInfo(std::unique_ptr<StreamData> data) {
147  StreamState& stream_state = stream_states_[data->stream_index];
148  // Keep a copy of the stream info so that we can check type and check
149  // timescale.
150  stream_state.info = data->stream_info;
151 
152  return Dispatch(std::move(data));
153 }
154 
155 Status CueAlignmentHandler::OnVideoSample(std::unique_ptr<StreamData> sample) {
156  DCHECK(sample);
157  DCHECK(sample->media_sample);
158 
159  const size_t stream_index = sample->stream_index;
160  StreamState& stream = stream_states_[stream_index];
161 
162  const double sample_time = TimeInSeconds(*stream.info, *sample);
163  const bool is_key_frame = sample->media_sample->is_key_frame();
164 
165  if (is_key_frame && sample_time >= hint_) {
166  auto next_sync = sync_points_->PromoteAt(sample_time);
167 
168  if (!next_sync) {
169  LOG(ERROR) << "Failed to promote sync point at " << sample_time
170  << ". This happens only if video streams are not GOP-aligned.";
171  return Status(error::INVALID_ARGUMENT,
172  "Streams are not properly GOP-aligned.");
173  }
174 
175  RETURN_IF_ERROR(UseNewSyncPoint(std::move(next_sync)));
176  DCHECK_EQ(stream.cues.size(), 1u);
177  RETURN_IF_ERROR(Dispatch(std::move(stream.cues.front())));
178  stream.cues.pop_front();
179  }
180 
181  return Dispatch(std::move(sample));
182 }
183 
184 Status CueAlignmentHandler::OnNonVideoSample(
185  std::unique_ptr<StreamData> sample) {
186  DCHECK(sample);
187  DCHECK(sample->media_sample || sample->text_sample);
188 
189  const size_t stream_index = sample->stream_index;
190  StreamState& stream_state = stream_states_[stream_index];
191 
192  // Accept the sample. This will output it if it comes before the hint point or
193  // will cache it if it comes after the hint point.
194  RETURN_IF_ERROR(AcceptSample(std::move(sample), &stream_state));
195 
196  // If all the streams are waiting on a hint, it means that none has next sync
197  // point determined. It also means that there are no video streams and we need
198  // to wait for all streams to converge on a hint so that we can get the next
199  // sync point.
200  if (EveryoneWaitingAtHint()) {
201  std::shared_ptr<const CueEvent> next_sync;
202  RETURN_IF_ERROR(GetNextCue(hint_, sync_points_, &next_sync));
203  RETURN_IF_ERROR(UseNewSyncPoint(next_sync));
204  }
205 
206  return Status::OK;
207 }
208 
209 Status CueAlignmentHandler::OnSample(std::unique_ptr<StreamData> sample) {
210  // There are two modes:
211  // 1. There is a video input.
212  // 2. There are no video inputs.
213  //
214  // When there is a video input, we rely on the video input get the next sync
215  // point and release all the samples.
216  //
217  // When there are no video inputs, we rely on the sync point queue to block
218  // us until there is a sync point.
219 
220  const size_t stream_index = sample->stream_index;
221  const StreamType stream_type =
222  stream_states_[stream_index].info->stream_type();
223 
224  const bool is_video = stream_type == kStreamVideo;
225 
226  return is_video ? OnVideoSample(std::move(sample))
227  : OnNonVideoSample(std::move(sample));
228 }
229 
230 Status CueAlignmentHandler::UseNewSyncPoint(
231  std::shared_ptr<const CueEvent> new_sync) {
232  hint_ = sync_points_->GetHint(new_sync->time_in_seconds);
233  DCHECK_GT(hint_, new_sync->time_in_seconds);
234 
235  for (size_t stream_index = 0; stream_index < stream_states_.size();
236  stream_index++) {
237  StreamState& stream = stream_states_[stream_index];
238  stream.cues.push_back(StreamData::FromCueEvent(stream_index, new_sync));
239 
240  RETURN_IF_ERROR(RunThroughSamples(&stream));
241  }
242 
243  return Status::OK;
244 }
245 
246 bool CueAlignmentHandler::EveryoneWaitingAtHint() const {
247  for (const StreamState& stream_state : stream_states_) {
248  if (stream_state.samples.empty()) {
249  return false;
250  }
251  }
252  return true;
253 }
254 
255 Status CueAlignmentHandler::AcceptSample(std::unique_ptr<StreamData> sample,
256  StreamState* stream) {
257  DCHECK(sample);
258  DCHECK(sample->media_sample || sample->text_sample);
259  DCHECK(stream);
260 
261  // Need to cache the stream index as we will lose the pointer when we add
262  // the sample to the queue.
263  const size_t stream_index = sample->stream_index;
264 
265  stream->samples.push_back(std::move(sample));
266 
267  if (stream->samples.size() > kMaxBufferSize) {
268  LOG(ERROR) << "Stream " << stream_index << " has buffered "
269  << stream->samples.size() << " when the max is "
270  << kMaxBufferSize;
271  return Status(error::INVALID_ARGUMENT,
272  "Streams are not properly multiplexed.");
273  }
274 
275  return RunThroughSamples(stream);
276 }
277 
278 Status CueAlignmentHandler::RunThroughSamples(StreamState* stream) {
279  // Step through all our samples until we find where we can insert the cue.
280  // Think of this as a merge sort.
281  while (stream->cues.size() && stream->samples.size()) {
282  const double cue_time = stream->cues.front()->cue_event->time_in_seconds;
283  const double sample_time =
284  TimeInSeconds(*stream->info, *stream->samples.front());
285 
286  if (sample_time < cue_time) {
287  RETURN_IF_ERROR(Dispatch(std::move(stream->samples.front())));
288  stream->samples.pop_front();
289  } else {
290  RETURN_IF_ERROR(Dispatch(std::move(stream->cues.front())));
291  stream->cues.pop_front();
292  }
293  }
294 
295  // If we still have samples, then it means that we sent out the cue and can
296  // now work up to the hint. So now send all samples that come before the hint
297  // downstream.
298  while (stream->samples.size() &&
299  TimeInSeconds(*stream->info, *stream->samples.front()) < hint_) {
300  RETURN_IF_ERROR(Dispatch(std::move(stream->samples.front())));
301  stream->samples.pop_front();
302  }
303 
304  return Status::OK;
305 }
306 } // namespace media
307 } // namespace shaka
Status Dispatch(std::unique_ptr< StreamData > stream_data) const
STL namespace.
All the methods that are virtual are virtual for mocking.
Status FlushAllDownstreams()
Flush all connected downstream handlers.