CppServer 1.0.5.0
C++ Server Library
Loading...
Searching...
No Matches
udp_server.cpp
Go to the documentation of this file.
1
10
11namespace CppServer {
12namespace Asio {
13
14UDPServer::UDPServer(const std::shared_ptr<Service>& service, int port, InternetProtocol protocol)
15 : _id(CppCommon::UUID::Sequential()),
16 _service(service),
17 _io_service(_service->GetAsioService()),
18 _strand(*_io_service),
19 _strand_required(_service->IsStrandRequired()),
20 _port(port),
21 _socket(*_io_service),
22 _started(false),
23 _bytes_sending(0),
24 _bytes_sent(0),
25 _bytes_received(0),
26 _datagrams_sent(0),
27 _datagrams_received(0),
28 _receiving(false),
29 _sending(false),
30 _option_reuse_address(false),
31 _option_reuse_port(false)
32{
33 assert((service != nullptr) && "Asio service is invalid!");
34 if (service == nullptr)
35 throw CppCommon::ArgumentException("Asio service is invalid!");
36
37 // Prepare endpoint
38 switch (protocol)
39 {
41 _endpoint = asio::ip::udp::endpoint(asio::ip::udp::v4(), (unsigned short)port);
42 break;
44 _endpoint = asio::ip::udp::endpoint(asio::ip::udp::v6(), (unsigned short)port);
45 break;
46 }
47}
48
49UDPServer::UDPServer(const std::shared_ptr<Service>& service, const std::string& address, int port)
50 : _id(CppCommon::UUID::Sequential()),
51 _service(service),
52 _io_service(_service->GetAsioService()),
53 _strand(*_io_service),
54 _strand_required(_service->IsStrandRequired()),
55 _address(address),
56 _port(port),
57 _socket(*_io_service),
58 _started(false),
59 _bytes_sending(0),
60 _bytes_sent(0),
61 _bytes_received(0),
62 _datagrams_sent(0),
63 _datagrams_received(0),
64 _receiving(false),
65 _sending(false),
66 _option_reuse_address(false),
67 _option_reuse_port(false)
68{
69 assert((service != nullptr) && "Asio service is invalid!");
70 if (service == nullptr)
71 throw CppCommon::ArgumentException("Asio service is invalid!");
72
73 // Prepare endpoint
74 _endpoint = asio::ip::udp::endpoint(asio::ip::make_address(address), (unsigned short)port);
75}
76
77UDPServer::UDPServer(const std::shared_ptr<Service>& service, const asio::ip::udp::endpoint& endpoint)
78 : _id(CppCommon::UUID::Sequential()),
79 _service(service),
80 _io_service(_service->GetAsioService()),
81 _strand(*_io_service),
82 _strand_required(_service->IsStrandRequired()),
83 _address(endpoint.address().to_string()),
84 _port(endpoint.port()),
85 _endpoint(endpoint),
86 _socket(*_io_service),
87 _started(false),
88 _bytes_sending(0),
89 _bytes_sent(0),
90 _bytes_received(0),
91 _datagrams_sent(0),
92 _datagrams_received(0),
93 _receiving(false),
94 _sending(false)
95{
96 assert((service != nullptr) && "Asio service is invalid!");
97 if (service == nullptr)
98 throw CppCommon::ArgumentException("Asio service is invalid!");
99}
100
102{
103 asio::socket_base::receive_buffer_size option;
104 _socket.get_option(option);
105 return option.value();
106}
107
109{
110 asio::socket_base::send_buffer_size option;
111 _socket.get_option(option);
112 return option.value();
113}
114
116{
117 asio::socket_base::receive_buffer_size option((int)size);
118 _socket.set_option(option);
119}
120
122{
123 asio::socket_base::send_buffer_size option((int)size);
124 _socket.set_option(option);
125}
126
128{
129 assert(!IsStarted() && "UDP server is already started!");
130 if (IsStarted())
131 return false;
132
133 // Post the start handler
134 auto self(this->shared_from_this());
135 auto start_handler = [this, self]()
136 {
137 if (IsStarted())
138 return;
139
140 // Open a server socket
141 _socket.open(_endpoint.protocol());
143 _socket.set_option(asio::ip::udp::socket::reuse_address(true));
144#if (defined(unix) || defined(__unix) || defined(__unix__) || defined(__APPLE__)) && !defined(__CYGWIN__)
145 if (option_reuse_port())
146 {
147 typedef asio::detail::socket_option::boolean<SOL_SOCKET, SO_REUSEPORT> reuse_port;
148 _socket.set_option(reuse_port(true));
149 }
150#endif
151 _socket.bind(_endpoint);
152
153 // Prepare receive buffer
154 _receive_buffer.resize(option_receive_buffer_size());
155
156 // Reset statistic
157 _bytes_sending = 0;
158 _bytes_sent = 0;
159 _bytes_received = 0;
160 _datagrams_sent = 0;
161 _datagrams_received = 0;
162
163 // Update the started flag
164 _started = true;
165
166 // Call the server started handler
167 onStarted();
168 };
169 if (_strand_required)
170 _strand.post(start_handler);
171 else
172 _io_service->post(start_handler);
173
174 return true;
175}
176
178{
179 _multicast_endpoint = asio::ip::udp::endpoint(asio::ip::make_address(multicast_address), (unsigned short)multicast_port);
180 return Start();
181}
182
183bool UDPServer::Start(const asio::ip::udp::endpoint& multicast_endpoint)
184{
185 _multicast_endpoint = multicast_endpoint;
186 return Start();
187}
188
190{
191 assert(IsStarted() && "UDP server is not started!");
192 if (!IsStarted())
193 return false;
194
195 // Post the stop handler
196 auto self(this->shared_from_this());
197 auto stop_handler = [this, self]()
198 {
199 if (!IsStarted())
200 return;
201
202 // Close the server socket
203 _socket.close();
204
205 // Update the started flag
206 _started = false;
207
208 // Update sending/receiving flags
209 _receiving = false;
210 _sending = false;
211
212 // Clear send/receive buffers
213 ClearBuffers();
214
215 // Call the server stopped handler
216 onStopped();
217 };
218 if (_strand_required)
219 _strand.post(stop_handler);
220 else
221 _io_service->post(stop_handler);
222
223 return true;
224}
225
227{
228 if (!Stop())
229 return false;
230
231 while (IsStarted())
232 CppCommon::Thread::Yield();
233
234 return Start();
235}
236
237size_t UDPServer::Multicast(const void* buffer, size_t size)
238{
239 // Send the datagram to the multicast endpoint
240 return Send(_multicast_endpoint, buffer, size);
241}
242
243size_t UDPServer::Multicast(const void* buffer, size_t size, const CppCommon::Timespan& timeout)
244{
245 // Send the datagram to the multicast endpoint
246 return Send(_multicast_endpoint, buffer, size, timeout);
247}
248
249bool UDPServer::MulticastAsync(const void* buffer, size_t size)
250{
251 // Send the datagram to the multicast endpoint
252 return SendAsync(_multicast_endpoint, buffer, size);
253}
254
255size_t UDPServer::Send(const asio::ip::udp::endpoint& endpoint, const void* buffer, size_t size)
256{
257 if (!IsStarted())
258 return 0;
259
260 if (size == 0)
261 return 0;
262
263 assert((buffer != nullptr) && "Pointer to the buffer should not be null!");
264 if (buffer == nullptr)
265 return 0;
266
267 asio::error_code ec;
268
269 // Sent datagram to the client
270 size_t sent = _socket.send_to(asio::const_buffer(buffer, size), endpoint, 0, ec);
271 if (sent > 0)
272 {
273 // Update statistic
274 ++_datagrams_sent;
275 _bytes_sent += sent;
276
277 // Call the datagram sent handler
279 }
280
281 // Check for error
282 if (ec)
283 SendError(ec);
284
285 return sent;
286}
287
288size_t UDPServer::Send(const asio::ip::udp::endpoint& endpoint, const void* buffer, size_t size, const CppCommon::Timespan& timeout)
289{
290 if (!IsStarted())
291 return 0;
292
293 if (size == 0)
294 return 0;
295
296 assert((buffer != nullptr) && "Pointer to the buffer should not be null!");
297 if (buffer == nullptr)
298 return 0;
299
300 int done = 0;
301 std::mutex mtx;
302 std::condition_variable cv;
303 asio::error_code error;
304 asio::system_timer timer(_socket.get_executor());
305
306 // Prepare done handler
307 auto async_done_handler = [&](asio::error_code ec)
308 {
309 std::unique_lock<std::mutex> lck(mtx);
310 if (done++ == 0)
311 {
312 error = ec;
313 _socket.cancel();
314 timer.cancel();
315 }
316 cv.notify_one();
317 };
318
319 // Async wait for timeout
320 timer.expires_from_now(timeout.chrono());
321 timer.async_wait([&](const asio::error_code& ec) { async_done_handler(ec ? ec : asio::error::timed_out); });
322
323 // Async send datagram to the client
324 size_t sent = 0;
325 _socket.async_send_to(asio::buffer(buffer, size), endpoint, [&](std::error_code ec, size_t write) { async_done_handler(ec); sent = write; });
326
327 // Wait for complete or timeout
328 std::unique_lock<std::mutex> lck(mtx);
329 cv.wait(lck, [&]() { return done == 2; });
330
331 // Send datagram to the client
332 if (sent > 0)
333 {
334 // Update statistic
335 ++_datagrams_sent;
336 _bytes_sent += sent;
337
338 // Call the datagram sent handler
340 }
341
342 // Check for error
343 if (error && (error != asio::error::timed_out))
344 SendError(error);
345
346 return sent;
347}
348
349bool UDPServer::SendAsync(const asio::ip::udp::endpoint& endpoint, const void* buffer, size_t size)
350{
351 if (_sending)
352 return false;
353
354 if (!IsStarted())
355 return false;
356
357 if (size == 0)
358 return true;
359
360 assert((buffer != nullptr) && "Pointer to the buffer should not be null!");
361 if (buffer == nullptr)
362 return false;
363
364 // Check the send buffer limit
365 if ((size > _send_buffer_limit) && (_send_buffer_limit > 0))
366 {
367 SendError(asio::error::no_buffer_space);
368
369 // Call the buffer sent zero handler
370 onSent(_send_endpoint, 0);
371
372 return false;
373 }
374
375 // Fill the main send buffer
376 const uint8_t* bytes = (const uint8_t*)buffer;
377 _send_buffer.assign(bytes, bytes + size);
378
379 // Update statistic
380 _bytes_sending = _send_buffer.size();
381
382 // Update send endpoint
383 _send_endpoint = endpoint;
384
385 // Async send-to with the send-to handler
386 _sending = true;
387 auto self(this->shared_from_this());
388 auto async_send_to_handler = make_alloc_handler(_send_storage, [this, self](std::error_code ec, size_t sent)
389 {
390 _sending = false;
391
392 if (!IsStarted())
393 return;
394
395 // Check for error
396 if (ec)
397 {
398 SendError(ec);
399
400 // Call the buffer sent zero handler
401 onSent(_send_endpoint, 0);
402
403 return;
404 }
405
406 // Send some data to the client
407 if (sent > 0)
408 {
409 // Update statistic
410 _bytes_sending = 0;
411 _bytes_sent += sent;
412
413 // Clear the send buffer
414 _send_buffer.clear();
415
416 // Call the buffer sent handler
417 onSent(_send_endpoint, sent);
418 }
419 });
420 if (_strand_required)
421 _socket.async_send_to(asio::buffer(_send_buffer.data(), _send_buffer.size()), _send_endpoint, bind_executor(_strand, async_send_to_handler));
422 else
423 _socket.async_send_to(asio::buffer(_send_buffer.data(), _send_buffer.size()), _send_endpoint, async_send_to_handler);
424
425 return true;
426}
427
428size_t UDPServer::Receive(asio::ip::udp::endpoint& endpoint, void* buffer, size_t size)
429{
430 if (!IsStarted())
431 return 0;
432
433 if (size == 0)
434 return 0;
435
436 assert((buffer != nullptr) && "Pointer to the buffer should not be null!");
437 if (buffer == nullptr)
438 return 0;
439
440 asio::error_code ec;
441
442 // Receive datagram from the client
443 size_t received = _socket.receive_from(asio::buffer(buffer, size), endpoint, 0, ec);
444
445 // Update statistic
446 ++_datagrams_received;
447 _bytes_received += received;
448
449 // Call the datagram received handler
451
452 // Check for error
453 if (ec)
454 SendError(ec);
455
456 return received;
457}
458
459std::string UDPServer::Receive(asio::ip::udp::endpoint& endpoint, size_t size)
460{
461 std::string text(size, 0);
462 text.resize(Receive(endpoint, text.data(), text.size()));
463 return text;
464}
465
466size_t UDPServer::Receive(asio::ip::udp::endpoint& endpoint, void* buffer, size_t size, const CppCommon::Timespan& timeout)
467{
468 if (!IsStarted())
469 return 0;
470
471 if (size == 0)
472 return 0;
473
474 assert((buffer != nullptr) && "Pointer to the buffer should not be null!");
475 if (buffer == nullptr)
476 return 0;
477
478 int done = 0;
479 std::mutex mtx;
480 std::condition_variable cv;
481 asio::error_code error;
482 asio::system_timer timer(_socket.get_executor());
483
484 // Prepare done handler
485 auto async_done_handler = [&](asio::error_code ec)
486 {
487 std::unique_lock<std::mutex> lck(mtx);
488 if (done++ == 0)
489 {
490 error = ec;
491 _socket.cancel();
492 timer.cancel();
493 }
494 cv.notify_one();
495 };
496
497 // Async wait for timeout
498 timer.expires_from_now(timeout.chrono());
499 timer.async_wait([&](const asio::error_code& ec) { async_done_handler(ec ? ec : asio::error::timed_out); });
500
501 // Async receive datagram from the client
502 size_t received = 0;
503 _socket.async_receive_from(asio::buffer(buffer, size), endpoint, [&](std::error_code ec, size_t read) { async_done_handler(ec); received = read; });
504
505 // Wait for complete or timeout
506 std::unique_lock<std::mutex> lck(mtx);
507 cv.wait(lck, [&]() { return done == 2; });
508
509 // Update statistic
510 ++_datagrams_received;
511 _bytes_received += received;
512
513 // Call the datagram received handler
515
516 // Check for error
517 if (error && (error != asio::error::timed_out))
518 SendError(error);
519
520 return received;
521}
522
523std::string UDPServer::Receive(asio::ip::udp::endpoint& endpoint, size_t size, const CppCommon::Timespan& timeout)
524{
525 std::string text(size, 0);
526 text.resize(Receive(endpoint, text.data(), text.size(), timeout));
527 return text;
528}
529
531{
532 // Try to receive datagrams from clients
533 TryReceive();
534}
535
536void UDPServer::TryReceive()
537{
538 if (_receiving)
539 return;
540
541 if (!IsStarted())
542 return;
543
544 // Async receive with the receive handler
545 _receiving = true;
546 auto self(this->shared_from_this());
547 auto async_receive_handler = make_alloc_handler(_receive_storage, [this, self](std::error_code ec, size_t size)
548 {
549 _receiving = false;
550
551 if (!IsStarted())
552 return;
553
554 // Check for error
555 if (ec)
556 {
557 SendError(ec);
558
559 // Call the datagram received zero handler
560 onReceived(_receive_endpoint, _receive_buffer.data(), 0);
561
562 return;
563 }
564
565 // Update statistic
566 ++_datagrams_received;
567 _bytes_received += size;
568
569 // Call the datagram received handler
570 onReceived(_receive_endpoint, _receive_buffer.data(), size);
571
572 // If the receive buffer is full increase its size
573 if (_receive_buffer.size() == size)
574 {
575 // Check the receive buffer limit
576 if (((2 * size) > _receive_buffer_limit) && (_receive_buffer_limit > 0))
577 {
578 SendError(asio::error::no_buffer_space);
579
580 // Call the datagram received zero handler
581 onReceived(_receive_endpoint, _receive_buffer.data(), 0);
582
583 return;
584 }
585
586 _receive_buffer.resize(2 * size);
587 }
588 });
589 if (_strand_required)
590 _socket.async_receive_from(asio::buffer(_receive_buffer.data(), _receive_buffer.size()), _receive_endpoint, bind_executor(_strand, async_receive_handler));
591 else
592 _socket.async_receive_from(asio::buffer(_receive_buffer.data(), _receive_buffer.size()), _receive_endpoint, async_receive_handler);
593}
594
595void UDPServer::ClearBuffers()
596{
597 // Clear send buffers
598 _send_buffer.clear();
599
600 // Update statistic
601 _bytes_sending = 0;
602}
603
604void UDPServer::SendError(std::error_code ec)
605{
606 // Skip Asio disconnect errors
607 if ((ec == asio::error::connection_aborted) ||
608 (ec == asio::error::connection_refused) ||
609 (ec == asio::error::connection_reset) ||
610 (ec == asio::error::eof) ||
611 (ec == asio::error::operation_aborted))
612 return;
613
614 onError(ec.value(), ec.category().name(), ec.message());
615}
616
617} // namespace Asio
618} // namespace CppServer
Asio allocate handler wrapper.
Definition memory.h:133
virtual bool Restart()
Restart the server.
int port() const noexcept
Get the server port number.
Definition udp_server.h:72
virtual void onStarted()
Handle server started notification.
Definition udp_server.h:310
bool option_reuse_port() const noexcept
Get the option: reuse port.
Definition udp_server.h:88
const std::string & address() const noexcept
Get the server address.
Definition udp_server.h:70
virtual void ReceiveAsync()
Receive datagram from the client (asynchronous)
void SetupReceiveBufferSize(size_t size)
Setup option: receive buffer size.
virtual size_t Multicast(const void *buffer, size_t size)
Multicast datagram to the prepared mulicast endpoint (synchronous)
virtual void onError(int error, const std::string &category, const std::string &message)
Handle error notification.
Definition udp_server.h:342
bool option_reuse_address() const noexcept
Get the option: reuse address.
Definition udp_server.h:86
void SetupSendBufferSize(size_t size)
Setup option: send buffer size.
virtual bool MulticastAsync(const void *buffer, size_t size)
Multicast datagram to the prepared mulicast endpoint (asynchronous)
virtual bool Start()
Start the server.
size_t option_send_buffer_size() const
Get the option: send buffer size.
bool IsStarted() const noexcept
Is the server started?
Definition udp_server.h:99
virtual void onSent(const asio::ip::udp::endpoint &endpoint, size_t sent)
Handle datagram sent notification.
Definition udp_server.h:334
virtual bool Stop()
Stop the server.
virtual size_t Receive(asio::ip::udp::endpoint &endpoint, void *buffer, size_t size)
Receive datagram from the given endpoint (synchronous)
asio::ip::udp::endpoint & multicast_endpoint() noexcept
Get the server multicast endpoint.
Definition udp_server.h:67
virtual bool SendAsync(const asio::ip::udp::endpoint &endpoint, const void *buffer, size_t size)
Send datagram into the given endpoint (asynchronous)
std::shared_ptr< Service > & service() noexcept
Get the Asio service.
Definition udp_server.h:59
UDPServer(const std::shared_ptr< Service > &service, int port, InternetProtocol protocol=InternetProtocol::IPv4)
Initialize UDP server with a given Asio service and port number.
asio::ip::udp::endpoint & endpoint() noexcept
Get the server endpoint.
Definition udp_server.h:65
virtual void onReceived(const asio::ip::udp::endpoint &endpoint, const void *buffer, size_t size)
Handle datagram received notification.
Definition udp_server.h:323
virtual size_t Send(const asio::ip::udp::endpoint &endpoint, const void *buffer, size_t size)
Send datagram into the given endpoint (synchronous)
size_t option_receive_buffer_size() const
Get the option: receive buffer size.
virtual void onStopped()
Handle server stopped notification.
Definition udp_server.h:312
AllocateHandler< THandler > make_alloc_handler(HandlerStorage &storage, THandler handler)
Helper function to wrap a handler object to add custom allocation.
Definition memory.inl:39
InternetProtocol
Internet protocol.
Definition asio.h:66
@ IPv4
Internet Protocol version 4.
@ IPv6
Internet Protocol version 6.
C++ Server project definitions.
Definition asio.h:56
UDP server definition.