Shaka Packager SDK
udp_file.cc
1 // Copyright 2014 Google Inc. All rights reserved.
2 //
3 // Use of this source code is governed by a BSD-style
4 // license that can be found in the LICENSE file or at
5 // https://developers.google.com/open-source/licenses/bsd
6 
7 #include "packager/file/udp_file.h"
8 
9 #if defined(OS_WIN)
10 
11 #include <windows.h>
12 #include <ws2tcpip.h>
13 #define close closesocket
14 
15 #else
16 
17 #include <arpa/inet.h>
18 #include <errno.h>
19 #include <strings.h>
20 #include <sys/socket.h>
21 #include <sys/time.h>
22 #include <unistd.h>
23 #define INVALID_SOCKET -1
24 
25 // IP_MULTICAST_ALL has been supported since kernel version 2.6.31 but we may be
26 // building on a machine that is older than that.
27 #ifndef IP_MULTICAST_ALL
28 #define IP_MULTICAST_ALL 49
29 #endif
30 
31 #endif // defined(OS_WIN)
32 
33 #include <limits>
34 
35 #include "packager/base/logging.h"
36 #include "packager/file/udp_options.h"
37 
38 namespace shaka {
39 
40 namespace {
41 
42 bool IsIpv4MulticastAddress(const struct in_addr& addr) {
43  return (ntohl(addr.s_addr) & 0xf0000000) == 0xe0000000;
44 }
45 
46 } // anonymous namespace
47 
48 UdpFile::UdpFile(const char* file_name)
49  : File(file_name), socket_(INVALID_SOCKET) {}
50 
51 UdpFile::~UdpFile() {}
52 
54  if (socket_ != INVALID_SOCKET) {
55  close(socket_);
56  socket_ = INVALID_SOCKET;
57  }
58  delete this;
59  return true;
60 }
61 
62 int64_t UdpFile::Read(void* buffer, uint64_t length) {
63  DCHECK(buffer);
64  DCHECK_GE(length, 65535u)
65  << "Buffer may be too small to read entire datagram.";
66 
67  if (socket_ == INVALID_SOCKET)
68  return -1;
69 
70  int64_t result;
71  do {
72  result =
73  recvfrom(socket_, reinterpret_cast<char*>(buffer), length, 0, NULL, 0);
74  } while ((result == -1) && (errno == EINTR));
75 
76  return result;
77 }
78 
79 int64_t UdpFile::Write(const void* buffer, uint64_t length) {
80  NOTIMPLEMENTED();
81  return -1;
82 }
83 
84 int64_t UdpFile::Size() {
85  if (socket_ == INVALID_SOCKET)
86  return -1;
87 
88  return std::numeric_limits<int64_t>::max();
89 }
90 
92  NOTIMPLEMENTED();
93  return false;
94 }
95 
96 bool UdpFile::Seek(uint64_t position) {
97  NOTIMPLEMENTED();
98  return false;
99 }
100 
101 bool UdpFile::Tell(uint64_t* position) {
102  NOTIMPLEMENTED();
103  return false;
104 }
105 
106 #if defined(OS_WIN)
107 class LibWinsockInitializer {
108  public:
109  LibWinsockInitializer() {
110  WSADATA wsa_data;
111  error_ = WSAStartup(MAKEWORD(2, 2), &wsa_data);
112  }
113 
114  ~LibWinsockInitializer() {
115  if (error_ == 0)
116  WSACleanup();
117  }
118 
119  int error() const { return error_; }
120 
121  private:
122  int error_;
123 };
124 #endif // defined(OS_WIN)
125 
126 class ScopedSocket {
127  public:
128  explicit ScopedSocket(SOCKET sock_fd) : sock_fd_(sock_fd) {}
129 
130  ~ScopedSocket() {
131  if (sock_fd_ != INVALID_SOCKET)
132  close(sock_fd_);
133  }
134 
135  SOCKET get() { return sock_fd_; }
136 
137  SOCKET release() {
138  SOCKET socket = sock_fd_;
139  sock_fd_ = INVALID_SOCKET;
140  return socket;
141  }
142 
143  private:
144  SOCKET sock_fd_;
145 
146  DISALLOW_COPY_AND_ASSIGN(ScopedSocket);
147 };
148 
150 #if defined(OS_WIN)
151  static LibWinsockInitializer lib_winsock_initializer;
152  if (lib_winsock_initializer.error() != 0) {
153  LOG(ERROR) << "Winsock start up failed with error "
154  << lib_winsock_initializer.error();
155  return false;
156  }
157 #endif // defined(OS_WIN)
158 
159  DCHECK_EQ(INVALID_SOCKET, socket_);
160 
161  std::unique_ptr<UdpOptions> options =
163  if (!options)
164  return false;
165 
166  ScopedSocket new_socket(socket(AF_INET, SOCK_DGRAM, 0));
167  if (new_socket.get() == INVALID_SOCKET) {
168  LOG(ERROR) << "Could not allocate socket.";
169  return false;
170  }
171 
172  struct in_addr local_in_addr = {0};
173  if (inet_pton(AF_INET, options->address().c_str(), &local_in_addr) != 1) {
174  LOG(ERROR) << "Malformed IPv4 address " << options->address();
175  return false;
176  }
177 
178  struct sockaddr_in local_sock_addr = {0};
179  // TODO(kqyang): Support IPv6.
180  local_sock_addr.sin_family = AF_INET;
181  local_sock_addr.sin_port = htons(options->port());
182  const bool is_multicast = IsIpv4MulticastAddress(local_in_addr);
183  if (is_multicast) {
184  local_sock_addr.sin_addr.s_addr = htonl(INADDR_ANY);
185  } else {
186  local_sock_addr.sin_addr = local_in_addr;
187  }
188 
189  if (options->reuse()) {
190  const int optval = 1;
191  if (setsockopt(new_socket.get(), SOL_SOCKET, SO_REUSEADDR,
192  reinterpret_cast<const char*>(&optval),
193  sizeof(optval)) < 0) {
194  LOG(ERROR)
195  << "Could not apply the SO_REUSEADDR property to the UDP socket";
196  return false;
197  }
198  }
199 
200  if (bind(new_socket.get(),
201  reinterpret_cast<struct sockaddr*>(&local_sock_addr),
202  sizeof(local_sock_addr))) {
203  LOG(ERROR) << "Could not bind UDP socket";
204  return false;
205  }
206 
207  if (is_multicast) {
208  if (options->is_source_specific_multicast()) {
209  struct ip_mreq_source source_multicast_group;
210 
211  source_multicast_group.imr_multiaddr = local_in_addr;
212  if (inet_pton(AF_INET,
213  options->interface_address().c_str(),
214  &source_multicast_group.imr_interface) != 1) {
215  LOG(ERROR) << "Malformed IPv4 interface address "
216  << options->interface_address();
217  return false;
218  }
219  if (inet_pton(AF_INET,
220  options->source_address().c_str(),
221  &source_multicast_group.imr_sourceaddr) != 1) {
222  LOG(ERROR) << "Malformed IPv4 source specific multicast address "
223  << options->source_address();
224  return false;
225  }
226 
227  if (setsockopt(new_socket.get(),
228  IPPROTO_IP,
229  IP_ADD_SOURCE_MEMBERSHIP,
230  reinterpret_cast<const char*>(&source_multicast_group),
231  sizeof(source_multicast_group)) < 0) {
232  LOG(ERROR) << "Failed to join multicast group.";
233  return false;
234  }
235  } else {
236  // this is a v2 join without a specific source.
237  struct ip_mreq multicast_group;
238 
239  multicast_group.imr_multiaddr = local_in_addr;
240 
241  if (inet_pton(AF_INET, options->interface_address().c_str(),
242  &multicast_group.imr_interface) != 1) {
243  LOG(ERROR) << "Malformed IPv4 interface address "
244  << options->interface_address();
245  return false;
246  }
247 
248  if (setsockopt(new_socket.get(), IPPROTO_IP, IP_ADD_MEMBERSHIP,
249  reinterpret_cast<const char*>(&multicast_group),
250  sizeof(multicast_group)) < 0) {
251  LOG(ERROR) << "Failed to join multicast group.";
252  return false;
253  }
254 
255  }
256 
257 #if defined(__linux__)
258  // Disable IP_MULTICAST_ALL to avoid interference caused when two sockets
259  // are bound to the same port but joined to different multicast groups.
260  const int optval_zero = 0;
261  if (setsockopt(new_socket.get(), IPPROTO_IP, IP_MULTICAST_ALL,
262  reinterpret_cast<const char*>(&optval_zero),
263  sizeof(optval_zero)) < 0 &&
264  errno != ENOPROTOOPT) {
265  LOG(ERROR) << "Failed to disable IP_MULTICAST_ALL option.";
266  return false;
267  }
268 #endif // #if defined(__linux__)
269  }
270 
271  // Set timeout if needed.
272  if (options->timeout_us() != 0) {
273  struct timeval tv;
274  tv.tv_sec = options->timeout_us() / 1000000;
275  tv.tv_usec = options->timeout_us() % 1000000;
276  if (setsockopt(new_socket.get(), SOL_SOCKET, SO_RCVTIMEO,
277  reinterpret_cast<const char*>(&tv), sizeof(tv)) < 0) {
278  LOG(ERROR) << "Failed to set socket timeout.";
279  return false;
280  }
281  }
282 
283  if (options->buffer_size() > 0) {
284  const int receive_buffer_size = options->buffer_size();
285  if (setsockopt(new_socket.get(), SOL_SOCKET, SO_RCVBUF,
286  reinterpret_cast<const char*>(&receive_buffer_size),
287  sizeof(receive_buffer_size)) < 0) {
288  LOG(ERROR) << "Failed to set the maximum receive buffer size: "
289  << strerror(errno);
290  return false;
291  }
292  }
293 
294  socket_ = new_socket.release();
295  return true;
296 }
297 
298 } // namespace shaka
int64_t Read(void *buffer, uint64_t length) override
Definition: udp_file.cc:62
bool Tell(uint64_t *position) override
Definition: udp_file.cc:101
bool Seek(uint64_t position) override
Definition: udp_file.cc:96
UdpFile(const char *address_and_port)
Definition: udp_file.cc:48
static std::unique_ptr< UdpOptions > ParseFromString(base::StringPiece udp_url)
Definition: udp_options.cc:75
Define an abstract file interface.
Definition: file.h:26
int64_t Write(const void *buffer, uint64_t length) override
Definition: udp_file.cc:79
const std::string & file_name() const
Definition: file.h:94
All the methods that are virtual are virtual for mocking.
bool Close() override
Definition: udp_file.cc:53
bool Open() override
Internal open. Should not be used directly.
Definition: udp_file.cc:149
int64_t Size() override
Definition: udp_file.cc:84
bool Flush() override
Definition: udp_file.cc:91