15 : _id(CppCommon::UUID::Sequential()),
17 _io_service(_service->GetAsioService()),
18 _strand(*_io_service),
19 _strand_required(_service->IsStrandRequired()),
22 _socket(*_io_service),
29 _datagrams_received(0),
32 _option_reuse_address(false),
33 _option_reuse_port(false),
34 _option_multicast(false)
36 assert((
service !=
nullptr) &&
"Asio service is invalid!");
38 throw CppCommon::ArgumentException(
"Asio service is invalid!");
41 UDPClient::UDPClient(
const std::shared_ptr<Service>& service,
const std::string& address,
const std::string& scheme)
42 : _id(CppCommon::UUID::Sequential()),
44 _io_service(_service->GetAsioService()),
45 _strand(*_io_service),
46 _strand_required(_service->IsStrandRequired()),
50 _socket(*_io_service),
57 _datagrams_received(0),
60 _option_reuse_address(false),
61 _option_reuse_port(false),
62 _option_multicast(false)
64 assert((
service !=
nullptr) &&
"Asio service is invalid!");
66 throw CppCommon::ArgumentException(
"Asio service is invalid!");
70 : _id(CppCommon::UUID::Sequential()),
72 _io_service(_service->GetAsioService()),
73 _strand(*_io_service),
74 _strand_required(_service->IsStrandRequired()),
75 _address(endpoint.address().to_string()),
76 _port(endpoint.port()),
78 _socket(*_io_service),
85 _datagrams_received(0),
88 _option_reuse_address(false),
89 _option_reuse_port(false),
90 _option_multicast(false)
92 assert((
service !=
nullptr) &&
"Asio service is invalid!");
94 throw CppCommon::ArgumentException(
"Asio service is invalid!");
99 asio::socket_base::receive_buffer_size option;
100 _socket.get_option(option);
101 return option.value();
106 asio::socket_base::send_buffer_size option;
107 _socket.get_option(option);
108 return option.value();
113 asio::socket_base::receive_buffer_size option((
int)size);
114 _socket.set_option(option);
119 asio::socket_base::send_buffer_size option((
int)size);
120 _socket.set_option(option);
125 assert(!_address.empty() &&
"Server address must not be empty!");
126 if (_address.empty())
128 assert((_port > 0) &&
"Server port number must be valid!");
136 _endpoint = asio::ip::udp::endpoint(asio::ip::make_address(_address), (
unsigned short)_port);
139 _socket.open(_endpoint.protocol());
141 _socket.set_option(asio::ip::udp::socket::reuse_address(
true));
142 #if (defined(unix) || defined(__unix) || defined(__unix__) || defined(__APPLE__)) && !defined(__CYGWIN__)
145 typedef asio::detail::socket_option::boolean<SOL_SOCKET, SO_REUSEPORT> reuse_port;
146 _socket.set_option(reuse_port(
true));
150 _socket.bind(_endpoint);
152 _socket.bind(asio::ip::udp::endpoint(_endpoint.protocol(), 0));
162 _datagrams_received = 0;
175 assert((resolver !=
nullptr) &&
"UDP resolver is invalid!");
176 if (resolver ==
nullptr)
178 assert(!_address.empty() &&
"Server address must not be empty!");
179 if (_address.empty())
181 assert((_port > 0) &&
"Server port number must be valid!");
191 asio::ip::udp::resolver::query query(_address, (_scheme.empty() ? std::to_string(_port) : _scheme));
192 auto endpoints = resolver->resolver().resolve(query, ec);
205 _endpoint = *endpoints;
208 _socket.open(_endpoint.protocol());
210 _socket.set_option(asio::ip::udp::socket::reuse_address(
true));
211 #if (defined(unix) || defined(__unix) || defined(__unix__) || defined(__APPLE__)) && !defined(__CYGWIN__)
214 typedef asio::detail::socket_option::boolean<SOL_SOCKET, SO_REUSEPORT> reuse_port;
215 _socket.set_option(reuse_port(
true));
219 _socket.bind(_endpoint);
221 _socket.bind(asio::ip::udp::endpoint(_endpoint.protocol(), 0));
231 _datagrams_received = 0;
242 bool UDPClient::DisconnectInternal()
281 auto self(this->shared_from_this());
282 auto connect_handler = [
this,
self]() {
Connect(); };
283 if (_strand_required)
284 _strand.post(connect_handler);
286 _io_service->post(connect_handler);
293 assert((resolver !=
nullptr) &&
"UDP resolver is invalid!");
294 if (resolver ==
nullptr)
301 auto self(this->shared_from_this());
302 auto connect_handler = [
this,
self, resolver]()
309 auto async_resolve_handler = [
this,
self](std::error_code ec, asio::ip::udp::resolver::results_type endpoints)
319 _endpoint = *endpoints;
322 _socket.open(_endpoint.protocol());
324 _socket.set_option(asio::ip::udp::socket::reuse_address(
true));
325 #if (defined(unix) || defined(__unix) || defined(__unix__) || defined(__APPLE__)) && !defined(__CYGWIN__)
328 typedef asio::detail::socket_option::boolean<SOL_SOCKET, SO_REUSEPORT> reuse_port;
329 _socket.set_option(reuse_port(
true));
333 _socket.bind(_endpoint);
335 _socket.bind(asio::ip::udp::endpoint(_endpoint.protocol(), 0));
345 _datagrams_received = 0;
363 asio::ip::udp::resolver::query query(_address, (_scheme.empty() ? std::to_string(_port) : _scheme));
364 if (_strand_required)
365 resolver->resolver().async_resolve(query, bind_executor(_strand, async_resolve_handler));
367 resolver->resolver().async_resolve(query, async_resolve_handler);
369 if (_strand_required)
370 _strand.post(connect_handler);
372 _io_service->post(connect_handler);
377 bool UDPClient::DisconnectInternalAsync(
bool dispatch)
388 auto self(this->shared_from_this());
389 auto disconnect_handler = [
this,
self]() { DisconnectInternal(); };
390 if (_strand_required)
393 _strand.dispatch(disconnect_handler);
395 _strand.post(disconnect_handler);
400 _io_service->dispatch(disconnect_handler);
402 _io_service->post(disconnect_handler);
414 CppCommon::Thread::Yield();
424 asio::ip::address muticast_address = asio::ip::make_address(
address);
426 asio::ip::multicast::join_group join(muticast_address);
427 _socket.set_option(join);
438 asio::ip::address muticast_address = asio::ip::make_address(
address);
440 asio::ip::multicast::leave_group leave(muticast_address);
441 _socket.set_option(leave);
453 auto self(this->shared_from_this());
455 if (_strand_required)
456 _strand.dispatch(join_multicast_group_handler);
458 _io_service->dispatch(join_multicast_group_handler);
467 auto self(this->shared_from_this());
469 if (_strand_required)
470 _strand.dispatch(leave_multicast_group_handler);
472 _io_service->dispatch(leave_multicast_group_handler);
478 return Send(_endpoint, buffer, size);
481 size_t UDPClient::Send(
const asio::ip::udp::endpoint& endpoint,
const void* buffer,
size_t size)
489 assert((buffer !=
nullptr) &&
"Pointer to the buffer should not be null!");
490 if (buffer ==
nullptr)
496 size_t sent = _socket.send_to(asio::const_buffer(buffer, size),
endpoint, 0, ec);
517 size_t UDPClient::Send(
const void* buffer,
size_t size,
const CppCommon::Timespan& timeout)
520 return Send(_endpoint, buffer, size, timeout);
523 size_t UDPClient::Send(
const asio::ip::udp::endpoint& endpoint,
const void* buffer,
size_t size,
const CppCommon::Timespan& timeout)
531 assert((buffer !=
nullptr) &&
"Pointer to the buffer should not be null!");
532 if (buffer ==
nullptr)
537 std::condition_variable cv;
538 asio::error_code error;
539 asio::system_timer timer(_socket.get_executor());
542 auto async_done_handler = [&](asio::error_code ec)
544 std::unique_lock<std::mutex> lck(mtx);
555 timer.expires_from_now(timeout.chrono());
556 timer.async_wait([&](
const asio::error_code& ec) { async_done_handler(ec ? ec : asio::error::timed_out); });
560 _socket.async_send_to(asio::buffer(buffer, size),
endpoint, [&](std::error_code ec,
size_t write) { async_done_handler(ec); sent = write; });
563 std::unique_lock<std::mutex> lck(mtx);
564 cv.wait(lck, [&]() {
return done == 2; });
578 if (error && (error != asio::error::timed_out))
590 return SendAsync(_endpoint, buffer, size);
604 assert((buffer !=
nullptr) &&
"Pointer to the buffer should not be null!");
605 if (buffer ==
nullptr)
609 if ((size > _send_buffer_limit) && (_send_buffer_limit > 0))
611 SendError(asio::error::no_buffer_space);
616 const uint8_t* bytes = (
const uint8_t*)buffer;
617 _send_buffer.assign(bytes, bytes + size);
620 _bytes_sending = _send_buffer.size();
627 auto self(this->shared_from_this());
628 auto async_send_to_handler =
make_alloc_handler(_send_storage, [
this,
self](std::error_code ec,
size_t sent)
639 DisconnectInternalAsync(
true);
651 _send_buffer.clear();
654 onSent(_send_endpoint, sent);
657 if (_strand_required)
658 _socket.async_send_to(asio::buffer(_send_buffer.data(), _send_buffer.size()), _send_endpoint, bind_executor(_strand, async_send_to_handler));
660 _socket.async_send_to(asio::buffer(_send_buffer.data(), _send_buffer.size()), _send_endpoint, async_send_to_handler);
673 assert((buffer !=
nullptr) &&
"Pointer to the buffer should not be null!");
674 if (buffer ==
nullptr)
680 size_t received = _socket.receive_from(asio::buffer(buffer, size),
endpoint, 0, ec);
683 ++_datagrams_received;
684 _bytes_received += received;
701 std::string text(size, 0);
706 size_t UDPClient::Receive(asio::ip::udp::endpoint& endpoint,
void* buffer,
size_t size,
const CppCommon::Timespan& timeout)
714 assert((buffer !=
nullptr) &&
"Pointer to the buffer should not be null!");
715 if (buffer ==
nullptr)
720 std::condition_variable cv;
721 asio::error_code error;
722 asio::system_timer timer(_socket.get_executor());
725 auto async_done_handler = [&](asio::error_code ec)
727 std::unique_lock<std::mutex> lck(mtx);
738 timer.expires_from_now(timeout.chrono());
739 timer.async_wait([&](
const asio::error_code& ec) { async_done_handler(ec ? ec : asio::error::timed_out); });
743 _socket.async_receive_from(asio::buffer(buffer, size),
endpoint, [&](std::error_code ec,
size_t read) { async_done_handler(ec); received = read; });
746 std::unique_lock<std::mutex> lck(mtx);
747 cv.wait(lck, [&]() {
return done == 2; });
750 ++_datagrams_received;
751 _bytes_received += received;
757 if (error && (error != asio::error::timed_out))
766 std::string
UDPClient::Receive(asio::ip::udp::endpoint& endpoint,
size_t size,
const CppCommon::Timespan& timeout)
768 std::string text(size, 0);
779 void UDPClient::TryReceive()
789 auto self(this->shared_from_this());
790 auto async_receive_handler =
make_alloc_handler(_receive_storage, [
this,
self](std::error_code ec,
size_t size)
801 DisconnectInternalAsync(
true);
806 ++_datagrams_received;
807 _bytes_received += size;
810 onReceived(_receive_endpoint, _receive_buffer.data(), size);
813 if (_receive_buffer.size() == size)
816 if (((2 * size) > _receive_buffer_limit) && (_receive_buffer_limit > 0))
818 SendError(asio::error::no_buffer_space);
819 DisconnectInternalAsync(
true);
823 _receive_buffer.resize(2 * size);
826 if (_strand_required)
827 _socket.async_receive_from(asio::buffer(_receive_buffer.data(), _receive_buffer.size()), _receive_endpoint, bind_executor(_strand, async_receive_handler));
829 _socket.async_receive_from(asio::buffer(_receive_buffer.data(), _receive_buffer.size()), _receive_endpoint, async_receive_handler);
832 void UDPClient::ClearBuffers()
835 _send_buffer.clear();
841 void UDPClient::SendError(std::error_code ec)
844 if ((ec == asio::error::connection_aborted) ||
845 (ec == asio::error::connection_refused) ||
846 (ec == asio::error::connection_reset) ||
847 (ec == asio::error::eof) ||
848 (ec == asio::error::operation_aborted))
851 onError(ec.value(), ec.category().name(), ec.message());
bool option_reuse_address() const noexcept
Get the option: reuse address.
virtual void LeaveMulticastGroup(const std::string &address)
Leave multicast group with a given address (synchronous)
virtual void onConnected()
Handle client connected notification.
void SetupReceiveBufferSize(size_t size)
Setup option: receive buffer size.
virtual size_t Receive(asio::ip::udp::endpoint &endpoint, void *buffer, size_t size)
Receive datagram from the given endpoint (synchronous)
bool option_multicast() const noexcept
Get the option: bind the socket to the multicast UDP server.
virtual bool ReconnectAsync()
Reconnect the client (asynchronous)
bool IsConnected() const noexcept
Is the client connected?
UDPClient(const std::shared_ptr< Service > &service, const std::string &address, int port)
Initialize UDP client with a given Asio service, server address and port number.
const std::string & address() const noexcept
Get the server address.
bool option_reuse_port() const noexcept
Get the option: reuse port.
virtual void onDisconnected()
Handle client disconnected notification.
virtual void LeaveMulticastGroupAsync(const std::string &address)
Leave multicast group with a given address (asynchronous)
virtual void onSent(const asio::ip::udp::endpoint &endpoint, size_t sent)
Handle datagram sent notification.
virtual void JoinMulticastGroupAsync(const std::string &address)
Join multicast group with a given address (asynchronous)
virtual void onError(int error, const std::string &category, const std::string &message)
Handle error notification.
virtual void JoinMulticastGroup(const std::string &address)
Join multicast group with a given address (synchronous)
virtual bool Reconnect()
Reconnect the client (synchronous)
virtual void onJoinedMulticastGroup(const std::string &address)
Handle client joined multicast group notification.
std::shared_ptr< Service > & service() noexcept
Get the Asio service.
virtual void ReceiveAsync()
Receive datagram from the server (asynchronous)
virtual bool Connect()
Connect the client (synchronous)
size_t option_send_buffer_size() const
Get the option: send buffer size.
virtual void onReceived(const asio::ip::udp::endpoint &endpoint, const void *buffer, size_t size)
Handle datagram received notification.
virtual bool SendAsync(const void *buffer, size_t size)
Send datagram to the connected server (asynchronous)
void SetupSendBufferSize(size_t size)
Setup option: send buffer size.
virtual bool DisconnectAsync()
Disconnect the client (asynchronous)
virtual void onLeftMulticastGroup(const std::string &address)
Handle client left multicast group notification.
virtual size_t Send(const void *buffer, size_t size)
Send datagram to the connected server (synchronous)
virtual bool Disconnect()
Disconnect the client (synchronous)
size_t option_receive_buffer_size() const
Get the option: receive buffer size.
asio::ip::udp::endpoint & endpoint() noexcept
Get the client endpoint.
virtual bool ConnectAsync()
Connect the client (asynchronous)
AllocateHandler< THandler > make_alloc_handler(HandlerStorage &storage, THandler handler)
Helper function to wrap a handler object to add custom allocation.
C++ Server project definitions.