Add udp options support

Three options are supported right now:
- reuse=1|0
  Allow or disallow reusing UDP sockets. Default 0.
- interface=a.b.c.d
  Address of the interface over which to receive UDP multicast
  streams.
- timeout=microseconds
  Timeout in microseconds. Default to unlimited.

A UDP url should be of the form udp://ip:port[?options], here
is an example:
  udp://224.1.2.30:88?reuse=1&interface=10.11.12.13&timeout=12345

Also deprecate --udp_interface_address flag in favor udp options.

Closes #133

Change-Id: I962d35bfedc1779d67ba20ed910207c66b7c1a15
This commit is contained in:
Kongqun Yang 2016-10-04 18:17:07 -07:00 committed by KongQun Yang
parent 4c6e5f4fa6
commit 458fadc6a8
7 changed files with 343 additions and 81 deletions

View File

@ -263,6 +263,17 @@ packager \
--mpd_output live.mpd --mpd_output live.mpd
``` ```
A UDP url is of the form udp://ip:port[?options]. Here is an example:
udp://224.1.1.5:5003?reuse=1&interface=10.11.12.13&timeout=1234567.
Three options are supported right now:
- reuse=1|0
Allow or disallow reusing UDP sockets. Default to 0.
- interface=interface_ip_address
Address of the interface over which to receive UDP multicast streams.
- timeout=microseconds
Timeout in microseconds. Default to unlimited.
Demux video from the input and generate an encrypted fragmented mp4 using Demux video from the input and generate an encrypted fragmented mp4 using
Widevine encryption with RSA signing key file *widevine_test_private.der*: Widevine encryption with RSA signing key file *widevine_test_private.der*:
```Shell ```Shell

View File

@ -146,7 +146,6 @@ bool InsertStreamDescriptor(const std::string& descriptor_string,
LOG(ERROR) << "Unknown field in stream descriptor (\"" << iter->first LOG(ERROR) << "Unknown field in stream descriptor (\"" << iter->first
<< "\")."; << "\").";
return false; return false;
break;
} }
} }
// Validate and insert the descriptor // Validate and insert the descriptor

View File

