18 _io_service(server->service()->GetAsioService()),
19 _strand(*_io_service),
20 _strand_required(_server->_strand_required),
21 _socket(*_io_service),
29 _send_buffer_flush_offset(0)
35 asio::socket_base::receive_buffer_size
option;
36 _socket.get_option(
option);
42 asio::socket_base::send_buffer_size
option;
43 _socket.get_option(
option);
49 asio::socket_base::receive_buffer_size
option((
int)
size);
50 _socket.set_option(
option);
55 asio::socket_base::send_buffer_size
option((
int)
size);
56 _socket.set_option(
option);
59void TCPSession::Connect()
62 if (_server->option_keep_alive())
63 _socket.set_option(asio::ip::tcp::socket::keep_alive(
true));
65 if (_server->option_no_delay())
66 _socket.set_option(asio::ip::tcp::no_delay(
true));
93 if (_send_buffer_main.empty())
103 auto self(this->shared_from_this());
104 auto disconnect_handler = [
this, self]()
126 auto disconnected_session(this->shared_from_this());
127 _server->onDisconnected(disconnected_session);
130 auto unregister_session_handler = [
this, self]()
132 _server->UnregisterSession(
id());
134 if (_server->_strand_required)
135 _server->_strand.dispatch(unregister_session_handler);
137 _server->_io_service->dispatch(unregister_session_handler);
139 if (_strand_required)
142 _strand.dispatch(disconnect_handler);
144 _strand.post(disconnect_handler);
149 _io_service->dispatch(disconnect_handler);
151 _io_service->post(disconnect_handler);
165 assert((
buffer !=
nullptr) &&
"Pointer to the buffer should not be null!");
177 _server->_bytes_sent +=
sent;
201 assert((
buffer !=
nullptr) &&
"Pointer to the buffer should not be null!");
207 std::condition_variable
cv;
208 asio::error_code error;
209 asio::system_timer
timer(_socket.get_executor());
214 std::unique_lock<std::mutex>
lck(
mtx);
225 timer.expires_from_now(timeout.chrono());
233 std::unique_lock<std::mutex>
lck(
mtx);
234 cv.wait(
lck, [&]() {
return done == 2; });
241 _server->_bytes_sent +=
sent;
248 if (error && (error != asio::error::timed_out))
265 assert((
buffer !=
nullptr) &&
"Pointer to the buffer should not be null!");
270 std::scoped_lock
locker(_send_lock);
273 bool send_required = _send_buffer_main.empty() || _send_buffer_flush.empty();
276 if (((_send_buffer_main.size() +
size) > _send_buffer_limit) && (_send_buffer_limit > 0))
278 SendError(asio::error::no_buffer_space);
284 _send_buffer_main.insert(_send_buffer_main.end(),
bytes,
bytes +
size);
287 _bytes_pending = _send_buffer_main.size();
301 if (_strand_required)
317 assert((
buffer !=
nullptr) &&
"Pointer to the buffer should not be null!");
329 _server->_bytes_received +=
received;
360 assert((
buffer !=
nullptr) &&
"Pointer to the buffer should not be null!");
366 std::condition_variable
cv;
367 asio::error_code error;
368 asio::system_timer
timer(_socket.get_executor());
373 std::unique_lock<std::mutex>
lck(
mtx);
384 timer.expires_from_now(timeout.chrono());
392 std::unique_lock<std::mutex>
lck(
mtx);
393 cv.wait(
lck, [&]() {
return done == 2; });
400 _server->_bytes_received +=
received;
407 if (error && (error != asio::error::timed_out))
429void TCPSession::TryReceive()
451 _bytes_received +=
size;
452 _server->_bytes_received +=
size;
458 if (_receive_buffer.size() ==
size)
461 if (((2 *
size) > _receive_buffer_limit) && (_receive_buffer_limit > 0))
463 SendError(asio::error::no_buffer_space);
468 _receive_buffer.resize(2 * size);
481 if (_strand_required)
482 _socket.async_read_some(asio::buffer(_receive_buffer.data(), _receive_buffer.size()), bind_executor(_strand, async_receive_handler));
484 _socket.async_read_some(asio::buffer(_receive_buffer.data(), _receive_buffer.size()), async_receive_handler);
487void TCPSession::TrySend()
496 if (_send_buffer_flush.empty())
498 std::scoped_lock locker(_send_lock);
501 _send_buffer_flush.swap(_send_buffer_main);
502 _send_buffer_flush_offset = 0;
506 _bytes_sending += _send_buffer_flush.size();
510 if (_send_buffer_flush.empty())
519 auto self(this->shared_from_this());
520 auto async_write_handler =
make_alloc_handler(_send_storage, [
this, self](std::error_code ec,
size_t size)
531 _bytes_sending -= size;
533 _server->_bytes_sent += size;
536 _send_buffer_flush_offset += size;
539 if (_send_buffer_flush_offset == _send_buffer_flush.size())
542 _send_buffer_flush.clear();
543 _send_buffer_flush_offset = 0;
559 if (_strand_required)
560 _socket.async_write_some(asio::buffer(_send_buffer_flush.data() + _send_buffer_flush_offset, _send_buffer_flush.size() - _send_buffer_flush_offset), bind_executor(_strand, async_write_handler));
562 _socket.async_write_some(asio::buffer(_send_buffer_flush.data() + _send_buffer_flush_offset, _send_buffer_flush.size() - _send_buffer_flush_offset), async_write_handler);
565void TCPSession::ClearBuffers()
568 std::scoped_lock locker(_send_lock);
571 _send_buffer_main.clear();
572 _send_buffer_flush.clear();
573 _send_buffer_flush_offset = 0;
581void TCPSession::ResetServer()
587void TCPSession::SendError(std::error_code ec)
590 if ((ec == asio::error::connection_aborted) ||
591 (ec == asio::error::connection_refused) ||
592 (ec == asio::error::connection_reset) ||
593 (ec == asio::error::eof) ||
594 (ec == asio::error::operation_aborted))
597 onError(ec.value(), ec.category().name(), ec.message());
Asio allocate handler wrapper.
void SetupReceiveBufferSize(size_t size)
Setup option: receive buffer size.
TCPSession(const std::shared_ptr< TCPServer > &server)
Initialize the session with a given server.
virtual void onConnected()
Handle session connected notification.
uint64_t bytes_pending() const noexcept
Get the number of bytes pending sent by the session.
void SetupSendBufferSize(size_t size)
Setup option: send buffer size.
virtual bool SendAsync(const void *buffer, size_t size)
Send data to the client (asynchronous)
virtual size_t Send(const void *buffer, size_t size)
Send data to the client (synchronous)
bool IsConnected() const noexcept
Is the session connected?
virtual void onEmpty()
Handle empty send buffer notification.
virtual void onReceived(const void *buffer, size_t size)
Handle buffer received notification.
size_t option_receive_buffer_size() const
Get the option: receive buffer size.
virtual void onDisconnected()
Handle session disconnected notification.
size_t option_send_buffer_size() const
Get the option: send buffer size.
virtual void onSent(size_t sent, size_t pending)
Handle buffer sent notification.
virtual void onError(int error, const std::string &category, const std::string &message)
Handle error notification.
virtual size_t Receive(void *buffer, size_t size)
Receive data from the client (synchronous)
virtual bool Disconnect()
Disconnect the session.
virtual void ReceiveAsync()
Receive data from the client (asynchronous)
AllocateHandler< THandler > make_alloc_handler(HandlerStorage &storage, THandler handler)
Helper function to wrap a handler object to add custom allocation.
C++ Server project definitions.