From 458fadc6a81a028e61b5e2a5bfe59da5edea8f8e Mon Sep 17 00:00:00 2001 From: Kongqun Yang Date: Tue, 4 Oct 2016 18:17:07 -0700 Subject: [PATCH] Add udp options support Three options are supported right now: - reuse=1|0 Allow or disallow reusing UDP sockets. Default 0. - interface=a.b.c.d Address of the interface over which to receive UDP multicast streams. - timeout=microseconds Timeout in microseconds. Default to unlimited. A UDP url should be of the form udp://ip:port[?options], here is an example: udp://224.1.2.30:88?reuse=1&interface=10.11.12.13&timeout=12345 Also deprecate --udp_interface_address flag in favor udp options. Closes #133 Change-Id: I962d35bfedc1779d67ba20ed910207c66b7c1a15 --- README.md | 11 ++ packager/app/stream_descriptor.cc | 1 - packager/media/file/file.gyp | 3 + packager/media/file/udp_file_posix.cc | 131 ++++++++----------- packager/media/file/udp_options.cc | 132 ++++++++++++++++++++ packager/media/file/udp_options.h | 46 +++++++ packager/media/file/udp_options_unittest.cc | 100 +++++++++++++++ 7 files changed, 343 insertions(+), 81 deletions(-) create mode 100644 packager/media/file/udp_options.cc create mode 100644 packager/media/file/udp_options.h create mode 100644 packager/media/file/udp_options_unittest.cc diff --git a/README.md b/README.md index 2855b4b012..fa62684a21 100644 --- a/README.md +++ b/README.md @@ -263,6 +263,17 @@ packager \ --mpd_output live.mpd ``` +A UDP url is of the form udp://ip:port[?options]. Here is an example: +udp://224.1.1.5:5003?reuse=1&interface=10.11.12.13&timeout=1234567. + +Three options are supported right now: +- reuse=1|0 + Allow or disallow reusing UDP sockets. Default to 0. +- interface=interface_ip_address + Address of the interface over which to receive UDP multicast streams. +- timeout=microseconds + Timeout in microseconds. Default to unlimited. + Demux video from the input and generate an encrypted fragmented mp4 using Widevine encryption with RSA signing key file *widevine_test_private.der*: ```Shell diff --git a/packager/app/stream_descriptor.cc b/packager/app/stream_descriptor.cc index b36e5d92c7..21954aa534 100644 --- a/packager/app/stream_descriptor.cc +++ b/packager/app/stream_descriptor.cc @@ -146,7 +146,6 @@ bool InsertStreamDescriptor(const std::string& descriptor_string, LOG(ERROR) << "Unknown field in stream descriptor (\"" << iter->first << "\")."; return false; - break; } } // Validate and insert the descriptor diff --git a/packager/media/file/file.gyp b/packager/media/file/file.gyp index ad164284e6..ed7d8af97d 100644 --- a/packager/media/file/file.gyp +++ b/packager/media/file/file.gyp @@ -27,6 +27,8 @@ 'threaded_io_file.cc', 'threaded_io_file.h', 'udp_file.h', + 'udp_options.cc', + 'udp_options.h', ], 'conditions': [ ['OS == "win"', { @@ -53,6 +55,7 @@ 'file_util_unittest.cc', 'io_cache_unittest.cc', 'memory_file_unittest.cc', + 'udp_options_unittest.cc', ], 'dependencies': [ '../../testing/gtest.gyp:gtest', diff --git a/packager/media/file/udp_file_posix.cc b/packager/media/file/udp_file_posix.cc index 48a62ac64f..769dada00c 100644 --- a/packager/media/file/udp_file_posix.cc +++ b/packager/media/file/udp_file_posix.cc @@ -8,7 +8,6 @@ #include #include -#include #include #include #include @@ -16,15 +15,10 @@ #include #include "packager/base/logging.h" -#include "packager/base/strings/string_number_conversions.h" +#include "packager/media/file/udp_options.h" // TODO(tinskip): Adapt to work with winsock. -DEFINE_string(udp_interface_address, - "0.0.0.0", - "IP address of the interface over which to receive UDP unicast" - " or multicast streams"); - namespace shaka { namespace media { @@ -32,51 +26,8 @@ namespace { const int kInvalidSocket(-1); -bool StringToIpv4Address(const std::string& addr_in, uint32_t* addr_out) { - DCHECK(addr_out); - - *addr_out = 0; - size_t start_pos(0); - size_t end_pos(0); - for (int i = 0; i < 4; ++i) { - end_pos = addr_in.find('.', start_pos); - if ((end_pos == std::string::npos) != (i == 3)) - return false; - unsigned addr_byte; - if (!base::StringToUint(addr_in.substr(start_pos, end_pos - start_pos), - &addr_byte) - || (addr_byte > 255)) - return false; - *addr_out <<= 8; - *addr_out |= addr_byte; - start_pos = end_pos + 1; - } - return true; -} - -bool StringToIpv4AddressAndPort(const std::string& addr_and_port, - uint32_t* addr, - uint16_t* port) { - DCHECK(addr); - DCHECK(port); - - size_t colon_pos = addr_and_port.find(':'); - if (colon_pos == std::string::npos) { - return false; - } - if (!StringToIpv4Address(addr_and_port.substr(0, colon_pos), addr)) - return false; - unsigned port_value; - if (!base::StringToUint(addr_and_port.substr(colon_pos + 1), - &port_value) || - (port_value > 65535)) - return false; - *port = port_value; - return true; -} - -bool IsIpv4MulticastAddress(uint32_t addr) { - return (addr & 0xf0000000) == 0xe0000000; +bool IsIpv4MulticastAddress(const struct in_addr& addr) { + return (ntohl(addr.s_addr) & 0xf0000000) == 0xe0000000; } } // anonymous namespace @@ -166,15 +117,10 @@ class ScopedSocket { bool UdpFile::Open() { DCHECK_EQ(kInvalidSocket, socket_); - // TODO(tinskip): Support IPv6 addresses. - uint32_t dest_addr; - uint16_t dest_port; - if (!StringToIpv4AddressAndPort(file_name(), - &dest_addr, - &dest_port)) { - LOG(ERROR) << "Malformed IPv4 address:port UDP stream specifier."; + std::unique_ptr options = + UdpOptions::ParseFromString(file_name()); + if (!options) return false; - } ScopedSocket new_socket(socket(AF_INET, SOCK_DGRAM, 0)); if (new_socket.get() == kInvalidSocket) { @@ -184,17 +130,25 @@ bool UdpFile::Open() { struct sockaddr_in local_sock_addr; bzero(&local_sock_addr, sizeof(local_sock_addr)); + // TODO(kqyang): Support IPv6. local_sock_addr.sin_family = AF_INET; - local_sock_addr.sin_port = htons(dest_port); - local_sock_addr.sin_addr.s_addr = htonl(dest_addr); - // We do not need to require exclusive bind of udp sockets - const int optval = 1; - if (setsockopt(new_socket.get(), SOL_SOCKET, SO_REUSEADDR, &optval, - sizeof(optval)) < 0) { - LOG(ERROR) - << "Could not apply the SO_REUSEADDR property to the UDP socket"; + local_sock_addr.sin_port = htons(options->port()); + if (inet_pton(AF_INET, options->address().c_str(), + &local_sock_addr.sin_addr) != 1) { + LOG(ERROR) << "Malformed IPv4 address " << options->address(); return false; } + + if (options->reuse()) { + const int optval = 1; + if (setsockopt(new_socket.get(), SOL_SOCKET, SO_REUSEADDR, &optval, + sizeof(optval)) < 0) { + LOG(ERROR) + << "Could not apply the SO_REUSEADDR property to the UDP socket"; + return false; + } + } + if (bind(new_socket.get(), reinterpret_cast(&local_sock_addr), sizeof(local_sock_addr))) { @@ -202,25 +156,42 @@ bool UdpFile::Open() { return false; } - if (IsIpv4MulticastAddress(dest_addr)) { - uint32_t if_addr; - if (!StringToIpv4Address(FLAGS_udp_interface_address, &if_addr)) { - LOG(ERROR) << "Malformed IPv4 address for interface."; + if (IsIpv4MulticastAddress(local_sock_addr.sin_addr)) { + struct ip_mreq multicast_group; + multicast_group.imr_multiaddr = local_sock_addr.sin_addr; + + if (options->interface_address().empty()) { + LOG(ERROR) << "Interface address is required for multicast, which can be " + "specified in udp url, e.g. " + "udp://ip:port?interface=interface_ip."; return false; } - struct ip_mreq multicast_group; - multicast_group.imr_multiaddr.s_addr = htonl(dest_addr); - multicast_group.imr_interface.s_addr = htonl(if_addr); - if (setsockopt(new_socket.get(), - IPPROTO_IP, - IP_ADD_MEMBERSHIP, - &multicast_group, - sizeof(multicast_group)) < 0) { + if (inet_pton(AF_INET, options->interface_address().c_str(), + &multicast_group.imr_interface) != 1) { + LOG(ERROR) << "Malformed IPv4 interface address " + << options->interface_address(); + return false; + } + + if (setsockopt(new_socket.get(), IPPROTO_IP, IP_ADD_MEMBERSHIP, + &multicast_group, sizeof(multicast_group)) < 0) { LOG(ERROR) << "Failed to join multicast group."; return false; } } + // Set timeout if needed. + if (options->timeout_us() != 0) { + struct timeval tv; + tv.tv_sec = options->timeout_us() / 1000000; + tv.tv_usec = options->timeout_us() % 1000000; + if (setsockopt(new_socket.get(), SOL_SOCKET, SO_RCVTIMEO, + reinterpret_cast(&tv), sizeof(tv)) < 0) { + LOG(ERROR) << "Failed to set socket timeout."; + return false; + } + } + socket_ = new_socket.release(); return true; } diff --git a/packager/media/file/udp_options.cc b/packager/media/file/udp_options.cc new file mode 100644 index 0000000000..affd6be034 --- /dev/null +++ b/packager/media/file/udp_options.cc @@ -0,0 +1,132 @@ +// Copyright 2016 Google Inc. All rights reserved. +// +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file or at +// https://developers.google.com/open-source/licenses/bsd + +#include "packager/media/file/udp_options.h" + +#include + +#include "packager/base/strings/string_number_conversions.h" +#include "packager/base/strings/string_split.h" + +DEFINE_string(udp_interface_address, + "", + "IP address of the interface over which to receive UDP unicast" + " or multicast streams"); + +namespace shaka { +namespace media { + +namespace { + +enum FieldType { + kUnknownField = 0, + kReuseField, + kInterfaceAddressField, + kTimeoutField, +}; + +struct FieldNameToTypeMapping { + const char* field_name; + FieldType field_type; +}; + +const FieldNameToTypeMapping kFieldNameTypeMappings[] = { + {"reuse", kReuseField}, + {"interface", kInterfaceAddressField}, + {"source", kInterfaceAddressField}, + {"timeout", kTimeoutField}, +}; + +FieldType GetFieldType(const std::string& field_name) { + for (size_t idx = 0; idx < arraysize(kFieldNameTypeMappings); ++idx) { + if (field_name == kFieldNameTypeMappings[idx].field_name) + return kFieldNameTypeMappings[idx].field_type; + } + return kUnknownField; +} + +bool StringToAddressAndPort(base::StringPiece addr_and_port, + std::string* addr, + uint16_t* port) { + DCHECK(addr); + DCHECK(port); + + const size_t colon_pos = addr_and_port.find(':'); + if (colon_pos == base::StringPiece::npos) { + return false; + } + *addr = addr_and_port.substr(0, colon_pos).as_string(); + unsigned port_value; + if (!base::StringToUint(addr_and_port.substr(colon_pos + 1), &port_value) || + (port_value > 65535)) { + return false; + } + *port = port_value; + return true; +} + +} // namespace + +std::unique_ptr UdpOptions::ParseFromString( + base::StringPiece udp_url) { + std::unique_ptr options(new UdpOptions); + + const size_t question_mark_pos = udp_url.find('?'); + base::StringPiece address_str = udp_url.substr(0, question_mark_pos); + + if (question_mark_pos != base::StringPiece::npos) { + base::StringPiece options_str = udp_url.substr(question_mark_pos + 1); + + base::StringPairs pairs; + if (!base::SplitStringIntoKeyValuePairs(options_str, '=', '&', &pairs)) { + LOG(ERROR) << "Invalid udp options name/value pairs " << options_str; + return nullptr; + } + for (const auto& pair : pairs) { + switch (GetFieldType(pair.first)) { + case kReuseField: { + int reuse_value = 0; + if (!base::StringToInt(pair.second, &reuse_value)) { + LOG(ERROR) << "Invalid udp option for reuse field " << pair.second; + return nullptr; + } + options->reuse_ = reuse_value > 0; + break; + } + case kInterfaceAddressField: + options->interface_address_ = pair.second; + break; + case kTimeoutField: + if (!base::StringToUint(pair.second, &options->timeout_us_)) { + LOG(ERROR) << "Invalid udp option for timeout field " << pair.second; + return nullptr; + } + break; + default: + LOG(ERROR) << "Unknown field in udp options (\"" << pair.first + << "\")."; + return nullptr; + } + } + } + + if (!FLAGS_udp_interface_address.empty()) { + LOG(WARNING) << "--udp_interface_address is deprecated. Consider switching " + "to udp options instead, something like " + "udp:://ip:port?interface=interface_ip."; + options->interface_address_ = FLAGS_udp_interface_address; + } + + if (!StringToAddressAndPort(address_str, &options->address_, + &options->port_)) { + LOG(ERROR) << "Malformed address:port UDP url " << address_str; + return nullptr; + } + return options; +} + +} // namespace media +} // namespace shaka diff --git a/packager/media/file/udp_options.h b/packager/media/file/udp_options.h new file mode 100644 index 0000000000..7927a4cf4e --- /dev/null +++ b/packager/media/file/udp_options.h @@ -0,0 +1,46 @@ +// Copyright 2016 Google Inc. All rights reserved. +// +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file or at +// https://developers.google.com/open-source/licenses/bsd + +#include +#include + +#include "packager/base/strings/string_piece.h" + +namespace shaka { +namespace media { + +/// Options parsed from UDP url string of the form: udp://ip:port[?options] +class UdpOptions { + public: + ~UdpOptions() = default; + + /// Parse from UDP url. + /// @param udp_url is the url of the form udp://ip:port[?options] + /// @returns a UdpOptions object on success, nullptr otherwise. + static std::unique_ptr ParseFromString(base::StringPiece udp_url); + + const std::string& address() const { return address_; } + uint16_t port() const { return port_; } + bool reuse() const { return reuse_; } + const std::string& interface_address() const { return interface_address_; } + unsigned timeout_us() const { return timeout_us_; } + + private: + UdpOptions() = default; + + /// IP Address. + std::string address_; + uint16_t port_ = 0; + /// Allow or disallow reusing UDP sockets. + bool reuse_ = false; + // Address of the interface over which to receive UDP multicast streams. + std::string interface_address_; + /// Timeout in microseconds. 0 to indicate unlimited timeout. + unsigned timeout_us_ = 0; +}; + +} // namespace media +} // namespace shaka diff --git a/packager/media/file/udp_options_unittest.cc b/packager/media/file/udp_options_unittest.cc new file mode 100644 index 0000000000..f5e0017063 --- /dev/null +++ b/packager/media/file/udp_options_unittest.cc @@ -0,0 +1,100 @@ +// Copyright 2016 Google Inc. All rights reserved. +// +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file or at +// https://developers.google.com/open-source/licenses/bsd + +#include "packager/media/file/udp_options.h" + +#include +#include + +DECLARE_string(udp_interface_address); + +namespace shaka { +namespace media { + +class UdpOptionsTest : public testing::Test { + public: + void SetUp() override { FLAGS_udp_interface_address = ""; } +}; + +TEST_F(UdpOptionsTest, AddressAndPort) { + auto options = UdpOptions::ParseFromString("224.1.2.30:88"); + ASSERT_TRUE(options); + EXPECT_EQ("224.1.2.30", options->address()); + EXPECT_EQ(88u, options->port()); + // The below fields are not set. + EXPECT_FALSE(options->reuse()); + EXPECT_EQ("", options->interface_address()); + EXPECT_EQ(0u, options->timeout_us()); +} + +TEST_F(UdpOptionsTest, MissingPort) { + ASSERT_FALSE(UdpOptions::ParseFromString("224.1.2.30")); + ASSERT_FALSE(UdpOptions::ParseFromString("224.1.2.30:")); +} + +TEST_F(UdpOptionsTest, InvalidPort) { + ASSERT_FALSE(UdpOptions::ParseFromString("224.1.2.30:888888")); + ASSERT_FALSE(UdpOptions::ParseFromString("224.1.2.30:abcd")); +} + +TEST_F(UdpOptionsTest, MissingAddress) { + ASSERT_FALSE(UdpOptions::ParseFromString(":888888")); + ASSERT_FALSE(UdpOptions::ParseFromString("888888")); +} + +TEST_F(UdpOptionsTest, UdpInterfaceAddressFlag) { + FLAGS_udp_interface_address = "10.11.12.13"; + + auto options = UdpOptions::ParseFromString("224.1.2.30:88"); + ASSERT_TRUE(options); + EXPECT_EQ("224.1.2.30", options->address()); + EXPECT_EQ(88u, options->port()); + // The below fields are not set. + EXPECT_FALSE(options->reuse()); + EXPECT_EQ("10.11.12.13", options->interface_address()); + EXPECT_EQ(0u, options->timeout_us()); +} + +TEST_F(UdpOptionsTest, Reuse) { + auto options = UdpOptions::ParseFromString("224.1.2.30:88?reuse=1"); + EXPECT_EQ("224.1.2.30", options->address()); + EXPECT_EQ(88u, options->port()); + EXPECT_TRUE(options->reuse()); + EXPECT_EQ("", options->interface_address()); + EXPECT_EQ(0u, options->timeout_us()); +} + +TEST_F(UdpOptionsTest, InvalidReuse) { + ASSERT_FALSE(UdpOptions::ParseFromString("224.1.2.30:88?reuse=7bd")); +} + +TEST_F(UdpOptionsTest, InterfaceAddress) { + auto options = UdpOptions::ParseFromString( + "224.1.2.30:88?reuse=0&interface=10.11.12.13"); + EXPECT_EQ("224.1.2.30", options->address()); + EXPECT_EQ(88u, options->port()); + EXPECT_FALSE(options->reuse()); + EXPECT_EQ("10.11.12.13", options->interface_address()); + EXPECT_EQ(0u, options->timeout_us()); +} + +TEST_F(UdpOptionsTest, Timeout) { + auto options = UdpOptions::ParseFromString( + "224.1.2.30:88?source=10.11.12.13&timeout=88888888"); + EXPECT_EQ("224.1.2.30", options->address()); + EXPECT_EQ(88u, options->port()); + EXPECT_FALSE(options->reuse()); + EXPECT_EQ("10.11.12.13", options->interface_address()); + EXPECT_EQ(88888888u, options->timeout_us()); +} + +TEST_F(UdpOptionsTest, InvalidTimeout) { + ASSERT_FALSE(UdpOptions::ParseFromString( + "224.1.2.30:88?source=10.11.12.13&timeout=1a9")); +} + +} // namespace media +} // namespace shaka