HTTP PUT output support (#737)

Issue #149

Co-authored-by: Andreas Motl <andreas.motl@elmyra.de>
Co-authored-by: Rintaro Kuroiwa <rkuroiwa@google.com>
Co-authored-by: Ole Andre Birkedal <o.birkedal@sportradar.com>
This commit is contained in:
Ole Andre Birkedal 2021-02-02 19:51:50 +01:00 committed by GitHub
parent 5bcda6b88b
commit aa17521268
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 797 additions and 11 deletions

1
.gitignore vendored
View File

@ -8,6 +8,7 @@
.cproject .cproject
.project .project
.pydevproject .pydevproject
.idea
.repo .repo
.settings .settings
/out* /out*

View File

@ -13,6 +13,7 @@
# #
# Please keep the list sorted. # Please keep the list sorted.
3Q GmbH <*@3qsdn.com>
Alen Vrecko <alen.vrecko@gmail.com> Alen Vrecko <alen.vrecko@gmail.com>
Anders Hasselqvist <anders.hasselqvist@gmail.com> Anders Hasselqvist <anders.hasselqvist@gmail.com>
Chun-da Chen <capitalm.c@gmail.com> Chun-da Chen <capitalm.c@gmail.com>
@ -24,6 +25,7 @@ Ivi.ru LLC <*@ivi.ru>
Leandro Moreira <leandro.ribeiro.moreira@gmail.com> Leandro Moreira <leandro.ribeiro.moreira@gmail.com>
Leo Law <leoltlaw.gh@gmail.com> Leo Law <leoltlaw.gh@gmail.com>
More Screens Ltd. <*@morescreens.net> More Screens Ltd. <*@morescreens.net>
Ole Andre Birkedal <o.birkedal@sportradar.com>
Philo Inc. <*@philo.com> Philo Inc. <*@philo.com>
Piotr Srebrny <srebrny.piotr@gmail.com> Piotr Srebrny <srebrny.piotr@gmail.com>
Richard Eklycke <richard@eklycke.se> Richard Eklycke <richard@eklycke.se>

View File

@ -24,6 +24,7 @@
Alen Vrecko <alen.vrecko@gmail.com> Alen Vrecko <alen.vrecko@gmail.com>
Anders Hasselqvist <anders.hasselqvist@gmail.com> Anders Hasselqvist <anders.hasselqvist@gmail.com>
Andreas Motl <andreas.motl@elmyra.de>
Bei Li <beil@google.com> Bei Li <beil@google.com>
Chun-da Chen <capitalm.c@gmail.com> Chun-da Chen <capitalm.c@gmail.com>
Daniel Cantarín <canta@canta.com.ar> Daniel Cantarín <canta@canta.com.ar>
@ -37,6 +38,7 @@ Joey Parrish <joeyparrish@google.com>
Kongqun Yang <kqyang@google.com> Kongqun Yang <kqyang@google.com>
Leandro Moreira <leandro.ribeiro.moreira@gmail.com> Leandro Moreira <leandro.ribeiro.moreira@gmail.com>
Leo Law <leoltlaw.gh@gmail.com> Leo Law <leoltlaw.gh@gmail.com>
Ole Andre Birkedal <o.birkedal@sportradar.com>
Piotr Srebrny <srebrny.piotr@gmail.com> Piotr Srebrny <srebrny.piotr@gmail.com>
Qingquan Wang <wangqq1103@gmail.com> Qingquan Wang <wangqq1103@gmail.com>
Richard Eklycke <richard@eklycke.se> Richard Eklycke <richard@eklycke.se>

View File

@ -0,0 +1,250 @@
🛠 Status: Work in progress
The Shaka Packager HTTP upload feature is currently in development.
It's on the fast track to a working alpha, so we encourage you to use
it and give us some feedback. However, there are things that haven't
been finalized yet so you can expect some changes.
This document describes the current state of the implementation,
contributions are always welcome.
The discussion about this feature currently happens at
`Add HTTP PUT output #149 <https://github.com/google/shaka-packager/issues/149>`_,
its development on the
`http-upload <https://github.com/3QSDN/shaka-packager/tree/http-upload>`_ branch,
feel free to join us.
###########
HTTP upload
###########
************
Introduction
************
Shaka Packager can upload produced artefacts to a HTTP server using
HTTP PUT requests with chunked transfer encoding to improve live
publishing performance when content is not served directly from
the packaging output location. For talking HTTP, libcurl_ is used.
The produced artefacts are:
- HLS_ playlist files in M3U_ Format encoded with UTF-8 (.m3u8)
- Chunked audio segments encoded with AAC (.aac)
- Chunked video segments encapsulated into the
`MPEG transport stream`_ container format (.ts)
References
==========
- `RFC 2616 about HTTP PUT`_
- `RFC 2616 about Chunked Transfer Coding`_
*************
Documentation
*************
Getting started
===============
For enabling the HTTP upload transfer mode, please populate
the ``segment_template`` attribute in the ``stream_descriptor``
parameter as well as the ``--hls_master_playlist_output`` parameter
with appropriate URLs where the HTTP PUT requests will be issued to.
You can also supply the ``--user_agent`` flag to specify a custom
User-Agent string for all HTTP PUT requests.
For pragmatic reasons, all HTTP requests will be declared as
``Content-Type: application/octet-stream``.
Synopsis
========
Here is a basic example. It is similar to the "live" example and also
borrows features from "FFmpeg piping", see :doc:`live` and :doc:`ffmpeg_piping`.
Define UNIX pipe to connect ffmpeg with packager::
export PIPE=/tmp/bigbuckbunny.fifo
mkfifo ${PIPE}
Acquire and transcode RTMP stream::
# Steady
ffmpeg -fflags nobuffer -threads 0 -y \
-i rtmp://184.72.239.149/vod/mp4:bigbuckbunny_450.mp4 \
-pix_fmt yuv420p -vcodec libx264 -preset:v superfast -acodec aac \
-f mpegts pipe: > ${PIPE}
Configure and run packager::
# Define upload URL
export UPLOAD_URL=http://localhost:6767/hls-live
# Go
packager \
"input=${PIPE},stream=audio,segment_template=${UPLOAD_URL}/bigbuckbunny-audio-aac-\$Number%04d\$.aac,playlist_name=bigbuckbunny-audio.m3u8,hls_group_id=audio" \
"input=${PIPE},stream=video,segment_template=${UPLOAD_URL}/bigbuckbunny-video-h264-450-\$Number%04d\$.ts,playlist_name=bigbuckbunny-video-450.m3u8" \
--io_block_size 65536 --fragment_duration 2 --segment_duration 2 \
--time_shift_buffer_depth 3600 --preserved_segments_outside_live_window 7200 \
--hls_master_playlist_output "${UPLOAD_URL}/bigbuckbunny.m3u8" \
--hls_playlist_type LIVE \
--vmodule=http_file=1
*******
HTTPS
*******
If your ingest uses HTTPS and requires specific certificates, these
can be specified on the command line similar to how it's done for
:doc:`playready`, with the following arguments:
- ``--https_ca_file``: Absolute path to the Certificate Authority file for the server cert. PEM format.
- ``--https_cert_file``: Absolute path to client certificate file.
- ``--https_cert_private_key_file``: Absolute path to the private Key file.
- ``--https_cert_private_key_password``: Password to the private key file.
*******
Backlog
*******
Please note the HTTP upload feature still lacks some features
probably important for production. Contributions are welcome!
DASH
====
While the current implementation works for HLS_,
we should also check DASH_.
Basic Auth
==========
There's no support for authentication yet.
HTTPS
=====
While there's already some code in place,
HTTPS is currently not supported yet.
HTTP DELETE
===========
Nothing has be done to support this yet:
Packager supports removing old segments automatically.
See ``preserved_segments_outside_live_window`` option in
DASH_ options or HLS_ options for details.
Software tests
==============
We should do some minimal QA, check whether the test
suite breaks and maybe add some tests covering new code.
Network timeouts
================
libcurl_ can apply network timeout settings. However,
we haven't addressed this yet.
Miscellaneous
=============
- Address all things TODO and FIXME
- Make ``io_cache_size`` configurable?
*******
Backend
*******
HTTP PUT file uploads to Nginx
==============================
The receiver is based on the native Nginx_ module "`ngx_http_dav_module`_",
it handles HTTP PUT requests with chunked transfer encoding
like emitted by Shaka Packager.
The configuration is very simple::
server {
listen 6767 default_server;
access_log /dev/stdout combined;
error_log /dev/stdout info;
root /var/spool;
location ~ ^/hls-live/(.+)$ {
dav_methods PUT;
create_full_put_path on;
proxy_buffering off;
client_max_body_size 20m;
}
}
Run Nginx::
nginx -p `pwd` -c nginx.conf -g "daemon off;"
HTTP PUT file uploads to Caddy
==============================
The receiver is based on the Caddy_ webserver, it handles HTTP PUT
requests with chunked transfer encoding like emitted by Shaka Packager.
Put this configuration into a `Caddyfile`::
# Bind address
:6767
# Enable logging
log stdout
# Web server root with autoindex
root /var/spool
redir /hls-live {
if {path} is "/"
}
browse
# Enable upload with HTTP PUT
upload /hls-live {
to "/var/spool/hls-live"
}
Run Caddy::
caddy -conf Caddyfile
*************************
Development and debugging
*************************
Watch the network::
ngrep -Wbyline -dlo port 6767
Grab and run `httpd-reflector.py`_ to use it as a dummy HTTP sink::
# Ready
wget https://gist.githubusercontent.com/amotl/3ed38e461af743aeeade5a5a106c1296/raw/httpd-reflector.py
chmod +x httpd-reflector.py
./httpd-reflector.py --port 6767
----
Have fun!
.. _HLS: https://en.wikipedia.org/wiki/HTTP_Live_Streaming
.. _DASH: https://en.wikipedia.org/wiki/Dynamic_Adaptive_Streaming_over_HTTP
.. _M3U: https://en.wikipedia.org/wiki/M3U
.. _MPEG transport stream: https://en.wikipedia.org/wiki/MPEG_transport_stream
.. _libcurl: https://curl.haxx.se/libcurl/
.. _RFC 1867: https://tools.ietf.org/html/rfc1867
.. _RFC 2616 about HTTP PUT: https://www.w3.org/Protocols/rfc2616/rfc2616-sec9.html#sec9.6
.. _RFC 2616 about Chunked Transfer Coding: https://www.w3.org/Protocols/rfc2616/rfc2616-sec3.html#sec3.6.1
.. _RFC 5789: https://tools.ietf.org/html/rfc5789
.. _Nginx: http://nginx.org/
.. _ngx_http_dav_module: http://nginx.org/en/docs/http/ngx_http_dav_module.html
.. _Caddy: https://caddyserver.com/
.. _httpd-reflector.py: https://gist.github.com/amotl/3ed38e461af743aeeade5a5a106c1296
.. _@colleenkhenry: https://github.com/colleenkhenry
.. _@kqyang: https://github.com/kqyang

View File

@ -11,3 +11,4 @@ Tutorials
drm.rst drm.rst
ads.rst ads.rst
ffmpeg_piping.rst ffmpeg_piping.rst
http_upload.rst

View File

@ -21,6 +21,7 @@
#include "packager/file/memory_file.h" #include "packager/file/memory_file.h"
#include "packager/file/threaded_io_file.h" #include "packager/file/threaded_io_file.h"
#include "packager/file/udp_file.h" #include "packager/file/udp_file.h"
#include "packager/file/http_file.h"
DEFINE_uint64(io_cache_size, DEFINE_uint64(io_cache_size,
32ULL << 20, 32ULL << 20,
@ -41,6 +42,9 @@ const char* kCallbackFilePrefix = "callback://";
const char* kLocalFilePrefix = "file://"; const char* kLocalFilePrefix = "file://";
const char* kMemoryFilePrefix = "memory://"; const char* kMemoryFilePrefix = "memory://";
const char* kUdpFilePrefix = "udp://"; const char* kUdpFilePrefix = "udp://";
const char* kHttpFilePrefix = "http://";
const char* kHttpsFilePrefix = "https://";
namespace { namespace {
@ -95,6 +99,14 @@ File* CreateUdpFile(const char* file_name, const char* mode) {
return new UdpFile(file_name); return new UdpFile(file_name);
} }
File* CreateHttpsFile(const char* file_name, const char* mode) {
return new HttpFile(file_name, mode, true);
}
File* CreateHttpFile(const char* file_name, const char* mode) {
return new HttpFile(file_name, mode, false);
}
File* CreateMemoryFile(const char* file_name, const char* mode) { File* CreateMemoryFile(const char* file_name, const char* mode) {
return new MemoryFile(file_name, mode); return new MemoryFile(file_name, mode);
} }
@ -114,6 +126,8 @@ static const FileTypeInfo kFileTypeInfo[] = {
{kUdpFilePrefix, &CreateUdpFile, nullptr, nullptr}, {kUdpFilePrefix, &CreateUdpFile, nullptr, nullptr},
{kMemoryFilePrefix, &CreateMemoryFile, &DeleteMemoryFile, nullptr}, {kMemoryFilePrefix, &CreateMemoryFile, &DeleteMemoryFile, nullptr},
{kCallbackFilePrefix, &CreateCallbackFile, nullptr, nullptr}, {kCallbackFilePrefix, &CreateCallbackFile, nullptr, nullptr},
{kHttpFilePrefix, &CreateHttpFile, nullptr, nullptr},
{kHttpsFilePrefix, &CreateHttpsFile, nullptr, nullptr},
}; };
base::StringPiece GetFileTypePrefix(base::StringPiece file_name) { base::StringPiece GetFileTypePrefix(base::StringPiece file_name) {
@ -123,6 +137,7 @@ base::StringPiece GetFileTypePrefix(base::StringPiece file_name) {
const FileTypeInfo* GetFileTypeInfo(base::StringPiece file_name, const FileTypeInfo* GetFileTypeInfo(base::StringPiece file_name,
base::StringPiece* real_file_name) { base::StringPiece* real_file_name) {
base::StringPiece file_type_prefix = GetFileTypePrefix(file_name); base::StringPiece file_type_prefix = GetFileTypePrefix(file_name);
for (const FileTypeInfo& file_type : kFileTypeInfo) { for (const FileTypeInfo& file_type : kFileTypeInfo) {
if (file_type_prefix == file_type.type) { if (file_type_prefix == file_type.type) {
@ -233,6 +248,7 @@ bool File::ReadFileToString(const char* file_name, std::string* contents) {
bool File::WriteStringToFile(const char* file_name, bool File::WriteStringToFile(const char* file_name,
const std::string& contents) { const std::string& contents) {
VLOG(2) << "File::WriteStringToFile: " << file_name;
std::unique_ptr<File, FileCloser> file(File::Open(file_name, "w")); std::unique_ptr<File, FileCloser> file(File::Open(file_name, "w"));
if (!file) { if (!file) {
LOG(ERROR) << "Failed to open file " << file_name; LOG(ERROR) << "Failed to open file " << file_name;
@ -261,6 +277,7 @@ bool File::WriteStringToFile(const char* file_name,
bool File::WriteFileAtomically(const char* file_name, bool File::WriteFileAtomically(const char* file_name,
const std::string& contents) { const std::string& contents) {
VLOG(2) << "File::WriteFileAtomically: " << file_name;
base::StringPiece real_file_name; base::StringPiece real_file_name;
const FileTypeInfo* file_type = GetFileTypeInfo(file_name, &real_file_name); const FileTypeInfo* file_type = GetFileTypeInfo(file_name, &real_file_name);
DCHECK(file_type); DCHECK(file_type);
@ -280,6 +297,7 @@ bool File::WriteFileAtomically(const char* file_name,
bool File::Copy(const char* from_file_name, const char* to_file_name) { bool File::Copy(const char* from_file_name, const char* to_file_name) {
std::string content; std::string content;
VLOG(1) << "File::Copy from " << from_file_name << " to " << to_file_name;
if (!ReadFileToString(from_file_name, &content)) { if (!ReadFileToString(from_file_name, &content)) {
LOG(ERROR) << "Failed to open file " << from_file_name; LOG(ERROR) << "Failed to open file " << from_file_name;
return false; return false;
@ -323,6 +341,8 @@ int64_t File::CopyFile(File* source, File* destination, int64_t max_copy) {
if (max_copy < 0) if (max_copy < 0)
max_copy = std::numeric_limits<int64_t>::max(); max_copy = std::numeric_limits<int64_t>::max();
VLOG(1) << "File::CopyFile from " << source->file_name() << " to " << destination->file_name();
const int64_t kBufferSize = 0x40000; // 256KB. const int64_t kBufferSize = 0x40000; // 256KB.
std::unique_ptr<uint8_t[]> buffer(new uint8_t[kBufferSize]); std::unique_ptr<uint8_t[]> buffer(new uint8_t[kBufferSize]);
int64_t bytes_copied = 0; int64_t bytes_copied = 0;

View File

@ -20,6 +20,8 @@
'file_util.cc', 'file_util.cc',
'file_util.h', 'file_util.h',
'file_closer.h', 'file_closer.h',
'http_file.cc',
'http_file.h',
'io_cache.cc', 'io_cache.cc',
'io_cache.h', 'io_cache.h',
'local_file.cc', 'local_file.cc',
@ -36,7 +38,9 @@
], ],
'dependencies': [ 'dependencies': [
'../base/base.gyp:base', '../base/base.gyp:base',
'../packager.gyp:status',
'../third_party/gflags/gflags.gyp:gflags', '../third_party/gflags/gflags.gyp:gflags',
'../third_party/curl/curl.gyp:libcurl',
], ],
}, },
{ {
@ -49,12 +53,14 @@
'io_cache_unittest.cc', 'io_cache_unittest.cc',
'memory_file_unittest.cc', 'memory_file_unittest.cc',
'udp_options_unittest.cc', 'udp_options_unittest.cc',
'http_file_unittest.cc',
], ],
'dependencies': [ 'dependencies': [
'../media/test/media_test.gyp:run_tests_with_atexit_manager', '../media/test/media_test.gyp:run_tests_with_atexit_manager',
'../testing/gmock.gyp:gmock', '../testing/gmock.gyp:gmock',
'../testing/gtest.gyp:gtest', '../testing/gtest.gyp:gtest',
'../third_party/gflags/gflags.gyp:gflags', '../third_party/gflags/gflags.gyp:gflags',
'../third_party/curl/curl.gyp:libcurl',
'file', 'file',
], ],
}, },

View File

@ -20,6 +20,7 @@ extern const char* kCallbackFilePrefix;
extern const char* kLocalFilePrefix; extern const char* kLocalFilePrefix;
extern const char* kMemoryFilePrefix; extern const char* kMemoryFilePrefix;
extern const char* kUdpFilePrefix; extern const char* kUdpFilePrefix;
extern const char* kHttpFilePrefix;
const int64_t kWholeFile = -1; const int64_t kWholeFile = -1;
/// Define an abstract file interface. /// Define an abstract file interface.

368
packager/file/http_file.cc Normal file
View File

@ -0,0 +1,368 @@
// Copyright 2018 Google Inc. All rights reserved.
//
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file or at
// https://developers.google.com/open-source/licenses/bsd
#include "packager/file/http_file.h"
#include <gflags/gflags.h>
#include "packager/base/bind.h"
#include "packager/base/files/file_util.h"
#include "packager/base/logging.h"
#include "packager/base/strings/string_number_conversions.h"
#include "packager/base/strings/stringprintf.h"
#include "packager/base/synchronization/lock.h"
#include "packager/base/threading/worker_pool.h"
DEFINE_int32(libcurl_verbosity, 0,
"Set verbosity level for libcurl.");
DEFINE_string(user_agent, "",
"Set a custom User-Agent string for HTTP ingest.");
DEFINE_string(https_ca_file, "",
"Absolute path to the Certificate Authority file for the "
"server cert. PEM format");
DEFINE_string(https_cert_file, "",
"Absolute path to client certificate file.");
DEFINE_string(https_cert_private_key_file, "",
"Absolute path to the private Key file.");
DEFINE_string(https_cert_private_key_password, "",
"Password to the private key file.");
DECLARE_uint64(io_cache_size);
namespace shaka {
// curl_ primitives stolen from `http_key_fetcher.cc`.
namespace {
const char kUserAgentString[] = "shaka-packager-uploader/0.1";
size_t AppendToString(char* ptr,
size_t size,
size_t nmemb,
std::string* response) {
DCHECK(ptr);
DCHECK(response);
const size_t total_size = size * nmemb;
response->append(ptr, total_size);
return total_size;
}
} // namespace
class LibCurlInitializer {
public:
LibCurlInitializer() {
curl_global_init(CURL_GLOBAL_DEFAULT);
}
~LibCurlInitializer() {
curl_global_cleanup();
}
private:
DISALLOW_COPY_AND_ASSIGN(LibCurlInitializer);
};
/// Create a HTTP/HTTPS client
HttpFile::HttpFile(const char* file_name, const char* mode, bool https)
: File(file_name),
file_mode_(mode),
user_agent_(FLAGS_user_agent),
ca_file_(FLAGS_https_ca_file),
cert_file_(FLAGS_https_cert_file),
cert_private_key_file_(FLAGS_https_cert_private_key_file),
cert_private_key_pass_(FLAGS_https_cert_private_key_password),
timeout_in_seconds_(0),
cache_(FLAGS_io_cache_size),
scoped_curl(curl_easy_init(), &curl_easy_cleanup),
task_exit_event_(base::WaitableEvent::ResetPolicy::AUTOMATIC,
base::WaitableEvent::InitialState::NOT_SIGNALED) {
if (https) {
resource_url_ = "https://" + std::string(file_name);
} else {
resource_url_ = "http://" + std::string(file_name);
}
static LibCurlInitializer lib_curl_initializer;
// Setup libcurl scope
if (!scoped_curl.get()) {
LOG(ERROR) << "curl_easy_init() failed.";
// return Status(error::HTTP_FAILURE, "curl_easy_init() failed.");
delete this;
}
}
HttpFile::HttpFile(const char* file_name, const char* mode)
: HttpFile(file_name, mode, false)
{}
// Destructor
HttpFile::~HttpFile() {}
bool HttpFile::Open() {
VLOG(1) << "Opening " << resource_url() << " with file mode \"" << file_mode_ << "\".";
// Ignore read requests as they would truncate the target
// file by propagating as zero-length PUT requests.
// See also https://github.com/google/shaka-packager/issues/149#issuecomment-437203701
if (file_mode_ == "r") {
VLOG(1) << "HttpFile only supports write mode, skipping further operations";
task_exit_event_.Signal();
return false;
}
// Run progressive upload in separate thread.
base::WorkerPool::PostTask(
FROM_HERE, base::Bind(&HttpFile::CurlPut, base::Unretained(this)),
true // task_is_slow
);
return true;
}
void HttpFile::CurlPut() {
// Setup libcurl handle with HTTP PUT upload transfer mode.
std::string request_body;
Request(PUT, resource_url(), request_body, &response_body_);
}
bool HttpFile::Close() {
VLOG(1) << "Closing " << resource_url() << ".";
cache_.Close();
task_exit_event_.Wait();
delete this;
return true;
}
int64_t HttpFile::Read(void* buffer, uint64_t length) {
LOG(WARNING) << "HttpFile does not support Read().";
return -1;
}
int64_t HttpFile::Write(const void* buffer, uint64_t length) {
std::string url = resource_url();
VLOG(2) << "Writing to " << url << ", length=" << length;
// TODO: Implement retrying with exponential backoff, see
// "widevine_key_source.cc"
Status status;
uint64_t bytes_written = cache_.Write(buffer, length);
VLOG(3) << "PUT CHUNK bytes_written: " << bytes_written;
return bytes_written;
// Debugging based on response status
/*
if (status.ok()) {
VLOG(1) << "Writing chunk succeeded";
} else {
VLOG(1) << "Writing chunk failed";
if (!response_body.empty()) {
VLOG(2) << "Response:\n" << response_body;
}
}
*/
// Always signal success to the downstream pipeline
return length;
}
int64_t HttpFile::Size() {
VLOG(1) << "HttpFile does not support Size().";
return -1;
}
bool HttpFile::Flush() {
// Do nothing on Flush.
return true;
}
bool HttpFile::Seek(uint64_t position) {
VLOG(1) << "HttpFile does not support Seek().";
return false;
}
bool HttpFile::Tell(uint64_t* position) {
VLOG(1) << "HttpFile does not support Tell().";
return false;
}
// Perform HTTP request
Status HttpFile::Request(HttpMethod http_method,
const std::string& url,
const std::string& data,
std::string* response) {
// TODO: Sanity checks.
// DCHECK(http_method == GET || http_method == POST);
VLOG(1) << "Sending request to URL " << url;
// Setup HTTP method and libcurl options
SetupRequestBase(http_method, url, response);
// Setup HTTP request headers and body
SetupRequestData(data);
// Perform HTTP request
CURLcode res = curl_easy_perform(scoped_curl.get());
// Assume successful request
Status status = Status::OK;
// Handle request failure
if (res != CURLE_OK) {
std::string method_text = method_as_text(http_method);
std::string error_message = base::StringPrintf(
"%s request for %s failed. Reason: %s.", method_text.c_str(),
url.c_str(), curl_easy_strerror(res));
if (res == CURLE_HTTP_RETURNED_ERROR) {
long response_code = 0;
curl_easy_getinfo(scoped_curl.get(), CURLINFO_RESPONSE_CODE, &response_code);
error_message +=
base::StringPrintf(" Response code: %ld.", response_code);
}
// Signal error to logfile
LOG(ERROR) << error_message;
// Signal error to caller
status = Status(
res == CURLE_OPERATION_TIMEDOUT ? error::TIME_OUT : error::HTTP_FAILURE,
error_message);
}
// Signal task completion
task_exit_event_.Signal();
// Return request status to caller
return status;
}
// Configure curl_ handle with reasonable defaults
void HttpFile::SetupRequestBase(HttpMethod http_method,
const std::string& url,
std::string* response) {
response->clear();
// Configure HTTP request method/verb
switch (http_method) {
case GET:
curl_easy_setopt(scoped_curl.get(), CURLOPT_HTTPGET, 1L);
break;
case POST:
curl_easy_setopt(scoped_curl.get(), CURLOPT_POST, 1L);
break;
case PUT:
curl_easy_setopt(scoped_curl.get(), CURLOPT_PUT, 1L);
break;
case PATCH:
curl_easy_setopt(scoped_curl.get(), CURLOPT_CUSTOMREQUEST, "PATCH");
break;
}
// Configure HTTP request
curl_easy_setopt(scoped_curl.get(), CURLOPT_URL, url.c_str());
if (user_agent_.empty()) {
curl_easy_setopt(scoped_curl.get(), CURLOPT_USERAGENT, kUserAgentString);
} else {
curl_easy_setopt(scoped_curl.get(), CURLOPT_USERAGENT, user_agent_.data());
}
curl_easy_setopt(scoped_curl.get(), CURLOPT_TIMEOUT, timeout_in_seconds_);
curl_easy_setopt(scoped_curl.get(), CURLOPT_FAILONERROR, 1L);
curl_easy_setopt(scoped_curl.get(), CURLOPT_FOLLOWLOCATION, 1L);
curl_easy_setopt(scoped_curl.get(), CURLOPT_WRITEFUNCTION, AppendToString);
curl_easy_setopt(scoped_curl.get(), CURLOPT_WRITEDATA, response);
// HTTPS
if (!cert_private_key_file_.empty() && !cert_file_.empty()) {
curl_easy_setopt(scoped_curl.get(), CURLOPT_SSLKEY,
cert_private_key_file_.data());
if (!cert_private_key_pass_.empty()) {
curl_easy_setopt(scoped_curl.get(), CURLOPT_KEYPASSWD,
cert_private_key_pass_.data());
}
curl_easy_setopt(scoped_curl.get(), CURLOPT_SSLKEYTYPE, "PEM");
curl_easy_setopt(scoped_curl.get(), CURLOPT_SSLCERTTYPE, "PEM");
curl_easy_setopt(scoped_curl.get(), CURLOPT_SSLCERT, cert_file_.data());
}
if (!ca_file_.empty()) {
// Host validation needs to be off when using self-signed certificates.
curl_easy_setopt(scoped_curl.get(), CURLOPT_SSL_VERIFYHOST, 0L);
curl_easy_setopt(scoped_curl.get(), CURLOPT_CAINFO, ca_file_.data());
}
// Propagate log level indicated by "--libcurl_verbosity" to libcurl.
curl_easy_setopt(scoped_curl.get(), CURLOPT_VERBOSE, FLAGS_libcurl_verbosity);
}
// https://ec.haxx.se/callback-read.html
size_t read_callback(char* buffer, size_t size, size_t nitems, void* stream) {
VLOG(3) << "read_callback";
// Cast stream back to what is actually is
// IoCache* cache = reinterpret_cast<IoCache*>(stream);
IoCache* cache = (IoCache*)stream;
VLOG(3) << "read_callback, cache: " << cache;
// Copy cache content into buffer
size_t length = cache->Read(buffer, size * nitems);
VLOG(3) << "read_callback, length: " << length << "; buffer: " << buffer;
return length;
}
// Configure curl_ handle for HTTP PUT upload
void HttpFile::SetupRequestData(const std::string& data) {
// TODO: Sanity checks.
// if (method == POST || method == PUT || method == PATCH)
// Build list of HTTP request headers.
struct curl_slist* headers = nullptr;
headers = curl_slist_append(headers, "Content-Type: application/octet-stream");
headers = curl_slist_append(headers, "Transfer-Encoding: chunked");
// Don't stop on 200 OK responses.
headers = curl_slist_append(headers, "Expect:");
// Enable progressive upload with chunked transfer encoding.
curl_easy_setopt(scoped_curl.get(), CURLOPT_READFUNCTION, read_callback);
curl_easy_setopt(scoped_curl.get(), CURLOPT_READDATA, &cache_);
curl_easy_setopt(scoped_curl.get(), CURLOPT_UPLOAD, 1L);
// Add HTTP request headers.
curl_easy_setopt(scoped_curl.get(), CURLOPT_HTTPHEADER, headers);
}
// Return HTTP request method (verb) as string
std::string HttpFile::method_as_text(HttpMethod method) {
std::string method_text = "UNKNOWN";
switch (method) {
case GET:
method_text = "GET";
break;
case POST:
method_text = "POST";
break;
case PUT:
method_text = "PUT";
break;
case PATCH:
method_text = "PATCH";
break;
}
return method_text;
}
} // namespace shaka

106
packager/file/http_file.h Normal file
View File

@ -0,0 +1,106 @@
// Copyright 2018 Google Inc. All rights reserved.
//
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file or at
// https://developers.google.com/open-source/licenses/bsd
#ifndef PACKAGER_FILE_HTTP_H_
#define PACKAGER_FILE_HTTP_H_
#include <curl/curl.h>
#include <memory>
#include "packager/base/compiler_specific.h"
#include "packager/base/synchronization/waitable_event.h"
#include "packager/file/file.h"
#include "packager/file/io_cache.h"
#include "packager/status.h"
namespace shaka {
using ScopedCurl = std::unique_ptr<CURL, decltype(&curl_easy_cleanup)>;
/// HttpFile delegates write calls to HTTP PUT requests.
///
/// About how to use this, please visit the corresponding documentation [1,2].
///
/// [1] https://google.github.io/shaka-packager/html/tutorials/http_upload.html
/// [2]
/// https://github.com/3QSDN/shaka-packager/blob/http-upload/docs/source/tutorials/http_upload.rst
///
class HttpFile : public File {
public:
/// Create a HTTP client
/// @param file_name contains the url of the resource to be accessed.
/// Note that the file type prefix should be stripped off already.
/// @param mode contains file access mode. Implementation dependent.
HttpFile(const char* file_name, const char* mode, bool https);
HttpFile(const char* file_name, const char* mode);
/// @name File implementation overrides.
/// @{
bool Close() override;
int64_t Read(void* buffer, uint64_t length) override;
int64_t Write(const void* buffer, uint64_t length) override;
int64_t Size() override;
bool Flush() override;
bool Seek(uint64_t position) override;
bool Tell(uint64_t* position) override;
/// @}
/// @return The full resource url
const std::string& resource_url() const { return resource_url_; }
protected:
// Destructor
~HttpFile() override;
bool Open() override;
private:
enum HttpMethod {
GET,
POST,
PUT,
PATCH,
};
HttpFile(const HttpFile&) = delete;
HttpFile& operator=(const HttpFile&) = delete;
// Internal implementation of HTTP functions, e.g. Get and Post.
Status Request(HttpMethod http_method,
const std::string& url,
const std::string& data,
std::string* response);
void SetupRequestBase(HttpMethod http_method,
const std::string& url,
std::string* response);
void SetupRequestData(const std::string& data);
void CurlPut();
std::string method_as_text(HttpMethod method);
std::string file_mode_;
std::string resource_url_;
std::string user_agent_;
std::string ca_file_;
std::string cert_file_;
std::string cert_private_key_file_;
std::string cert_private_key_pass_;
const uint32_t timeout_in_seconds_;
IoCache cache_;
ScopedCurl scoped_curl;
std::string response_body_;
// Signaled when the "curl easy perform" task completes.
base::WaitableEvent task_exit_event_;
};
} // namespace shaka
#endif // PACKAGER_FILE_HTTP_H_

View File

@ -0,0 +1,25 @@
#include "packager/file/http_file.h"
#include <gtest/gtest.h>
#include <memory>
#include "packager/file/file.h"
#include "packager/file/file_closer.h"
namespace shaka {
namespace {
const uint8_t kWriteBuffer[] = {1, 2, 3, 4, 5, 6, 7, 8};
const int64_t kWriteBufferSize = sizeof(kWriteBuffer);
} // namespace
class HttpFileTest : public testing::Test {};
TEST_F(HttpFileTest, PutChunkedTranser) {
std::unique_ptr<File, FileCloser> writer(
File::Open("http://127.0.0.1:8080/test_out", "w"));
ASSERT_TRUE(writer);
ASSERT_EQ(kWriteBufferSize, writer->Write(kWriteBuffer, kWriteBufferSize));
writer.release()->Close();
}
} // namespace shaka

View File

@ -124,6 +124,10 @@ int64_t LocalFile::Read(void* buffer, uint64_t length) {
} }
int64_t LocalFile::Write(const void* buffer, uint64_t length) { int64_t LocalFile::Write(const void* buffer, uint64_t length) {
base::FilePath file_path(base::FilePath::FromUTF8Unsafe(file_name()));
VLOG(2) << "Writing to " << file_path.AsUTF8Unsafe() << ", length=" << length;
DCHECK(buffer != NULL); DCHECK(buffer != NULL);
DCHECK(internal_file_ != NULL); DCHECK(internal_file_ != NULL);
size_t bytes_written = fwrite(buffer, sizeof(char), length, internal_file_); size_t bytes_written = fwrite(buffer, sizeof(char), length, internal_file_);

View File

@ -70,8 +70,8 @@ void BufferWriter::AppendBuffer(const BufferWriter& buffer) {
Status BufferWriter::WriteToFile(File* file) { Status BufferWriter::WriteToFile(File* file) {
DCHECK(file); DCHECK(file);
DCHECK(!buf_.empty()); DCHECK(!buf_.empty());
size_t remaining_size = buf_.size(); size_t remaining_size = buf_.size();
VLOG(1) << "BufferWriter::WriteToFile " << file->file_name() << " with " << remaining_size << " octets";
const uint8_t* buf = &buf_[0]; const uint8_t* buf = &buf_[0];
while (remaining_size > 0) { while (remaining_size > 0) {
int64_t size_written = file->Write(buf, remaining_size); int64_t size_written = file->Write(buf, remaining_size);

View File

@ -147,13 +147,13 @@ Status TsSegmenter::WritePesPackets() {
if (listener_ && IsVideoCodec(codec_) && pes_packet->is_key_frame()) { if (listener_ && IsVideoCodec(codec_) && pes_packet->is_key_frame()) {
uint64_t start_pos = segment_buffer_.Size(); uint64_t start_pos = segment_buffer_.Size();
const int64_t timestamp = pes_packet->pts(); const int64_t timestamp = pes_packet->pts();
if (!ts_writer_->AddPesPacket(std::move(pes_packet), &segment_buffer_)) if (!ts_writer_->AddPesPacket(std::move(pes_packet), &segment_buffer_))
return Status(error::MUXER_FAILURE, "Failed to add PES packet."); return Status(error::MUXER_FAILURE, "Failed to add PES packet.");
uint64_t end_pos = segment_buffer_.Size(); uint64_t end_pos = segment_buffer_.Size();
listener_->OnKeyFrame(timestamp, start_pos, end_pos - start_pos); listener_->OnKeyFrame(timestamp, start_pos, end_pos - start_pos);
} else { } else {
if (!ts_writer_->AddPesPacket(std::move(pes_packet), &segment_buffer_)) if (!ts_writer_->AddPesPacket(std::move(pes_packet), &segment_buffer_))
@ -181,20 +181,20 @@ Status TsSegmenter::FinalizeSegment(uint64_t start_timestamp,
segment_number_++, muxer_options_.bandwidth); segment_number_++, muxer_options_.bandwidth);
const int64_t file_size = segment_buffer_.Size(); const int64_t file_size = segment_buffer_.Size();
std::unique_ptr<File, FileCloser> segment_file; std::unique_ptr<File, FileCloser> segment_file;
segment_file.reset(File::Open(segment_path.c_str(), "w")); segment_file.reset(File::Open(segment_path.c_str(), "w"));
if (!segment_file) { if (!segment_file) {
return Status(error::FILE_FAILURE, return Status(error::FILE_FAILURE,
"Cannot open file for write " + segment_path); "Cannot open file for write " + segment_path);
} }
RETURN_IF_ERROR(segment_buffer_.WriteToFile(segment_file.get())); RETURN_IF_ERROR(segment_buffer_.WriteToFile(segment_file.get()));
if (!segment_file.release()->Close()) { if (!segment_file.release()->Close()) {
return Status( return Status(
error::FILE_FAILURE, error::FILE_FAILURE,
"Cannot close file " + segment_path + "Cannot close file " + segment_path +
", possibly file permission issue or running out of disk space."); ", possibly file permission issue or running out of disk space.");
} }
if (listener_) { if (listener_) {
@ -204,7 +204,7 @@ Status TsSegmenter::FinalizeSegment(uint64_t start_timestamp,
duration * timescale_scale_, file_size); duration * timescale_scale_, file_size);
} }
segment_started_ = false; segment_started_ = false;
return Status::OK; return Status::OK;
} }

View File

@ -69,7 +69,7 @@ class TsSegmenter {
/// Only for testing. /// Only for testing.
void SetSegmentStartedForTesting(bool value); void SetSegmentStartedForTesting(bool value);
private: private:
Status StartSegmentIfNeeded(int64_t next_pts); Status StartSegmentIfNeeded(int64_t next_pts);
@ -92,7 +92,7 @@ class TsSegmenter {
uint64_t segment_number_ = 0; uint64_t segment_number_ = 0;
std::unique_ptr<TsWriter> ts_writer_; std::unique_ptr<TsWriter> ts_writer_;
BufferWriter segment_buffer_; BufferWriter segment_buffer_;
// Set to true if segment_buffer_ is initialized, set to false after // Set to true if segment_buffer_ is initialized, set to false after

View File

@ -166,7 +166,6 @@ TsWriter::TsWriter(std::unique_ptr<ProgramMapTableWriter> pmt_writer)
TsWriter::~TsWriter() {} TsWriter::~TsWriter() {}
bool TsWriter::NewSegment(BufferWriter* buffer) { bool TsWriter::NewSegment(BufferWriter* buffer) {
BufferWriter psi; BufferWriter psi;
WritePatToBuffer(kPat, arraysize(kPat), &pat_continuity_counter_, &psi); WritePatToBuffer(kPat, arraysize(kPat), &pat_continuity_counter_, &psi);
if (encrypted_) { if (encrypted_) {

View File

@ -107,6 +107,7 @@ Status PackedAudioWriter::WriteSegment(const std::string& segment_path,
range.end = range.start + segment_buffer->Size() - 1; range.end = range.start + segment_buffer->Size() - 1;
media_ranges_.subsegment_ranges.push_back(range); media_ranges_.subsegment_ranges.push_back(range);
} else { } else {
VLOG(2) << "PackedAudioWriter::WriteSegment: File::Open(" << segment_path << ")";
file.reset(File::Open(segment_path.c_str(), "w")); file.reset(File::Open(segment_path.c_str(), "w"));
if (!file) { if (!file) {
return Status(error::FILE_FAILURE, return Status(error::FILE_FAILURE,