17 _io_service(_service->GetAsioService()),
18 _strand(*_io_service),
19 _strand_required(_service->IsStrandRequired()),
22 _socket(*_io_service),
32 _send_buffer_flush_offset(0),
33 _option_keep_alive(
false),
34 _option_no_delay(
false)
38 throw CppCommon::ArgumentException(
"Asio service is invalid!");
41TCPClient::TCPClient(
const std::shared_ptr<Service>& service,
const std::string& address,
const std::string& scheme)
44 _io_service(_service->GetAsioService()),
45 _strand(*_io_service),
46 _strand_required(_service->IsStrandRequired()),
50 _socket(*_io_service),
60 _send_buffer_flush_offset(0),
61 _option_keep_alive(
false),
62 _option_no_delay(
false)
66 throw CppCommon::ArgumentException(
"Asio service is invalid!");
72 _io_service(_service->GetAsioService()),
73 _strand(*_io_service),
74 _strand_required(_service->IsStrandRequired()),
76 _port(endpoint.port()),
78 _socket(*_io_service),
88 _send_buffer_flush_offset(0),
89 _option_keep_alive(
false),
90 _option_no_delay(
false)
94 throw CppCommon::ArgumentException(
"Asio service is invalid!");
99 asio::socket_base::receive_buffer_size
option;
100 _socket.get_option(
option);
106 asio::socket_base::send_buffer_size
option;
107 _socket.get_option(
option);
113 asio::socket_base::receive_buffer_size
option((
int)
size);
114 _socket.set_option(
option);
119 asio::socket_base::send_buffer_size
option((
int)
size);
120 _socket.set_option(
option);
131 _endpoint = asio::ip::tcp::endpoint(asio::ip::make_address(_address), (
unsigned short)_port);
134 _socket.connect(_endpoint,
ec);
149 _socket.set_option(asio::ip::tcp::socket::keep_alive(
true));
152 _socket.set_option(asio::ip::tcp::no_delay(
true));
172 if (_send_buffer_main.empty())
186 asio::ip::tcp::resolver::query
query(_address, (_scheme.empty() ? std::to_string(_port) : _scheme));
216 _socket.set_option(asio::ip::tcp::socket::keep_alive(
true));
219 _socket.set_option(asio::ip::tcp::no_delay(
true));
239 if (_send_buffer_main.empty())
245bool TCPClient::DisconnectInternal()
304 _socket.set_option(asio::ip::tcp::socket::keep_alive(
true));
307 _socket.set_option(asio::ip::tcp::no_delay(
true));
330 if (_send_buffer_main.empty())
343 _endpoint = asio::ip::tcp::endpoint(asio::ip::make_address(_address), (
unsigned short)_port);
345 if (_strand_required)
350 if (_strand_required)
397 _socket.set_option(asio::ip::tcp::socket::keep_alive(
true));
400 _socket.set_option(asio::ip::tcp::no_delay(
true));
423 if (_send_buffer_main.empty())
434 if (_strand_required)
449 asio::ip::tcp::resolver::query
query(_address, (_scheme.empty() ? std::to_string(_port) : _scheme));
450 if (_strand_required)
455 if (_strand_required)
463bool TCPClient::DisconnectInternalAsync(
bool dispatch)
476 if (_strand_required)
479 _strand.dispatch(disconnect_handler);
481 _strand.post(disconnect_handler);
486 _io_service->dispatch(disconnect_handler);
488 _io_service->post(disconnect_handler);
500 CppCommon::Thread::Yield();
513 assert((
buffer !=
nullptr) &&
"Pointer to the buffer should not be null!");
548 assert((
buffer !=
nullptr) &&
"Pointer to the buffer should not be null!");
554 std::condition_variable
cv;
555 asio::error_code error;
556 asio::system_timer
timer(_socket.get_executor());
561 std::unique_lock<std::mutex>
lck(
mtx);
572 timer.expires_from_now(timeout.chrono());
580 std::unique_lock<std::mutex>
lck(
mtx);
581 cv.wait(
lck, [&]() {
return done == 2; });
594 if (error && (error != asio::error::timed_out))
611 assert((
buffer !=
nullptr) &&
"Pointer to the buffer should not be null!");
616 std::scoped_lock
locker(_send_lock);
619 bool send_required = _send_buffer_main.empty() || _send_buffer_flush.empty();
622 if (((_send_buffer_main.size() +
size) > _send_buffer_limit) && (_send_buffer_limit > 0))
624 SendError(asio::error::no_buffer_space);
630 _send_buffer_main.insert(_send_buffer_main.end(),
bytes,
bytes +
size);
633 _bytes_pending = _send_buffer_main.size();
647 if (_strand_required)
663 assert((
buffer !=
nullptr) &&
"Pointer to the buffer should not be null!");
705 assert((
buffer !=
nullptr) &&
"Pointer to the buffer should not be null!");
711 std::condition_variable
cv;
712 asio::error_code error;
713 asio::system_timer
timer(_socket.get_executor());
718 std::unique_lock<std::mutex>
lck(
mtx);
729 timer.expires_from_now(timeout.chrono());
737 std::unique_lock<std::mutex>
lck(
mtx);
738 cv.wait(
lck, [&]() {
return done == 2; });
751 if (error && (error != asio::error::timed_out))
773void TCPClient::TryReceive()
795 _bytes_received +=
size;
801 if (_receive_buffer.size() ==
size)
804 if (((2 *
size) > _receive_buffer_limit) && (_receive_buffer_limit > 0))
806 SendError(asio::error::no_buffer_space);
807 DisconnectInternalAsync(
true);
811 _receive_buffer.resize(2 * size);
821 DisconnectInternalAsync(
true);
824 if (_strand_required)
825 _socket.async_read_some(asio::buffer(_receive_buffer.data(), _receive_buffer.size()), bind_executor(_strand, async_receive_handler));
827 _socket.async_read_some(asio::buffer(_receive_buffer.data(), _receive_buffer.size()), async_receive_handler);
830void TCPClient::TrySend()
839 if (_send_buffer_flush.empty())
841 std::scoped_lock locker(_send_lock);
844 _send_buffer_flush.swap(_send_buffer_main);
845 _send_buffer_flush_offset = 0;
849 _bytes_sending += _send_buffer_flush.size();
853 if (_send_buffer_flush.empty())
862 auto self(this->shared_from_this());
863 auto async_write_handler =
make_alloc_handler(_send_storage, [
this, self](std::error_code ec,
size_t size)
874 _bytes_sending -= size;
878 _send_buffer_flush_offset += size;
881 if (_send_buffer_flush_offset == _send_buffer_flush.size())
884 _send_buffer_flush.clear();
885 _send_buffer_flush_offset = 0;
898 DisconnectInternalAsync(
true);
901 if (_strand_required)
902 _socket.async_write_some(asio::buffer(_send_buffer_flush.data() + _send_buffer_flush_offset, _send_buffer_flush.size() - _send_buffer_flush_offset), bind_executor(_strand, async_write_handler));
904 _socket.async_write_some(asio::buffer(_send_buffer_flush.data() + _send_buffer_flush_offset, _send_buffer_flush.size() - _send_buffer_flush_offset), async_write_handler);
907void TCPClient::ClearBuffers()
910 std::scoped_lock locker(_send_lock);
913 _send_buffer_main.clear();
914 _send_buffer_flush.clear();
915 _send_buffer_flush_offset = 0;
923void TCPClient::SendError(std::error_code ec)
926 if ((ec == asio::error::connection_aborted) ||
927 (ec == asio::error::connection_refused) ||
928 (ec == asio::error::connection_reset) ||
929 (ec == asio::error::eof) ||
930 (ec == asio::error::operation_aborted))
933 onError(ec.value(), ec.category().name(), ec.message());
Asio allocate handler wrapper.
size_t option_send_buffer_size() const
Get the option: send buffer size.
virtual size_t Receive(void *buffer, size_t size)
Receive data from the server (synchronous)
bool IsConnected() const noexcept
Is the client connected?
virtual bool Connect()
Connect the client (synchronous)
virtual size_t Send(const void *buffer, size_t size)
Send data to the server (synchronous)
TCPClient(const std::shared_ptr< Service > &service, const std::string &address, int port)
Initialize TCP client with a given Asio service, server address and port number.
virtual bool Disconnect()
Disconnect the client (synchronous)
virtual bool SendAsync(const void *buffer, size_t size)
Send data to the server (asynchronous)
virtual void onSent(size_t sent, size_t pending)
Handle buffer sent notification.
virtual bool ReconnectAsync()
Reconnect the client (asynchronous)
virtual void onReceived(const void *buffer, size_t size)
Handle buffer received notification.
virtual void onDisconnected()
Handle client disconnected notification.
virtual void onConnected()
Handle client connected notification.
void SetupReceiveBufferSize(size_t size)
Setup option: receive buffer size.
virtual void onEmpty()
Handle empty send buffer notification.
virtual void onError(int error, const std::string &category, const std::string &message)
Handle error notification.
std::shared_ptr< Service > & service() noexcept
Get the Asio service.
uint64_t bytes_pending() const noexcept
Get the number of bytes pending sent by the client.
virtual bool ConnectAsync()
Connect the client (asynchronous)
bool option_no_delay() const noexcept
Get the option: no delay.
asio::ip::tcp::endpoint & endpoint() noexcept
Get the client endpoint.
virtual bool DisconnectAsync()
Disconnect the client (asynchronous)
bool option_keep_alive() const noexcept
Get the option: keep alive.
virtual void ReceiveAsync()
Receive data from the server (asynchronous)
void SetupSendBufferSize(size_t size)
Setup option: send buffer size.
size_t option_receive_buffer_size() const
Get the option: receive buffer size.
virtual bool Reconnect()
Reconnect the client (synchronous)
AllocateHandler< THandler > make_alloc_handler(HandlerStorage &storage, THandler handler)
Helper function to wrap a handler object to add custom allocation.
C++ Server project definitions.