15 : _id(CppCommon::UUID::Sequential()),
17 _io_service(_service->GetAsioService()),
18 _strand(*_io_service),
19 _strand_required(_service->IsStrandRequired()),
22 _socket(*_io_service),
32 _send_buffer_flush_offset(0),
33 _option_keep_alive(false),
34 _option_no_delay(false)
36 assert((
service !=
nullptr) &&
"Asio service is invalid!");
38 throw CppCommon::ArgumentException(
"Asio service is invalid!");
41 TCPClient::TCPClient(
const std::shared_ptr<Service>& service,
const std::string& address,
const std::string& scheme)
42 : _id(CppCommon::UUID::Sequential()),
44 _io_service(_service->GetAsioService()),
45 _strand(*_io_service),
46 _strand_required(_service->IsStrandRequired()),
50 _socket(*_io_service),
60 _send_buffer_flush_offset(0),
61 _option_keep_alive(false),
62 _option_no_delay(false)
64 assert((
service !=
nullptr) &&
"Asio service is invalid!");
66 throw CppCommon::ArgumentException(
"Asio service is invalid!");
70 : _id(CppCommon::UUID::Sequential()),
72 _io_service(_service->GetAsioService()),
73 _strand(*_io_service),
74 _strand_required(_service->IsStrandRequired()),
75 _address(endpoint.address().to_string()),
76 _port(endpoint.port()),
78 _socket(*_io_service),
88 _send_buffer_flush_offset(0),
89 _option_keep_alive(false),
90 _option_no_delay(false)
92 assert((
service !=
nullptr) &&
"Asio service is invalid!");
94 throw CppCommon::ArgumentException(
"Asio service is invalid!");
99 asio::socket_base::receive_buffer_size option;
100 _socket.get_option(option);
101 return option.value();
106 asio::socket_base::send_buffer_size option;
107 _socket.get_option(option);
108 return option.value();
113 asio::socket_base::receive_buffer_size option((
int)size);
114 _socket.set_option(option);
119 asio::socket_base::send_buffer_size option((
int)size);
120 _socket.set_option(option);
131 _endpoint = asio::ip::tcp::endpoint(asio::ip::make_address(_address), (
unsigned short)_port);
134 _socket.connect(_endpoint, ec);
149 _socket.set_option(asio::ip::tcp::socket::keep_alive(
true));
152 _socket.set_option(asio::ip::tcp::no_delay(
true));
172 if (_send_buffer_main.empty())
186 asio::ip::tcp::resolver::query query(_address, (_scheme.empty() ? std::to_string(_port) : _scheme));
187 auto endpoints = resolver->resolver().resolve(query, ec);
201 _endpoint = asio::connect(_socket, endpoints, ec);
216 _socket.set_option(asio::ip::tcp::socket::keep_alive(
true));
219 _socket.set_option(asio::ip::tcp::no_delay(
true));
239 if (_send_buffer_main.empty())
245 bool TCPClient::DisconnectInternal()
285 auto self(this->shared_from_this());
286 auto connect_handler = [
this,
self]()
293 auto async_connect_handler = [
this,
self](std::error_code ec)
304 _socket.set_option(asio::ip::tcp::socket::keep_alive(
true));
307 _socket.set_option(asio::ip::tcp::no_delay(
true));
330 if (_send_buffer_main.empty())
343 _endpoint = asio::ip::tcp::endpoint(asio::ip::make_address(_address), (
unsigned short)_port);
345 if (_strand_required)
346 _socket.async_connect(_endpoint, bind_executor(_strand, async_connect_handler));
348 _socket.async_connect(_endpoint, async_connect_handler);
350 if (_strand_required)
351 _strand.post(connect_handler);
353 _io_service->post(connect_handler);
364 auto self(this->shared_from_this());
365 auto connect_handler = [
this,
self, resolver]()
372 auto async_resolve_handler = [
this,
self](std::error_code ec1, asio::ip::tcp::resolver::results_type endpoints)
383 auto async_connect_handler = [
this,
self](std::error_code ec2,
const asio::ip::tcp::endpoint&
endpoint)
397 _socket.set_option(asio::ip::tcp::socket::keep_alive(
true));
400 _socket.set_option(asio::ip::tcp::no_delay(
true));
423 if (_send_buffer_main.empty())
434 if (_strand_required)
435 asio::async_connect(_socket, endpoints, bind_executor(_strand, async_connect_handler));
437 asio::async_connect(_socket, endpoints, async_connect_handler);
449 asio::ip::tcp::resolver::query query(_address, (_scheme.empty() ? std::to_string(_port) : _scheme));
450 if (_strand_required)
451 resolver->resolver().async_resolve(query, bind_executor(_strand, async_resolve_handler));
453 resolver->resolver().async_resolve(query, async_resolve_handler);
455 if (_strand_required)
456 _strand.post(connect_handler);
458 _io_service->post(connect_handler);
463 bool TCPClient::DisconnectInternalAsync(
bool dispatch)
474 auto self(this->shared_from_this());
475 auto disconnect_handler = [
this,
self]() { DisconnectInternal(); };
476 if (_strand_required)
479 _strand.dispatch(disconnect_handler);
481 _strand.post(disconnect_handler);
486 _io_service->dispatch(disconnect_handler);
488 _io_service->post(disconnect_handler);
500 CppCommon::Thread::Yield();
513 assert((buffer !=
nullptr) &&
"Pointer to the buffer should not be null!");
514 if (buffer ==
nullptr)
520 size_t sent = asio::write(_socket, asio::buffer(buffer, size), ec);
540 size_t TCPClient::Send(
const void* buffer,
size_t size,
const CppCommon::Timespan& timeout)
548 assert((buffer !=
nullptr) &&
"Pointer to the buffer should not be null!");
549 if (buffer ==
nullptr)
554 std::condition_variable cv;
555 asio::error_code error;
556 asio::system_timer timer(_socket.get_executor());
559 auto async_done_handler = [&](asio::error_code ec)
561 std::unique_lock<std::mutex> lck(mtx);
572 timer.expires_from_now(timeout.chrono());
573 timer.async_wait([&](
const asio::error_code& ec) { async_done_handler(ec ? ec : asio::error::timed_out); });
577 _socket.async_write_some(asio::buffer(buffer, size), [&](std::error_code ec,
size_t write) { async_done_handler(ec); sent = write; });
580 std::unique_lock<std::mutex> lck(mtx);
581 cv.wait(lck, [&]() {
return done == 2; });
594 if (error && (error != asio::error::timed_out))
611 assert((buffer !=
nullptr) &&
"Pointer to the buffer should not be null!");
612 if (buffer ==
nullptr)
616 std::scoped_lock locker(_send_lock);
619 bool send_required = _send_buffer_main.empty() || _send_buffer_flush.empty();
622 if (((_send_buffer_main.size() + size) > _send_buffer_limit) && (_send_buffer_limit > 0))
624 SendError(asio::error::no_buffer_space);
629 const uint8_t* bytes = (
const uint8_t*)buffer;
630 _send_buffer_main.insert(_send_buffer_main.end(), bytes, bytes + size);
633 _bytes_pending = _send_buffer_main.size();
641 auto self(this->shared_from_this());
642 auto send_handler = [
this,
self]()
647 if (_strand_required)
648 _strand.dispatch(send_handler);
650 _io_service->dispatch(send_handler);
663 assert((buffer !=
nullptr) &&
"Pointer to the buffer should not be null!");
664 if (buffer ==
nullptr)
670 size_t received = _socket.read_some(asio::buffer(buffer, size), ec);
674 _bytes_received += received;
692 std::string text(size, 0);
693 text.resize(
Receive(text.data(), text.size()));
705 assert((buffer !=
nullptr) &&
"Pointer to the buffer should not be null!");
706 if (buffer ==
nullptr)
711 std::condition_variable cv;
712 asio::error_code error;
713 asio::system_timer timer(_socket.get_executor());
716 auto async_done_handler = [&](asio::error_code ec)
718 std::unique_lock<std::mutex> lck(mtx);
729 timer.expires_from_now(timeout.chrono());
730 timer.async_wait([&](
const asio::error_code& ec) { async_done_handler(ec ? ec : asio::error::timed_out); });
734 _socket.async_read_some(asio::buffer(buffer, size), [&](std::error_code ec,
size_t read) { async_done_handler(ec); received = read; });
737 std::unique_lock<std::mutex> lck(mtx);
738 cv.wait(lck, [&]() {
return done == 2; });
744 _bytes_received += received;
751 if (error && (error != asio::error::timed_out))
762 std::string text(size, 0);
763 text.resize(
Receive(text.data(), text.size(), timeout));
773 void TCPClient::TryReceive()
783 auto self(this->shared_from_this());
784 auto async_receive_handler =
make_alloc_handler(_receive_storage, [
this,
self](std::error_code ec,
size_t size)
795 _bytes_received += size;
801 if (_receive_buffer.size() == size)
804 if (((2 * size) > _receive_buffer_limit) && (_receive_buffer_limit > 0))
806 SendError(asio::error::no_buffer_space);
807 DisconnectInternalAsync(
true);
811 _receive_buffer.resize(2 * size);
821 DisconnectInternalAsync(
true);
824 if (_strand_required)
825 _socket.async_read_some(asio::buffer(_receive_buffer.data(), _receive_buffer.size()), bind_executor(_strand, async_receive_handler));
827 _socket.async_read_some(asio::buffer(_receive_buffer.data(), _receive_buffer.size()), async_receive_handler);
830 void TCPClient::TrySend()
839 if (_send_buffer_flush.empty())
841 std::scoped_lock locker(_send_lock);
844 _send_buffer_flush.swap(_send_buffer_main);
845 _send_buffer_flush_offset = 0;
849 _bytes_sending += _send_buffer_flush.size();
853 if (_send_buffer_flush.empty())
862 auto self(this->shared_from_this());
863 auto async_write_handler =
make_alloc_handler(_send_storage, [
this,
self](std::error_code ec,
size_t size)
874 _bytes_sending -= size;
878 _send_buffer_flush_offset += size;
881 if (_send_buffer_flush_offset == _send_buffer_flush.size())
884 _send_buffer_flush.clear();
885 _send_buffer_flush_offset = 0;
898 DisconnectInternalAsync(
true);
901 if (_strand_required)
902 _socket.async_write_some(asio::buffer(_send_buffer_flush.data() + _send_buffer_flush_offset, _send_buffer_flush.size() - _send_buffer_flush_offset), bind_executor(_strand, async_write_handler));
904 _socket.async_write_some(asio::buffer(_send_buffer_flush.data() + _send_buffer_flush_offset, _send_buffer_flush.size() - _send_buffer_flush_offset), async_write_handler);
907 void TCPClient::ClearBuffers()
910 std::scoped_lock locker(_send_lock);
913 _send_buffer_main.clear();
914 _send_buffer_flush.clear();
915 _send_buffer_flush_offset = 0;
923 void TCPClient::SendError(std::error_code ec)
926 if ((ec == asio::error::connection_aborted) ||
927 (ec == asio::error::connection_refused) ||
928 (ec == asio::error::connection_reset) ||
929 (ec == asio::error::eof) ||
930 (ec == asio::error::operation_aborted))
933 onError(ec.value(), ec.category().name(), ec.message());
size_t option_send_buffer_size() const
Get the option: send buffer size.
virtual size_t Receive(void *buffer, size_t size)
Receive data from the server (synchronous)
bool IsConnected() const noexcept
Is the client connected?
virtual bool Connect()
Connect the client (synchronous)
virtual size_t Send(const void *buffer, size_t size)
Send data to the server (synchronous)
TCPClient(const std::shared_ptr< Service > &service, const std::string &address, int port)
Initialize TCP client with a given Asio service, server address and port number.
virtual bool Disconnect()
Disconnect the client (synchronous)
virtual bool SendAsync(const void *buffer, size_t size)
Send data to the server (asynchronous)
virtual void onSent(size_t sent, size_t pending)
Handle buffer sent notification.
virtual bool ReconnectAsync()
Reconnect the client (asynchronous)
virtual void onReceived(const void *buffer, size_t size)
Handle buffer received notification.
virtual void onDisconnected()
Handle client disconnected notification.
virtual void onConnected()
Handle client connected notification.
void SetupReceiveBufferSize(size_t size)
Setup option: receive buffer size.
virtual void onEmpty()
Handle empty send buffer notification.
virtual void onError(int error, const std::string &category, const std::string &message)
Handle error notification.
std::shared_ptr< Service > & service() noexcept
Get the Asio service.
asio::ip::tcp::endpoint & endpoint() noexcept
Get the client endpoint.
uint64_t bytes_pending() const noexcept
Get the number of bytes pending sent by the client.
virtual bool ConnectAsync()
Connect the client (asynchronous)
bool option_no_delay() const noexcept
Get the option: no delay.
virtual bool DisconnectAsync()
Disconnect the client (asynchronous)
bool option_keep_alive() const noexcept
Get the option: keep alive.
virtual void ReceiveAsync()
Receive data from the server (asynchronous)
void SetupSendBufferSize(size_t size)
Setup option: send buffer size.
size_t option_receive_buffer_size() const
Get the option: receive buffer size.
virtual bool Reconnect()
Reconnect the client (synchronous)
AllocateHandler< THandler > make_alloc_handler(HandlerStorage &storage, THandler handler)
Helper function to wrap a handler object to add custom allocation.
C++ Server project definitions.