@ -27,6 +27,8 @@
'threaded_io_file.cc', 'threaded_io_file.cc',
'threaded_io_file.h', 'threaded_io_file.h',
'udp_file.h', 'udp_file.h',
'udp_options.cc',
'udp_options.h',
], ],
'conditions': [ 'conditions': [
['OS == "win"', { ['OS == "win"', {
@ -53,6 +55,7 @@
'file_util_unittest.cc', 'file_util_unittest.cc',
'io_cache_unittest.cc', 'io_cache_unittest.cc',
'memory_file_unittest.cc', 'memory_file_unittest.cc',
'udp_options_unittest.cc',
], ],
'dependencies': [ 'dependencies': [
'../../testing/gtest.gyp:gtest', '../../testing/gtest.gyp:gtest',

View File

@ -8,7 +8,6 @@
#include <arpa/inet.h> #include <arpa/inet.h>
#include <errno.h> #include <errno.h>
#include <gflags/gflags.h>
#include <strings.h> #include <strings.h>
#include <sys/socket.h> #include <sys/socket.h>
#include <unistd.h> #include <unistd.h>
@ -16,15 +15,10 @@
#include <limits> #include <limits>
#include "packager/base/logging.h" #include "packager/base/logging.h"
#include "packager/base/strings/string_number_conversions.h" #include "packager/media/file/udp_options.h"
// TODO(tinskip): Adapt to work with winsock. // TODO(tinskip): Adapt to work with winsock.
DEFINE_string(udp_interface_address,
"0.0.0.0",
"IP address of the interface over which to receive UDP unicast"
" or multicast streams");
namespace shaka { namespace shaka {
namespace media { namespace media {
@ -32,51 +26,8 @@ namespace {
const int kInvalidSocket(-1); const int kInvalidSocket(-1);
bool StringToIpv4Address(const std::string& addr_in, uint32_t* addr_out) { bool IsIpv4MulticastAddress(const struct in_addr& addr) {
DCHECK(addr_out); return (ntohl(addr.s_addr) & 0xf0000000) == 0xe0000000;
*addr_out = 0;
size_t start_pos(0);
size_t end_pos(0);
for (int i = 0; i < 4; ++i) {
end_pos = addr_in.find('.', start_pos);
if ((end_pos == std::string::npos) != (i == 3))
return false;
unsigned addr_byte;
if (!base::StringToUint(addr_in.substr(start_pos, end_pos - start_pos),
&addr_byte)
|| (addr_byte > 255))
return false;
*addr_out <<= 8;
*addr_out |= addr_byte;
start_pos = end_pos + 1;
}
return true;
}
bool StringToIpv4AddressAndPort(const std::string& addr_and_port,
uint32_t* addr,
uint16_t* port) {
DCHECK(addr);
DCHECK(port);
size_t colon_pos = addr_and_port.find(':');
if (colon_pos == std::string::npos) {
return false;
}
if (!StringToIpv4Address(addr_and_port.substr(0, colon_pos), addr))
return false;
unsigned port_value;
if (!base::StringToUint(addr_and_port.substr(colon_pos + 1),
&port_value) ||
(port_value > 65535))
return false;
*port = port_value;
return true;
}
bool IsIpv4MulticastAddress(uint32_t addr) {
return (addr & 0xf0000000) == 0xe0000000;
} }
} // anonymous namespace } // anonymous namespace
@ -166,15 +117,10 @@ class ScopedSocket {
bool UdpFile::Open() { bool UdpFile::Open() {
DCHECK_EQ(kInvalidSocket, socket_); DCHECK_EQ(kInvalidSocket, socket_);
// TODO(tinskip): Support IPv6 addresses. std::unique_ptr<UdpOptions> options =
uint32_t dest_addr; UdpOptions::ParseFromString(file_name());
uint16_t dest_port; if (!options)
if (!StringToIpv4AddressAndPort(file_name(),
&dest_addr,
&dest_port)) {
LOG(ERROR) << "Malformed IPv4 address:port UDP stream specifier.";
return false; return false;
}
ScopedSocket new_socket(socket(AF_INET, SOCK_DGRAM, 0)); ScopedSocket new_socket(socket(AF_INET, SOCK_DGRAM, 0));
if (new_socket.get() == kInvalidSocket) { if (new_socket.get() == kInvalidSocket) {
@ -184,10 +130,16 @@ bool UdpFile::Open() {
struct sockaddr_in local_sock_addr; struct sockaddr_in local_sock_addr;
bzero(&local_sock_addr, sizeof(local_sock_addr)); bzero(&local_sock_addr, sizeof(local_sock_addr));
// TODO(kqyang): Support IPv6.
local_sock_addr.sin_family = AF_INET; local_sock_addr.sin_family = AF_INET;
local_sock_addr.sin_port = htons(dest_port); local_sock_addr.sin_port = htons(options->port());
local_sock_addr.sin_addr.s_addr = htonl(dest_addr); if (inet_pton(AF_INET, options->address().c_str(),
// We do not need to require exclusive bind of udp sockets &local_sock_addr.sin_addr) != 1) {
LOG(ERROR) << "Malformed IPv4 address " << options->address();
return false;
}
if (options->reuse()) {
const int optval = 1; const int optval = 1;
if (setsockopt(new_socket.get(), SOL_SOCKET, SO_REUSEADDR, &optval, if (setsockopt(new_socket.get(), SOL_SOCKET, SO_REUSEADDR, &optval,
sizeof(optval)) < 0) { sizeof(optval)) < 0) {
@ -195,6 +147,8 @@ bool UdpFile::Open() {
<< "Could not apply the SO_REUSEADDR property to the UDP socket"; << "Could not apply the SO_REUSEADDR property to the UDP socket";
return false; return false;
} }
}
if (bind(new_socket.get(), if (bind(new_socket.get(),
reinterpret_cast<struct sockaddr*>(&local_sock_addr), reinterpret_cast<struct sockaddr*>(&local_sock_addr),
sizeof(local_sock_addr))) { sizeof(local_sock_addr))) {
@ -202,25 +156,42 @@ bool UdpFile::Open() {
return false; return false;
} }
if (IsIpv4MulticastAddress(dest_addr)) { if (IsIpv4MulticastAddress(local_sock_addr.sin_addr)) {
uint32_t if_addr; struct ip_mreq multicast_group;
if (!StringToIpv4Address(FLAGS_udp_interface_address, &if_addr)) { multicast_group.imr_multiaddr = local_sock_addr.sin_addr;
LOG(ERROR) << "Malformed IPv4 address for interface.";
if (options->interface_address().empty()) {
LOG(ERROR) << "Interface address is required for multicast, which can be "
"specified in udp url, e.g. "
"udp://ip:port?interface=interface_ip.";
return false; return false;
} }
struct ip_mreq multicast_group; if (inet_pton(AF_INET, options->interface_address().c_str(),
multicast_group.imr_multiaddr.s_addr = htonl(dest_addr); &multicast_group.imr_interface) != 1) {
multicast_group.imr_interface.s_addr = htonl(if_addr); LOG(ERROR) << "Malformed IPv4 interface address "
if (setsockopt(new_socket.get(), << options->interface_address();
IPPROTO_IP, return false;
IP_ADD_MEMBERSHIP, }
&multicast_group,
sizeof(multicast_group)) < 0) { if (setsockopt(new_socket.get(), IPPROTO_IP, IP_ADD_MEMBERSHIP,
&multicast_group, sizeof(multicast_group)) < 0) {
LOG(ERROR) << "Failed to join multicast group."; LOG(ERROR) << "Failed to join multicast group.";
return false; return false;
} }
} }
// Set timeout if needed.
if (options->timeout_us() != 0) {
struct timeval tv;
tv.tv_sec = options->timeout_us() / 1000000;
tv.tv_usec = options->timeout_us() % 1000000;
if (setsockopt(new_socket.get(), SOL_SOCKET, SO_RCVTIMEO,
reinterpret_cast<char*>(&tv), sizeof(tv)) < 0) {
LOG(ERROR) << "Failed to set socket timeout.";
return false;
}
}
socket_ = new_socket.release(); socket_ = new_socket.release();
return true; return true;
} }

View File

@ -0,0 +1,132 @@
// Copyright 2016 Google Inc. All rights reserved.
//
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file or at
// https://developers.google.com/open-source/licenses/bsd
#include "packager/media/file/udp_options.h"
#include <gflags/gflags.h>
#include "packager/base/strings/string_number_conversions.h"
#include "packager/base/strings/string_split.h"
DEFINE_string(udp_interface_address,
"",
"IP address of the interface over which to receive UDP unicast"
" or multicast streams");
namespace shaka {
namespace media {
namespace {
enum FieldType {
kUnknownField = 0,
kReuseField,
kInterfaceAddressField,
kTimeoutField,
};
struct FieldNameToTypeMapping {
const char* field_name;
FieldType field_type;
};
const FieldNameToTypeMapping kFieldNameTypeMappings[] = {
{"reuse", kReuseField},
{"interface", kInterfaceAddressField},
{"source", kInterfaceAddressField},
{"timeout", kTimeoutField},
};
FieldType GetFieldType(const std::string& field_name) {
for (size_t idx = 0; idx < arraysize(kFieldNameTypeMappings); ++idx) {
if (field_name == kFieldNameTypeMappings[idx].field_name)
return kFieldNameTypeMappings[idx].field_type;
}
return kUnknownField;
}
bool StringToAddressAndPort(base::StringPiece addr_and_port,
std::string* addr,
uint16_t* port) {
DCHECK(addr);
DCHECK(port);
const size_t colon_pos = addr_and_port.find(':');
if (colon_pos == base::StringPiece::npos) {
return false;
}
*addr = addr_and_port.substr(0, colon_pos).as_string();
unsigned port_value;
if (!base::StringToUint(addr_and_port.substr(colon_pos + 1), &port_value) ||
(port_value > 65535)) {
return false;
}
*port = port_value;
return true;
}
} // namespace
std::unique_ptr<UdpOptions> UdpOptions::ParseFromString(
base::StringPiece udp_url) {
std::unique_ptr<UdpOptions> options(new UdpOptions);
const size_t question_mark_pos = udp_url.find('?');
base::StringPiece address_str = udp_url.substr(0, question_mark_pos);
if (question_mark_pos != base::StringPiece::npos) {
base::StringPiece options_str = udp_url.substr(question_mark_pos + 1);
base::StringPairs pairs;
if (!base::SplitStringIntoKeyValuePairs(options_str, '=', '&', &pairs)) {
LOG(ERROR) << "Invalid udp options name/value pairs " << options_str;
return nullptr;
}
for (const auto& pair : pairs) {
switch (GetFieldType(pair.first)) {
case kReuseField: {
int reuse_value = 0;
if (!base::StringToInt(pair.second, &reuse_value)) {
LOG(ERROR) << "Invalid udp option for reuse field " << pair.second;
return nullptr;
}
options->reuse_ = reuse_value > 0;
break;
}
case kInterfaceAddressField:
options->interface_address_ = pair.second;
break;
case kTimeoutField:
if (!base::StringToUint(pair.second, &options->timeout_us_)) {
LOG(ERROR) << "Invalid udp option for timeout field " << pair.second;
return nullptr;
}
break;
default:
LOG(ERROR) << "Unknown field in udp options (\"" << pair.first
<< "\").";
return nullptr;
}
}
}
if (!FLAGS_udp_interface_address.empty()) {
LOG(WARNING) << "--udp_interface_address is deprecated. Consider switching "
"to udp options instead, something like "
"udp:://ip:port?interface=interface_ip.";
options->interface_address_ = FLAGS_udp_interface_address;
}
if (!StringToAddressAndPort(address_str, &options->address_,
&options->port_)) {
LOG(ERROR) << "Malformed address:port UDP url " << address_str;
return nullptr;
}
return options;
}
} // namespace media
} // namespace shaka

View File

@ -0,0 +1,46 @@
// Copyright 2016 Google Inc. All rights reserved.
//
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file or at
// https://developers.google.com/open-source/licenses/bsd
#include <memory>
#include <string>
#include "packager/base/strings/string_piece.h"
namespace shaka {
namespace media {
/// Options parsed from UDP url string of the form: udp://ip:port[?options]
class UdpOptions {
public:
~UdpOptions() = default;
/// Parse from UDP url.
/// @param udp_url is the url of the form udp://ip:port[?options]
/// @returns a UdpOptions object on success, nullptr otherwise.
static std::unique_ptr<UdpOptions> ParseFromString(base::StringPiece udp_url);
const std::string& address() const { return address_; }
uint16_t port() const { return port_; }
bool reuse() const { return reuse_; }
const std::string& interface_address() const { return interface_address_; }
unsigned timeout_us() const { return timeout_us_; }
private:
UdpOptions() = default;
/// IP Address.
std::string address_;
uint16_t port_ = 0;
/// Allow or disallow reusing UDP sockets.
bool reuse_ = false;
// Address of the interface over which to receive UDP multicast streams.
std::string interface_address_;
/// Timeout in microseconds. 0 to indicate unlimited timeout.
unsigned timeout_us_ = 0;
};
} // namespace media
} // namespace shaka

View File

@ -0,0 +1,100 @@
// Copyright 2016 Google Inc. All rights reserved.
//
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file or at
// https://developers.google.com/open-source/licenses/bsd
#include "packager/media/file/udp_options.h"
#include <gflags/gflags.h>
#include <gtest/gtest.h>
DECLARE_string(udp_interface_address);
namespace shaka {
namespace media {
class UdpOptionsTest : public testing::Test {
public:
void SetUp() override { FLAGS_udp_interface_address = ""; }
};
TEST_F(UdpOptionsTest, AddressAndPort) {
auto options = UdpOptions::ParseFromString("224.1.2.30:88");
ASSERT_TRUE(options);
EXPECT_EQ("224.1.2.30", options->address());
EXPECT_EQ(88u, options->port());
// The below fields are not set.
EXPECT_FALSE(options->reuse());
EXPECT_EQ("", options->interface_address());
EXPECT_EQ(0u, options->timeout_us());
}
TEST_F(UdpOptionsTest, MissingPort) {
ASSERT_FALSE(UdpOptions::ParseFromString("224.1.2.30"));
ASSERT_FALSE(UdpOptions::ParseFromString("224.1.2.30:"));
}
TEST_F(UdpOptionsTest, InvalidPort) {
ASSERT_FALSE(UdpOptions::ParseFromString("224.1.2.30:888888"));
ASSERT_FALSE(UdpOptions::ParseFromString("224.1.2.30:abcd"));
}
TEST_F(UdpOptionsTest, MissingAddress) {
ASSERT_FALSE(UdpOptions::ParseFromString(":888888"));
ASSERT_FALSE(UdpOptions::ParseFromString("888888"));
}
TEST_F(UdpOptionsTest, UdpInterfaceAddressFlag) {
FLAGS_udp_interface_address = "10.11.12.13";
auto options = UdpOptions::ParseFromString("224.1.2.30:88");
ASSERT_TRUE(options);
EXPECT_EQ("224.1.2.30", options->address());
EXPECT_EQ(88u, options->port());
// The below fields are not set.
EXPECT_FALSE(options->reuse());
EXPECT_EQ("10.11.12.13", options->interface_address());
EXPECT_EQ(0u, options->timeout_us());
}
TEST_F(UdpOptionsTest, Reuse) {
auto options = UdpOptions::ParseFromString("224.1.2.30:88?reuse=1");
EXPECT_EQ("224.1.2.30", options->address());
EXPECT_EQ(88u, options->port());
EXPECT_TRUE(options->reuse());
EXPECT_EQ("", options->interface_address());
EXPECT_EQ(0u, options->timeout_us());
}
TEST_F(UdpOptionsTest, InvalidReuse) {
ASSERT_FALSE(UdpOptions::ParseFromString("224.1.2.30:88?reuse=7bd"));
}
TEST_F(UdpOptionsTest, InterfaceAddress) {
auto options = UdpOptions::ParseFromString(
"224.1.2.30:88?reuse=0&interface=10.11.12.13");
EXPECT_EQ("224.1.2.30", options->address());
EXPECT_EQ(88u, options->port());
EXPECT_FALSE(options->reuse());
EXPECT_EQ("10.11.12.13", options->interface_address());
EXPECT_EQ(0u, options->timeout_us());
}
TEST_F(UdpOptionsTest, Timeout) {
auto options = UdpOptions::ParseFromString(
"224.1.2.30:88?source=10.11.12.13&timeout=88888888");
EXPECT_EQ("224.1.2.30", options->address());
EXPECT_EQ(88u, options->port());
EXPECT_FALSE(options->reuse());
EXPECT_EQ("10.11.12.13", options->interface_address());
EXPECT_EQ(88888888u, options->timeout_us());
}
TEST_F(UdpOptionsTest, InvalidTimeout) {
ASSERT_FALSE(UdpOptions::ParseFromString(
"224.1.2.30:88?source=10.11.12.13&timeout=1a9"));
}
} // namespace media
} // namespace shaka