Shaka Packager SDK
chunking_handler.cc
1 // Copyright 2017 Google Inc. All rights reserved.
2 //
3 // Use of this source code is governed by a BSD-style
4 // license that can be found in the LICENSE file or at
5 // https://developers.google.com/open-source/licenses/bsd
6 
7 #include "packager/media/chunking/chunking_handler.h"
8 
9 #include <algorithm>
10 
11 #include "packager/base/logging.h"
12 #include "packager/base/threading/platform_thread.h"
13 #include "packager/media/base/media_sample.h"
14 
15 namespace {
16 int64_t kThreadIdUnset = -1;
17 } // namespace
18 
19 namespace shaka {
20 namespace media {
21 
22 ChunkingHandler::ChunkingHandler(const ChunkingParams& chunking_params)
23  : chunking_params_(chunking_params),
24  thread_id_(kThreadIdUnset),
25  media_sample_comparator_(this),
26  cached_media_sample_stream_data_(media_sample_comparator_) {
27  CHECK_NE(chunking_params.segment_duration_in_seconds, 0u);
28 }
29 
30 ChunkingHandler::~ChunkingHandler() {}
31 
33  segment_info_.resize(num_input_streams());
34  subsegment_info_.resize(num_input_streams());
35  time_scales_.resize(num_input_streams());
36  last_sample_end_timestamps_.resize(num_input_streams());
37  num_cached_samples_.resize(num_input_streams());
38  return Status::OK;
39 }
40 
41 Status ChunkingHandler::Process(std::unique_ptr<StreamData> stream_data) {
42  switch (stream_data->stream_data_type) {
43  case StreamDataType::kStreamInfo: {
44  // Make sure the inputs come from the same thread.
45  const int64_t thread_id =
46  static_cast<int64_t>(base::PlatformThread::CurrentId());
47  int64_t expected = kThreadIdUnset;
48  if (!thread_id_.compare_exchange_strong(expected, thread_id) &&
49  expected != thread_id) {
50  return Status(error::CHUNKING_ERROR,
51  "Inputs should come from the same thread.");
52  }
53 
54  const auto time_scale = stream_data->stream_info->time_scale();
55  // The video stream is treated as the main stream. If there is only one
56  // stream, it is the main stream.
57  const bool is_main_stream =
58  main_stream_index_ == kInvalidStreamIndex &&
59  (stream_data->stream_info->stream_type() == kStreamVideo ||
60  num_input_streams() == 1);
61  if (is_main_stream) {
62  main_stream_index_ = stream_data->stream_index;
63  segment_duration_ =
64  chunking_params_.segment_duration_in_seconds * time_scale;
65  subsegment_duration_ =
66  chunking_params_.subsegment_duration_in_seconds * time_scale;
67  } else if (stream_data->stream_info->stream_type() == kStreamVideo) {
68  return Status(error::CHUNKING_ERROR,
69  "Only one video stream is allowed per chunking handler.");
70  }
71  time_scales_[stream_data->stream_index] = time_scale;
72  break;
73  }
74  case StreamDataType::kScte35Event: {
75  if (stream_data->stream_index != main_stream_index_) {
76  VLOG(3) << "Dropping scte35 event from non main stream.";
77  return Status::OK;
78  }
79  scte35_events_.push(std::move(stream_data));
80  return Status::OK;
81  }
82  case StreamDataType::kSegmentInfo:
83  VLOG(3) << "Droppping existing segment info.";
84  return Status::OK;
85  case StreamDataType::kMediaSample: {
86  const size_t stream_index = stream_data->stream_index;
87  DCHECK_NE(time_scales_[stream_index], 0u)
88  << "kStreamInfo should arrive before kMediaSample";
89 
90  if (stream_index != main_stream_index_ &&
91  !stream_data->media_sample->is_key_frame()) {
92  return Status(error::CHUNKING_ERROR,
93  "All non video samples should be key frames.");
94  }
95  // The streams are expected to be roughly synchronized, so we don't expect
96  // to see a lot of samples from one stream but no samples from another
97  // stream.
98  // The value is kind of arbitrary here. For a 24fps video, it is ~40s.
99  const size_t kMaxCachedSamplesPerStream = 1000u;
100  if (num_cached_samples_[stream_index] >= kMaxCachedSamplesPerStream) {
101  LOG(ERROR) << "Streams are not synchronized:";
102  for (size_t i = 0; i < num_cached_samples_.size(); ++i)
103  LOG(ERROR) << " [Stream " << i << "] " << num_cached_samples_[i];
104  return Status(error::CHUNKING_ERROR, "Streams are not synchronized.");
105  }
106 
107  cached_media_sample_stream_data_.push(std::move(stream_data));
108  ++num_cached_samples_[stream_index];
109 
110  // If we have cached samples from every stream, the first sample in
111  // |cached_media_samples_stream_data_| is guaranteed to be the earliest
112  // sample. Extract and process that sample.
113  if (std::all_of(num_cached_samples_.begin(), num_cached_samples_.end(),
114  [](size_t num_samples) { return num_samples > 0; })) {
115  while (true) {
116  const size_t top_stream_index =
117  cached_media_sample_stream_data_.top()->stream_index;
118  Status status = ProcessMediaSampleStreamData(
119  *cached_media_sample_stream_data_.top());
120  if (!status.ok())
121  return status;
122  cached_media_sample_stream_data_.pop();
123  if (--num_cached_samples_[top_stream_index] == 0)
124  break;
125  }
126  }
127  return Status::OK;
128  }
129  default:
130  VLOG(3) << "Stream data type "
131  << static_cast<int>(stream_data->stream_data_type) << " ignored.";
132  break;
133  }
134  return Dispatch(std::move(stream_data));
135 }
136 
137 Status ChunkingHandler::OnFlushRequest(size_t input_stream_index) {
138  // Process all cached samples.
139  while (!cached_media_sample_stream_data_.empty()) {
140  Status status =
141  ProcessMediaSampleStreamData(*cached_media_sample_stream_data_.top());
142  if (!status.ok())
143  return status;
144  --num_cached_samples_[cached_media_sample_stream_data_.top()->stream_index];
145  cached_media_sample_stream_data_.pop();
146  }
147  if (segment_info_[input_stream_index]) {
148  auto& segment_info = segment_info_[input_stream_index];
149  if (segment_info->start_timestamp != -1) {
150  segment_info->duration = last_sample_end_timestamps_[input_stream_index] -
151  segment_info->start_timestamp;
152  Status status =
153  DispatchSegmentInfo(input_stream_index, std::move(segment_info));
154  if (!status.ok())
155  return status;
156  }
157  }
158  const size_t output_stream_index = input_stream_index;
159  return FlushDownstream(output_stream_index);
160 }
161 
162 Status ChunkingHandler::ProcessMainMediaSample(const MediaSample* sample) {
163  const bool is_key_frame = sample->is_key_frame();
164  const int64_t timestamp = sample->dts();
165  // Check if we need to terminate the current (sub)segment.
166  bool new_segment = false;
167  bool new_subsegment = false;
168  std::shared_ptr<CueEvent> cue_event;
169  if (is_key_frame || !chunking_params_.segment_sap_aligned) {
170  const int64_t segment_index = timestamp / segment_duration_;
171  if (segment_index != current_segment_index_) {
172  current_segment_index_ = segment_index;
173  // Reset subsegment index.
174  current_subsegment_index_ = 0;
175  new_segment = true;
176  }
177  // We use 'while' instead of 'if' to make sure to pop off multiple SCTE35
178  // events that may be very close to each other.
179  while (!scte35_events_.empty() &&
180  (scte35_events_.top()->scte35_event->start_time <= timestamp)) {
181  // For simplicity, don't change |current_segment_index_|.
182  current_subsegment_index_ = 0;
183  new_segment = true;
184 
185  cue_event = std::make_shared<CueEvent>();
186  // Use PTS instead of DTS for cue event timestamp.
187  cue_event->timestamp = sample->pts();
188  cue_event->cue_data = scte35_events_.top()->scte35_event->cue_data;
189  LOG(INFO) << "Chunked at " << timestamp << " for Ad Cue.";
190 
191  scte35_events_.pop();
192  }
193  }
194  if (!new_segment && subsegment_duration_ > 0 &&
195  (is_key_frame || !chunking_params_.subsegment_sap_aligned)) {
196  const int64_t subsegment_index =
197  (timestamp - segment_info_[main_stream_index_]->start_timestamp) /
198  subsegment_duration_;
199  if (subsegment_index != current_subsegment_index_) {
200  current_subsegment_index_ = subsegment_index;
201  new_subsegment = true;
202  }
203  }
204 
205  Status status;
206  if (new_segment) {
207  status.Update(DispatchSegmentInfoForAllStreams());
208  segment_info_[main_stream_index_]->start_timestamp = timestamp;
209 
210  if (cue_event)
211  status.Update(DispatchCueEventForAllStreams(std::move(cue_event)));
212  }
213  if (subsegment_duration_ > 0 && (new_segment || new_subsegment)) {
214  status.Update(DispatchSubsegmentInfoForAllStreams());
215  subsegment_info_[main_stream_index_]->start_timestamp = timestamp;
216  }
217  return status;
218 }
219 
220 Status ChunkingHandler::ProcessMediaSampleStreamData(
221  const StreamData& media_sample_stream_data) {
222  const size_t stream_index = media_sample_stream_data.stream_index;
223  const auto sample = std::move(media_sample_stream_data.media_sample);
224 
225  if (stream_index == main_stream_index_) {
226  Status status = ProcessMainMediaSample(sample.get());
227  if (!status.ok())
228  return status;
229  }
230 
231  VLOG(3) << "Stream index: " << stream_index << " "
232  << "Sample ts: " << sample->dts() << " "
233  << " duration: " << sample->duration()
234  << " scale: " << time_scales_[stream_index] << "\n"
235  << " scale: " << time_scales_[main_stream_index_]
236  << (segment_info_[stream_index] ? " dispatch " : " discard ");
237  // Discard samples before segment start. If the segment has started,
238  // |segment_info_[stream_index]| won't be null.
239  if (!segment_info_[stream_index])
240  return Status::OK;
241  if (segment_info_[stream_index]->start_timestamp == -1)
242  segment_info_[stream_index]->start_timestamp = sample->dts();
243  if (subsegment_info_[stream_index] &&
244  subsegment_info_[stream_index]->start_timestamp == -1) {
245  subsegment_info_[stream_index]->start_timestamp = sample->dts();
246  }
247  last_sample_end_timestamps_[stream_index] =
248  sample->dts() + sample->duration();
249  return DispatchMediaSample(stream_index, std::move(sample));
250 }
251 
252 Status ChunkingHandler::DispatchSegmentInfoForAllStreams() {
253  Status status;
254  for (size_t i = 0; i < segment_info_.size() && status.ok(); ++i) {
255  if (segment_info_[i] && segment_info_[i]->start_timestamp != -1) {
256  segment_info_[i]->duration =
257  last_sample_end_timestamps_[i] - segment_info_[i]->start_timestamp;
258  status.Update(DispatchSegmentInfo(i, std::move(segment_info_[i])));
259  }
260  segment_info_[i].reset(new SegmentInfo);
261  subsegment_info_[i].reset();
262  }
263  return status;
264 }
265 
266 Status ChunkingHandler::DispatchSubsegmentInfoForAllStreams() {
267  Status status;
268  for (size_t i = 0; i < subsegment_info_.size() && status.ok(); ++i) {
269  if (subsegment_info_[i] && subsegment_info_[i]->start_timestamp != -1) {
270  subsegment_info_[i]->duration =
271  last_sample_end_timestamps_[i] - subsegment_info_[i]->start_timestamp;
272  status.Update(DispatchSegmentInfo(i, std::move(subsegment_info_[i])));
273  }
274  subsegment_info_[i].reset(new SegmentInfo);
275  subsegment_info_[i]->is_subsegment = true;
276  }
277  return status;
278 }
279 
280 Status ChunkingHandler::DispatchCueEventForAllStreams(
281  std::shared_ptr<CueEvent> cue_event) {
282  Status status;
283  for (size_t i = 0; i < segment_info_.size() && status.ok(); ++i) {
284  std::shared_ptr<CueEvent> new_cue_event(new CueEvent(*cue_event));
285  new_cue_event->timestamp = cue_event->timestamp * time_scales_[i] /
286  time_scales_[main_stream_index_];
287  status.Update(DispatchCueEvent(i, std::move(new_cue_event)));
288  }
289  return status;
290 }
291 
292 ChunkingHandler::MediaSampleTimestampGreater::MediaSampleTimestampGreater(
293  const ChunkingHandler* const chunking_handler)
294  : chunking_handler_(chunking_handler) {}
295 
296 bool ChunkingHandler::MediaSampleTimestampGreater::operator()(
297  const std::unique_ptr<StreamData>& lhs,
298  const std::unique_ptr<StreamData>& rhs) const {
299  DCHECK(lhs);
300  DCHECK(rhs);
301  return GetSampleTimeInSeconds(*lhs) > GetSampleTimeInSeconds(*rhs);
302 }
303 
304 double ChunkingHandler::MediaSampleTimestampGreater::GetSampleTimeInSeconds(
305  const StreamData& media_sample_stream_data) const {
306  const size_t stream_index = media_sample_stream_data.stream_index;
307  const auto& sample = media_sample_stream_data.media_sample;
308  DCHECK(sample);
309  // Order main samples by left boundary and non main samples by mid-point. This
310  // ensures non main samples are properly chunked, i.e. if the portion of the
311  // sample in the next chunk is bigger than the portion of the sample in the
312  // previous chunk, the sample is placed in the next chunk.
313  const uint64_t timestamp =
314  stream_index == chunking_handler_->main_stream_index_
315  ? sample->dts()
316  : (sample->dts() + sample->duration() / 2);
317  return static_cast<double>(timestamp) /
318  chunking_handler_->time_scales_[stream_index];
319 }
320 
321 bool ChunkingHandler::Scte35EventTimestampGreater::operator()(
322  const std::unique_ptr<StreamData>& lhs,
323  const std::unique_ptr<StreamData>& rhs) const {
324  DCHECK(lhs);
325  DCHECK(rhs);
326  DCHECK(lhs->scte35_event);
327  DCHECK(rhs->scte35_event);
328  return lhs->scte35_event->start_time > rhs->scte35_event->start_time;
329 }
330 
331 } // namespace media
332 } // namespace shaka
Status Process(std::unique_ptr< StreamData > stream_data) override
Status InitializeInternal() override
All the methods that are virtual are virtual for mocking.
Status OnFlushRequest(size_t input_stream_index) override
Event handler for flush request at the specific input stream index.
Class to hold a media sample.
Definition: media_sample.h:22
void Update(Status new_status)
Definition: status.cc:76