Store Stream Data and Not CueEvent
In the cue aligner, instead of storing the cue event, store a stream data so that we don't need to know the stream index when sending the cue event downstream. Change-Id: Ice27da021fad2872e2a23975b959630a9d43b736
This commit is contained in:
parent
b5a73fc1d5
commit
ff0fd500b8
|
@ -102,7 +102,7 @@ Status CueAlignmentHandler::OnFlushRequest(size_t stream_index) {
|
||||||
if (stream.info->stream_type() == kStreamVideo) {
|
if (stream.info->stream_type() == kStreamVideo) {
|
||||||
DCHECK_EQ(stream.samples.size(), 0u)
|
DCHECK_EQ(stream.samples.size(), 0u)
|
||||||
<< "Video streams should not store samples";
|
<< "Video streams should not store samples";
|
||||||
DCHECK_EQ(stream.cue, nullptr) << "Video streams should not store cues";
|
DCHECK(!stream.cue) << "Video streams should not store cues";
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!stream.samples.empty()) {
|
if (!stream.samples.empty()) {
|
||||||
|
@ -151,7 +151,8 @@ Status CueAlignmentHandler::OnVideoSample(std::unique_ptr<StreamData> sample) {
|
||||||
}
|
}
|
||||||
|
|
||||||
RETURN_IF_ERROR(UseNewSyncPoint(std::move(next_sync)));
|
RETURN_IF_ERROR(UseNewSyncPoint(std::move(next_sync)));
|
||||||
RETURN_IF_ERROR(DispatchCueEvent(stream_index, std::move(stream.cue)));
|
DCHECK(stream.cue);
|
||||||
|
RETURN_IF_ERROR(Dispatch(std::move(stream.cue)));
|
||||||
}
|
}
|
||||||
|
|
||||||
return Dispatch(std::move(sample));
|
return Dispatch(std::move(sample));
|
||||||
|
@ -212,40 +213,31 @@ Status CueAlignmentHandler::UseNewSyncPoint(
|
||||||
hint_ = sync_points_->GetHint(new_sync->time_in_seconds);
|
hint_ = sync_points_->GetHint(new_sync->time_in_seconds);
|
||||||
DCHECK_GT(hint_, new_sync->time_in_seconds);
|
DCHECK_GT(hint_, new_sync->time_in_seconds);
|
||||||
|
|
||||||
Status status;
|
// No stream should be so out of sync with the others that they would
|
||||||
for (StreamState& stream_state : stream_states_) {
|
// still be working on an old cue.
|
||||||
// No stream should be so out of sync with the others that they would
|
for (auto& stream : stream_states_) {
|
||||||
// still be working on an old cue.
|
if (!stream.cue) {
|
||||||
if (stream_state.cue) {
|
continue;
|
||||||
// TODO(kqyang): Could this happen for text when there are no text samples
|
|
||||||
// between the two cues?
|
|
||||||
LOG(ERROR) << "Found two cue events that are too close together. One at "
|
|
||||||
<< stream_state.cue->time_in_seconds << " and the other at "
|
|
||||||
<< new_sync->time_in_seconds;
|
|
||||||
return Status(error::INVALID_ARGUMENT, "Cue events too close together");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
stream_state.cue = new_sync;
|
// TODO(kqyang): Could this happen for text when there are no text samples
|
||||||
|
// between the two cues?
|
||||||
while (status.ok() && !stream_state.samples.empty()) {
|
LOG(ERROR) << "Found two cue events that are too close together. One at "
|
||||||
std::unique_ptr<StreamData>& sample = stream_state.samples.front();
|
<< stream.cue->cue_event->time_in_seconds << " and the other at "
|
||||||
const double sample_time_in_seconds =
|
<< new_sync->time_in_seconds;
|
||||||
TimeInSeconds(*stream_state.info, *sample);
|
return Status(error::INVALID_ARGUMENT, "Cue events too close together");
|
||||||
if (sample_time_in_seconds >= hint_) {
|
|
||||||
DCHECK(!stream_state.cue);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
const size_t stream_index = sample->stream_index;
|
|
||||||
if (stream_state.cue) {
|
|
||||||
status.Update(DispatchCueIfNeeded(stream_index, sample_time_in_seconds,
|
|
||||||
&stream_state));
|
|
||||||
}
|
|
||||||
status.Update(Dispatch(std::move(sample)));
|
|
||||||
stream_state.samples.pop_front();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return status;
|
|
||||||
|
// Add the cue to each stream and give them each a chance to run through their
|
||||||
|
// samples.
|
||||||
|
for (size_t index = 0; index < stream_states_.size(); index++) {
|
||||||
|
StreamState& stream = stream_states_[index];
|
||||||
|
stream.cue = StreamData::FromCueEvent(index, new_sync);
|
||||||
|
|
||||||
|
RETURN_IF_ERROR(RunThroughSamples(&stream));
|
||||||
|
}
|
||||||
|
|
||||||
|
return Status::OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool CueAlignmentHandler::EveryoneWaitingAtHint() const {
|
bool CueAlignmentHandler::EveryoneWaitingAtHint() const {
|
||||||
|
@ -263,25 +255,16 @@ bool CueAlignmentHandler::EveryoneWaitingAtHint() const {
|
||||||
// 2. Save the sample in the buffer as it comes after the next sync point.
|
// 2. Save the sample in the buffer as it comes after the next sync point.
|
||||||
Status CueAlignmentHandler::AcceptSample(std::unique_ptr<StreamData> sample,
|
Status CueAlignmentHandler::AcceptSample(std::unique_ptr<StreamData> sample,
|
||||||
StreamState* stream_state) {
|
StreamState* stream_state) {
|
||||||
|
DCHECK(sample);
|
||||||
|
DCHECK(sample->media_sample || sample->text_sample);
|
||||||
DCHECK(stream_state);
|
DCHECK(stream_state);
|
||||||
|
|
||||||
|
// Need to cache the stream index as we will lose the pointer when we add
|
||||||
|
// the sample to the queue.
|
||||||
const size_t stream_index = sample->stream_index;
|
const size_t stream_index = sample->stream_index;
|
||||||
if (stream_state->samples.empty()) {
|
|
||||||
const double sample_time_in_seconds =
|
|
||||||
TimeInSeconds(*stream_state->info, *sample);
|
|
||||||
if (sample_time_in_seconds < hint_) {
|
|
||||||
Status status;
|
|
||||||
if (stream_state->cue) {
|
|
||||||
status.Update(DispatchCueIfNeeded(stream_index, sample_time_in_seconds,
|
|
||||||
stream_state));
|
|
||||||
}
|
|
||||||
status.Update(Dispatch(std::move(sample)));
|
|
||||||
return status;
|
|
||||||
}
|
|
||||||
DCHECK(!stream_state->cue);
|
|
||||||
}
|
|
||||||
|
|
||||||
stream_state->samples.push_back(std::move(sample));
|
stream_state->samples.push_back(std::move(sample));
|
||||||
|
|
||||||
if (stream_state->samples.size() > kMaxBufferSize) {
|
if (stream_state->samples.size() > kMaxBufferSize) {
|
||||||
LOG(ERROR) << "Stream " << stream_index << " has buffered "
|
LOG(ERROR) << "Stream " << stream_index << " has buffered "
|
||||||
<< stream_state->samples.size() << " when the max is "
|
<< stream_state->samples.size() << " when the max is "
|
||||||
|
@ -290,19 +273,37 @@ Status CueAlignmentHandler::AcceptSample(std::unique_ptr<StreamData> sample,
|
||||||
"Streams are not properly multiplexed.");
|
"Streams are not properly multiplexed.");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return RunThroughSamples(stream_state);
|
||||||
|
}
|
||||||
|
|
||||||
|
Status CueAlignmentHandler::RunThroughSamples(StreamState* stream) {
|
||||||
|
// Step through all our samples until we find where we can insert the cue.
|
||||||
|
// Think of this as a merge sort.
|
||||||
|
while (stream->cue && stream->samples.size()) {
|
||||||
|
const double sample_time_in_seconds =
|
||||||
|
TimeInSeconds(*stream->info, *stream->samples.front());
|
||||||
|
const double cue_time_in_seconds = stream->cue->cue_event->time_in_seconds;
|
||||||
|
|
||||||
|
// The cue can go out now. So let it, but we can't let the sample out yet
|
||||||
|
// because we need to check it with the hint.
|
||||||
|
if (cue_time_in_seconds <= sample_time_in_seconds) {
|
||||||
|
RETURN_IF_ERROR(Dispatch(std::move(stream->cue)));
|
||||||
|
} else {
|
||||||
|
RETURN_IF_ERROR(Dispatch(std::move(stream->samples.front())));
|
||||||
|
stream->samples.pop_front();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we still have samples, then it means that we sent out the cue and can
|
||||||
|
// now work up to the hint. So now send all samples that come before the hint
|
||||||
|
// downstream.
|
||||||
|
while (stream->samples.size() &&
|
||||||
|
TimeInSeconds(*stream->info, *stream->samples.front()) < hint_) {
|
||||||
|
RETURN_IF_ERROR(Dispatch(std::move(stream->samples.front())));
|
||||||
|
stream->samples.pop_front();
|
||||||
|
}
|
||||||
|
|
||||||
return Status::OK;
|
return Status::OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
Status CueAlignmentHandler::DispatchCueIfNeeded(
|
|
||||||
size_t stream_index,
|
|
||||||
double next_sample_time_in_seconds,
|
|
||||||
StreamState* stream_state) {
|
|
||||||
DCHECK(stream_state->cue);
|
|
||||||
if (next_sample_time_in_seconds < stream_state->cue->time_in_seconds)
|
|
||||||
return Status::OK;
|
|
||||||
DCHECK_LT(stream_state->cue->time_in_seconds, hint_);
|
|
||||||
return DispatchCueEvent(stream_index, std::move(stream_state->cue));
|
|
||||||
}
|
|
||||||
|
|
||||||
} // namespace media
|
} // namespace media
|
||||||
} // namespace shaka
|
} // namespace shaka
|
||||||
|
|
|
@ -41,9 +41,9 @@ class CueAlignmentHandler : public MediaHandler {
|
||||||
bool to_be_flushed = false;
|
bool to_be_flushed = false;
|
||||||
|
|
||||||
// If set, it points to the next cue it has to send downstream. Note that if
|
// If set, it points to the next cue it has to send downstream. Note that if
|
||||||
// it is not set, the next cue is not determined.
|
// it is not set, the next cue is not determined. We store the stream data
|
||||||
// This is set but not really used by video stream.
|
// so that we don't need a stream index to send it out.
|
||||||
std::shared_ptr<const CueEvent> cue;
|
std::unique_ptr<StreamData> cue;
|
||||||
};
|
};
|
||||||
|
|
||||||
// MediaHandler overrides.
|
// MediaHandler overrides.
|
||||||
|
@ -68,10 +68,8 @@ class CueAlignmentHandler : public MediaHandler {
|
||||||
Status AcceptSample(std::unique_ptr<StreamData> sample,
|
Status AcceptSample(std::unique_ptr<StreamData> sample,
|
||||||
StreamState* stream_state);
|
StreamState* stream_state);
|
||||||
|
|
||||||
// Dispatch the cue if the new sample comes at or after it.
|
// Dispatch all samples and cues (in the correct order) for the given stream.
|
||||||
Status DispatchCueIfNeeded(size_t stream_index,
|
Status RunThroughSamples(StreamState* stream);
|
||||||
double next_sample_time_in_seconds,
|
|
||||||
StreamState* stream_state);
|
|
||||||
|
|
||||||
SyncPointQueue* const sync_points_ = nullptr;
|
SyncPointQueue* const sync_points_ = nullptr;
|
||||||
std::vector<StreamState> stream_states_;
|
std::vector<StreamState> stream_states_;
|
||||||
|
|
Loading…
Reference in New Issue