DASH Media Packaging SDK
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator
chunking_handler.cc
1 // Copyright 2017 Google Inc. All rights reserved.
2 //
3 // Use of this source code is governed by a BSD-style
4 // license that can be found in the LICENSE file or at
5 // https://developers.google.com/open-source/licenses/bsd
6 
7 #include "packager/media/chunking/chunking_handler.h"
8 
9 #include "packager/base/logging.h"
10 #include "packager/base/threading/platform_thread.h"
11 #include "packager/media/base/media_sample.h"
12 
13 namespace {
14 int64_t kThreadIdUnset = -1;
15 int64_t kTimeStampToDispatchAllSamples = -1;
16 } // namespace
17 
18 namespace shaka {
19 namespace media {
20 
21 ChunkingHandler::ChunkingHandler(const ChunkingOptions& chunking_options)
22  : chunking_options_(chunking_options), thread_id_(kThreadIdUnset) {
23  CHECK_NE(chunking_options.segment_duration_in_seconds, 0u);
24 }
25 
26 ChunkingHandler::~ChunkingHandler() {}
27 
29  segment_info_.resize(num_input_streams());
30  subsegment_info_.resize(num_input_streams());
31  time_scales_.resize(num_input_streams());
32  last_sample_end_timestamps_.resize(num_input_streams());
33  return Status::OK;
34 }
35 
36 Status ChunkingHandler::Process(std::unique_ptr<StreamData> stream_data) {
37  switch (stream_data->stream_data_type) {
38  case StreamDataType::kStreamInfo: {
39  // Make sure the inputs come from the same thread.
40  const int64_t thread_id =
41  static_cast<int64_t>(base::PlatformThread::CurrentId());
42  int64_t expected = kThreadIdUnset;
43  if (!thread_id_.compare_exchange_strong(expected, thread_id) &&
44  expected != thread_id) {
45  return Status(error::CHUNKING_ERROR,
46  "Inputs should come from the same thread.");
47  }
48 
49  const auto time_scale = stream_data->stream_info->time_scale();
50  // The video stream is treated as the main stream. If there is only one
51  // stream, it is the main stream.
52  const bool is_main_stream =
53  main_stream_index_ == -1 &&
54  (stream_data->stream_info->stream_type() == kStreamVideo ||
55  num_input_streams() == 1);
56  if (is_main_stream) {
57  main_stream_index_ = stream_data->stream_index;
58  segment_duration_ =
59  chunking_options_.segment_duration_in_seconds * time_scale;
60  subsegment_duration_ =
61  chunking_options_.subsegment_duration_in_seconds * time_scale;
62  } else if (stream_data->stream_info->stream_type() == kStreamVideo) {
63  return Status(error::CHUNKING_ERROR,
64  "Only one video stream is allowed per chunking handler.");
65  }
66  time_scales_[stream_data->stream_index] = time_scale;
67  break;
68  }
69  case StreamDataType::kSegmentInfo:
70  VLOG(3) << "Drop existing segment info.";
71  return Status::OK;
72  case StreamDataType::kMediaSample: {
73  const int stream_index = stream_data->stream_index;
74  DCHECK_NE(time_scales_[stream_index], 0u)
75  << "kStreamInfo should arrive before kMediaSample";
76  if (stream_index != main_stream_index_) {
77  if (!stream_data->media_sample->is_key_frame()) {
78  return Status(error::CHUNKING_ERROR,
79  "All non video samples should be key frames.");
80  }
81  // Cache non main stream samples, since we don't know yet whether these
82  // samples belong to the current or next segment.
83  non_main_samples_.push_back(std::move(stream_data));
84  // The streams are expected to be synchronized, so we don't expect to
85  // see a lot of samples before seeing video samples.
86  const size_t kMaxSamplesPerStreamBeforeVideoSample = 5u;
87  if (non_main_samples_.size() >
88  num_input_streams() * kMaxSamplesPerStreamBeforeVideoSample) {
89  return Status(error::CHUNKING_ERROR,
90  "Too many non video samples before video sample.");
91  }
92  return Status::OK;
93  }
94 
95  const MediaSample* sample = stream_data->media_sample.get();
96  Status status = ProcessMediaSample(sample);
97  if (!status.ok())
98  return status;
99  // Discard samples before segment start.
100  if (!segment_info_[stream_index])
101  return Status::OK;
102  last_sample_end_timestamps_[stream_index] =
103  sample->dts() + sample->duration();
104  break;
105  }
106  default:
107  VLOG(3) << "Stream data type "
108  << static_cast<int>(stream_data->stream_data_type) << " ignored.";
109  break;
110  }
111  return Dispatch(std::move(stream_data));
112 }
113 
114 Status ChunkingHandler::FlushStream(int input_stream_index) {
115  if (segment_info_[input_stream_index]) {
116  Status status;
117  if (input_stream_index != main_stream_index_) {
118  status = DispatchNonMainSamples(kTimeStampToDispatchAllSamples);
119  if (!status.ok())
120  return status;
121  }
122  auto& segment_info = segment_info_[input_stream_index];
123  if (segment_info->start_timestamp != -1) {
124  segment_info->duration = last_sample_end_timestamps_[input_stream_index] -
125  segment_info->start_timestamp;
126  status = DispatchSegmentInfo(input_stream_index, std::move(segment_info));
127  if (!status.ok())
128  return status;
129  }
130  }
131  return MediaHandler::FlushStream(input_stream_index);
132 }
133 
134 Status ChunkingHandler::ProcessMediaSample(const MediaSample* sample) {
135  const bool is_key_frame = sample->is_key_frame();
136  const int64_t timestamp = sample->dts();
137  // Check if we need to terminate the current (sub)segment.
138  bool new_segment = false;
139  bool new_subsegment = false;
140  if (is_key_frame || !chunking_options_.segment_sap_aligned) {
141  const int64_t segment_index = timestamp / segment_duration_;
142  if (segment_index != current_segment_index_) {
143  current_segment_index_ = segment_index;
144  new_segment = true;
145  }
146  }
147  if (!new_segment && subsegment_duration_ > 0 &&
148  (is_key_frame || !chunking_options_.subsegment_sap_aligned)) {
149  const int64_t subsegment_index =
150  (timestamp - segment_info_[main_stream_index_]->start_timestamp) /
151  subsegment_duration_;
152  if (subsegment_index != current_subsegment_index_) {
153  current_subsegment_index_ = subsegment_index;
154  new_subsegment = true;
155  }
156  }
157 
158  Status status;
159  if (new_segment || new_subsegment) {
160  // Dispatch the samples before |timestamp| - See the implemention on how we
161  // determine if a sample is before |timestamp|..
162  status.Update(DispatchNonMainSamples(timestamp));
163  }
164 
165  if (new_segment) {
166  status.Update(DispatchSegmentInfoForAllStreams());
167  segment_info_[main_stream_index_]->start_timestamp = timestamp;
168  }
169  if (subsegment_duration_ > 0 && (new_segment || new_subsegment)) {
170  status.Update(DispatchSubsegmentInfoForAllStreams());
171  subsegment_info_[main_stream_index_]->start_timestamp = timestamp;
172  }
173  if (!status.ok())
174  return status;
175 
176  // Dispatch non-main samples for the next segment.
177  return DispatchNonMainSamples(kTimeStampToDispatchAllSamples);
178 }
179 
180 Status ChunkingHandler::DispatchNonMainSamples(int64_t timestamp_threshold) {
181  Status status;
182  while (status.ok() && !non_main_samples_.empty()) {
183  DCHECK_EQ(non_main_samples_.front()->stream_data_type,
184  StreamDataType::kMediaSample);
185  const int stream_index = non_main_samples_.front()->stream_index;
186  const MediaSample* sample = non_main_samples_.front()->media_sample.get();
187  // If the portion of the sample before |timestamp_threshold| is bigger than
188  // the other portion, we consider it part of the current segment.
189  const int64_t timestamp = sample->dts() + sample->duration() / 2;
190  const bool stop =
191  (timestamp_threshold != kTimeStampToDispatchAllSamples &&
192  (static_cast<double>(timestamp) / time_scales_[stream_index]) >
193  (static_cast<double>(timestamp_threshold) /
194  time_scales_[main_stream_index_]));
195  VLOG(3) << "Sample ts: " << sample->dts() << " "
196  << " duration: " << sample->duration()
197  << " scale: " << time_scales_[stream_index] << "\n"
198  << " threshold: " << timestamp_threshold
199  << " scale: " << time_scales_[main_stream_index_]
200  << (stop ? " stop "
201  : (segment_info_[stream_index] ? " dispatch "
202  : " discard "));
203  if (stop)
204  break;
205  // Only dispatch samples if the segment has started, otherwise discard
206  // them.
207  if (segment_info_[stream_index]) {
208  if (segment_info_[stream_index]->start_timestamp == -1)
209  segment_info_[stream_index]->start_timestamp = sample->dts();
210  if (subsegment_info_[stream_index] &&
211  subsegment_info_[stream_index]->start_timestamp == -1) {
212  subsegment_info_[stream_index]->start_timestamp = sample->dts();
213  }
214  last_sample_end_timestamps_[stream_index] =
215  sample->dts() + sample->duration();
216  status.Update(Dispatch(std::move(non_main_samples_.front())));
217  }
218  non_main_samples_.pop_front();
219  }
220  return status;
221 }
222 
223 Status ChunkingHandler::DispatchSegmentInfoForAllStreams() {
224  Status status;
225  for (int i = 0; i < static_cast<int>(segment_info_.size()) && status.ok();
226  ++i) {
227  if (segment_info_[i] && segment_info_[i]->start_timestamp != -1) {
228  segment_info_[i]->duration =
229  last_sample_end_timestamps_[i] - segment_info_[i]->start_timestamp;
230  status.Update(DispatchSegmentInfo(i, std::move(segment_info_[i])));
231  }
232  segment_info_[i].reset(new SegmentInfo);
233  subsegment_info_[i].reset();
234  }
235  return status;
236 }
237 
238 Status ChunkingHandler::DispatchSubsegmentInfoForAllStreams() {
239  Status status;
240  for (int i = 0; i < static_cast<int>(subsegment_info_.size()) && status.ok();
241  ++i) {
242  if (subsegment_info_[i] && subsegment_info_[i]->start_timestamp != -1) {
243  subsegment_info_[i]->duration =
244  last_sample_end_timestamps_[i] - subsegment_info_[i]->start_timestamp;
245  status.Update(DispatchSegmentInfo(i, std::move(subsegment_info_[i])));
246  }
247  subsegment_info_[i].reset(new SegmentInfo);
248  subsegment_info_[i]->is_subsegment = true;
249  }
250  return status;
251 }
252 
253 } // namespace media
254 } // namespace shaka
Status Process(std::unique_ptr< StreamData > stream_data) override
Status Dispatch(std::unique_ptr< StreamData > stream_data)
Status InitializeInternal() override
Status DispatchSegmentInfo(int stream_index, std::shared_ptr< SegmentInfo > segment_info)
Dispatch the segment info to downstream handlers.
virtual Status FlushStream(int input_stream_index)
Flush the stream at the specified input stream index.
Class to hold a media sample.
Definition: media_sample.h:22
double segment_duration_in_seconds
Segment duration in seconds.
Status FlushStream(int input_stream_index) override
Flush the stream at the specified input stream index.