diff --git a/.gitignore b/.gitignore index 8c3c34bb94..66bc422882 100644 --- a/.gitignore +++ b/.gitignore @@ -8,6 +8,7 @@ .cproject .project .pydevproject +.idea .repo .settings /out* diff --git a/AUTHORS b/AUTHORS index a1c7831bcf..30ed4f3c72 100644 --- a/AUTHORS +++ b/AUTHORS @@ -13,6 +13,7 @@ # # Please keep the list sorted. +3Q GmbH <*@3qsdn.com> Alen Vrecko Anders Hasselqvist Chun-da Chen @@ -24,6 +25,7 @@ Ivi.ru LLC <*@ivi.ru> Leandro Moreira Leo Law More Screens Ltd. <*@morescreens.net> +Ole Andre Birkedal Philo Inc. <*@philo.com> Piotr Srebrny Richard Eklycke diff --git a/CONTRIBUTORS b/CONTRIBUTORS index 2ef8829b65..e5010bdb0b 100644 --- a/CONTRIBUTORS +++ b/CONTRIBUTORS @@ -24,6 +24,7 @@ Alen Vrecko Anders Hasselqvist +Andreas Motl Bei Li Chun-da Chen Daniel CantarĂ­n @@ -37,6 +38,7 @@ Joey Parrish Kongqun Yang Leandro Moreira Leo Law +Ole Andre Birkedal Piotr Srebrny Qingquan Wang Richard Eklycke diff --git a/docs/source/tutorials/http_upload.rst b/docs/source/tutorials/http_upload.rst new file mode 100644 index 0000000000..2c5aeed8da --- /dev/null +++ b/docs/source/tutorials/http_upload.rst @@ -0,0 +1,250 @@ + đŸ›  Status: Work in progress + + The Shaka Packager HTTP upload feature is currently in development. + It's on the fast track to a working alpha, so we encourage you to use + it and give us some feedback. However, there are things that haven't + been finalized yet so you can expect some changes. + + This document describes the current state of the implementation, + contributions are always welcome. + + The discussion about this feature currently happens at + `Add HTTP PUT output #149 `_, + its development on the + `http-upload `_ branch, + feel free to join us. + +########### +HTTP upload +########### + + +************ +Introduction +************ +Shaka Packager can upload produced artefacts to a HTTP server using +HTTP PUT requests with chunked transfer encoding to improve live +publishing performance when content is not served directly from +the packaging output location. For talking HTTP, libcurl_ is used. + +The produced artefacts are: + +- HLS_ playlist files in M3U_ Format encoded with UTF-8 (.m3u8) +- Chunked audio segments encoded with AAC (.aac) +- Chunked video segments encapsulated into the + `MPEG transport stream`_ container format (.ts) + +References +========== +- `RFC 2616 about HTTP PUT`_ +- `RFC 2616 about Chunked Transfer Coding`_ + + +************* +Documentation +************* + +Getting started +=============== +For enabling the HTTP upload transfer mode, please populate +the ``segment_template`` attribute in the ``stream_descriptor`` +parameter as well as the ``--hls_master_playlist_output`` parameter +with appropriate URLs where the HTTP PUT requests will be issued to. + +You can also supply the ``--user_agent`` flag to specify a custom +User-Agent string for all HTTP PUT requests. + +For pragmatic reasons, all HTTP requests will be declared as +``Content-Type: application/octet-stream``. + +Synopsis +======== +Here is a basic example. It is similar to the "live" example and also +borrows features from "FFmpeg piping", see :doc:`live` and :doc:`ffmpeg_piping`. + +Define UNIX pipe to connect ffmpeg with packager:: + + export PIPE=/tmp/bigbuckbunny.fifo + mkfifo ${PIPE} + +Acquire and transcode RTMP stream:: + + # Steady + ffmpeg -fflags nobuffer -threads 0 -y \ + -i rtmp://184.72.239.149/vod/mp4:bigbuckbunny_450.mp4 \ + -pix_fmt yuv420p -vcodec libx264 -preset:v superfast -acodec aac \ + -f mpegts pipe: > ${PIPE} + +Configure and run packager:: + + # Define upload URL + export UPLOAD_URL=http://localhost:6767/hls-live + + # Go + packager \ + "input=${PIPE},stream=audio,segment_template=${UPLOAD_URL}/bigbuckbunny-audio-aac-\$Number%04d\$.aac,playlist_name=bigbuckbunny-audio.m3u8,hls_group_id=audio" \ + "input=${PIPE},stream=video,segment_template=${UPLOAD_URL}/bigbuckbunny-video-h264-450-\$Number%04d\$.ts,playlist_name=bigbuckbunny-video-450.m3u8" \ + --io_block_size 65536 --fragment_duration 2 --segment_duration 2 \ + --time_shift_buffer_depth 3600 --preserved_segments_outside_live_window 7200 \ + --hls_master_playlist_output "${UPLOAD_URL}/bigbuckbunny.m3u8" \ + --hls_playlist_type LIVE \ + --vmodule=http_file=1 + +******* +HTTPS +******* +If your ingest uses HTTPS and requires specific certificates, these +can be specified on the command line similar to how it's done for +:doc:`playready`, with the following arguments: + +- ``--https_ca_file``: Absolute path to the Certificate Authority file for the server cert. PEM format. +- ``--https_cert_file``: Absolute path to client certificate file. +- ``--https_cert_private_key_file``: Absolute path to the private Key file. +- ``--https_cert_private_key_password``: Password to the private key file. + +******* +Backlog +******* +Please note the HTTP upload feature still lacks some features +probably important for production. Contributions are welcome! + +DASH +==== +While the current implementation works for HLS_, +we should also check DASH_. + +Basic Auth +========== +There's no support for authentication yet. + +HTTPS +===== +While there's already some code in place, +HTTPS is currently not supported yet. + +HTTP DELETE +=========== +Nothing has be done to support this yet: + + Packager supports removing old segments automatically. + See ``preserved_segments_outside_live_window`` option in + DASH_ options or HLS_ options for details. + +Software tests +============== +We should do some minimal QA, check whether the test +suite breaks and maybe add some tests covering new code. + +Network timeouts +================ +libcurl_ can apply network timeout settings. However, +we haven't addressed this yet. + +Miscellaneous +============= +- Address all things TODO and FIXME +- Make ``io_cache_size`` configurable? + + +******* +Backend +******* + +HTTP PUT file uploads to Nginx +============================== +The receiver is based on the native Nginx_ module "`ngx_http_dav_module`_", +it handles HTTP PUT requests with chunked transfer encoding +like emitted by Shaka Packager. + +The configuration is very simple:: + + server { + listen 6767 default_server; + + access_log /dev/stdout combined; + error_log /dev/stdout info; + + root /var/spool; + location ~ ^/hls-live/(.+)$ { + + dav_methods PUT; + create_full_put_path on; + + proxy_buffering off; + client_max_body_size 20m; + + } + + } + +Run Nginx:: + + nginx -p `pwd` -c nginx.conf -g "daemon off;" + + +HTTP PUT file uploads to Caddy +============================== +The receiver is based on the Caddy_ webserver, it handles HTTP PUT +requests with chunked transfer encoding like emitted by Shaka Packager. + +Put this configuration into a `Caddyfile`:: + + # Bind address + :6767 + + # Enable logging + log stdout + + # Web server root with autoindex + root /var/spool + redir /hls-live { + if {path} is "/" + } + browse + + # Enable upload with HTTP PUT + upload /hls-live { + to "/var/spool/hls-live" + } + +Run Caddy:: + + caddy -conf Caddyfile + + +************************* +Development and debugging +************************* + +Watch the network:: + + ngrep -Wbyline -dlo port 6767 + +Grab and run `httpd-reflector.py`_ to use it as a dummy HTTP sink:: + + # Ready + wget https://gist.githubusercontent.com/amotl/3ed38e461af743aeeade5a5a106c1296/raw/httpd-reflector.py + chmod +x httpd-reflector.py + ./httpd-reflector.py --port 6767 + + +---- + +Have fun! + +.. _HLS: https://en.wikipedia.org/wiki/HTTP_Live_Streaming +.. _DASH: https://en.wikipedia.org/wiki/Dynamic_Adaptive_Streaming_over_HTTP +.. _M3U: https://en.wikipedia.org/wiki/M3U +.. _MPEG transport stream: https://en.wikipedia.org/wiki/MPEG_transport_stream +.. _libcurl: https://curl.haxx.se/libcurl/ +.. _RFC 1867: https://tools.ietf.org/html/rfc1867 +.. _RFC 2616 about HTTP PUT: https://www.w3.org/Protocols/rfc2616/rfc2616-sec9.html#sec9.6 +.. _RFC 2616 about Chunked Transfer Coding: https://www.w3.org/Protocols/rfc2616/rfc2616-sec3.html#sec3.6.1 +.. _RFC 5789: https://tools.ietf.org/html/rfc5789 +.. _Nginx: http://nginx.org/ +.. _ngx_http_dav_module: http://nginx.org/en/docs/http/ngx_http_dav_module.html +.. _Caddy: https://caddyserver.com/ +.. _httpd-reflector.py: https://gist.github.com/amotl/3ed38e461af743aeeade5a5a106c1296 + +.. _@colleenkhenry: https://github.com/colleenkhenry +.. _@kqyang: https://github.com/kqyang diff --git a/docs/source/tutorials/tutorials.rst b/docs/source/tutorials/tutorials.rst index 7fb5188172..4a62bea786 100644 --- a/docs/source/tutorials/tutorials.rst +++ b/docs/source/tutorials/tutorials.rst @@ -11,3 +11,4 @@ Tutorials drm.rst ads.rst ffmpeg_piping.rst + http_upload.rst diff --git a/packager/file/file.cc b/packager/file/file.cc index c1a62053c2..3e61c68d9f 100644 --- a/packager/file/file.cc +++ b/packager/file/file.cc @@ -21,6 +21,7 @@ #include "packager/file/memory_file.h" #include "packager/file/threaded_io_file.h" #include "packager/file/udp_file.h" +#include "packager/file/http_file.h" DEFINE_uint64(io_cache_size, 32ULL << 20, @@ -41,6 +42,9 @@ const char* kCallbackFilePrefix = "callback://"; const char* kLocalFilePrefix = "file://"; const char* kMemoryFilePrefix = "memory://"; const char* kUdpFilePrefix = "udp://"; +const char* kHttpFilePrefix = "http://"; +const char* kHttpsFilePrefix = "https://"; + namespace { @@ -95,6 +99,14 @@ File* CreateUdpFile(const char* file_name, const char* mode) { return new UdpFile(file_name); } +File* CreateHttpsFile(const char* file_name, const char* mode) { + return new HttpFile(file_name, mode, true); +} + +File* CreateHttpFile(const char* file_name, const char* mode) { + return new HttpFile(file_name, mode, false); +} + File* CreateMemoryFile(const char* file_name, const char* mode) { return new MemoryFile(file_name, mode); } @@ -114,6 +126,8 @@ static const FileTypeInfo kFileTypeInfo[] = { {kUdpFilePrefix, &CreateUdpFile, nullptr, nullptr}, {kMemoryFilePrefix, &CreateMemoryFile, &DeleteMemoryFile, nullptr}, {kCallbackFilePrefix, &CreateCallbackFile, nullptr, nullptr}, + {kHttpFilePrefix, &CreateHttpFile, nullptr, nullptr}, + {kHttpsFilePrefix, &CreateHttpsFile, nullptr, nullptr}, }; base::StringPiece GetFileTypePrefix(base::StringPiece file_name) { @@ -123,6 +137,7 @@ base::StringPiece GetFileTypePrefix(base::StringPiece file_name) { const FileTypeInfo* GetFileTypeInfo(base::StringPiece file_name, base::StringPiece* real_file_name) { + base::StringPiece file_type_prefix = GetFileTypePrefix(file_name); for (const FileTypeInfo& file_type : kFileTypeInfo) { if (file_type_prefix == file_type.type) { @@ -233,6 +248,7 @@ bool File::ReadFileToString(const char* file_name, std::string* contents) { bool File::WriteStringToFile(const char* file_name, const std::string& contents) { + VLOG(2) << "File::WriteStringToFile: " << file_name; std::unique_ptr file(File::Open(file_name, "w")); if (!file) { LOG(ERROR) << "Failed to open file " << file_name; @@ -261,6 +277,7 @@ bool File::WriteStringToFile(const char* file_name, bool File::WriteFileAtomically(const char* file_name, const std::string& contents) { + VLOG(2) << "File::WriteFileAtomically: " << file_name; base::StringPiece real_file_name; const FileTypeInfo* file_type = GetFileTypeInfo(file_name, &real_file_name); DCHECK(file_type); @@ -280,6 +297,7 @@ bool File::WriteFileAtomically(const char* file_name, bool File::Copy(const char* from_file_name, const char* to_file_name) { std::string content; + VLOG(1) << "File::Copy from " << from_file_name << " to " << to_file_name; if (!ReadFileToString(from_file_name, &content)) { LOG(ERROR) << "Failed to open file " << from_file_name; return false; @@ -323,6 +341,8 @@ int64_t File::CopyFile(File* source, File* destination, int64_t max_copy) { if (max_copy < 0) max_copy = std::numeric_limits::max(); + VLOG(1) << "File::CopyFile from " << source->file_name() << " to " << destination->file_name(); + const int64_t kBufferSize = 0x40000; // 256KB. std::unique_ptr buffer(new uint8_t[kBufferSize]); int64_t bytes_copied = 0; diff --git a/packager/file/file.gyp b/packager/file/file.gyp index de8b41a591..e1a8d79162 100644 --- a/packager/file/file.gyp +++ b/packager/file/file.gyp @@ -20,6 +20,8 @@ 'file_util.cc', 'file_util.h', 'file_closer.h', + 'http_file.cc', + 'http_file.h', 'io_cache.cc', 'io_cache.h', 'local_file.cc', @@ -36,7 +38,9 @@ ], 'dependencies': [ '../base/base.gyp:base', + '../packager.gyp:status', '../third_party/gflags/gflags.gyp:gflags', + '../third_party/curl/curl.gyp:libcurl', ], }, { @@ -49,12 +53,14 @@ 'io_cache_unittest.cc', 'memory_file_unittest.cc', 'udp_options_unittest.cc', + 'http_file_unittest.cc', ], 'dependencies': [ '../media/test/media_test.gyp:run_tests_with_atexit_manager', '../testing/gmock.gyp:gmock', '../testing/gtest.gyp:gtest', '../third_party/gflags/gflags.gyp:gflags', + '../third_party/curl/curl.gyp:libcurl', 'file', ], }, diff --git a/packager/file/file.h b/packager/file/file.h index 295fddb75c..7854cb768c 100644 --- a/packager/file/file.h +++ b/packager/file/file.h @@ -20,6 +20,7 @@ extern const char* kCallbackFilePrefix; extern const char* kLocalFilePrefix; extern const char* kMemoryFilePrefix; extern const char* kUdpFilePrefix; +extern const char* kHttpFilePrefix; const int64_t kWholeFile = -1; /// Define an abstract file interface. diff --git a/packager/file/http_file.cc b/packager/file/http_file.cc new file mode 100644 index 0000000000..9fe513a839 --- /dev/null +++ b/packager/file/http_file.cc @@ -0,0 +1,368 @@ +// Copyright 2018 Google Inc. All rights reserved. +// +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file or at +// https://developers.google.com/open-source/licenses/bsd + +#include "packager/file/http_file.h" + +#include +#include "packager/base/bind.h" +#include "packager/base/files/file_util.h" +#include "packager/base/logging.h" +#include "packager/base/strings/string_number_conversions.h" +#include "packager/base/strings/stringprintf.h" +#include "packager/base/synchronization/lock.h" +#include "packager/base/threading/worker_pool.h" + +DEFINE_int32(libcurl_verbosity, 0, + "Set verbosity level for libcurl."); +DEFINE_string(user_agent, "", + "Set a custom User-Agent string for HTTP ingest."); +DEFINE_string(https_ca_file, "", + "Absolute path to the Certificate Authority file for the " + "server cert. PEM format"); +DEFINE_string(https_cert_file, "", + "Absolute path to client certificate file."); +DEFINE_string(https_cert_private_key_file, "", + "Absolute path to the private Key file."); +DEFINE_string(https_cert_private_key_password, "", + "Password to the private key file."); +DECLARE_uint64(io_cache_size); + +namespace shaka { + +// curl_ primitives stolen from `http_key_fetcher.cc`. +namespace { + +const char kUserAgentString[] = "shaka-packager-uploader/0.1"; + +size_t AppendToString(char* ptr, + size_t size, + size_t nmemb, + std::string* response) { + DCHECK(ptr); + DCHECK(response); + const size_t total_size = size * nmemb; + response->append(ptr, total_size); + return total_size; +} + +} // namespace + +class LibCurlInitializer { + public: + LibCurlInitializer() { + curl_global_init(CURL_GLOBAL_DEFAULT); + } + + ~LibCurlInitializer() { + curl_global_cleanup(); + } + + private: + DISALLOW_COPY_AND_ASSIGN(LibCurlInitializer); +}; + +/// Create a HTTP/HTTPS client +HttpFile::HttpFile(const char* file_name, const char* mode, bool https) + : File(file_name), + file_mode_(mode), + user_agent_(FLAGS_user_agent), + ca_file_(FLAGS_https_ca_file), + cert_file_(FLAGS_https_cert_file), + cert_private_key_file_(FLAGS_https_cert_private_key_file), + cert_private_key_pass_(FLAGS_https_cert_private_key_password), + timeout_in_seconds_(0), + cache_(FLAGS_io_cache_size), + scoped_curl(curl_easy_init(), &curl_easy_cleanup), + task_exit_event_(base::WaitableEvent::ResetPolicy::AUTOMATIC, + base::WaitableEvent::InitialState::NOT_SIGNALED) { + if (https) { + resource_url_ = "https://" + std::string(file_name); + } else { + resource_url_ = "http://" + std::string(file_name); + } + + static LibCurlInitializer lib_curl_initializer; + + // Setup libcurl scope + if (!scoped_curl.get()) { + LOG(ERROR) << "curl_easy_init() failed."; + // return Status(error::HTTP_FAILURE, "curl_easy_init() failed."); + delete this; + } +} + +HttpFile::HttpFile(const char* file_name, const char* mode) + : HttpFile(file_name, mode, false) +{} + +// Destructor +HttpFile::~HttpFile() {} + +bool HttpFile::Open() { + + VLOG(1) << "Opening " << resource_url() << " with file mode \"" << file_mode_ << "\"."; + + // Ignore read requests as they would truncate the target + // file by propagating as zero-length PUT requests. + // See also https://github.com/google/shaka-packager/issues/149#issuecomment-437203701 + if (file_mode_ == "r") { + VLOG(1) << "HttpFile only supports write mode, skipping further operations"; + task_exit_event_.Signal(); + return false; + } + + // Run progressive upload in separate thread. + base::WorkerPool::PostTask( + FROM_HERE, base::Bind(&HttpFile::CurlPut, base::Unretained(this)), + true // task_is_slow + ); + + return true; +} + +void HttpFile::CurlPut() { + // Setup libcurl handle with HTTP PUT upload transfer mode. + std::string request_body; + Request(PUT, resource_url(), request_body, &response_body_); +} + +bool HttpFile::Close() { + VLOG(1) << "Closing " << resource_url() << "."; + cache_.Close(); + task_exit_event_.Wait(); + delete this; + return true; +} + +int64_t HttpFile::Read(void* buffer, uint64_t length) { + LOG(WARNING) << "HttpFile does not support Read()."; + return -1; +} + +int64_t HttpFile::Write(const void* buffer, uint64_t length) { + std::string url = resource_url(); + + VLOG(2) << "Writing to " << url << ", length=" << length; + + // TODO: Implement retrying with exponential backoff, see + // "widevine_key_source.cc" + Status status; + + uint64_t bytes_written = cache_.Write(buffer, length); + VLOG(3) << "PUT CHUNK bytes_written: " << bytes_written; + return bytes_written; + + // Debugging based on response status + /* + if (status.ok()) { + VLOG(1) << "Writing chunk succeeded"; + + } else { + VLOG(1) << "Writing chunk failed"; + if (!response_body.empty()) { + VLOG(2) << "Response:\n" << response_body; + } + } + */ + + // Always signal success to the downstream pipeline + return length; +} + +int64_t HttpFile::Size() { + VLOG(1) << "HttpFile does not support Size()."; + return -1; +} + +bool HttpFile::Flush() { + // Do nothing on Flush. + return true; +} + +bool HttpFile::Seek(uint64_t position) { + VLOG(1) << "HttpFile does not support Seek()."; + return false; +} + +bool HttpFile::Tell(uint64_t* position) { + VLOG(1) << "HttpFile does not support Tell()."; + return false; +} + +// Perform HTTP request +Status HttpFile::Request(HttpMethod http_method, + const std::string& url, + const std::string& data, + std::string* response) { + + // TODO: Sanity checks. + // DCHECK(http_method == GET || http_method == POST); + + VLOG(1) << "Sending request to URL " << url; + + // Setup HTTP method and libcurl options + SetupRequestBase(http_method, url, response); + + // Setup HTTP request headers and body + SetupRequestData(data); + + // Perform HTTP request + CURLcode res = curl_easy_perform(scoped_curl.get()); + + // Assume successful request + Status status = Status::OK; + + // Handle request failure + if (res != CURLE_OK) { + std::string method_text = method_as_text(http_method); + std::string error_message = base::StringPrintf( + "%s request for %s failed. Reason: %s.", method_text.c_str(), + url.c_str(), curl_easy_strerror(res)); + if (res == CURLE_HTTP_RETURNED_ERROR) { + long response_code = 0; + curl_easy_getinfo(scoped_curl.get(), CURLINFO_RESPONSE_CODE, &response_code); + error_message += + base::StringPrintf(" Response code: %ld.", response_code); + } + + // Signal error to logfile + LOG(ERROR) << error_message; + + // Signal error to caller + status = Status( + res == CURLE_OPERATION_TIMEDOUT ? error::TIME_OUT : error::HTTP_FAILURE, + error_message); + } + + // Signal task completion + task_exit_event_.Signal(); + + // Return request status to caller + return status; +} + +// Configure curl_ handle with reasonable defaults +void HttpFile::SetupRequestBase(HttpMethod http_method, + const std::string& url, + std::string* response) { + response->clear(); + + // Configure HTTP request method/verb + switch (http_method) { + case GET: + curl_easy_setopt(scoped_curl.get(), CURLOPT_HTTPGET, 1L); + break; + case POST: + curl_easy_setopt(scoped_curl.get(), CURLOPT_POST, 1L); + break; + case PUT: + curl_easy_setopt(scoped_curl.get(), CURLOPT_PUT, 1L); + break; + case PATCH: + curl_easy_setopt(scoped_curl.get(), CURLOPT_CUSTOMREQUEST, "PATCH"); + break; + } + + // Configure HTTP request + curl_easy_setopt(scoped_curl.get(), CURLOPT_URL, url.c_str()); + + if (user_agent_.empty()) { + curl_easy_setopt(scoped_curl.get(), CURLOPT_USERAGENT, kUserAgentString); + } else { + curl_easy_setopt(scoped_curl.get(), CURLOPT_USERAGENT, user_agent_.data()); + } + + curl_easy_setopt(scoped_curl.get(), CURLOPT_TIMEOUT, timeout_in_seconds_); + curl_easy_setopt(scoped_curl.get(), CURLOPT_FAILONERROR, 1L); + curl_easy_setopt(scoped_curl.get(), CURLOPT_FOLLOWLOCATION, 1L); + curl_easy_setopt(scoped_curl.get(), CURLOPT_WRITEFUNCTION, AppendToString); + curl_easy_setopt(scoped_curl.get(), CURLOPT_WRITEDATA, response); + + // HTTPS + if (!cert_private_key_file_.empty() && !cert_file_.empty()) { + curl_easy_setopt(scoped_curl.get(), CURLOPT_SSLKEY, + cert_private_key_file_.data()); + + if (!cert_private_key_pass_.empty()) { + curl_easy_setopt(scoped_curl.get(), CURLOPT_KEYPASSWD, + cert_private_key_pass_.data()); + } + + curl_easy_setopt(scoped_curl.get(), CURLOPT_SSLKEYTYPE, "PEM"); + curl_easy_setopt(scoped_curl.get(), CURLOPT_SSLCERTTYPE, "PEM"); + curl_easy_setopt(scoped_curl.get(), CURLOPT_SSLCERT, cert_file_.data()); + } + if (!ca_file_.empty()) { + // Host validation needs to be off when using self-signed certificates. + curl_easy_setopt(scoped_curl.get(), CURLOPT_SSL_VERIFYHOST, 0L); + curl_easy_setopt(scoped_curl.get(), CURLOPT_CAINFO, ca_file_.data()); + } + + // Propagate log level indicated by "--libcurl_verbosity" to libcurl. + curl_easy_setopt(scoped_curl.get(), CURLOPT_VERBOSE, FLAGS_libcurl_verbosity); + +} + +// https://ec.haxx.se/callback-read.html +size_t read_callback(char* buffer, size_t size, size_t nitems, void* stream) { + VLOG(3) << "read_callback"; + + // Cast stream back to what is actually is + // IoCache* cache = reinterpret_cast(stream); + IoCache* cache = (IoCache*)stream; + VLOG(3) << "read_callback, cache: " << cache; + + // Copy cache content into buffer + size_t length = cache->Read(buffer, size * nitems); + VLOG(3) << "read_callback, length: " << length << "; buffer: " << buffer; + return length; +} + +// Configure curl_ handle for HTTP PUT upload +void HttpFile::SetupRequestData(const std::string& data) { + + // TODO: Sanity checks. + // if (method == POST || method == PUT || method == PATCH) + + // Build list of HTTP request headers. + struct curl_slist* headers = nullptr; + + headers = curl_slist_append(headers, "Content-Type: application/octet-stream"); + headers = curl_slist_append(headers, "Transfer-Encoding: chunked"); + + // Don't stop on 200 OK responses. + headers = curl_slist_append(headers, "Expect:"); + + // Enable progressive upload with chunked transfer encoding. + curl_easy_setopt(scoped_curl.get(), CURLOPT_READFUNCTION, read_callback); + curl_easy_setopt(scoped_curl.get(), CURLOPT_READDATA, &cache_); + curl_easy_setopt(scoped_curl.get(), CURLOPT_UPLOAD, 1L); + + // Add HTTP request headers. + curl_easy_setopt(scoped_curl.get(), CURLOPT_HTTPHEADER, headers); +} + +// Return HTTP request method (verb) as string +std::string HttpFile::method_as_text(HttpMethod method) { + std::string method_text = "UNKNOWN"; + switch (method) { + case GET: + method_text = "GET"; + break; + case POST: + method_text = "POST"; + break; + case PUT: + method_text = "PUT"; + break; + case PATCH: + method_text = "PATCH"; + break; + } + return method_text; +} + +} // namespace shaka diff --git a/packager/file/http_file.h b/packager/file/http_file.h new file mode 100644 index 0000000000..fc23347883 --- /dev/null +++ b/packager/file/http_file.h @@ -0,0 +1,106 @@ +// Copyright 2018 Google Inc. All rights reserved. +// +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file or at +// https://developers.google.com/open-source/licenses/bsd + +#ifndef PACKAGER_FILE_HTTP_H_ +#define PACKAGER_FILE_HTTP_H_ + +#include +#include + +#include "packager/base/compiler_specific.h" +#include "packager/base/synchronization/waitable_event.h" +#include "packager/file/file.h" +#include "packager/file/io_cache.h" +#include "packager/status.h" + +namespace shaka { +using ScopedCurl = std::unique_ptr; + +/// HttpFile delegates write calls to HTTP PUT requests. +/// +/// About how to use this, please visit the corresponding documentation [1,2]. +/// +/// [1] https://google.github.io/shaka-packager/html/tutorials/http_upload.html +/// [2] +/// https://github.com/3QSDN/shaka-packager/blob/http-upload/docs/source/tutorials/http_upload.rst +/// +class HttpFile : public File { + public: + + /// Create a HTTP client + /// @param file_name contains the url of the resource to be accessed. + /// Note that the file type prefix should be stripped off already. + /// @param mode contains file access mode. Implementation dependent. + HttpFile(const char* file_name, const char* mode, bool https); + HttpFile(const char* file_name, const char* mode); + + /// @name File implementation overrides. + /// @{ + bool Close() override; + int64_t Read(void* buffer, uint64_t length) override; + int64_t Write(const void* buffer, uint64_t length) override; + int64_t Size() override; + bool Flush() override; + bool Seek(uint64_t position) override; + bool Tell(uint64_t* position) override; + /// @} + + /// @return The full resource url + const std::string& resource_url() const { return resource_url_; } + + protected: + // Destructor + ~HttpFile() override; + + bool Open() override; + + private: + enum HttpMethod { + GET, + POST, + PUT, + PATCH, + }; + + HttpFile(const HttpFile&) = delete; + HttpFile& operator=(const HttpFile&) = delete; + + // Internal implementation of HTTP functions, e.g. Get and Post. + Status Request(HttpMethod http_method, + const std::string& url, + const std::string& data, + std::string* response); + + void SetupRequestBase(HttpMethod http_method, + const std::string& url, + std::string* response); + + void SetupRequestData(const std::string& data); + + void CurlPut(); + + std::string method_as_text(HttpMethod method); + + std::string file_mode_; + std::string resource_url_; + std::string user_agent_; + std::string ca_file_; + std::string cert_file_; + std::string cert_private_key_file_; + std::string cert_private_key_pass_; + + const uint32_t timeout_in_seconds_; + IoCache cache_; + ScopedCurl scoped_curl; + std::string response_body_; + + // Signaled when the "curl easy perform" task completes. + base::WaitableEvent task_exit_event_; +}; + +} // namespace shaka + +#endif // PACKAGER_FILE_HTTP_H_ diff --git a/packager/file/http_file_unittest.cc b/packager/file/http_file_unittest.cc new file mode 100644 index 0000000000..c11fed8897 --- /dev/null +++ b/packager/file/http_file_unittest.cc @@ -0,0 +1,25 @@ +#include "packager/file/http_file.h" +#include +#include +#include "packager/file/file.h" +#include "packager/file/file_closer.h" + +namespace shaka { +namespace { + +const uint8_t kWriteBuffer[] = {1, 2, 3, 4, 5, 6, 7, 8}; +const int64_t kWriteBufferSize = sizeof(kWriteBuffer); + +} // namespace + +class HttpFileTest : public testing::Test {}; + +TEST_F(HttpFileTest, PutChunkedTranser) { + std::unique_ptr writer( + File::Open("http://127.0.0.1:8080/test_out", "w")); + ASSERT_TRUE(writer); + ASSERT_EQ(kWriteBufferSize, writer->Write(kWriteBuffer, kWriteBufferSize)); + writer.release()->Close(); +} + +} // namespace shaka diff --git a/packager/file/local_file.cc b/packager/file/local_file.cc index 4120df1812..2b2b9a7913 100644 --- a/packager/file/local_file.cc +++ b/packager/file/local_file.cc @@ -124,6 +124,10 @@ int64_t LocalFile::Read(void* buffer, uint64_t length) { } int64_t LocalFile::Write(const void* buffer, uint64_t length) { + + base::FilePath file_path(base::FilePath::FromUTF8Unsafe(file_name())); + VLOG(2) << "Writing to " << file_path.AsUTF8Unsafe() << ", length=" << length; + DCHECK(buffer != NULL); DCHECK(internal_file_ != NULL); size_t bytes_written = fwrite(buffer, sizeof(char), length, internal_file_); diff --git a/packager/media/base/buffer_writer.cc b/packager/media/base/buffer_writer.cc index d3c32fece3..d974a5a3be 100644 --- a/packager/media/base/buffer_writer.cc +++ b/packager/media/base/buffer_writer.cc @@ -70,8 +70,8 @@ void BufferWriter::AppendBuffer(const BufferWriter& buffer) { Status BufferWriter::WriteToFile(File* file) { DCHECK(file); DCHECK(!buf_.empty()); - size_t remaining_size = buf_.size(); + VLOG(1) << "BufferWriter::WriteToFile " << file->file_name() << " with " << remaining_size << " octets"; const uint8_t* buf = &buf_[0]; while (remaining_size > 0) { int64_t size_written = file->Write(buf, remaining_size); diff --git a/packager/media/formats/mp2t/ts_segmenter.cc b/packager/media/formats/mp2t/ts_segmenter.cc index 4bedb1e32b..78b2bafd0c 100644 --- a/packager/media/formats/mp2t/ts_segmenter.cc +++ b/packager/media/formats/mp2t/ts_segmenter.cc @@ -147,13 +147,13 @@ Status TsSegmenter::WritePesPackets() { if (listener_ && IsVideoCodec(codec_) && pes_packet->is_key_frame()) { - uint64_t start_pos = segment_buffer_.Size(); + uint64_t start_pos = segment_buffer_.Size(); const int64_t timestamp = pes_packet->pts(); if (!ts_writer_->AddPesPacket(std::move(pes_packet), &segment_buffer_)) return Status(error::MUXER_FAILURE, "Failed to add PES packet."); uint64_t end_pos = segment_buffer_.Size(); - + listener_->OnKeyFrame(timestamp, start_pos, end_pos - start_pos); } else { if (!ts_writer_->AddPesPacket(std::move(pes_packet), &segment_buffer_)) @@ -181,20 +181,20 @@ Status TsSegmenter::FinalizeSegment(uint64_t start_timestamp, segment_number_++, muxer_options_.bandwidth); const int64_t file_size = segment_buffer_.Size(); - std::unique_ptr segment_file; + std::unique_ptr segment_file; segment_file.reset(File::Open(segment_path.c_str(), "w")); if (!segment_file) { return Status(error::FILE_FAILURE, - "Cannot open file for write " + segment_path); + "Cannot open file for write " + segment_path); } - + RETURN_IF_ERROR(segment_buffer_.WriteToFile(segment_file.get())); if (!segment_file.release()->Close()) { return Status( error::FILE_FAILURE, "Cannot close file " + segment_path + - ", possibly file permission issue or running out of disk space."); + ", possibly file permission issue or running out of disk space."); } if (listener_) { @@ -204,7 +204,7 @@ Status TsSegmenter::FinalizeSegment(uint64_t start_timestamp, duration * timescale_scale_, file_size); } segment_started_ = false; - + return Status::OK; } diff --git a/packager/media/formats/mp2t/ts_segmenter.h b/packager/media/formats/mp2t/ts_segmenter.h index 595e36f9e2..365bc57676 100644 --- a/packager/media/formats/mp2t/ts_segmenter.h +++ b/packager/media/formats/mp2t/ts_segmenter.h @@ -69,7 +69,7 @@ class TsSegmenter { /// Only for testing. void SetSegmentStartedForTesting(bool value); - + private: Status StartSegmentIfNeeded(int64_t next_pts); @@ -92,7 +92,7 @@ class TsSegmenter { uint64_t segment_number_ = 0; std::unique_ptr ts_writer_; - + BufferWriter segment_buffer_; // Set to true if segment_buffer_ is initialized, set to false after diff --git a/packager/media/formats/mp2t/ts_writer.cc b/packager/media/formats/mp2t/ts_writer.cc index 36bf3a73b0..7259787091 100644 --- a/packager/media/formats/mp2t/ts_writer.cc +++ b/packager/media/formats/mp2t/ts_writer.cc @@ -166,7 +166,6 @@ TsWriter::TsWriter(std::unique_ptr pmt_writer) TsWriter::~TsWriter() {} bool TsWriter::NewSegment(BufferWriter* buffer) { - BufferWriter psi; WritePatToBuffer(kPat, arraysize(kPat), &pat_continuity_counter_, &psi); if (encrypted_) { diff --git a/packager/media/formats/packed_audio/packed_audio_writer.cc b/packager/media/formats/packed_audio/packed_audio_writer.cc index 0b17cf3872..bfe89f16e3 100644 --- a/packager/media/formats/packed_audio/packed_audio_writer.cc +++ b/packager/media/formats/packed_audio/packed_audio_writer.cc @@ -107,6 +107,7 @@ Status PackedAudioWriter::WriteSegment(const std::string& segment_path, range.end = range.start + segment_buffer->Size() - 1; media_ranges_.subsegment_ranges.push_back(range); } else { + VLOG(2) << "PackedAudioWriter::WriteSegment: File::Open(" << segment_path << ")"; file.reset(File::Open(segment_path.c_str(), "w")); if (!file) { return Status(error::FILE_FAILURE,