15 : _id(CppCommon::UUID::Sequential()),
17 _io_context(_service->GetAsioContext()),
18 _strand(*_io_context),
19 _strand_required(_service->IsStrandRequired()),
22 _socket(*_io_context),
29 _datagrams_received(0),
32 _option_reuse_address(false),
33 _option_reuse_port(false),
34 _option_multicast(false)
36 assert((
service !=
nullptr) &&
"Asio service is invalid!");
38 throw CppCommon::ArgumentException(
"Asio service is invalid!");
42 : _id(CppCommon::UUID::Sequential()),
44 _io_context(_service->GetAsioContext()),
45 _strand(*_io_context),
46 _strand_required(_service->IsStrandRequired()),
50 _socket(*_io_context),
57 _datagrams_received(0),
60 _option_reuse_address(false),
61 _option_reuse_port(false),
62 _option_multicast(false)
64 assert((
service !=
nullptr) &&
"Asio service is invalid!");
66 throw CppCommon::ArgumentException(
"Asio service is invalid!");
70 : _id(CppCommon::UUID::Sequential()),
72 _io_context(_service->GetAsioContext()),
73 _strand(*_io_context),
74 _strand_required(_service->IsStrandRequired()),
78 _socket(*_io_context),
85 _datagrams_received(0),
88 _option_reuse_address(false),
89 _option_reuse_port(false),
90 _option_multicast(false)
92 assert((
service !=
nullptr) &&
"Asio service is invalid!");
94 throw CppCommon::ArgumentException(
"Asio service is invalid!");
99 asio::socket_base::receive_buffer_size option;
100 _socket.get_option(option);
101 return option.value();
106 asio::socket_base::send_buffer_size option;
107 _socket.get_option(option);
108 return option.value();
113 asio::socket_base::receive_buffer_size option((
int)size);
114 _socket.set_option(option);
119 asio::socket_base::send_buffer_size option((
int)size);
120 _socket.set_option(option);
125 assert(!_address.empty() &&
"Server address must not be empty!");
126 if (_address.empty())
128 assert((_port > 0) &&
"Server port number must be valid!");
136 _endpoint = asio::ip::udp::endpoint(asio::ip::make_address(_address), (
unsigned short)_port);
142 _socket.open(_endpoint.protocol());
144 _socket.set_option(asio::ip::udp::socket::reuse_address(
true));
145#if (defined(unix) || defined(__unix) || defined(__unix__) || defined(__APPLE__)) && !defined(__CYGWIN__)
148 typedef asio::detail::socket_option::boolean<SOL_SOCKET, SO_REUSEPORT> reuse_port;
149 _socket.set_option(reuse_port(
true));
153 _socket.bind(_endpoint);
155 _socket.bind(asio::ip::udp::endpoint(_endpoint.protocol(), 0));
165 _datagrams_received = 0;
178 assert((resolver !=
nullptr) &&
"UDP resolver is invalid!");
179 if (resolver ==
nullptr)
181 assert(!_address.empty() &&
"Server address must not be empty!");
182 if (_address.empty())
184 assert((_port > 0) &&
"Server port number must be valid!");
194 auto endpoints = resolver->resolver().resolve(_address, (_scheme.empty() ? std::to_string(_port) : _scheme), ec);
207 _endpoint = endpoints.begin()->endpoint();
213 _socket.open(_endpoint.protocol());
215 _socket.set_option(asio::ip::udp::socket::reuse_address(
true));
216#if (defined(unix) || defined(__unix) || defined(__unix__) || defined(__APPLE__)) && !defined(__CYGWIN__)
219 typedef asio::detail::socket_option::boolean<SOL_SOCKET, SO_REUSEPORT> reuse_port;
220 _socket.set_option(reuse_port(
true));
224 _socket.bind(_endpoint);
226 _socket.bind(asio::ip::udp::endpoint(_endpoint.protocol(), 0));
236 _datagrams_received = 0;
247bool UDPClient::DisconnectInternal()
289 auto self(this->shared_from_this());
290 auto connect_handler = [
this, self]() {
Connect(); };
291 if (_strand_required)
292 asio::post(_strand, connect_handler);
294 asio::post(*_io_context, connect_handler);
301 assert((resolver !=
nullptr) &&
"UDP resolver is invalid!");
302 if (resolver ==
nullptr)
309 auto self(this->shared_from_this());
310 auto connect_handler = [
this, self, resolver]()
317 auto async_resolve_handler = [
this, self](std::error_code ec, asio::ip::udp::resolver::results_type endpoints)
327 _endpoint = endpoints.begin()->endpoint();
333 _socket.open(_endpoint.protocol());
335 _socket.set_option(asio::ip::udp::socket::reuse_address(
true));
336#if (defined(unix) || defined(__unix) || defined(__unix__) || defined(__APPLE__)) && !defined(__CYGWIN__)
339 typedef asio::detail::socket_option::boolean<SOL_SOCKET, SO_REUSEPORT> reuse_port;
340 _socket.set_option(reuse_port(
true));
344 _socket.bind(_endpoint);
346 _socket.bind(asio::ip::udp::endpoint(_endpoint.protocol(), 0));
356 _datagrams_received = 0;
374 if (_strand_required)
375 resolver->resolver().async_resolve(_address, (_scheme.empty() ? std::to_string(_port) : _scheme), bind_executor(_strand, async_resolve_handler));
377 resolver->resolver().async_resolve(_address, (_scheme.empty() ? std::to_string(_port) : _scheme), async_resolve_handler);
379 if (_strand_required)
380 asio::post(_strand, connect_handler);
382 asio::post(*_io_context, connect_handler);
387bool UDPClient::DisconnectInternalAsync(
bool dispatch)
398 auto self(this->shared_from_this());
399 auto disconnect_handler = [
this, self]() { DisconnectInternal(); };
400 if (_strand_required)
403 asio::dispatch(_strand, disconnect_handler);
405 asio::post(_strand, disconnect_handler);
410 asio::dispatch(*_io_context, disconnect_handler);
412 asio::post(*_io_context, disconnect_handler);
424 CppCommon::Thread::Yield();
434 asio::ip::address muticast_address = asio::ip::make_address(
address);
436 asio::ip::multicast::join_group join(muticast_address);
437 _socket.set_option(join);
448 asio::ip::address muticast_address = asio::ip::make_address(
address);
450 asio::ip::multicast::leave_group leave(muticast_address);
451 _socket.set_option(leave);
463 auto self(this->shared_from_this());
465 if (_strand_required)
466 asio::dispatch(_strand, join_multicast_group_handler);
468 asio::dispatch(*_io_context, join_multicast_group_handler);
477 auto self(this->shared_from_this());
479 if (_strand_required)
480 asio::dispatch(_strand, leave_multicast_group_handler);
482 asio::dispatch(*_io_context, leave_multicast_group_handler);
488 return Send(_endpoint, buffer, size);
499 assert((buffer !=
nullptr) &&
"Pointer to the buffer should not be null!");
500 if (buffer ==
nullptr)
506 size_t sent = _socket.send_to(asio::const_buffer(buffer, size),
endpoint, 0, ec);
527size_t UDPClient::Send(
const void* buffer,
size_t size,
const CppCommon::Timespan& timeout)
530 return Send(_endpoint, buffer, size, timeout);
533size_t UDPClient::Send(
const asio::ip::udp::endpoint&
endpoint,
const void* buffer,
size_t size,
const CppCommon::Timespan& timeout)
541 assert((buffer !=
nullptr) &&
"Pointer to the buffer should not be null!");
542 if (buffer ==
nullptr)
547 std::condition_variable cv;
548 asio::error_code error;
549 asio::system_timer timer(_socket.get_executor());
552 auto async_done_handler = [&](asio::error_code ec)
554 std::unique_lock<std::mutex> lck(mtx);
565 timer.expires_after(timeout.chrono());
566 timer.async_wait([&](
const asio::error_code& ec) { async_done_handler(ec ? ec : asio::error::timed_out); });
570 _socket.async_send_to(asio::buffer(buffer, size),
endpoint, [&](std::error_code ec,
size_t write) { async_done_handler(ec); sent = write; });
573 std::unique_lock<std::mutex> lck(mtx);
574 cv.wait(lck, [&]() {
return done == 2; });
588 if (error && (error != asio::error::timed_out))
600 return SendAsync(_endpoint, buffer, size);
614 assert((buffer !=
nullptr) &&
"Pointer to the buffer should not be null!");
615 if (buffer ==
nullptr)
619 if ((size > _send_buffer_limit) && (_send_buffer_limit > 0))
621 SendError(asio::error::no_buffer_space);
626 const uint8_t* bytes = (
const uint8_t*)buffer;
627 _send_buffer.assign(bytes, bytes + size);
630 _bytes_sending = _send_buffer.size();
637 auto self(this->shared_from_this());
638 auto async_send_to_handler =
make_alloc_handler(_send_storage, [
this, self](std::error_code ec,
size_t sent)
649 DisconnectInternalAsync(
true);
661 _send_buffer.clear();
664 onSent(_send_endpoint, sent);
667 if (_strand_required)
668 _socket.async_send_to(asio::buffer(_send_buffer.data(), _send_buffer.size()), _send_endpoint, bind_executor(_strand, async_send_to_handler));
670 _socket.async_send_to(asio::buffer(_send_buffer.data(), _send_buffer.size()), _send_endpoint, async_send_to_handler);
683 assert((buffer !=
nullptr) &&
"Pointer to the buffer should not be null!");
684 if (buffer ==
nullptr)
690 size_t received = _socket.receive_from(asio::buffer(buffer, size),
endpoint, 0, ec);
693 ++_datagrams_received;
694 _bytes_received += received;
711 std::string text(size, 0);
724 assert((buffer !=
nullptr) &&
"Pointer to the buffer should not be null!");
725 if (buffer ==
nullptr)
730 std::condition_variable cv;
731 asio::error_code error;
732 asio::system_timer timer(_socket.get_executor());
735 auto async_done_handler = [&](asio::error_code ec)
737 std::unique_lock<std::mutex> lck(mtx);
748 timer.expires_after(timeout.chrono());
749 timer.async_wait([&](
const asio::error_code& ec) { async_done_handler(ec ? ec : asio::error::timed_out); });
753 _socket.async_receive_from(asio::buffer(buffer, size),
endpoint, [&](std::error_code ec,
size_t read) { async_done_handler(ec); received = read; });
756 std::unique_lock<std::mutex> lck(mtx);
757 cv.wait(lck, [&]() {
return done == 2; });
760 ++_datagrams_received;
761 _bytes_received += received;
767 if (error && (error != asio::error::timed_out))
778 std::string text(size, 0);
789void UDPClient::TryReceive()
799 auto self(this->shared_from_this());
800 auto async_receive_handler =
make_alloc_handler(_receive_storage, [
this, self](std::error_code ec,
size_t size)
811 DisconnectInternalAsync(
true);
816 ++_datagrams_received;
817 _bytes_received += size;
820 onReceived(_receive_endpoint, _receive_buffer.data(), size);
823 if (_receive_buffer.size() == size)
826 if (((2 * size) > _receive_buffer_limit) && (_receive_buffer_limit > 0))
828 SendError(asio::error::no_buffer_space);
829 DisconnectInternalAsync(
true);
833 _receive_buffer.resize(2 * size);
836 if (_strand_required)
837 _socket.async_receive_from(asio::buffer(_receive_buffer.data(), _receive_buffer.size()), _receive_endpoint, bind_executor(_strand, async_receive_handler));
839 _socket.async_receive_from(asio::buffer(_receive_buffer.data(), _receive_buffer.size()), _receive_endpoint, async_receive_handler);
842void UDPClient::ClearBuffers()
845 _send_buffer.clear();
851void UDPClient::SendError(std::error_code ec)
854 if ((ec == asio::error::connection_aborted) ||
855 (ec == asio::error::connection_refused) ||
856 (ec == asio::error::connection_reset) ||
857 (ec == asio::error::eof) ||
858 (ec == asio::error::operation_aborted))
861 onError(ec.value(), ec.category().name(), ec.message());
bool option_reuse_address() const noexcept
Get the option: reuse address.
virtual void LeaveMulticastGroup(const std::string &address)
Leave multicast group with a given address (synchronous).
virtual void onConnecting()
Handle client connecting notification.
virtual void onConnected()
Handle client connected notification.
int port() const noexcept
Get the server port number.
asio::ip::udp::endpoint & endpoint() noexcept
Get the client endpoint.
void SetupReceiveBufferSize(size_t size)
Setup option: receive buffer size.
virtual size_t Receive(asio::ip::udp::endpoint &endpoint, void *buffer, size_t size)
Receive datagram from the given endpoint (synchronous).
bool option_multicast() const noexcept
Get the option: bind the socket to the multicast UDP server.
virtual bool ReconnectAsync()
Reconnect the client (asynchronous).
virtual void onDisconnecting()
Handle client disconnecting notification.
bool IsConnected() const noexcept
Is the client connected?
UDPClient(const std::shared_ptr< Service > &service, const std::string &address, int port)
Initialize UDP client with a given Asio service, server address and port number.
bool option_reuse_port() const noexcept
Get the option: reuse port.
virtual void onDisconnected()
Handle client disconnected notification.
virtual void LeaveMulticastGroupAsync(const std::string &address)
Leave multicast group with a given address (asynchronous).
virtual void onSent(const asio::ip::udp::endpoint &endpoint, size_t sent)
Handle datagram sent notification.
virtual void JoinMulticastGroupAsync(const std::string &address)
Join multicast group with a given address (asynchronous).
virtual void onError(int error, const std::string &category, const std::string &message)
Handle error notification.
virtual void JoinMulticastGroup(const std::string &address)
Join multicast group with a given address (synchronous).
virtual bool Reconnect()
Reconnect the client (synchronous).
virtual void onJoinedMulticastGroup(const std::string &address)
Handle client joined multicast group notification.
virtual void ReceiveAsync()
Receive datagram from the server (asynchronous).
virtual bool Connect()
Connect the client (synchronous).
size_t option_send_buffer_size() const
Get the option: send buffer size.
virtual void onReceived(const asio::ip::udp::endpoint &endpoint, const void *buffer, size_t size)
Handle datagram received notification.
virtual bool SendAsync(const void *buffer, size_t size)
Send datagram to the connected server (asynchronous).
void SetupSendBufferSize(size_t size)
Setup option: send buffer size.
virtual bool DisconnectAsync()
Disconnect the client (asynchronous).
virtual void onLeftMulticastGroup(const std::string &address)
Handle client left multicast group notification.
virtual size_t Send(const void *buffer, size_t size)
Send datagram to the connected server (synchronous).
std::shared_ptr< Service > & service() noexcept
Get the Asio service.
const std::string & address() const noexcept
Get the server address.
virtual bool Disconnect()
Disconnect the client (synchronous).
size_t option_receive_buffer_size() const
Get the option: receive buffer size.
const std::string & scheme() const noexcept
Get the scheme name.
virtual bool ConnectAsync()
Connect the client (asynchronous).
AllocateHandler< THandler > make_alloc_handler(HandlerStorage &storage, THandler handler)
Helper function to wrap a handler object to add custom allocation.
C++ Server project definitions.