15 : _id(CppCommon::UUID::Sequential()),
17 _io_context(_service->GetAsioContext()),
18 _strand(*_io_context),
19 _strand_required(_service->IsStrandRequired()),
22 _socket(*_io_context),
32 _send_buffer_flush_offset(0),
33 _option_keep_alive(false),
34 _option_no_delay(false)
36 assert((
service !=
nullptr) &&
"Asio service is invalid!");
38 throw CppCommon::ArgumentException(
"Asio service is invalid!");
42 : _id(CppCommon::UUID::Sequential()),
44 _io_context(_service->GetAsioContext()),
45 _strand(*_io_context),
46 _strand_required(_service->IsStrandRequired()),
50 _socket(*_io_context),
60 _send_buffer_flush_offset(0),
61 _option_keep_alive(false),
62 _option_no_delay(false)
64 assert((
service !=
nullptr) &&
"Asio service is invalid!");
66 throw CppCommon::ArgumentException(
"Asio service is invalid!");
70 : _id(CppCommon::UUID::Sequential()),
72 _io_context(_service->GetAsioContext()),
73 _strand(*_io_context),
74 _strand_required(_service->IsStrandRequired()),
78 _socket(*_io_context),
88 _send_buffer_flush_offset(0),
89 _option_keep_alive(false),
90 _option_no_delay(false)
92 assert((
service !=
nullptr) &&
"Asio service is invalid!");
94 throw CppCommon::ArgumentException(
"Asio service is invalid!");
99 asio::socket_base::receive_buffer_size option;
100 _socket.get_option(option);
101 return option.value();
106 asio::socket_base::send_buffer_size option;
107 _socket.get_option(option);
108 return option.value();
113 asio::socket_base::receive_buffer_size option((
int)size);
114 _socket.set_option(option);
119 asio::socket_base::send_buffer_size option((
int)size);
120 _socket.set_option(option);
131 _endpoint = asio::ip::tcp::endpoint(asio::ip::make_address(_address), (
unsigned short)_port);
140 _socket.connect(_endpoint, ec);
158 _socket.set_option(asio::ip::tcp::socket::keep_alive(
true));
161 _socket.set_option(asio::ip::tcp::no_delay(
true));
181 if (_send_buffer_main.empty())
195 auto endpoints = resolver->resolver().resolve(_address, (_scheme.empty() ? std::to_string(_port) : _scheme), ec);
215 _endpoint = asio::connect(_socket, endpoints, ec);
233 _socket.set_option(asio::ip::tcp::socket::keep_alive(
true));
236 _socket.set_option(asio::ip::tcp::no_delay(
true));
256 if (_send_buffer_main.empty())
262bool TCPClient::DisconnectInternal()
305 auto self(this->shared_from_this());
306 auto connect_handler = [
this, self]()
318 auto async_connect_handler = [
this, self](std::error_code ec)
330 _socket.set_option(asio::ip::tcp::socket::keep_alive(
true));
333 _socket.set_option(asio::ip::tcp::no_delay(
true));
356 if (_send_buffer_main.empty())
369 _endpoint = asio::ip::tcp::endpoint(asio::ip::make_address(_address), (
unsigned short)_port);
371 if (_strand_required)
372 _socket.async_connect(_endpoint, bind_executor(_strand, async_connect_handler));
374 _socket.async_connect(_endpoint, async_connect_handler);
376 if (_strand_required)
377 asio::post(_strand, connect_handler);
379 asio::post(*_io_context, connect_handler);
390 auto self(this->shared_from_this());
391 auto connect_handler = [
this, self, resolver]()
398 auto async_resolve_handler = [
this, self](std::error_code ec1, asio::ip::tcp::resolver::results_type endpoints)
414 auto async_connect_handler = [
this, self](std::error_code ec2,
const asio::ip::tcp::endpoint&
endpoint)
429 _socket.set_option(asio::ip::tcp::socket::keep_alive(
true));
432 _socket.set_option(asio::ip::tcp::no_delay(
true));
455 if (_send_buffer_main.empty())
466 if (_strand_required)
467 asio::async_connect(_socket, endpoints, bind_executor(_strand, async_connect_handler));
469 asio::async_connect(_socket, endpoints, async_connect_handler);
481 if (_strand_required)
482 resolver->resolver().async_resolve(_address, (_scheme.empty() ? std::to_string(_port) : _scheme), bind_executor(_strand, async_resolve_handler));
484 resolver->resolver().async_resolve(_address, (_scheme.empty() ? std::to_string(_port) : _scheme), async_resolve_handler);
486 if (_strand_required)
487 asio::post(_strand, connect_handler);
489 asio::post(*_io_context, connect_handler);
494bool TCPClient::DisconnectInternalAsync(
bool dispatch)
505 auto self(this->shared_from_this());
506 auto disconnect_handler = [
this, self]() { DisconnectInternal(); };
507 if (_strand_required)
510 asio::dispatch(_strand, disconnect_handler);
512 asio::post(_strand, disconnect_handler);
517 asio::dispatch(*_io_context, disconnect_handler);
519 asio::post(*_io_context, disconnect_handler);
531 CppCommon::Thread::Yield();
544 assert((buffer !=
nullptr) &&
"Pointer to the buffer should not be null!");
545 if (buffer ==
nullptr)
551 size_t sent = asio::write(_socket, asio::buffer(buffer, size), ec);
571size_t TCPClient::Send(
const void* buffer,
size_t size,
const CppCommon::Timespan& timeout)
579 assert((buffer !=
nullptr) &&
"Pointer to the buffer should not be null!");
580 if (buffer ==
nullptr)
585 std::condition_variable cv;
586 asio::error_code error;
587 asio::system_timer timer(_socket.get_executor());
590 auto async_done_handler = [&](asio::error_code ec)
592 std::unique_lock<std::mutex> lck(mtx);
603 timer.expires_after(timeout.chrono());
604 timer.async_wait([&](
const asio::error_code& ec) { async_done_handler(ec ? ec : asio::error::timed_out); });
608 _socket.async_write_some(asio::buffer(buffer, size), [&](std::error_code ec,
size_t write) { async_done_handler(ec); sent = write; });
611 std::unique_lock<std::mutex> lck(mtx);
612 cv.wait(lck, [&]() {
return done == 2; });
625 if (error && (error != asio::error::timed_out))
642 assert((buffer !=
nullptr) &&
"Pointer to the buffer should not be null!");
643 if (buffer ==
nullptr)
647 std::scoped_lock locker(_send_lock);
650 bool send_required = _send_buffer_main.empty() || _send_buffer_flush.empty();
653 if (((_send_buffer_main.size() + size) > _send_buffer_limit) && (_send_buffer_limit > 0))
655 SendError(asio::error::no_buffer_space);
660 const uint8_t* bytes = (
const uint8_t*)buffer;
661 _send_buffer_main.insert(_send_buffer_main.end(), bytes, bytes + size);
664 _bytes_pending = _send_buffer_main.size();
672 auto self(this->shared_from_this());
673 auto send_handler = [
this, self]()
678 if (_strand_required)
679 asio::dispatch(_strand, send_handler);
681 asio::dispatch(*_io_context, send_handler);
694 assert((buffer !=
nullptr) &&
"Pointer to the buffer should not be null!");
695 if (buffer ==
nullptr)
701 size_t received = _socket.read_some(asio::buffer(buffer, size), ec);
705 _bytes_received += received;
723 std::string text(size, 0);
724 text.resize(
Receive(text.data(), text.size()));
736 assert((buffer !=
nullptr) &&
"Pointer to the buffer should not be null!");
737 if (buffer ==
nullptr)
742 std::condition_variable cv;
743 asio::error_code error;
744 asio::system_timer timer(_socket.get_executor());
747 auto async_done_handler = [&](asio::error_code ec)
749 std::unique_lock<std::mutex> lck(mtx);
760 timer.expires_after(timeout.chrono());
761 timer.async_wait([&](
const asio::error_code& ec) { async_done_handler(ec ? ec : asio::error::timed_out); });
765 _socket.async_read_some(asio::buffer(buffer, size), [&](std::error_code ec,
size_t read) { async_done_handler(ec); received = read; });
768 std::unique_lock<std::mutex> lck(mtx);
769 cv.wait(lck, [&]() {
return done == 2; });
775 _bytes_received += received;
782 if (error && (error != asio::error::timed_out))
793 std::string text(size, 0);
794 text.resize(
Receive(text.data(), text.size(), timeout));
804void TCPClient::TryReceive()
814 auto self(this->shared_from_this());
815 auto async_receive_handler =
make_alloc_handler(_receive_storage, [
this, self](std::error_code ec,
size_t size)
826 _bytes_received += size;
832 if (_receive_buffer.size() == size)
835 if (((2 * size) > _receive_buffer_limit) && (_receive_buffer_limit > 0))
837 SendError(asio::error::no_buffer_space);
838 DisconnectInternalAsync(
true);
842 _receive_buffer.resize(2 * size);
852 DisconnectInternalAsync(
true);
855 if (_strand_required)
856 _socket.async_read_some(asio::buffer(_receive_buffer.data(), _receive_buffer.size()), bind_executor(_strand, async_receive_handler));
858 _socket.async_read_some(asio::buffer(_receive_buffer.data(), _receive_buffer.size()), async_receive_handler);
861void TCPClient::TrySend()
870 if (_send_buffer_flush.empty())
872 std::scoped_lock locker(_send_lock);
875 _send_buffer_flush.swap(_send_buffer_main);
876 _send_buffer_flush_offset = 0;
880 _bytes_sending += _send_buffer_flush.size();
884 if (_send_buffer_flush.empty())
893 auto self(this->shared_from_this());
894 auto async_write_handler =
make_alloc_handler(_send_storage, [
this, self](std::error_code ec,
size_t size)
905 _bytes_sending -= size;
909 _send_buffer_flush_offset += size;
912 if (_send_buffer_flush_offset == _send_buffer_flush.size())
915 _send_buffer_flush.clear();
916 _send_buffer_flush_offset = 0;
929 DisconnectInternalAsync(
true);
932 if (_strand_required)
933 _socket.async_write_some(asio::buffer(_send_buffer_flush.data() + _send_buffer_flush_offset, _send_buffer_flush.size() - _send_buffer_flush_offset), bind_executor(_strand, async_write_handler));
935 _socket.async_write_some(asio::buffer(_send_buffer_flush.data() + _send_buffer_flush_offset, _send_buffer_flush.size() - _send_buffer_flush_offset), async_write_handler);
938void TCPClient::ClearBuffers()
941 std::scoped_lock locker(_send_lock);
944 _send_buffer_main.clear();
945 _send_buffer_flush.clear();
946 _send_buffer_flush_offset = 0;
954void TCPClient::SendError(std::error_code ec)
957 if ((ec == asio::error::connection_aborted) ||
958 (ec == asio::error::connection_refused) ||
959 (ec == asio::error::connection_reset) ||
960 (ec == asio::error::eof) ||
961 (ec == asio::error::operation_aborted))
964 onError(ec.value(), ec.category().name(), ec.message());
size_t option_send_buffer_size() const
Get the option: send buffer size.
virtual size_t Receive(void *buffer, size_t size)
Receive data from the server (synchronous).
bool IsConnected() const noexcept
Is the client connected?
virtual bool Connect()
Connect the client (synchronous).
const std::string & address() const noexcept
Get the server address.
virtual size_t Send(const void *buffer, size_t size)
Send data to the server (synchronous).
TCPClient(const std::shared_ptr< Service > &service, const std::string &address, int port)
Initialize TCP client with a given Asio service, server address and port number.
virtual bool Disconnect()
Disconnect the client (synchronous).
virtual bool SendAsync(const void *buffer, size_t size)
Send data to the server (asynchronous).
virtual void onSent(size_t sent, size_t pending)
Handle buffer sent notification.
virtual bool ReconnectAsync()
Reconnect the client (asynchronous).
virtual void onReceived(const void *buffer, size_t size)
Handle buffer received notification.
virtual void onDisconnected()
Handle client disconnected notification.
virtual void onConnected()
Handle client connected notification.
virtual void onDisconnecting()
Handle client disconnecting notification.
void SetupReceiveBufferSize(size_t size)
Setup option: receive buffer size.
int port() const noexcept
Get the server port number.
virtual void onEmpty()
Handle empty send buffer notification.
virtual void onError(int error, const std::string &category, const std::string &message)
Handle error notification.
std::shared_ptr< Service > & service() noexcept
Get the Asio service.
uint64_t bytes_pending() const noexcept
Get the number of bytes pending sent by the client.
virtual bool ConnectAsync()
Connect the client (asynchronous).
const std::string & scheme() const noexcept
Get the scheme name.
bool option_no_delay() const noexcept
Get the option: no delay.
asio::ip::tcp::endpoint & endpoint() noexcept
Get the client endpoint.
virtual bool DisconnectAsync()
Disconnect the client (asynchronous).
bool option_keep_alive() const noexcept
Get the option: keep alive.
virtual void ReceiveAsync()
Receive data from the server (asynchronous).
void SetupSendBufferSize(size_t size)
Setup option: send buffer size.
size_t option_receive_buffer_size() const
Get the option: receive buffer size.
virtual void onConnecting()
Handle client connecting notification.
virtual bool Reconnect()
Reconnect the client (synchronous).
AllocateHandler< THandler > make_alloc_handler(HandlerStorage &storage, THandler handler)
Helper function to wrap a handler object to add custom allocation.
C++ Server project definitions.