15 : _id(CppCommon::UUID::Sequential()),
17 _io_service(_service->GetAsioService()),
18 _strand(*_io_service),
19 _strand_required(_service->IsStrandRequired()),
21 _socket(*_io_service),
27 _datagrams_received(0),
30 _option_reuse_address(false),
31 _option_reuse_port(false)
33 assert((
service !=
nullptr) &&
"Asio service is invalid!");
35 throw CppCommon::ArgumentException(
"Asio service is invalid!");
41 _endpoint = asio::ip::udp::endpoint(asio::ip::udp::v4(), (
unsigned short)
port);
44 _endpoint = asio::ip::udp::endpoint(asio::ip::udp::v6(), (
unsigned short)
port);
50 : _id(CppCommon::UUID::Sequential()),
52 _io_service(_service->GetAsioService()),
53 _strand(*_io_service),
54 _strand_required(_service->IsStrandRequired()),
57 _socket(*_io_service),
63 _datagrams_received(0),
66 _option_reuse_address(false),
67 _option_reuse_port(false)
69 assert((
service !=
nullptr) &&
"Asio service is invalid!");
71 throw CppCommon::ArgumentException(
"Asio service is invalid!");
74 _endpoint = asio::ip::udp::endpoint(asio::ip::make_address(
address), (
unsigned short)
port);
78 : _id(CppCommon::UUID::Sequential()),
80 _io_service(_service->GetAsioService()),
81 _strand(*_io_service),
82 _strand_required(_service->IsStrandRequired()),
83 _address(endpoint.address().to_string()),
84 _port(endpoint.port()),
86 _socket(*_io_service),
92 _datagrams_received(0),
96 assert((
service !=
nullptr) &&
"Asio service is invalid!");
98 throw CppCommon::ArgumentException(
"Asio service is invalid!");
103 asio::socket_base::receive_buffer_size option;
104 _socket.get_option(option);
105 return option.value();
110 asio::socket_base::send_buffer_size option;
111 _socket.get_option(option);
112 return option.value();
117 asio::socket_base::receive_buffer_size option((
int)size);
118 _socket.set_option(option);
123 asio::socket_base::send_buffer_size option((
int)size);
124 _socket.set_option(option);
129 assert(!
IsStarted() &&
"UDP server is already started!");
134 auto self(this->shared_from_this());
135 auto start_handler = [
this,
self]()
141 _socket.open(_endpoint.protocol());
143 _socket.set_option(asio::ip::udp::socket::reuse_address(
true));
144 #if (defined(unix) || defined(__unix) || defined(__unix__) || defined(__APPLE__)) && !defined(__CYGWIN__)
147 typedef asio::detail::socket_option::boolean<SOL_SOCKET, SO_REUSEPORT> reuse_port;
148 _socket.set_option(reuse_port(
true));
151 _socket.bind(_endpoint);
161 _datagrams_received = 0;
169 if (_strand_required)
170 _strand.post(start_handler);
172 _io_service->post(start_handler);
179 _multicast_endpoint = asio::ip::udp::endpoint(asio::ip::make_address(multicast_address), (
unsigned short)multicast_port);
191 assert(
IsStarted() &&
"UDP server is not started!");
196 auto self(this->shared_from_this());
197 auto stop_handler = [
this,
self]()
218 if (_strand_required)
219 _strand.post(stop_handler);
221 _io_service->post(stop_handler);
232 CppCommon::Thread::Yield();
240 return Send(_multicast_endpoint, buffer, size);
246 return Send(_multicast_endpoint, buffer, size, timeout);
252 return SendAsync(_multicast_endpoint, buffer, size);
255 size_t UDPServer::Send(
const asio::ip::udp::endpoint& endpoint,
const void* buffer,
size_t size)
263 assert((buffer !=
nullptr) &&
"Pointer to the buffer should not be null!");
264 if (buffer ==
nullptr)
270 size_t sent = _socket.send_to(asio::const_buffer(buffer, size),
endpoint, 0, ec);
288 size_t UDPServer::Send(
const asio::ip::udp::endpoint& endpoint,
const void* buffer,
size_t size,
const CppCommon::Timespan& timeout)
296 assert((buffer !=
nullptr) &&
"Pointer to the buffer should not be null!");
297 if (buffer ==
nullptr)
302 std::condition_variable cv;
303 asio::error_code error;
304 asio::system_timer timer(_socket.get_executor());
307 auto async_done_handler = [&](asio::error_code ec)
309 std::unique_lock<std::mutex> lck(mtx);
320 timer.expires_from_now(timeout.chrono());
321 timer.async_wait([&](
const asio::error_code& ec) { async_done_handler(ec ? ec : asio::error::timed_out); });
325 _socket.async_send_to(asio::buffer(buffer, size),
endpoint, [&](std::error_code ec,
size_t write) { async_done_handler(ec); sent = write; });
328 std::unique_lock<std::mutex> lck(mtx);
329 cv.wait(lck, [&]() {
return done == 2; });
343 if (error && (error != asio::error::timed_out))
360 assert((buffer !=
nullptr) &&
"Pointer to the buffer should not be null!");
361 if (buffer ==
nullptr)
365 if ((size > _send_buffer_limit) && (_send_buffer_limit > 0))
367 SendError(asio::error::no_buffer_space);
370 onSent(_send_endpoint, 0);
376 const uint8_t* bytes = (
const uint8_t*)buffer;
377 _send_buffer.assign(bytes, bytes + size);
380 _bytes_sending = _send_buffer.size();
387 auto self(this->shared_from_this());
388 auto async_send_to_handler =
make_alloc_handler(_send_storage, [
this,
self](std::error_code ec,
size_t sent)
401 onSent(_send_endpoint, 0);
414 _send_buffer.clear();
417 onSent(_send_endpoint, sent);
420 if (_strand_required)
421 _socket.async_send_to(asio::buffer(_send_buffer.data(), _send_buffer.size()), _send_endpoint, bind_executor(_strand, async_send_to_handler));
423 _socket.async_send_to(asio::buffer(_send_buffer.data(), _send_buffer.size()), _send_endpoint, async_send_to_handler);
436 assert((buffer !=
nullptr) &&
"Pointer to the buffer should not be null!");
437 if (buffer ==
nullptr)
443 size_t received = _socket.receive_from(asio::buffer(buffer, size),
endpoint, 0, ec);
446 ++_datagrams_received;
447 _bytes_received += received;
461 std::string text(size, 0);
466 size_t UDPServer::Receive(asio::ip::udp::endpoint& endpoint,
void* buffer,
size_t size,
const CppCommon::Timespan& timeout)
474 assert((buffer !=
nullptr) &&
"Pointer to the buffer should not be null!");
475 if (buffer ==
nullptr)
480 std::condition_variable cv;
481 asio::error_code error;
482 asio::system_timer timer(_socket.get_executor());
485 auto async_done_handler = [&](asio::error_code ec)
487 std::unique_lock<std::mutex> lck(mtx);
498 timer.expires_from_now(timeout.chrono());
499 timer.async_wait([&](
const asio::error_code& ec) { async_done_handler(ec ? ec : asio::error::timed_out); });
503 _socket.async_receive_from(asio::buffer(buffer, size),
endpoint, [&](std::error_code ec,
size_t read) { async_done_handler(ec); received = read; });
506 std::unique_lock<std::mutex> lck(mtx);
507 cv.wait(lck, [&]() {
return done == 2; });
510 ++_datagrams_received;
511 _bytes_received += received;
517 if (error && (error != asio::error::timed_out))
523 std::string
UDPServer::Receive(asio::ip::udp::endpoint& endpoint,
size_t size,
const CppCommon::Timespan& timeout)
525 std::string text(size, 0);
536 void UDPServer::TryReceive()
546 auto self(this->shared_from_this());
547 auto async_receive_handler =
make_alloc_handler(_receive_storage, [
this,
self](std::error_code ec,
size_t size)
560 onReceived(_receive_endpoint, _receive_buffer.data(), 0);
566 ++_datagrams_received;
567 _bytes_received += size;
570 onReceived(_receive_endpoint, _receive_buffer.data(), size);
573 if (_receive_buffer.size() == size)
576 if (((2 * size) > _receive_buffer_limit) && (_receive_buffer_limit > 0))
578 SendError(asio::error::no_buffer_space);
581 onReceived(_receive_endpoint, _receive_buffer.data(), 0);
586 _receive_buffer.resize(2 * size);
589 if (_strand_required)
590 _socket.async_receive_from(asio::buffer(_receive_buffer.data(), _receive_buffer.size()), _receive_endpoint, bind_executor(_strand, async_receive_handler));
592 _socket.async_receive_from(asio::buffer(_receive_buffer.data(), _receive_buffer.size()), _receive_endpoint, async_receive_handler);
595 void UDPServer::ClearBuffers()
598 _send_buffer.clear();
604 void UDPServer::SendError(std::error_code ec)
607 if ((ec == asio::error::connection_aborted) ||
608 (ec == asio::error::connection_refused) ||
609 (ec == asio::error::connection_reset) ||
610 (ec == asio::error::eof) ||
611 (ec == asio::error::operation_aborted))
614 onError(ec.value(), ec.category().name(), ec.message());
asio::ip::udp::endpoint & endpoint() noexcept
Get the server endpoint.
virtual bool Restart()
Restart the server.
int port() const noexcept
Get the server port number.
std::shared_ptr< Service > & service() noexcept
Get the Asio service.
virtual void onStarted()
Handle server started notification.
asio::ip::udp::endpoint & multicast_endpoint() noexcept
Get the server multicast endpoint.
bool option_reuse_port() const noexcept
Get the option: reuse port.
virtual void ReceiveAsync()
Receive datagram from the client (asynchronous)
void SetupReceiveBufferSize(size_t size)
Setup option: receive buffer size.
virtual size_t Multicast(const void *buffer, size_t size)
Multicast datagram to the prepared mulicast endpoint (synchronous)
virtual void onError(int error, const std::string &category, const std::string &message)
Handle error notification.
bool option_reuse_address() const noexcept
Get the option: reuse address.
void SetupSendBufferSize(size_t size)
Setup option: send buffer size.
virtual bool MulticastAsync(const void *buffer, size_t size)
Multicast datagram to the prepared mulicast endpoint (asynchronous)
virtual bool Start()
Start the server.
size_t option_send_buffer_size() const
Get the option: send buffer size.
bool IsStarted() const noexcept
Is the server started?
virtual void onSent(const asio::ip::udp::endpoint &endpoint, size_t sent)
Handle datagram sent notification.
virtual bool Stop()
Stop the server.
virtual size_t Receive(asio::ip::udp::endpoint &endpoint, void *buffer, size_t size)
Receive datagram from the given endpoint (synchronous)
const std::string & address() const noexcept
Get the server address.
virtual bool SendAsync(const asio::ip::udp::endpoint &endpoint, const void *buffer, size_t size)
Send datagram into the given endpoint (asynchronous)
UDPServer(const std::shared_ptr< Service > &service, int port, InternetProtocol protocol=InternetProtocol::IPv4)
Initialize UDP server with a given Asio service and port number.
virtual void onReceived(const asio::ip::udp::endpoint &endpoint, const void *buffer, size_t size)
Handle datagram received notification.
virtual size_t Send(const asio::ip::udp::endpoint &endpoint, const void *buffer, size_t size)
Send datagram into the given endpoint (synchronous)
size_t option_receive_buffer_size() const
Get the option: receive buffer size.
virtual void onStopped()
Handle server stopped notification.
AllocateHandler< THandler > make_alloc_handler(HandlerStorage &storage, THandler handler)
Helper function to wrap a handler object to add custom allocation.
InternetProtocol
Internet protocol.
@ IPv4
Internet Protocol version 4.
@ IPv6
Internet Protocol version 6.
C++ Server project definitions.