From d7d307ff56cf9bbb9e3a0d465f081fa8fe8ee286 Mon Sep 17 00:00:00 2001 From: Thomas Inskip Date: Thu, 8 May 2014 18:23:54 -0700 Subject: [PATCH] Implemented multi-stream packager driver program. Change-Id: I16e1f5f1e8863b09b642c94d4be565e309bdafb6 --- app/libcrypto_threading.cc | 21 +++ app/libcrypto_threading.h | 38 ++++++ app/libcrypto_threading_posix.cc | 52 +++++++ app/packager_common.h | 2 +- app/packager_main.cc | 226 +++++++++++++++++++++++++++++++ app/single_packager_main.cc | 4 +- packager.gyp | 37 ++++- 7 files changed, 375 insertions(+), 5 deletions(-) create mode 100644 app/libcrypto_threading.cc create mode 100644 app/libcrypto_threading.h create mode 100644 app/libcrypto_threading_posix.cc create mode 100644 app/packager_main.cc diff --git a/app/libcrypto_threading.cc b/app/libcrypto_threading.cc new file mode 100644 index 0000000000..25bf3c71f1 --- /dev/null +++ b/app/libcrypto_threading.cc @@ -0,0 +1,21 @@ +// Copyright 2014 Google Inc. All rights reserved. +// +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file or at +// https://developers.google.com/open-source/licenses/bsd + +#include "app/libcrypto_threading.h" + +namespace media { + +LibcryptoThreading::LibcryptoThreading() {} + +LibcryptoThreading::~LibcryptoThreading() { + TerminateLibcryptoThreading(); +} + +bool LibcryptoThreading::Initialize() { + return InitLibcryptoThreading(); +} + +} // namespace media diff --git a/app/libcrypto_threading.h b/app/libcrypto_threading.h new file mode 100644 index 0000000000..d641446378 --- /dev/null +++ b/app/libcrypto_threading.h @@ -0,0 +1,38 @@ +// Copyright 2014 Google Inc. All rights reserved. +// +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file or at +// https://developers.google.com/open-source/licenses/bsd + +#ifndef APP_LIBCRYPTO_THREADING_H_ +#define APP_LIBCRYPTO_THREADING_H_ + +#include "base/macros.h" + +namespace media { + +/// Enable thread safety for OpenSSL libcrypto. +/// @return true if successful, false otherwise. +bool InitLibcryptoThreading(); + +/// Terminate thread safety for OpenSSL libcrypto. +/// @return true if successful, false otherwise. +bool TerminateLibcryptoThreading(); + +/// Convenience class which initializes and terminates libcrypto threading. +class LibcryptoThreading { + public: + LibcryptoThreading(); + ~LibcryptoThreading(); + + /// Enables thread safety for OpenSSL libcrypto. + /// @return true if successful, false otherwise. + bool Initialize(); + + private: + DISALLOW_COPY_AND_ASSIGN(LibcryptoThreading); +}; + +} // namespace media + +#endif // APP_LIBCRYPTO_THREADING_H_ diff --git a/app/libcrypto_threading_posix.cc b/app/libcrypto_threading_posix.cc new file mode 100644 index 0000000000..32eb0aefdf --- /dev/null +++ b/app/libcrypto_threading_posix.cc @@ -0,0 +1,52 @@ +// Copyright 2014 Google Inc. All rights reserved. +// +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file or at +// https://developers.google.com/open-source/licenses/bsd + +#include "app/libcrypto_threading.h" + +#include +#include + +#include "openssl/crypto.h" + +namespace { + +std::vector global_locks; + +void LockFunction(int mode, int n, const char* file, int line) { + if (mode & CRYPTO_LOCK) + pthread_mutex_lock(&global_locks[n]); + else + pthread_mutex_unlock(&global_locks[n]); +} + +unsigned long ThreadIdFunction() { + return static_cast(pthread_self()); +} + +} // anonymous namespace + +namespace media { + +bool InitLibcryptoThreading() { + int num_global_locks = CRYPTO_num_locks(); + global_locks.resize(num_global_locks); + for (int i = 0; i < num_global_locks; ++i) + pthread_mutex_init(&global_locks[i], NULL); + CRYPTO_set_id_callback(ThreadIdFunction); + CRYPTO_set_locking_callback(LockFunction); + return true; +} + +bool TerminateLibcryptoThreading() { + CRYPTO_set_id_callback(NULL); + CRYPTO_set_locking_callback(NULL); + for (size_t i = 0; i < global_locks.size(); ++i) + pthread_mutex_destroy(&global_locks[i]); + global_locks.clear(); + return true; +} + +} // namespace media diff --git a/app/packager_common.h b/app/packager_common.h index 3b40cdcc5a..fc85ed74d6 100644 --- a/app/packager_common.h +++ b/app/packager_common.h @@ -4,7 +4,7 @@ // license that can be found in the LICENSE file or at // https://developers.google.com/open-source/licenses/bsd // -// Functionality common to single and multiple file packager. +// Functionality common to single and multiple stream packager. #ifndef APP_PACKAGER_COMMON_H_ #define APP_PACKAGER_COMMON_H_ diff --git a/app/packager_main.cc b/app/packager_main.cc new file mode 100644 index 0000000000..524a85aeb9 --- /dev/null +++ b/app/packager_main.cc @@ -0,0 +1,226 @@ +// Copyright 2014 Google Inc. All rights reserved. +// +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file or at +// https://developers.google.com/open-source/licenses/bsd + +#include + +#include "app/fixed_key_encryption_flags.h" +#include "app/libcrypto_threading.h" +#include "app/muxer_flags.h" +#include "app/packager_common.h" +#include "app/single_muxer_flags.h" +#include "app/widevine_encryption_flags.h" +#include "base/logging.h" +#include "base/stl_util.h" +#include "base/strings/string_split.h" +#include "base/strings/stringprintf.h" +#include "base/threading/simple_thread.h" +#include "media/base/demuxer.h" +#include "media/base/encryption_key_source.h" +#include "media/base/muxer_options.h" +#include "media/formats/mp4/mp4_muxer.h" + +namespace { +const char kUsage[] = + "Packager driver program. Sample Usage:\n" + "%s [flags] ...\n" + "stream_descriptor may be repeated and consists of a tuplet as follows:\n" + "#,[,]\n" + " - input_file is a file path or network stream URL.\n" + " - stream_selector is one of 'audio', 'video', or stream number.\n" + " - output_file is the output file (single file) or initialization file" + " path (multiple file)." + " - segment_template is an optional value which specifies the naming" + " pattern for the segment files, and that the stream should be split into" + " multiple files. Its presence should be consistent across streams.\n"; + +typedef std::vector StringVector; + +} // namespace + +namespace media { + +// Demux, Mux(es) and worker thread used to remux a source file/stream. +class RemuxJob : public base::SimpleThread { + public: + RemuxJob(scoped_ptr demuxer) + : SimpleThread("RemuxJob"), + demuxer_(demuxer.Pass()) {} + + virtual ~RemuxJob() { + STLDeleteElements(&muxers_); + } + + void AddMuxer(scoped_ptr mux) { + muxers_.push_back(mux.release()); + } + + Demuxer* demuxer() { return demuxer_.get(); } + Status status() { return status_; } + + private: + virtual void Run() OVERRIDE { + DCHECK(demuxer_); + status_ = demuxer_->Run(); + } + + scoped_ptr demuxer_; + std::vector muxers_; + Status status_; + + DISALLOW_COPY_AND_ASSIGN(RemuxJob); +}; + +bool CreateRemuxJobs(const StringVector& stream_descriptors, + const MuxerOptions& muxer_options, + EncryptionKeySource* key_source, + std::vector* remux_jobs) { + DCHECK(remux_jobs); + + // Sort the stream descriptors so that we can group muxers by demux. + StringVector sorted_descriptors(stream_descriptors); + std::sort(sorted_descriptors.begin(), sorted_descriptors.end()); + + std::string previous_file_path; + for (StringVector::const_iterator stream_iter = sorted_descriptors.begin(); + stream_iter != sorted_descriptors.end(); + ++stream_iter) { + // Process stream descriptor. + StringVector descriptor; + base::SplitString(*stream_iter, ',', &descriptor); + if ((descriptor.size() < 2) || (descriptor.size() > 3)) { + LOG(ERROR) + << "Malformed stream descriptor (invalid number of components)."; + return false; + } + size_t hash_pos = descriptor[0].find('#'); + if (hash_pos == std::string::npos) { + LOG(ERROR) + << "Malformed stream descriptor (stream selector unspecified)."; + return false; + } + MuxerOptions stream_muxer_options(muxer_options); + std::string file_path(descriptor[0].substr(0, hash_pos)); + std::string stream_selector(descriptor[0].substr(hash_pos + 1)); + stream_muxer_options.output_file_name = descriptor[1]; + if (descriptor.size() == 3) + stream_muxer_options.segment_template = descriptor[2]; + + if (file_path != previous_file_path) { + // New remux job needed. Create demux and job thread. + scoped_ptr demux(new Demuxer(file_path, NULL)); + Status status = demux->Initialize(); + if (!status.ok()) { + LOG(ERROR) << "Demuxer failed to initialize: " << status.ToString(); + return false; + } + if (FLAGS_dump_stream_info) { + printf("\nFile \"%s\":\n", file_path.c_str()); + DumpStreamInfo(demux->streams()); + } + remux_jobs->push_back(new RemuxJob(demux.Pass())); + previous_file_path = file_path; + } + DCHECK(!remux_jobs->empty()); + + scoped_ptr muxer(new mp4::MP4Muxer(stream_muxer_options)); + if (key_source) { + muxer->SetEncryptionKeySource(key_source, + FLAGS_max_sd_pixels, + FLAGS_clear_lead, + FLAGS_crypto_period_duration); + } + + if (!AddStreamToMuxer(remux_jobs->back()->demuxer()->streams(), + stream_selector, + muxer.get())) + return false; + remux_jobs->back()->AddMuxer(muxer.Pass()); + } + + return true; +} + +Status RunRemuxJobs(const std::vector& remux_jobs) { + // Start the job threads. + for (std::vector::const_iterator job_iter = remux_jobs.begin(); + job_iter != remux_jobs.end(); + ++job_iter) { + (*job_iter)->Start(); + } + + // Wait for all jobs to complete or an error occurs. + Status status; + bool all_joined; + do { + all_joined = true; + for (std::vector::const_iterator job_iter = remux_jobs.begin(); + job_iter != remux_jobs.end(); + ++job_iter) { + if ((*job_iter)->HasBeenJoined()) { + status = (*job_iter)->status(); + if (!status.ok()) + break; + } else { + all_joined = false; + (*job_iter)->Join(); + } + } + } while (!all_joined && status.ok()); + + return status; +} + +bool RunPackager(const StringVector& stream_descriptors) { + // Get basic muxer options. + MuxerOptions muxer_options; + if (!GetMuxerOptions(&muxer_options)) + return false; + + // Create encryption key source if needed. + scoped_ptr encryption_key_source; + if (FLAGS_enable_widevine_encryption || FLAGS_enable_fixed_key_encryption) { + encryption_key_source = CreateEncryptionKeySource(); + if (!encryption_key_source) + return false; + } + + std::vector remux_jobs; + STLElementDeleter > scoped_jobs_deleter(&remux_jobs); + if (!CreateRemuxJobs(stream_descriptors, + muxer_options, + encryption_key_source.get(), + &remux_jobs)) + return false; + + Status status = RunRemuxJobs(remux_jobs); + if (!status.ok()) { + LOG(ERROR) << "Packaging Error: " << status.ToString(); + return false; + } + + printf("Packaging completed successfully.\n"); + return true; +} + +} // namespace media + +int main(int argc, char** argv) { + google::SetUsageMessage(base::StringPrintf(kUsage, argv[0])); + google::ParseCommandLineFlags(&argc, &argv, true); + if (argc < 2) { + google::ShowUsageWithFlags(argv[0]); + return 1; + } + media::LibcryptoThreading libcrypto_threading; + if (!libcrypto_threading.Initialize()) { + LOG(ERROR) << "Could not initialize libcrypto threading."; + return 1; + } + StringVector stream_descriptors; + for (int i = 1; i < argc; ++i) + stream_descriptors.push_back(argv[i]); + return media::RunPackager(stream_descriptors) ? 0 : 1; +} diff --git a/app/single_packager_main.cc b/app/single_packager_main.cc index 2a62b97150..1d07dc4e56 100644 --- a/app/single_packager_main.cc +++ b/app/single_packager_main.cc @@ -24,7 +24,7 @@ namespace { const char kUsage[] = - "Packager driver program. Sample Usage:\n%s [flags]"; + "Single-stream packager driver program. Sample Usage:\n%s [flags]"; } // namespace namespace media { @@ -115,7 +115,7 @@ bool RunPackager(const std::string& input) { int main(int argc, char** argv) { google::SetUsageMessage(base::StringPrintf(kUsage, argv[0])); google::ParseCommandLineFlags(&argc, &argv, true); - if (argc < 2) { + if (argc != 2) { google::ShowUsageWithFlags(argv[0]); return 1; } diff --git a/packager.gyp b/packager.gyp index 7928daed49..a031b3e3bc 100644 --- a/packager.gyp +++ b/packager.gyp @@ -15,16 +15,49 @@ ], }, 'targets': [ + { + 'target_name': 'packager', + 'type': 'executable', + 'sources': [ + 'app/fixed_key_encryption_flags.cc', + 'app/fixed_key_encryption_flags.h', + 'app/libcrypto_threading.cc', + 'app/libcrypto_threading.h', + 'app/muxer_flags.cc', + 'app/muxer_flags.h', + 'app/packager_common.cc', + 'app/packager_common.h', + 'app/packager_main.cc', + 'app/widevine_encryption_flags.cc', + 'app/widevine_encryption_flags.h', + ], + 'dependencies': [ + 'media/event/media_event.gyp:media_event', + 'media/file/file.gyp:file', + 'media/filters/filters.gyp:filters', + 'media/formats/mp2t/mp2t.gyp:mp2t', + 'media/formats/mp4/mp4.gyp:mp4', + 'media/formats/mpeg/mpeg.gyp:mpeg', + 'third_party/gflags/gflags.gyp:gflags', + ], + 'conditions': [ + [ 'os_posix == 1', { + 'sources': [ + 'app/libcrypto_threading_posix.cc', + ] + }], + ], + }, { 'target_name': 'single_packager', 'type': 'executable', 'sources': [ 'app/fixed_key_encryption_flags.cc', 'app/fixed_key_encryption_flags.h', - 'app/packager_common.cc', - 'app/packager_common.h', 'app/muxer_flags.cc', 'app/muxer_flags.h', + 'app/packager_common.cc', + 'app/packager_common.h', 'app/single_muxer_flags.cc', 'app/single_muxer_flags.h', 'app/single_packager_main.cc',