Fix reading WebVTT from a pipe

Opening a named pipe can block until both ends are open, and we cannot
control when the other end will be open.  Ideally, we would always
open files in a thread so that Packager can be used with piped inputs
from naive applications without a potential deadlock.

This change will defer opening WebVTT files until the parser Run()
method is called from a thread.  This way, WebVTT files being sent in
from a pipe will never be able to block the main thread.

Previously, files were opened on the main thread before calling the
parser constructor, passing the open file to the constructor as an
argument.  I also tried doing it in the parser's InitializeInternal()
method, but that is also called from the main thread.

Change-Id: I54cc68ed9d48a8dc697829119be84d4065b1ae1c
This commit is contained in:
Joey Parrish 2020-03-19 13:54:55 -07:00 committed by KongQun Yang
parent 3c9603af13
commit 82bc7a3b95
4 changed files with 17 additions and 23 deletions

View File

@ -85,9 +85,9 @@ void UpdateConfig(const std::vector<std::string>& block, std::string* config) {
} // namespace } // namespace
WebVttParser::WebVttParser(std::unique_ptr<FileReader> source, WebVttParser::WebVttParser(const std::string& input_path,
const std::string& language) const std::string& language)
: reader_(std::move(source)), language_(language) {} : input_path_(input_path), language_(language) {}
Status WebVttParser::InitializeInternal() { Status WebVttParser::InitializeInternal() {
return Status::OK; return Status::OK;
@ -99,7 +99,11 @@ bool WebVttParser::ValidateOutputStreamIndex(size_t stream_index) const {
} }
Status WebVttParser::Run() { Status WebVttParser::Run() {
return Parse() std::unique_ptr<FileReader> reader;
RETURN_IF_ERROR(FileReader::Open(input_path_, &reader));
BlockReader block_reader(std::move(reader));
return Parse(&block_reader)
? FlushDownstream(kStreamIndex) ? FlushDownstream(kStreamIndex)
: Status(error::INTERNAL_ERROR, : Status(error::INTERNAL_ERROR,
"Failed to parse WebVTT source. See log for details."); "Failed to parse WebVTT source. See log for details.");
@ -109,9 +113,9 @@ void WebVttParser::Cancel() {
keep_reading_ = false; keep_reading_ = false;
} }
bool WebVttParser::Parse() { bool WebVttParser::Parse(BlockReader* block_reader) {
std::vector<std::string> block; std::vector<std::string> block;
if (!reader_.Next(&block)) { if (!block_reader->Next(&block)) {
LOG(ERROR) << "Failed to read WEBVTT HEADER - No blocks in source."; LOG(ERROR) << "Failed to read WEBVTT HEADER - No blocks in source.";
return false; return false;
} }
@ -131,7 +135,7 @@ bool WebVttParser::Parse() {
bool saw_cue = false; bool saw_cue = false;
while (reader_.Next(&block) && keep_reading_) { while (block_reader->Next(&block) && keep_reading_) {
// NOTE // NOTE
if (IsLikelyNote(block[0])) { if (IsLikelyNote(block[0])) {
// We can safely ignore the whole block. // We can safely ignore the whole block.

View File

@ -20,7 +20,7 @@ namespace media {
// Used to parse a WebVTT source into Cues that will be sent downstream. // Used to parse a WebVTT source into Cues that will be sent downstream.
class WebVttParser : public OriginHandler { class WebVttParser : public OriginHandler {
public: public:
WebVttParser(std::unique_ptr<FileReader> source, const std::string& language); WebVttParser(const std::string& input_path, const std::string& language);
Status Run() override; Status Run() override;
void Cancel() override; void Cancel() override;
@ -32,7 +32,7 @@ class WebVttParser : public OriginHandler {
Status InitializeInternal() override; Status InitializeInternal() override;
bool ValidateOutputStreamIndex(size_t stream_index) const override; bool ValidateOutputStreamIndex(size_t stream_index) const override;
bool Parse(); bool Parse(BlockReader* block_reader);
bool ParseCueWithNoId(const std::vector<std::string>& block); bool ParseCueWithNoId(const std::vector<std::string>& block);
bool ParseCueWithId(const std::vector<std::string>& block); bool ParseCueWithId(const std::vector<std::string>& block);
Status ParseCue(const std::string& id, Status ParseCue(const std::string& id,
@ -41,8 +41,9 @@ class WebVttParser : public OriginHandler {
Status DispatchTextStreamInfo(); Status DispatchTextStreamInfo();
BlockReader reader_; std::string input_path_;
std::string language_; std::string language_;
std::string style_region_config_; std::string style_region_config_;
bool stream_info_dispatched_ = false; bool stream_info_dispatched_ = false;
bool keep_reading_ = true; bool keep_reading_ = true;

View File

@ -44,10 +44,7 @@ class WebVttParserTest : public MediaHandlerTestBase {
ASSERT_TRUE(File::WriteStringToFile(kFilename, text)); ASSERT_TRUE(File::WriteStringToFile(kFilename, text));
// Read from the file we just wrote. // Read from the file we just wrote.
std::unique_ptr<FileReader> reader; parser_ = std::make_shared<WebVttParser>(kFilename, kLanguage);
ASSERT_OK(FileReader::Open(kFilename, &reader));
parser_ = std::make_shared<WebVttParser>(std::move(reader), kLanguage);
ASSERT_OK(MediaHandlerTestBase::SetUpAndInitializeGraph( ASSERT_OK(MediaHandlerTestBase::SetUpAndInitializeGraph(
parser_, kInputCount, kOutputCount)); parser_, kInputCount, kOutputCount));

View File

@ -502,11 +502,7 @@ Status CreateHlsTextJob(const StreamDescriptor& stream,
auto output = std::make_shared<WebVttTextOutputHandler>( auto output = std::make_shared<WebVttTextOutputHandler>(
muxer_options, std::move(muxer_listener)); muxer_options, std::move(muxer_listener));
std::unique_ptr<FileReader> reader; auto parser = std::make_shared<WebVttParser>(stream.input, stream.language);
RETURN_IF_ERROR(FileReader::Open(stream.input, &reader));
auto parser =
std::make_shared<WebVttParser>(std::move(reader), stream.language);
auto padder = std::make_shared<TextPadder>(kDefaultTextZeroBiasMs); auto padder = std::make_shared<TextPadder>(kDefaultTextZeroBiasMs);
auto cue_aligner = sync_points auto cue_aligner = sync_points
? std::make_shared<CueAlignmentHandler>(sync_points) ? std::make_shared<CueAlignmentHandler>(sync_points)
@ -526,11 +522,7 @@ Status CreateWebVttToMp4TextJob(const StreamDescriptor& stream,
SyncPointQueue* sync_points, SyncPointQueue* sync_points,
MuxerFactory* muxer_factory, MuxerFactory* muxer_factory,
std::shared_ptr<OriginHandler>* root) { std::shared_ptr<OriginHandler>* root) {
std::unique_ptr<FileReader> reader; auto parser = std::make_shared<WebVttParser>(stream.input, stream.language);
RETURN_IF_ERROR(FileReader::Open(stream.input, &reader));
auto parser =
std::make_shared<WebVttParser>(std::move(reader), stream.language);
auto padder = std::make_shared<TextPadder>(kDefaultTextZeroBiasMs); auto padder = std::make_shared<TextPadder>(kDefaultTextZeroBiasMs);
auto text_to_mp4 = std::make_shared<WebVttToMp4Handler>(); auto text_to_mp4 = std::make_shared<WebVttToMp4Handler>();