CppServer  1.0.4.0
C++ Server Library
udp_server.cpp
Go to the documentation of this file.
1 
10 
11 namespace CppServer {
12 namespace Asio {
13 
14 UDPServer::UDPServer(const std::shared_ptr<Service>& service, int port, InternetProtocol protocol)
15  : _id(CppCommon::UUID::Sequential()),
16  _service(service),
17  _io_service(_service->GetAsioService()),
18  _strand(*_io_service),
19  _strand_required(_service->IsStrandRequired()),
20  _port(port),
21  _socket(*_io_service),
22  _started(false),
23  _bytes_sending(0),
24  _bytes_sent(0),
25  _bytes_received(0),
26  _datagrams_sent(0),
27  _datagrams_received(0),
28  _receiving(false),
29  _sending(false),
30  _option_reuse_address(false),
31  _option_reuse_port(false)
32 {
33  assert((service != nullptr) && "Asio service is invalid!");
34  if (service == nullptr)
35  throw CppCommon::ArgumentException("Asio service is invalid!");
36 
37  // Prepare endpoint
38  switch (protocol)
39  {
41  _endpoint = asio::ip::udp::endpoint(asio::ip::udp::v4(), (unsigned short)port);
42  break;
44  _endpoint = asio::ip::udp::endpoint(asio::ip::udp::v6(), (unsigned short)port);
45  break;
46  }
47 }
48 
49 UDPServer::UDPServer(const std::shared_ptr<Service>& service, const std::string& address, int port)
50  : _id(CppCommon::UUID::Sequential()),
51  _service(service),
52  _io_service(_service->GetAsioService()),
53  _strand(*_io_service),
54  _strand_required(_service->IsStrandRequired()),
55  _address(address),
56  _port(port),
57  _socket(*_io_service),
58  _started(false),
59  _bytes_sending(0),
60  _bytes_sent(0),
61  _bytes_received(0),
62  _datagrams_sent(0),
63  _datagrams_received(0),
64  _receiving(false),
65  _sending(false),
66  _option_reuse_address(false),
67  _option_reuse_port(false)
68 {
69  assert((service != nullptr) && "Asio service is invalid!");
70  if (service == nullptr)
71  throw CppCommon::ArgumentException("Asio service is invalid!");
72 
73  // Prepare endpoint
74  _endpoint = asio::ip::udp::endpoint(asio::ip::make_address(address), (unsigned short)port);
75 }
76 
77 UDPServer::UDPServer(const std::shared_ptr<Service>& service, const asio::ip::udp::endpoint& endpoint)
78  : _id(CppCommon::UUID::Sequential()),
79  _service(service),
80  _io_service(_service->GetAsioService()),
81  _strand(*_io_service),
82  _strand_required(_service->IsStrandRequired()),
83  _address(endpoint.address().to_string()),
84  _port(endpoint.port()),
85  _endpoint(endpoint),
86  _socket(*_io_service),
87  _started(false),
88  _bytes_sending(0),
89  _bytes_sent(0),
90  _bytes_received(0),
91  _datagrams_sent(0),
92  _datagrams_received(0),
93  _receiving(false),
94  _sending(false)
95 {
96  assert((service != nullptr) && "Asio service is invalid!");
97  if (service == nullptr)
98  throw CppCommon::ArgumentException("Asio service is invalid!");
99 }
100 
102 {
103  asio::socket_base::receive_buffer_size option;
104  _socket.get_option(option);
105  return option.value();
106 }
107 
109 {
110  asio::socket_base::send_buffer_size option;
111  _socket.get_option(option);
112  return option.value();
113 }
114 
116 {
117  asio::socket_base::receive_buffer_size option((int)size);
118  _socket.set_option(option);
119 }
120 
122 {
123  asio::socket_base::send_buffer_size option((int)size);
124  _socket.set_option(option);
125 }
126 
128 {
129  assert(!IsStarted() && "UDP server is already started!");
130  if (IsStarted())
131  return false;
132 
133  // Post the start handler
134  auto self(this->shared_from_this());
135  auto start_handler = [this, self]()
136  {
137  if (IsStarted())
138  return;
139 
140  // Open a server socket
141  _socket.open(_endpoint.protocol());
142  if (option_reuse_address())
143  _socket.set_option(asio::ip::udp::socket::reuse_address(true));
144 #if (defined(unix) || defined(__unix) || defined(__unix__) || defined(__APPLE__)) && !defined(__CYGWIN__)
145  if (option_reuse_port())
146  {
147  typedef asio::detail::socket_option::boolean<SOL_SOCKET, SO_REUSEPORT> reuse_port;
148  _socket.set_option(reuse_port(true));
149  }
150 #endif
151  _socket.bind(_endpoint);
152 
153  // Prepare receive buffer
154  _receive_buffer.resize(option_receive_buffer_size());
155 
156  // Reset statistic
157  _bytes_sending = 0;
158  _bytes_sent = 0;
159  _bytes_received = 0;
160  _datagrams_sent = 0;
161  _datagrams_received = 0;
162 
163  // Update the started flag
164  _started = true;
165 
166  // Call the server started handler
167  onStarted();
168  };
169  if (_strand_required)
170  _strand.post(start_handler);
171  else
172  _io_service->post(start_handler);
173 
174  return true;
175 }
176 
177 bool UDPServer::Start(const std::string& multicast_address, int multicast_port)
178 {
179  _multicast_endpoint = asio::ip::udp::endpoint(asio::ip::make_address(multicast_address), (unsigned short)multicast_port);
180  return Start();
181 }
182 
183 bool UDPServer::Start(const asio::ip::udp::endpoint& multicast_endpoint)
184 {
185  _multicast_endpoint = multicast_endpoint;
186  return Start();
187 }
188 
190 {
191  assert(IsStarted() && "UDP server is not started!");
192  if (!IsStarted())
193  return false;
194 
195  // Post the stop handler
196  auto self(this->shared_from_this());
197  auto stop_handler = [this, self]()
198  {
199  if (!IsStarted())
200  return;
201 
202  // Close the server socket
203  _socket.close();
204 
205  // Update the started flag
206  _started = false;
207 
208  // Update sending/receiving flags
209  _receiving = false;
210  _sending = false;
211 
212  // Clear send/receive buffers
213  ClearBuffers();
214 
215  // Call the server stopped handler
216  onStopped();
217  };
218  if (_strand_required)
219  _strand.post(stop_handler);
220  else
221  _io_service->post(stop_handler);
222 
223  return true;
224 }
225 
227 {
228  if (!Stop())
229  return false;
230 
231  while (IsStarted())
232  CppCommon::Thread::Yield();
233 
234  return Start();
235 }
236 
237 size_t UDPServer::Multicast(const void* buffer, size_t size)
238 {
239  // Send the datagram to the multicast endpoint
240  return Send(_multicast_endpoint, buffer, size);
241 }
242 
243 size_t UDPServer::Multicast(const void* buffer, size_t size, const CppCommon::Timespan& timeout)
244 {
245  // Send the datagram to the multicast endpoint
246  return Send(_multicast_endpoint, buffer, size, timeout);
247 }
248 
249 bool UDPServer::MulticastAsync(const void* buffer, size_t size)
250 {
251  // Send the datagram to the multicast endpoint
252  return SendAsync(_multicast_endpoint, buffer, size);
253 }
254 
255 size_t UDPServer::Send(const asio::ip::udp::endpoint& endpoint, const void* buffer, size_t size)
256 {
257  if (!IsStarted())
258  return 0;
259 
260  if (size == 0)
261  return 0;
262 
263  assert((buffer != nullptr) && "Pointer to the buffer should not be null!");
264  if (buffer == nullptr)
265  return 0;
266 
267  asio::error_code ec;
268 
269  // Sent datagram to the client
270  size_t sent = _socket.send_to(asio::const_buffer(buffer, size), endpoint, 0, ec);
271  if (sent > 0)
272  {
273  // Update statistic
274  ++_datagrams_sent;
275  _bytes_sent += sent;
276 
277  // Call the datagram sent handler
278  onSent(endpoint, sent);
279  }
280 
281  // Check for error
282  if (ec)
283  SendError(ec);
284 
285  return sent;
286 }
287 
288 size_t UDPServer::Send(const asio::ip::udp::endpoint& endpoint, const void* buffer, size_t size, const CppCommon::Timespan& timeout)
289 {
290  if (!IsStarted())
291  return 0;
292 
293  if (size == 0)
294  return 0;
295 
296  assert((buffer != nullptr) && "Pointer to the buffer should not be null!");
297  if (buffer == nullptr)
298  return 0;
299 
300  int done = 0;
301  std::mutex mtx;
302  std::condition_variable cv;
303  asio::error_code error;
304  asio::system_timer timer(_socket.get_executor());
305 
306  // Prepare done handler
307  auto async_done_handler = [&](asio::error_code ec)
308  {
309  std::unique_lock<std::mutex> lck(mtx);
310  if (done++ == 0)
311  {
312  error = ec;
313  _socket.cancel();
314  timer.cancel();
315  }
316  cv.notify_one();
317  };
318 
319  // Async wait for timeout
320  timer.expires_from_now(timeout.chrono());
321  timer.async_wait([&](const asio::error_code& ec) { async_done_handler(ec ? ec : asio::error::timed_out); });
322 
323  // Async send datagram to the client
324  size_t sent = 0;
325  _socket.async_send_to(asio::buffer(buffer, size), endpoint, [&](std::error_code ec, size_t write) { async_done_handler(ec); sent = write; });
326 
327  // Wait for complete or timeout
328  std::unique_lock<std::mutex> lck(mtx);
329  cv.wait(lck, [&]() { return done == 2; });
330 
331  // Send datagram to the client
332  if (sent > 0)
333  {
334  // Update statistic
335  ++_datagrams_sent;
336  _bytes_sent += sent;
337 
338  // Call the datagram sent handler
339  onSent(endpoint, sent);
340  }
341 
342  // Check for error
343  if (error && (error != asio::error::timed_out))
344  SendError(error);
345 
346  return sent;
347 }
348 
349 bool UDPServer::SendAsync(const asio::ip::udp::endpoint& endpoint, const void* buffer, size_t size)
350 {
351  if (_sending)
352  return false;
353 
354  if (!IsStarted())
355  return false;
356 
357  if (size == 0)
358  return true;
359 
360  assert((buffer != nullptr) && "Pointer to the buffer should not be null!");
361  if (buffer == nullptr)
362  return false;
363 
364  // Check the send buffer limit
365  if ((size > _send_buffer_limit) && (_send_buffer_limit > 0))
366  {
367  SendError(asio::error::no_buffer_space);
368 
369  // Call the buffer sent zero handler
370  onSent(_send_endpoint, 0);
371 
372  return false;
373  }
374 
375  // Fill the main send buffer
376  const uint8_t* bytes = (const uint8_t*)buffer;
377  _send_buffer.assign(bytes, bytes + size);
378 
379  // Update statistic
380  _bytes_sending = _send_buffer.size();
381 
382  // Update send endpoint
383  _send_endpoint = endpoint;
384 
385  // Async send-to with the send-to handler
386  _sending = true;
387  auto self(this->shared_from_this());
388  auto async_send_to_handler = make_alloc_handler(_send_storage, [this, self](std::error_code ec, size_t sent)
389  {
390  _sending = false;
391 
392  if (!IsStarted())
393  return;
394 
395  // Check for error
396  if (ec)
397  {
398  SendError(ec);
399 
400  // Call the buffer sent zero handler
401  onSent(_send_endpoint, 0);
402 
403  return;
404  }
405 
406  // Send some data to the client
407  if (sent > 0)
408  {
409  // Update statistic
410  _bytes_sending = 0;
411  _bytes_sent += sent;
412 
413  // Clear the send buffer
414  _send_buffer.clear();
415 
416  // Call the buffer sent handler
417  onSent(_send_endpoint, sent);
418  }
419  });
420  if (_strand_required)
421  _socket.async_send_to(asio::buffer(_send_buffer.data(), _send_buffer.size()), _send_endpoint, bind_executor(_strand, async_send_to_handler));
422  else
423  _socket.async_send_to(asio::buffer(_send_buffer.data(), _send_buffer.size()), _send_endpoint, async_send_to_handler);
424 
425  return true;
426 }
427 
428 size_t UDPServer::Receive(asio::ip::udp::endpoint& endpoint, void* buffer, size_t size)
429 {
430  if (!IsStarted())
431  return 0;
432 
433  if (size == 0)
434  return 0;
435 
436  assert((buffer != nullptr) && "Pointer to the buffer should not be null!");
437  if (buffer == nullptr)
438  return 0;
439 
440  asio::error_code ec;
441 
442  // Receive datagram from the client
443  size_t received = _socket.receive_from(asio::buffer(buffer, size), endpoint, 0, ec);
444 
445  // Update statistic
446  ++_datagrams_received;
447  _bytes_received += received;
448 
449  // Call the datagram received handler
450  onReceived(endpoint, buffer, received);
451 
452  // Check for error
453  if (ec)
454  SendError(ec);
455 
456  return received;
457 }
458 
459 std::string UDPServer::Receive(asio::ip::udp::endpoint& endpoint, size_t size)
460 {
461  std::string text(size, 0);
462  text.resize(Receive(endpoint, text.data(), text.size()));
463  return text;
464 }
465 
466 size_t UDPServer::Receive(asio::ip::udp::endpoint& endpoint, void* buffer, size_t size, const CppCommon::Timespan& timeout)
467 {
468  if (!IsStarted())
469  return 0;
470 
471  if (size == 0)
472  return 0;
473 
474  assert((buffer != nullptr) && "Pointer to the buffer should not be null!");
475  if (buffer == nullptr)
476  return 0;
477 
478  int done = 0;
479  std::mutex mtx;
480  std::condition_variable cv;
481  asio::error_code error;
482  asio::system_timer timer(_socket.get_executor());
483 
484  // Prepare done handler
485  auto async_done_handler = [&](asio::error_code ec)
486  {
487  std::unique_lock<std::mutex> lck(mtx);
488  if (done++ == 0)
489  {
490  error = ec;
491  _socket.cancel();
492  timer.cancel();
493  }
494  cv.notify_one();
495  };
496 
497  // Async wait for timeout
498  timer.expires_from_now(timeout.chrono());
499  timer.async_wait([&](const asio::error_code& ec) { async_done_handler(ec ? ec : asio::error::timed_out); });
500 
501  // Async receive datagram from the client
502  size_t received = 0;
503  _socket.async_receive_from(asio::buffer(buffer, size), endpoint, [&](std::error_code ec, size_t read) { async_done_handler(ec); received = read; });
504 
505  // Wait for complete or timeout
506  std::unique_lock<std::mutex> lck(mtx);
507  cv.wait(lck, [&]() { return done == 2; });
508 
509  // Update statistic
510  ++_datagrams_received;
511  _bytes_received += received;
512 
513  // Call the datagram received handler
514  onReceived(endpoint, buffer, received);
515 
516  // Check for error
517  if (error && (error != asio::error::timed_out))
518  SendError(error);
519 
520  return received;
521 }
522 
523 std::string UDPServer::Receive(asio::ip::udp::endpoint& endpoint, size_t size, const CppCommon::Timespan& timeout)
524 {
525  std::string text(size, 0);
526  text.resize(Receive(endpoint, text.data(), text.size(), timeout));
527  return text;
528 }
529 
531 {
532  // Try to receive datagrams from clients
533  TryReceive();
534 }
535 
536 void UDPServer::TryReceive()
537 {
538  if (_receiving)
539  return;
540 
541  if (!IsStarted())
542  return;
543 
544  // Async receive with the receive handler
545  _receiving = true;
546  auto self(this->shared_from_this());
547  auto async_receive_handler = make_alloc_handler(_receive_storage, [this, self](std::error_code ec, size_t size)
548  {
549  _receiving = false;
550 
551  if (!IsStarted())
552  return;
553 
554  // Check for error
555  if (ec)
556  {
557  SendError(ec);
558 
559  // Call the datagram received zero handler
560  onReceived(_receive_endpoint, _receive_buffer.data(), 0);
561 
562  return;
563  }
564 
565  // Update statistic
566  ++_datagrams_received;
567  _bytes_received += size;
568 
569  // Call the datagram received handler
570  onReceived(_receive_endpoint, _receive_buffer.data(), size);
571 
572  // If the receive buffer is full increase its size
573  if (_receive_buffer.size() == size)
574  {
575  // Check the receive buffer limit
576  if (((2 * size) > _receive_buffer_limit) && (_receive_buffer_limit > 0))
577  {
578  SendError(asio::error::no_buffer_space);
579 
580  // Call the datagram received zero handler
581  onReceived(_receive_endpoint, _receive_buffer.data(), 0);
582 
583  return;
584  }
585 
586  _receive_buffer.resize(2 * size);
587  }
588  });
589  if (_strand_required)
590  _socket.async_receive_from(asio::buffer(_receive_buffer.data(), _receive_buffer.size()), _receive_endpoint, bind_executor(_strand, async_receive_handler));
591  else
592  _socket.async_receive_from(asio::buffer(_receive_buffer.data(), _receive_buffer.size()), _receive_endpoint, async_receive_handler);
593 }
594 
595 void UDPServer::ClearBuffers()
596 {
597  // Clear send buffers
598  _send_buffer.clear();
599 
600  // Update statistic
601  _bytes_sending = 0;
602 }
603 
604 void UDPServer::SendError(std::error_code ec)
605 {
606  // Skip Asio disconnect errors
607  if ((ec == asio::error::connection_aborted) ||
608  (ec == asio::error::connection_refused) ||
609  (ec == asio::error::connection_reset) ||
610  (ec == asio::error::eof) ||
611  (ec == asio::error::operation_aborted))
612  return;
613 
614  onError(ec.value(), ec.category().name(), ec.message());
615 }
616 
617 } // namespace Asio
618 } // namespace CppServer
asio::ip::udp::endpoint & endpoint() noexcept
Get the server endpoint.
Definition: udp_server.h:65
virtual bool Restart()
Restart the server.
Definition: udp_server.cpp:226
int port() const noexcept
Get the server port number.
Definition: udp_server.h:72
std::shared_ptr< Service > & service() noexcept
Get the Asio service.
Definition: udp_server.h:59
virtual void onStarted()
Handle server started notification.
Definition: udp_server.h:310
asio::ip::udp::endpoint & multicast_endpoint() noexcept
Get the server multicast endpoint.
Definition: udp_server.h:67
bool option_reuse_port() const noexcept
Get the option: reuse port.
Definition: udp_server.h:88
virtual void ReceiveAsync()
Receive datagram from the client (asynchronous)
Definition: udp_server.cpp:530
void SetupReceiveBufferSize(size_t size)
Setup option: receive buffer size.
Definition: udp_server.cpp:115
virtual size_t Multicast(const void *buffer, size_t size)
Multicast datagram to the prepared mulicast endpoint (synchronous)
Definition: udp_server.cpp:237
virtual void onError(int error, const std::string &category, const std::string &message)
Handle error notification.
Definition: udp_server.h:342
bool option_reuse_address() const noexcept
Get the option: reuse address.
Definition: udp_server.h:86
void SetupSendBufferSize(size_t size)
Setup option: send buffer size.
Definition: udp_server.cpp:121
virtual bool MulticastAsync(const void *buffer, size_t size)
Multicast datagram to the prepared mulicast endpoint (asynchronous)
Definition: udp_server.cpp:249
virtual bool Start()
Start the server.
Definition: udp_server.cpp:127
size_t option_send_buffer_size() const
Get the option: send buffer size.
Definition: udp_server.cpp:108
bool IsStarted() const noexcept
Is the server started?
Definition: udp_server.h:99
virtual void onSent(const asio::ip::udp::endpoint &endpoint, size_t sent)
Handle datagram sent notification.
Definition: udp_server.h:334
virtual bool Stop()
Stop the server.
Definition: udp_server.cpp:189
virtual size_t Receive(asio::ip::udp::endpoint &endpoint, void *buffer, size_t size)
Receive datagram from the given endpoint (synchronous)
Definition: udp_server.cpp:428
const std::string & address() const noexcept
Get the server address.
Definition: udp_server.h:70
virtual bool SendAsync(const asio::ip::udp::endpoint &endpoint, const void *buffer, size_t size)
Send datagram into the given endpoint (asynchronous)
Definition: udp_server.cpp:349
UDPServer(const std::shared_ptr< Service > &service, int port, InternetProtocol protocol=InternetProtocol::IPv4)
Initialize UDP server with a given Asio service and port number.
Definition: udp_server.cpp:14
virtual void onReceived(const asio::ip::udp::endpoint &endpoint, const void *buffer, size_t size)
Handle datagram received notification.
Definition: udp_server.h:323
virtual size_t Send(const asio::ip::udp::endpoint &endpoint, const void *buffer, size_t size)
Send datagram into the given endpoint (synchronous)
Definition: udp_server.cpp:255
size_t option_receive_buffer_size() const
Get the option: receive buffer size.
Definition: udp_server.cpp:101
virtual void onStopped()
Handle server stopped notification.
Definition: udp_server.h:312
AllocateHandler< THandler > make_alloc_handler(HandlerStorage &storage, THandler handler)
Helper function to wrap a handler object to add custom allocation.
Definition: memory.inl:39
InternetProtocol
Internet protocol.
Definition: asio.h:66
@ IPv4
Internet Protocol version 4.
@ IPv6
Internet Protocol version 6.
C++ Server project definitions.
Definition: asio.h:56
UDP server definition.