Shaka Packager SDK
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator Friends
chunking_handler.cc
1 // Copyright 2017 Google Inc. All rights reserved.
2 //
3 // Use of this source code is governed by a BSD-style
4 // license that can be found in the LICENSE file or at
5 // https://developers.google.com/open-source/licenses/bsd
6 
7 #include "packager/media/chunking/chunking_handler.h"
8 
9 #include "packager/base/logging.h"
10 #include "packager/base/threading/platform_thread.h"
11 #include "packager/media/base/media_sample.h"
12 
13 namespace {
14 int64_t kThreadIdUnset = -1;
15 int64_t kTimeStampToDispatchAllSamples = -1;
16 } // namespace
17 
18 namespace shaka {
19 namespace media {
20 
21 ChunkingHandler::ChunkingHandler(const ChunkingParams& chunking_params)
22  : chunking_params_(chunking_params), thread_id_(kThreadIdUnset) {
23  CHECK_NE(chunking_params.segment_duration_in_seconds, 0u);
24 }
25 
26 ChunkingHandler::~ChunkingHandler() {}
27 
29  segment_info_.resize(num_input_streams());
30  subsegment_info_.resize(num_input_streams());
31  time_scales_.resize(num_input_streams());
32  last_sample_end_timestamps_.resize(num_input_streams());
33  return Status::OK;
34 }
35 
36 Status ChunkingHandler::Process(std::unique_ptr<StreamData> stream_data) {
37  switch (stream_data->stream_data_type) {
38  case StreamDataType::kStreamInfo: {
39  // Make sure the inputs come from the same thread.
40  const int64_t thread_id =
41  static_cast<int64_t>(base::PlatformThread::CurrentId());
42  int64_t expected = kThreadIdUnset;
43  if (!thread_id_.compare_exchange_strong(expected, thread_id) &&
44  expected != thread_id) {
45  return Status(error::CHUNKING_ERROR,
46  "Inputs should come from the same thread.");
47  }
48 
49  const auto time_scale = stream_data->stream_info->time_scale();
50  // The video stream is treated as the main stream. If there is only one
51  // stream, it is the main stream.
52  const bool is_main_stream =
53  main_stream_index_ == kInvalidStreamIndex &&
54  (stream_data->stream_info->stream_type() == kStreamVideo ||
55  num_input_streams() == 1);
56  if (is_main_stream) {
57  main_stream_index_ = stream_data->stream_index;
58  segment_duration_ =
59  chunking_params_.segment_duration_in_seconds * time_scale;
60  subsegment_duration_ =
61  chunking_params_.subsegment_duration_in_seconds * time_scale;
62  } else if (stream_data->stream_info->stream_type() == kStreamVideo) {
63  return Status(error::CHUNKING_ERROR,
64  "Only one video stream is allowed per chunking handler.");
65  }
66  time_scales_[stream_data->stream_index] = time_scale;
67  break;
68  }
69  case StreamDataType::kSegmentInfo:
70  VLOG(3) << "Drop existing segment info.";
71  return Status::OK;
72  case StreamDataType::kMediaSample: {
73  const size_t stream_index = stream_data->stream_index;
74  DCHECK_NE(time_scales_[stream_index], 0u)
75  << "kStreamInfo should arrive before kMediaSample";
76  if (stream_index != main_stream_index_) {
77  if (!stream_data->media_sample->is_key_frame()) {
78  return Status(error::CHUNKING_ERROR,
79  "All non video samples should be key frames.");
80  }
81  // Cache non main stream samples, since we don't know yet whether these
82  // samples belong to the current or next segment.
83  non_main_samples_.push_back(std::move(stream_data));
84  // The streams are expected to be synchronized, so we don't expect to
85  // see a lot of samples before seeing video samples.
86  const size_t kMaxSamplesPerStreamBeforeVideoSample = 5u;
87  if (non_main_samples_.size() >
88  num_input_streams() * kMaxSamplesPerStreamBeforeVideoSample) {
89  return Status(error::CHUNKING_ERROR,
90  "Too many non video samples before video sample.");
91  }
92  return Status::OK;
93  }
94 
95  const MediaSample* sample = stream_data->media_sample.get();
96  Status status = ProcessMediaSample(sample);
97  if (!status.ok())
98  return status;
99  // Discard samples before segment start.
100  if (!segment_info_[stream_index])
101  return Status::OK;
102  last_sample_end_timestamps_[stream_index] =
103  sample->dts() + sample->duration();
104  break;
105  }
106  default:
107  VLOG(3) << "Stream data type "
108  << static_cast<int>(stream_data->stream_data_type) << " ignored.";
109  break;
110  }
111  return Dispatch(std::move(stream_data));
112 }
113 
114 Status ChunkingHandler::OnFlushRequest(size_t input_stream_index) {
115  if (segment_info_[input_stream_index]) {
116  Status status;
117  if (input_stream_index != main_stream_index_) {
118  status = DispatchNonMainSamples(kTimeStampToDispatchAllSamples);
119  if (!status.ok())
120  return status;
121  }
122  auto& segment_info = segment_info_[input_stream_index];
123  if (segment_info->start_timestamp != -1) {
124  segment_info->duration = last_sample_end_timestamps_[input_stream_index] -
125  segment_info->start_timestamp;
126  status = DispatchSegmentInfo(input_stream_index, std::move(segment_info));
127  if (!status.ok())
128  return status;
129  }
130  }
131  const size_t output_stream_index = input_stream_index;
132  return FlushDownstream(output_stream_index);
133 }
134 
135 Status ChunkingHandler::ProcessMediaSample(const MediaSample* sample) {
136  const bool is_key_frame = sample->is_key_frame();
137  const int64_t timestamp = sample->dts();
138  // Check if we need to terminate the current (sub)segment.
139  bool new_segment = false;
140  bool new_subsegment = false;
141  if (is_key_frame || !chunking_params_.segment_sap_aligned) {
142  const int64_t segment_index = timestamp / segment_duration_;
143  if (segment_index != current_segment_index_) {
144  current_segment_index_ = segment_index;
145  // Reset subsegment index.
146  current_subsegment_index_ = 0;
147  new_segment = true;
148  }
149  }
150  if (!new_segment && subsegment_duration_ > 0 &&
151  (is_key_frame || !chunking_params_.subsegment_sap_aligned)) {
152  const int64_t subsegment_index =
153  (timestamp - segment_info_[main_stream_index_]->start_timestamp) /
154  subsegment_duration_;
155  if (subsegment_index != current_subsegment_index_) {
156  current_subsegment_index_ = subsegment_index;
157  new_subsegment = true;
158  }
159  }
160 
161  Status status;
162  if (new_segment || new_subsegment) {
163  // Dispatch the samples before |timestamp| - See the implemention on how we
164  // determine if a sample is before |timestamp|..
165  status.Update(DispatchNonMainSamples(timestamp));
166  }
167 
168  if (new_segment) {
169  status.Update(DispatchSegmentInfoForAllStreams());
170  segment_info_[main_stream_index_]->start_timestamp = timestamp;
171  }
172  if (subsegment_duration_ > 0 && (new_segment || new_subsegment)) {
173  status.Update(DispatchSubsegmentInfoForAllStreams());
174  subsegment_info_[main_stream_index_]->start_timestamp = timestamp;
175  }
176  if (!status.ok())
177  return status;
178 
179  // Dispatch non-main samples for the next segment.
180  return DispatchNonMainSamples(kTimeStampToDispatchAllSamples);
181 }
182 
183 Status ChunkingHandler::DispatchNonMainSamples(int64_t timestamp_threshold) {
184  Status status;
185  while (status.ok() && !non_main_samples_.empty()) {
186  DCHECK_EQ(non_main_samples_.front()->stream_data_type,
187  StreamDataType::kMediaSample);
188  const size_t stream_index = non_main_samples_.front()->stream_index;
189  const MediaSample* sample = non_main_samples_.front()->media_sample.get();
190  // If the portion of the sample before |timestamp_threshold| is bigger than
191  // the other portion, we consider it part of the current segment.
192  const int64_t timestamp = sample->dts() + sample->duration() / 2;
193  const bool stop =
194  (timestamp_threshold != kTimeStampToDispatchAllSamples &&
195  (static_cast<double>(timestamp) / time_scales_[stream_index]) >
196  (static_cast<double>(timestamp_threshold) /
197  time_scales_[main_stream_index_]));
198  VLOG(3) << "Sample ts: " << sample->dts() << " "
199  << " duration: " << sample->duration()
200  << " scale: " << time_scales_[stream_index] << "\n"
201  << " threshold: " << timestamp_threshold
202  << " scale: " << time_scales_[main_stream_index_]
203  << (stop ? " stop "
204  : (segment_info_[stream_index] ? " dispatch "
205  : " discard "));
206  if (stop)
207  break;
208  // Only dispatch samples if the segment has started, otherwise discard
209  // them.
210  if (segment_info_[stream_index]) {
211  if (segment_info_[stream_index]->start_timestamp == -1)
212  segment_info_[stream_index]->start_timestamp = sample->dts();
213  if (subsegment_info_[stream_index] &&
214  subsegment_info_[stream_index]->start_timestamp == -1) {
215  subsegment_info_[stream_index]->start_timestamp = sample->dts();
216  }
217  last_sample_end_timestamps_[stream_index] =
218  sample->dts() + sample->duration();
219  status.Update(Dispatch(std::move(non_main_samples_.front())));
220  }
221  non_main_samples_.pop_front();
222  }
223  return status;
224 }
225 
226 Status ChunkingHandler::DispatchSegmentInfoForAllStreams() {
227  Status status;
228  for (size_t i = 0; i < segment_info_.size() && status.ok(); ++i) {
229  if (segment_info_[i] && segment_info_[i]->start_timestamp != -1) {
230  segment_info_[i]->duration =
231  last_sample_end_timestamps_[i] - segment_info_[i]->start_timestamp;
232  status.Update(DispatchSegmentInfo(i, std::move(segment_info_[i])));
233  }
234  segment_info_[i].reset(new SegmentInfo);
235  subsegment_info_[i].reset();
236  }
237  return status;
238 }
239 
240 Status ChunkingHandler::DispatchSubsegmentInfoForAllStreams() {
241  Status status;
242  for (size_t i = 0; i < subsegment_info_.size() && status.ok(); ++i) {
243  if (subsegment_info_[i] && subsegment_info_[i]->start_timestamp != -1) {
244  subsegment_info_[i]->duration =
245  last_sample_end_timestamps_[i] - subsegment_info_[i]->start_timestamp;
246  status.Update(DispatchSegmentInfo(i, std::move(subsegment_info_[i])));
247  }
248  subsegment_info_[i].reset(new SegmentInfo);
249  subsegment_info_[i]->is_subsegment = true;
250  }
251  return status;
252 }
253 
254 } // namespace media
255 } // namespace shaka
Status Process(std::unique_ptr< StreamData > stream_data) override
Status Dispatch(std::unique_ptr< StreamData > stream_data)
Status InitializeInternal() override
Status OnFlushRequest(size_t input_stream_index) override
Event handler for flush request at the specific input stream index.
Status FlushDownstream(size_t output_stream_index)
Flush the downstream connected at the specified output stream index.
Class to hold a media sample.
Definition: media_sample.h:22
double segment_duration_in_seconds
Segment duration in seconds.
Status DispatchSegmentInfo(size_t stream_index, std::shared_ptr< const SegmentInfo > segment_info)
Dispatch the segment info to downstream handlers.
double subsegment_duration_in_seconds