Sync streams coming to ChunkingHandler

Change-Id: I50d1a64f2ec3c50dfb1b7282241b47f3a55e26ce
This commit is contained in:
KongQun Yang 2018-01-08 17:18:15 -08:00
parent d76ccea46f
commit feabf770bf
4 changed files with 173 additions and 126 deletions

View File

@ -94,7 +94,7 @@ MATCHER_P4(IsMediaSample, stream_index, timestamp, duration, encrypted, "") {
<< StreamDataTypeToString(arg->stream_data_type);
return false;
}
*result_listener << "which is (" << stream_index << ","
*result_listener << "which is (" << arg->stream_index << ","
<< arg->media_sample->dts() << ","
<< arg->media_sample->duration() << ","
<< BoolToString(arg->media_sample->is_encrypted()) << ")";

View File

@ -6,20 +6,24 @@
#include "packager/media/chunking/chunking_handler.h"
#include <algorithm>
#include "packager/base/logging.h"
#include "packager/base/threading/platform_thread.h"
#include "packager/media/base/media_sample.h"
namespace {
int64_t kThreadIdUnset = -1;
int64_t kTimeStampToDispatchAllSamples = -1;
} // namespace
namespace shaka {
namespace media {
ChunkingHandler::ChunkingHandler(const ChunkingParams& chunking_params)
: chunking_params_(chunking_params), thread_id_(kThreadIdUnset) {
: chunking_params_(chunking_params),
thread_id_(kThreadIdUnset),
media_sample_comparator_(this),
cached_media_sample_stream_data_(media_sample_comparator_) {
CHECK_NE(chunking_params.segment_duration_in_seconds, 0u);
}
@ -30,6 +34,7 @@ Status ChunkingHandler::InitializeInternal() {
subsegment_info_.resize(num_input_streams());
time_scales_.resize(num_input_streams());
last_sample_end_timestamps_.resize(num_input_streams());
num_cached_samples_.resize(num_input_streams());
return Status::OK;
}
@ -84,36 +89,46 @@ Status ChunkingHandler::Process(std::unique_ptr<StreamData> stream_data) {
const size_t stream_index = stream_data->stream_index;
DCHECK_NE(time_scales_[stream_index], 0u)
<< "kStreamInfo should arrive before kMediaSample";
if (stream_index != main_stream_index_) {
if (!stream_data->media_sample->is_key_frame()) {
if (stream_index != main_stream_index_ &&
!stream_data->media_sample->is_key_frame()) {
return Status(error::CHUNKING_ERROR,
"All non video samples should be key frames.");
}
// Cache non main stream samples, since we don't know yet whether these
// samples belong to the current or next segment.
non_main_samples_.push_back(std::move(stream_data));
// The streams are expected to be synchronized, so we don't expect to
// see a lot of samples before seeing video samples.
const size_t kMaxSamplesPerStreamBeforeVideoSample = 5u;
if (non_main_samples_.size() >
num_input_streams() * kMaxSamplesPerStreamBeforeVideoSample) {
return Status(error::CHUNKING_ERROR,
"Too many non video samples before video sample.");
}
return Status::OK;
// The streams are expected to be roughly synchronized, so we don't expect
// to see a lot of samples from one stream but no samples from another
// stream.
// The value is kind of arbitrary here. For a 24fps video, it is ~40s.
const size_t kMaxCachedSamplesPerStream = 1000u;
if (num_cached_samples_[stream_index] >= kMaxCachedSamplesPerStream) {
LOG(ERROR) << "Streams are not synchronized:";
for (size_t i = 0; i < num_cached_samples_.size(); ++i)
LOG(ERROR) << " [Stream " << i << "] " << num_cached_samples_[i];
return Status(error::CHUNKING_ERROR, "Streams are not synchronized.");
}
const MediaSample* sample = stream_data->media_sample.get();
Status status = ProcessMediaSample(sample);
cached_media_sample_stream_data_.push(std::move(stream_data));
++num_cached_samples_[stream_index];
// If we have cached samples from every stream, the first sample in
// |cached_media_samples_stream_data_| is guaranteed to be the earliest
// sample. Extract and process that sample.
if (std::all_of(num_cached_samples_.begin(), num_cached_samples_.end(),
[](size_t num_samples) { return num_samples > 0; })) {
while (true) {
const size_t top_stream_index =
cached_media_sample_stream_data_.top()->stream_index;
Status status = ProcessMediaSampleStreamData(
*cached_media_sample_stream_data_.top());
if (!status.ok())
return status;
// Discard samples before segment start.
if (!segment_info_[stream_index])
return Status::OK;
last_sample_end_timestamps_[stream_index] =
sample->dts() + sample->duration();
cached_media_sample_stream_data_.pop();
if (--num_cached_samples_[top_stream_index] == 0)
break;
}
}
return Status::OK;
}
default:
VLOG(3) << "Stream data type "
<< static_cast<int>(stream_data->stream_data_type) << " ignored.";
@ -123,18 +138,22 @@ Status ChunkingHandler::Process(std::unique_ptr<StreamData> stream_data) {
}
Status ChunkingHandler::OnFlushRequest(size_t input_stream_index) {
if (segment_info_[input_stream_index]) {
Status status;
if (input_stream_index != main_stream_index_) {
status = DispatchNonMainSamples(kTimeStampToDispatchAllSamples);
// Process all cached samples.
while (!cached_media_sample_stream_data_.empty()) {
Status status =
ProcessMediaSampleStreamData(*cached_media_sample_stream_data_.top());
if (!status.ok())
return status;
--num_cached_samples_[cached_media_sample_stream_data_.top()->stream_index];
cached_media_sample_stream_data_.pop();
}
if (segment_info_[input_stream_index]) {
auto& segment_info = segment_info_[input_stream_index];
if (segment_info->start_timestamp != -1) {
segment_info->duration = last_sample_end_timestamps_[input_stream_index] -
segment_info->start_timestamp;
status = DispatchSegmentInfo(input_stream_index, std::move(segment_info));
Status status =
DispatchSegmentInfo(input_stream_index, std::move(segment_info));
if (!status.ok())
return status;
}
@ -143,7 +162,7 @@ Status ChunkingHandler::OnFlushRequest(size_t input_stream_index) {
return FlushDownstream(output_stream_index);
}
Status ChunkingHandler::ProcessMediaSample(const MediaSample* sample) {
Status ChunkingHandler::ProcessMainMediaSample(const MediaSample* sample) {
const bool is_key_frame = sample->is_key_frame();
const int64_t timestamp = sample->dts();
// Check if we need to terminate the current (sub)segment.
@ -187,12 +206,6 @@ Status ChunkingHandler::ProcessMediaSample(const MediaSample* sample) {
}
Status status;
if (new_segment || new_subsegment) {
// Dispatch the samples before |timestamp| - See the implemention on how we
// determine if a sample is before |timestamp|..
status.Update(DispatchNonMainSamples(timestamp));
}
if (new_segment) {
status.Update(DispatchSegmentInfoForAllStreams());
segment_info_[main_stream_index_]->start_timestamp = timestamp;
@ -204,41 +217,30 @@ Status ChunkingHandler::ProcessMediaSample(const MediaSample* sample) {
status.Update(DispatchSubsegmentInfoForAllStreams());
subsegment_info_[main_stream_index_]->start_timestamp = timestamp;
}
if (!status.ok())
return status;
// Dispatch non-main samples for the next segment.
return DispatchNonMainSamples(kTimeStampToDispatchAllSamples);
}
Status ChunkingHandler::DispatchNonMainSamples(int64_t timestamp_threshold) {
Status status;
while (status.ok() && !non_main_samples_.empty()) {
DCHECK_EQ(non_main_samples_.front()->stream_data_type,
StreamDataType::kMediaSample);
const size_t stream_index = non_main_samples_.front()->stream_index;
const MediaSample* sample = non_main_samples_.front()->media_sample.get();
// If the portion of the sample before |timestamp_threshold| is bigger than
// the other portion, we consider it part of the current segment.
const int64_t timestamp = sample->dts() + sample->duration() / 2;
const bool stop =
(timestamp_threshold != kTimeStampToDispatchAllSamples &&
(static_cast<double>(timestamp) / time_scales_[stream_index]) >
(static_cast<double>(timestamp_threshold) /
time_scales_[main_stream_index_]));
VLOG(3) << "Sample ts: " << sample->dts() << " "
Status ChunkingHandler::ProcessMediaSampleStreamData(
const StreamData& media_sample_stream_data) {
const size_t stream_index = media_sample_stream_data.stream_index;
const auto sample = std::move(media_sample_stream_data.media_sample);
if (stream_index == main_stream_index_) {
Status status = ProcessMainMediaSample(sample.get());
if (!status.ok())
return status;
}
VLOG(3) << "Stream index: " << stream_index << " "
<< "Sample ts: " << sample->dts() << " "
<< " duration: " << sample->duration()
<< " scale: " << time_scales_[stream_index] << "\n"
<< " threshold: " << timestamp_threshold
<< " scale: " << time_scales_[main_stream_index_]
<< (stop ? " stop "
: (segment_info_[stream_index] ? " dispatch "
: " discard "));
if (stop)
break;
// Only dispatch samples if the segment has started, otherwise discard
// them.
if (segment_info_[stream_index]) {
<< (segment_info_[stream_index] ? " dispatch " : " discard ");
// Discard samples before segment start. If the segment has started,
// |segment_info_[stream_index]| won't be null.
if (!segment_info_[stream_index])
return Status::OK;
if (segment_info_[stream_index]->start_timestamp == -1)
segment_info_[stream_index]->start_timestamp = sample->dts();
if (subsegment_info_[stream_index] &&
@ -247,11 +249,7 @@ Status ChunkingHandler::DispatchNonMainSamples(int64_t timestamp_threshold) {
}
last_sample_end_timestamps_[stream_index] =
sample->dts() + sample->duration();
status.Update(Dispatch(std::move(non_main_samples_.front())));
}
non_main_samples_.pop_front();
}
return status;
return DispatchMediaSample(stream_index, std::move(sample));
}
Status ChunkingHandler::DispatchSegmentInfoForAllStreams() {
@ -294,5 +292,44 @@ Status ChunkingHandler::DispatchCueEventForAllStreams(
return status;
}
ChunkingHandler::MediaSampleTimestampGreater::MediaSampleTimestampGreater(
const ChunkingHandler* const chunking_handler)
: chunking_handler_(chunking_handler) {}
bool ChunkingHandler::MediaSampleTimestampGreater::operator()(
const std::unique_ptr<StreamData>& lhs,
const std::unique_ptr<StreamData>& rhs) const {
DCHECK(lhs);
DCHECK(rhs);
return GetSampleTimeInSeconds(*lhs) > GetSampleTimeInSeconds(*rhs);
}
double ChunkingHandler::MediaSampleTimestampGreater::GetSampleTimeInSeconds(
const StreamData& media_sample_stream_data) const {
const size_t stream_index = media_sample_stream_data.stream_index;
const auto& sample = media_sample_stream_data.media_sample;
DCHECK(sample);
// Order main samples by left boundary and non main samples by mid-point. This
// ensures non main samples are properly chunked, i.e. if the portion of the
// sample in the next chunk is bigger than the portion of the sample in the
// previous chunk, the sample is placed in the next chunk.
const uint64_t timestamp =
stream_index == chunking_handler_->main_stream_index_
? sample->dts()
: (sample->dts() + sample->duration() / 2);
return static_cast<double>(timestamp) /
chunking_handler_->time_scales_[stream_index];
}
bool ChunkingHandler::Scte35EventTimestampGreater::operator()(
const std::unique_ptr<StreamData>& lhs,
const std::unique_ptr<StreamData>& rhs) const {
DCHECK(lhs);
DCHECK(rhs);
DCHECK(lhs->scte35_event);
DCHECK(rhs->scte35_event);
return lhs->scte35_event->start_time > rhs->scte35_event->start_time;
}
} // namespace media
} // namespace shaka

View File

@ -62,11 +62,11 @@ class ChunkingHandler : public MediaHandler {
ChunkingHandler(const ChunkingHandler&) = delete;
ChunkingHandler& operator=(const ChunkingHandler&) = delete;
// Processes media sample and apply chunking if needed.
Status ProcessMediaSample(const MediaSample* sample);
// Processes main media sample and apply chunking if needed.
Status ProcessMainMediaSample(const MediaSample* sample);
// Dispatch cached non main stream samples before |timestamp_threshold|.
Status DispatchNonMainSamples(int64_t timestamp_threshold);
// Processes and dispatches media sample.
Status ProcessMediaSampleStreamData(const StreamData& media_data);
// The (sub)segments are aligned and dispatched together.
Status DispatchSegmentInfoForAllStreams();
@ -86,11 +86,30 @@ class ChunkingHandler : public MediaHandler {
int64_t segment_duration_ = 0;
int64_t subsegment_duration_ = 0;
// The streams are expected to be synchronized. Cache non main (video) stream
// samples so we can determine whether the next segment should include these
// samples. The samples will be dispatched after seeing the next main stream
// sample.
std::deque<std::unique_ptr<StreamData>> non_main_samples_;
class MediaSampleTimestampGreater {
public:
explicit MediaSampleTimestampGreater(
const ChunkingHandler* const chunking_handler);
// Comparison operator. Used by |media_samples_| priority queue below to
// sort the media samples.
bool operator()(const std::unique_ptr<StreamData>& lhs,
const std::unique_ptr<StreamData>& rhs) const;
private:
double GetSampleTimeInSeconds(
const StreamData& media_sample_stream_data) const;
const ChunkingHandler* const chunking_handler_ = nullptr;
};
MediaSampleTimestampGreater media_sample_comparator_;
// Caches media samples and sort the samples.
std::priority_queue<std::unique_ptr<StreamData>,
std::vector<std::unique_ptr<StreamData>>,
MediaSampleTimestampGreater>
cached_media_sample_stream_data_;
// Tracks number of cached samples in input streams.
std::vector<size_t> num_cached_samples_;
// Current segment index, useful to determine where to do chunking.
int64_t current_segment_index_ = -1;
@ -103,22 +122,16 @@ class ChunkingHandler : public MediaHandler {
// The end timestamp of the last dispatched sample.
std::vector<int64_t> last_sample_end_timestamps_;
struct Scte35EventComparator {
bool operator()(const std::shared_ptr<StreamData>& lhs,
const std::shared_ptr<StreamData>& rhs) const {
DCHECK(lhs);
DCHECK(rhs);
DCHECK(lhs->scte35_event);
return lhs->scte35_event->start_time > rhs->scte35_event->start_time;
}
struct Scte35EventTimestampGreater {
bool operator()(const std::unique_ptr<StreamData>& lhs,
const std::unique_ptr<StreamData>& rhs) const;
};
// Captures all incoming SCTE35 events to identify chunking points. Events
// will be removed from this queue one at a time as soon as the correct
// chunking point is identified in the incoming samples.
std::priority_queue<std::shared_ptr<StreamData>,
std::vector<std::shared_ptr<StreamData>>,
Scte35EventComparator>
std::priority_queue<std::unique_ptr<StreamData>,
std::vector<std::unique_ptr<StreamData>>,
Scte35EventTimestampGreater>
scte35_events_;
};

View File

@ -180,35 +180,32 @@ TEST_F(ChunkingHandlerTest, AudioAndVideo) {
// Equivalent to 12345 in video timescale.
const int64_t kAudioStartTimestamp = 9876;
const int64_t kVideoStartTimestamp = 12345;
// Burst of audio and video samples. They will be properly ordered.
for (int i = 0; i < 5; ++i) {
ASSERT_OK(Process(StreamData::FromMediaSample(
kStreamIndex0,
GetMediaSample(
kAudioStartTimestamp + kDuration0 * i,
kDuration0,
true))));
kStreamIndex0, GetMediaSample(kAudioStartTimestamp + kDuration0 * i,
kDuration0, true))));
}
for (int i = 0; i < 5; ++i) {
// Alternate key frame.
const bool is_key_frame = (i % 2) == 1;
ASSERT_OK(Process(StreamData::FromMediaSample(
kStreamIndex1,
GetMediaSample(
kVideoStartTimestamp + kDuration1 * i,
kDuration1,
is_key_frame))));
kStreamIndex1, GetMediaSample(kVideoStartTimestamp + kDuration1 * i,
kDuration1, is_key_frame))));
}
EXPECT_THAT(
GetOutputStreamDataVector(),
ElementsAre(
// The first samples @ kStartTimestamp is discarded - not key frame.
IsMediaSample(kStreamIndex0, kAudioStartTimestamp + kDuration0,
kDuration0, !kEncrypted),
IsMediaSample(kStreamIndex1, kVideoStartTimestamp + kDuration1,
kDuration1, !kEncrypted),
IsMediaSample(kStreamIndex0, kAudioStartTimestamp + kDuration0 * 2,
IsMediaSample(kStreamIndex0, kAudioStartTimestamp + kDuration0,
kDuration0, !kEncrypted),
IsMediaSample(kStreamIndex1, kVideoStartTimestamp + kDuration1 * 2,
kDuration1, !kEncrypted),
IsMediaSample(kStreamIndex0, kAudioStartTimestamp + kDuration0 * 2,
kDuration0, !kEncrypted),
IsMediaSample(kStreamIndex0, kAudioStartTimestamp + kDuration0 * 3,
kDuration0, !kEncrypted),
// The audio segment is terminated together with video stream.
@ -220,9 +217,7 @@ TEST_F(ChunkingHandlerTest, AudioAndVideo) {
IsMediaSample(kStreamIndex1, kVideoStartTimestamp + kDuration1 * 3,
kDuration1, !kEncrypted),
IsMediaSample(kStreamIndex0, kAudioStartTimestamp + kDuration0 * 4,
kDuration0, !kEncrypted),
IsMediaSample(kStreamIndex1, kVideoStartTimestamp + kDuration1 * 4,
kDuration1, !kEncrypted)));
kDuration0, !kEncrypted)));
ClearOutputStreamDataVector();
// The side comments below show the equivalent timestamp in video timescale.
@ -250,6 +245,8 @@ TEST_F(ChunkingHandlerTest, AudioAndVideo) {
EXPECT_THAT(
GetOutputStreamDataVector(),
ElementsAre(
IsMediaSample(kStreamIndex1, kVideoStartTimestamp + kDuration1 * 4,
kDuration1, !kEncrypted),
IsMediaSample(kStreamIndex0, kAudioStartTimestamp + kDuration0 * 5,
kDuration0, !kEncrypted),
// Audio is terminated along with video below.