16 : _id(CppCommon::UUID::Sequential()),
18 _io_service(server->service()->GetAsioService()),
19 _strand(*_io_service),
20 _strand_required(_server->_strand_required),
21 _socket(*_io_service),
29 _send_buffer_flush_offset(0)
35 asio::socket_base::receive_buffer_size option;
36 _socket.get_option(option);
37 return option.value();
42 asio::socket_base::send_buffer_size option;
43 _socket.get_option(option);
44 return option.value();
49 asio::socket_base::receive_buffer_size option((
int)size);
50 _socket.set_option(option);
55 asio::socket_base::send_buffer_size option((
int)size);
56 _socket.set_option(option);
59 void TCPSession::Connect()
62 if (_server->option_keep_alive())
63 _socket.set_option(asio::ip::tcp::socket::keep_alive(
true));
65 if (_server->option_no_delay())
66 _socket.set_option(asio::ip::tcp::no_delay(
true));
89 auto connected_session(this->shared_from_this());
90 _server->onConnected(connected_session);
93 if (_send_buffer_main.empty())
103 auto self(this->shared_from_this());
104 auto disconnect_handler = [
this,
self]()
126 auto disconnected_session(this->shared_from_this());
127 _server->onDisconnected(disconnected_session);
130 auto unregister_session_handler = [
this,
self]()
132 _server->UnregisterSession(
id());
134 if (_server->_strand_required)
135 _server->_strand.dispatch(unregister_session_handler);
137 _server->_io_service->dispatch(unregister_session_handler);
139 if (_strand_required)
142 _strand.dispatch(disconnect_handler);
144 _strand.post(disconnect_handler);
149 _io_service->dispatch(disconnect_handler);
151 _io_service->post(disconnect_handler);
165 assert((buffer !=
nullptr) &&
"Pointer to the buffer should not be null!");
166 if (buffer ==
nullptr)
172 size_t sent = asio::write(_socket, asio::buffer(buffer, size), ec);
177 _server->_bytes_sent += sent;
193 size_t TCPSession::Send(
const void* buffer,
size_t size,
const CppCommon::Timespan& timeout)
201 assert((buffer !=
nullptr) &&
"Pointer to the buffer should not be null!");
202 if (buffer ==
nullptr)
207 std::condition_variable cv;
208 asio::error_code error;
209 asio::system_timer timer(_socket.get_executor());
212 auto async_done_handler = [&](asio::error_code ec)
214 std::unique_lock<std::mutex> lck(mtx);
225 timer.expires_from_now(timeout.chrono());
226 timer.async_wait([&](
const asio::error_code& ec) { async_done_handler(ec ? ec : asio::error::timed_out); });
230 _socket.async_write_some(asio::buffer(buffer, size), [&](std::error_code ec,
size_t write) { async_done_handler(ec); sent = write; });
233 std::unique_lock<std::mutex> lck(mtx);
234 cv.wait(lck, [&]() {
return done == 2; });
241 _server->_bytes_sent += sent;
248 if (error && (error != asio::error::timed_out))
265 assert((buffer !=
nullptr) &&
"Pointer to the buffer should not be null!");
266 if (buffer ==
nullptr)
270 std::scoped_lock locker(_send_lock);
273 bool send_required = _send_buffer_main.empty() || _send_buffer_flush.empty();
276 if (((_send_buffer_main.size() + size) > _send_buffer_limit) && (_send_buffer_limit > 0))
278 SendError(asio::error::no_buffer_space);
283 const uint8_t* bytes = (
const uint8_t*)buffer;
284 _send_buffer_main.insert(_send_buffer_main.end(), bytes, bytes + size);
287 _bytes_pending = _send_buffer_main.size();
295 auto self(this->shared_from_this());
296 auto send_handler = [
this,
self]()
301 if (_strand_required)
302 _strand.dispatch(send_handler);
304 _io_service->dispatch(send_handler);
317 assert((buffer !=
nullptr) &&
"Pointer to the buffer should not be null!");
318 if (buffer ==
nullptr)
324 size_t received = _socket.read_some(asio::buffer(buffer, size), ec);
328 _bytes_received += received;
329 _server->_bytes_received += received;
347 std::string text(size, 0);
348 text.resize(
Receive(text.data(), text.size()));
360 assert((buffer !=
nullptr) &&
"Pointer to the buffer should not be null!");
361 if (buffer ==
nullptr)
366 std::condition_variable cv;
367 asio::error_code error;
368 asio::system_timer timer(_socket.get_executor());
371 auto async_done_handler = [&](asio::error_code ec)
373 std::unique_lock<std::mutex> lck(mtx);
384 timer.expires_from_now(timeout.chrono());
385 timer.async_wait([&](
const asio::error_code& ec) { async_done_handler(ec ? ec : asio::error::timed_out); });
389 _socket.async_read_some(asio::buffer(buffer, size), [&](std::error_code ec,
size_t read) { async_done_handler(ec); received = read; });
392 std::unique_lock<std::mutex> lck(mtx);
393 cv.wait(lck, [&]() {
return done == 2; });
399 _bytes_received += received;
400 _server->_bytes_received += received;
407 if (error && (error != asio::error::timed_out))
418 std::string text(size, 0);
419 text.resize(
Receive(text.data(), text.size(), timeout));
429 void TCPSession::TryReceive()
439 auto self(this->shared_from_this());
440 auto async_receive_handler =
make_alloc_handler(_receive_storage, [
this,
self](std::error_code ec,
size_t size)
451 _bytes_received += size;
452 _server->_bytes_received += size;
458 if (_receive_buffer.size() == size)
461 if (((2 * size) > _receive_buffer_limit) && (_receive_buffer_limit > 0))
463 SendError(asio::error::no_buffer_space);
468 _receive_buffer.resize(2 * size);
481 if (_strand_required)
482 _socket.async_read_some(asio::buffer(_receive_buffer.data(), _receive_buffer.size()), bind_executor(_strand, async_receive_handler));
484 _socket.async_read_some(asio::buffer(_receive_buffer.data(), _receive_buffer.size()), async_receive_handler);
487 void TCPSession::TrySend()
496 if (_send_buffer_flush.empty())
498 std::scoped_lock locker(_send_lock);
501 _send_buffer_flush.swap(_send_buffer_main);
502 _send_buffer_flush_offset = 0;
506 _bytes_sending += _send_buffer_flush.size();
510 if (_send_buffer_flush.empty())
519 auto self(this->shared_from_this());
520 auto async_write_handler =
make_alloc_handler(_send_storage, [
this,
self](std::error_code ec,
size_t size)
531 _bytes_sending -= size;
533 _server->_bytes_sent += size;
536 _send_buffer_flush_offset += size;
539 if (_send_buffer_flush_offset == _send_buffer_flush.size())
542 _send_buffer_flush.clear();
543 _send_buffer_flush_offset = 0;
559 if (_strand_required)
560 _socket.async_write_some(asio::buffer(_send_buffer_flush.data() + _send_buffer_flush_offset, _send_buffer_flush.size() - _send_buffer_flush_offset), bind_executor(_strand, async_write_handler));
562 _socket.async_write_some(asio::buffer(_send_buffer_flush.data() + _send_buffer_flush_offset, _send_buffer_flush.size() - _send_buffer_flush_offset), async_write_handler);
565 void TCPSession::ClearBuffers()
568 std::scoped_lock locker(_send_lock);
571 _send_buffer_main.clear();
572 _send_buffer_flush.clear();
573 _send_buffer_flush_offset = 0;
581 void TCPSession::ResetServer()
587 void TCPSession::SendError(std::error_code ec)
590 if ((ec == asio::error::connection_aborted) ||
591 (ec == asio::error::connection_refused) ||
592 (ec == asio::error::connection_reset) ||
593 (ec == asio::error::eof) ||
594 (ec == asio::error::operation_aborted))
597 onError(ec.value(), ec.category().name(), ec.message());
void SetupReceiveBufferSize(size_t size)
Setup option: receive buffer size.
TCPSession(const std::shared_ptr< TCPServer > &server)
Initialize the session with a given server.
virtual void onConnected()
Handle session connected notification.
uint64_t bytes_pending() const noexcept
Get the number of bytes pending sent by the session.
void SetupSendBufferSize(size_t size)
Setup option: send buffer size.
virtual bool SendAsync(const void *buffer, size_t size)
Send data to the client (asynchronous)
virtual size_t Send(const void *buffer, size_t size)
Send data to the client (synchronous)
bool IsConnected() const noexcept
Is the session connected?
virtual void onEmpty()
Handle empty send buffer notification.
virtual void onReceived(const void *buffer, size_t size)
Handle buffer received notification.
size_t option_receive_buffer_size() const
Get the option: receive buffer size.
virtual void onDisconnected()
Handle session disconnected notification.
size_t option_send_buffer_size() const
Get the option: send buffer size.
virtual void onSent(size_t sent, size_t pending)
Handle buffer sent notification.
virtual void onError(int error, const std::string &category, const std::string &message)
Handle error notification.
virtual size_t Receive(void *buffer, size_t size)
Receive data from the client (synchronous)
virtual bool Disconnect()
Disconnect the session.
virtual void ReceiveAsync()
Receive data from the client (asynchronous)
AllocateHandler< THandler > make_alloc_handler(HandlerStorage &storage, THandler handler)
Helper function to wrap a handler object to add custom allocation.
C++ Server project definitions.