16 : _id(CppCommon::UUID::Sequential()),
18 _io_context(
server->service()->GetAsioContext()),
19 _strand(*_io_context),
20 _strand_required(_server->_strand_required),
21 _socket(*_io_context),
29 _send_buffer_flush_offset(0)
35 asio::socket_base::receive_buffer_size option;
36 _socket.get_option(option);
37 return option.value();
42 asio::socket_base::send_buffer_size option;
43 _socket.get_option(option);
44 return option.value();
49 asio::socket_base::receive_buffer_size option((
int)size);
50 _socket.set_option(option);
55 asio::socket_base::send_buffer_size option((
int)size);
56 _socket.set_option(option);
59void TCPSession::Connect()
62 if (_server->option_keep_alive())
63 _socket.set_option(asio::ip::tcp::socket::keep_alive(
true));
65 if (_server->option_no_delay())
66 _socket.set_option(asio::ip::tcp::no_delay(
true));
92 auto connected_session(this->shared_from_this());
93 _server->onConnected(connected_session);
96 if (_send_buffer_main.empty())
106 auto self(this->shared_from_this());
107 auto disconnect_handler = [
this, self]()
132 auto disconnected_session(this->shared_from_this());
133 _server->onDisconnected(disconnected_session);
136 auto unregister_session_handler = [
this, self]()
138 _server->UnregisterSession(
id());
140 if (_server->_strand_required)
141 asio::dispatch(_server->_strand, unregister_session_handler);
143 asio::dispatch(*_server->_io_context, unregister_session_handler);
145 if (_strand_required)
148 asio::dispatch(_strand, disconnect_handler);
150 asio::post(_strand, disconnect_handler);
155 asio::dispatch(*_io_context, disconnect_handler);
157 asio::post(*_io_context, disconnect_handler);
171 assert((buffer !=
nullptr) &&
"Pointer to the buffer should not be null!");
172 if (buffer ==
nullptr)
178 size_t sent = asio::write(_socket, asio::buffer(buffer, size), ec);
183 _server->_bytes_sent += sent;
199size_t TCPSession::Send(
const void* buffer,
size_t size,
const CppCommon::Timespan& timeout)
207 assert((buffer !=
nullptr) &&
"Pointer to the buffer should not be null!");
208 if (buffer ==
nullptr)
213 std::condition_variable cv;
214 asio::error_code error;
215 asio::system_timer timer(_socket.get_executor());
218 auto async_done_handler = [&](asio::error_code ec)
220 std::unique_lock<std::mutex> lck(mtx);
231 timer.expires_after(timeout.chrono());
232 timer.async_wait([&](
const asio::error_code& ec) { async_done_handler(ec ? ec : asio::error::timed_out); });
236 _socket.async_write_some(asio::buffer(buffer, size), [&](std::error_code ec,
size_t write) { async_done_handler(ec); sent = write; });
239 std::unique_lock<std::mutex> lck(mtx);
240 cv.wait(lck, [&]() {
return done == 2; });
247 _server->_bytes_sent += sent;
254 if (error && (error != asio::error::timed_out))
271 assert((buffer !=
nullptr) &&
"Pointer to the buffer should not be null!");
272 if (buffer ==
nullptr)
276 std::scoped_lock locker(_send_lock);
279 bool send_required = _send_buffer_main.empty() || _send_buffer_flush.empty();
282 if (((_send_buffer_main.size() + size) > _send_buffer_limit) && (_send_buffer_limit > 0))
284 SendError(asio::error::no_buffer_space);
289 const uint8_t* bytes = (
const uint8_t*)buffer;
290 _send_buffer_main.insert(_send_buffer_main.end(), bytes, bytes + size);
293 _bytes_pending = _send_buffer_main.size();
301 auto self(this->shared_from_this());
302 auto send_handler = [
this, self]()
307 if (_strand_required)
308 asio::dispatch(_strand, send_handler);
310 asio::dispatch(*_io_context, send_handler);
323 assert((buffer !=
nullptr) &&
"Pointer to the buffer should not be null!");
324 if (buffer ==
nullptr)
330 size_t received = _socket.read_some(asio::buffer(buffer, size), ec);
334 _bytes_received += received;
335 _server->_bytes_received += received;
353 std::string text(size, 0);
354 text.resize(
Receive(text.data(), text.size()));
366 assert((buffer !=
nullptr) &&
"Pointer to the buffer should not be null!");
367 if (buffer ==
nullptr)
372 std::condition_variable cv;
373 asio::error_code error;
374 asio::system_timer timer(_socket.get_executor());
377 auto async_done_handler = [&](asio::error_code ec)
379 std::unique_lock<std::mutex> lck(mtx);
390 timer.expires_after(timeout.chrono());
391 timer.async_wait([&](
const asio::error_code& ec) { async_done_handler(ec ? ec : asio::error::timed_out); });
395 _socket.async_read_some(asio::buffer(buffer, size), [&](std::error_code ec,
size_t read) { async_done_handler(ec); received = read; });
398 std::unique_lock<std::mutex> lck(mtx);
399 cv.wait(lck, [&]() {
return done == 2; });
405 _bytes_received += received;
406 _server->_bytes_received += received;
413 if (error && (error != asio::error::timed_out))
424 std::string text(size, 0);
425 text.resize(
Receive(text.data(), text.size(), timeout));
435void TCPSession::TryReceive()
445 auto self(this->shared_from_this());
446 auto async_receive_handler =
make_alloc_handler(_receive_storage, [
this, self](std::error_code ec,
size_t size)
457 _bytes_received += size;
458 _server->_bytes_received += size;
464 if (_receive_buffer.size() == size)
467 if (((2 * size) > _receive_buffer_limit) && (_receive_buffer_limit > 0))
469 SendError(asio::error::no_buffer_space);
474 _receive_buffer.resize(2 * size);
487 if (_strand_required)
488 _socket.async_read_some(asio::buffer(_receive_buffer.data(), _receive_buffer.size()), bind_executor(_strand, async_receive_handler));
490 _socket.async_read_some(asio::buffer(_receive_buffer.data(), _receive_buffer.size()), async_receive_handler);
493void TCPSession::TrySend()
502 if (_send_buffer_flush.empty())
504 std::scoped_lock locker(_send_lock);
507 _send_buffer_flush.swap(_send_buffer_main);
508 _send_buffer_flush_offset = 0;
512 _bytes_sending += _send_buffer_flush.size();
516 if (_send_buffer_flush.empty())
525 auto self(this->shared_from_this());
526 auto async_write_handler =
make_alloc_handler(_send_storage, [
this, self](std::error_code ec,
size_t size)
537 _bytes_sending -= size;
539 _server->_bytes_sent += size;
542 _send_buffer_flush_offset += size;
545 if (_send_buffer_flush_offset == _send_buffer_flush.size())
548 _send_buffer_flush.clear();
549 _send_buffer_flush_offset = 0;
565 if (_strand_required)
566 _socket.async_write_some(asio::buffer(_send_buffer_flush.data() + _send_buffer_flush_offset, _send_buffer_flush.size() - _send_buffer_flush_offset), bind_executor(_strand, async_write_handler));
568 _socket.async_write_some(asio::buffer(_send_buffer_flush.data() + _send_buffer_flush_offset, _send_buffer_flush.size() - _send_buffer_flush_offset), async_write_handler);
571void TCPSession::ClearBuffers()
574 std::scoped_lock locker(_send_lock);
577 _send_buffer_main.clear();
578 _send_buffer_flush.clear();
579 _send_buffer_flush_offset = 0;
587void TCPSession::ResetServer()
593void TCPSession::SendError(std::error_code ec)
596 if ((ec == asio::error::connection_aborted) ||
597 (ec == asio::error::connection_refused) ||
598 (ec == asio::error::connection_reset) ||
599 (ec == asio::error::eof) ||
600 (ec == asio::error::operation_aborted))
603 onError(ec.value(), ec.category().name(), ec.message());
void SetupReceiveBufferSize(size_t size)
Setup option: receive buffer size.
TCPSession(const std::shared_ptr< TCPServer > &server)
Initialize the session with a given server.
virtual void onConnected()
Handle session connected notification.
uint64_t bytes_pending() const noexcept
Get the number of bytes pending sent by the session.
virtual void onDisconnecting()
Handle session disconnecting notification.
std::shared_ptr< TCPServer > & server() noexcept
Get the server.
void SetupSendBufferSize(size_t size)
Setup option: send buffer size.
virtual bool SendAsync(const void *buffer, size_t size)
Send data to the client (asynchronous).
virtual size_t Send(const void *buffer, size_t size)
Send data to the client (synchronous).
bool IsConnected() const noexcept
Is the session connected?
virtual void onConnecting()
Handle session connecting notification.
virtual void onEmpty()
Handle empty send buffer notification.
virtual void onReceived(const void *buffer, size_t size)
Handle buffer received notification.
size_t option_receive_buffer_size() const
Get the option: receive buffer size.
virtual void onDisconnected()
Handle session disconnected notification.
size_t option_send_buffer_size() const
Get the option: send buffer size.
virtual void onSent(size_t sent, size_t pending)
Handle buffer sent notification.
virtual void onError(int error, const std::string &category, const std::string &message)
Handle error notification.
virtual size_t Receive(void *buffer, size_t size)
Receive data from the client (synchronous).
virtual bool Disconnect()
Disconnect the session.
virtual void ReceiveAsync()
Receive data from the client (asynchronous).
AllocateHandler< THandler > make_alloc_handler(HandlerStorage &storage, THandler handler)
Helper function to wrap a handler object to add custom allocation.
C++ Server project definitions.