CppServer 1.0.6.0
C++ Server Library
Loading...
Searching...
No Matches
tcp_session.cpp
Go to the documentation of this file.
1
8
11
12namespace CppServer {
13namespace Asio {
14
15TCPSession::TCPSession(const std::shared_ptr<TCPServer>& server)
16 : _id(CppCommon::UUID::Sequential()),
17 _server(server),
18 _io_context(server->service()->GetAsioContext()),
19 _strand(*_io_context),
20 _strand_required(_server->_strand_required),
21 _socket(*_io_context),
22 _connected(false),
23 _bytes_pending(0),
24 _bytes_sending(0),
25 _bytes_sent(0),
26 _bytes_received(0),
27 _receiving(false),
28 _sending(false),
29 _send_buffer_flush_offset(0)
30{
31}
32
34{
35 asio::socket_base::receive_buffer_size option;
36 _socket.get_option(option);
37 return option.value();
38}
39
41{
42 asio::socket_base::send_buffer_size option;
43 _socket.get_option(option);
44 return option.value();
45}
46
48{
49 asio::socket_base::receive_buffer_size option((int)size);
50 _socket.set_option(option);
51}
52
54{
55 asio::socket_base::send_buffer_size option((int)size);
56 _socket.set_option(option);
57}
58
59void TCPSession::Connect()
60{
61 // Apply the option: keep alive
62 if (_server->option_keep_alive())
63 _socket.set_option(asio::ip::tcp::socket::keep_alive(true));
64 // Apply the option: no delay
65 if (_server->option_no_delay())
66 _socket.set_option(asio::ip::tcp::no_delay(true));
67
68 // Prepare receive & send buffers
69 _receive_buffer.resize(option_receive_buffer_size());
70 _send_buffer_main.reserve(option_send_buffer_size());
71 _send_buffer_flush.reserve(option_send_buffer_size());
72
73 // Reset statistic
74 _bytes_pending = 0;
75 _bytes_sending = 0;
76 _bytes_sent = 0;
77 _bytes_received = 0;
78
79 // Call the session connecting handler
81
82 // Update the connected flag
83 _connected = true;
84
85 // Try to receive something from the client
86 TryReceive();
87
88 // Call the session connected handler
90
91 // Call the session connected handler in the server
92 auto connected_session(this->shared_from_this());
93 _server->onConnected(connected_session);
94
95 // Call the empty send buffer handler
96 if (_send_buffer_main.empty())
97 onEmpty();
98}
99
100bool TCPSession::Disconnect(bool dispatch)
101{
102 if (!IsConnected())
103 return false;
104
105 // Dispatch or post the disconnect handler
106 auto self(this->shared_from_this());
107 auto disconnect_handler = [this, self]()
108 {
109 if (!IsConnected())
110 return;
111
112 // Call the session disconnecting handler
114
115 // Close the session socket
116 _socket.close();
117
118 // Update the connected flag
119 _connected = false;
120
121 // Update sending/receiving flags
122 _receiving = false;
123 _sending = false;
124
125 // Clear send/receive buffers
126 ClearBuffers();
127
128 // Call the session disconnected handler
130
131 // Call the session disconnected handler in the server
132 auto disconnected_session(this->shared_from_this());
133 _server->onDisconnected(disconnected_session);
134
135 // Dispatch the unregister session handler
136 auto unregister_session_handler = [this, self]()
137 {
138 _server->UnregisterSession(id());
139 };
140 if (_server->_strand_required)
141 asio::dispatch(_server->_strand, unregister_session_handler);
142 else
143 asio::dispatch(*_server->_io_context, unregister_session_handler);
144 };
145 if (_strand_required)
146 {
147 if (dispatch)
148 asio::dispatch(_strand, disconnect_handler);
149 else
150 asio::post(_strand, disconnect_handler);
151 }
152 else
153 {
154 if (dispatch)
155 asio::dispatch(*_io_context, disconnect_handler);
156 else
157 asio::post(*_io_context, disconnect_handler);
158 }
159
160 return true;
161}
162
163size_t TCPSession::Send(const void* buffer, size_t size)
164{
165 if (!IsConnected())
166 return 0;
167
168 if (size == 0)
169 return 0;
170
171 assert((buffer != nullptr) && "Pointer to the buffer should not be null!");
172 if (buffer == nullptr)
173 return 0;
174
175 asio::error_code ec;
176
177 // Send data to the client
178 size_t sent = asio::write(_socket, asio::buffer(buffer, size), ec);
179 if (sent > 0)
180 {
181 // Update statistic
182 _bytes_sent += sent;
183 _server->_bytes_sent += sent;
184
185 // Call the buffer sent handler
186 onSent(sent, bytes_pending());
187 }
188
189 // Disconnect on error
190 if (ec)
191 {
192 SendError(ec);
193 Disconnect();
194 }
195
196 return sent;
197}
198
199size_t TCPSession::Send(const void* buffer, size_t size, const CppCommon::Timespan& timeout)
200{
201 if (!IsConnected())
202 return 0;
203
204 if (size == 0)
205 return 0;
206
207 assert((buffer != nullptr) && "Pointer to the buffer should not be null!");
208 if (buffer == nullptr)
209 return 0;
210
211 int done = 0;
212 std::mutex mtx;
213 std::condition_variable cv;
214 asio::error_code error;
215 asio::system_timer timer(_socket.get_executor());
216
217 // Prepare done handler
218 auto async_done_handler = [&](asio::error_code ec)
219 {
220 std::unique_lock<std::mutex> lck(mtx);
221 if (done++ == 0)
222 {
223 error = ec;
224 _socket.cancel();
225 timer.cancel();
226 }
227 cv.notify_one();
228 };
229
230 // Async wait for timeout
231 timer.expires_after(timeout.chrono());
232 timer.async_wait([&](const asio::error_code& ec) { async_done_handler(ec ? ec : asio::error::timed_out); });
233
234 // Async write some data to the client
235 size_t sent = 0;
236 _socket.async_write_some(asio::buffer(buffer, size), [&](std::error_code ec, size_t write) { async_done_handler(ec); sent = write; });
237
238 // Wait for complete or timeout
239 std::unique_lock<std::mutex> lck(mtx);
240 cv.wait(lck, [&]() { return done == 2; });
241
242 // Send data to the client
243 if (sent > 0)
244 {
245 // Update statistic
246 _bytes_sent += sent;
247 _server->_bytes_sent += sent;
248
249 // Call the buffer sent handler
250 onSent(sent, bytes_pending());
251 }
252
253 // Disconnect on error
254 if (error && (error != asio::error::timed_out))
255 {
256 SendError(error);
257 Disconnect();
258 }
259
260 return sent;
261}
262
263bool TCPSession::SendAsync(const void* buffer, size_t size)
264{
265 if (!IsConnected())
266 return false;
267
268 if (size == 0)
269 return true;
270
271 assert((buffer != nullptr) && "Pointer to the buffer should not be null!");
272 if (buffer == nullptr)
273 return false;
274
275 {
276 std::scoped_lock locker(_send_lock);
277
278 // Detect multiple send handlers
279 bool send_required = _send_buffer_main.empty() || _send_buffer_flush.empty();
280
281 // Check the send buffer limit
282 if (((_send_buffer_main.size() + size) > _send_buffer_limit) && (_send_buffer_limit > 0))
283 {
284 SendError(asio::error::no_buffer_space);
285 return false;
286 }
287
288 // Fill the main send buffer
289 const uint8_t* bytes = (const uint8_t*)buffer;
290 _send_buffer_main.insert(_send_buffer_main.end(), bytes, bytes + size);
291
292 // Update statistic
293 _bytes_pending = _send_buffer_main.size();
294
295 // Avoid multiple send handlers
296 if (!send_required)
297 return true;
298 }
299
300 // Dispatch the send handler
301 auto self(this->shared_from_this());
302 auto send_handler = [this, self]()
303 {
304 // Try to send the main buffer
305 TrySend();
306 };
307 if (_strand_required)
308 asio::dispatch(_strand, send_handler);
309 else
310 asio::dispatch(*_io_context, send_handler);
311
312 return true;
313}
314
315size_t TCPSession::Receive(void* buffer, size_t size)
316{
317 if (!IsConnected())
318 return 0;
319
320 if (size == 0)
321 return 0;
322
323 assert((buffer != nullptr) && "Pointer to the buffer should not be null!");
324 if (buffer == nullptr)
325 return 0;
326
327 asio::error_code ec;
328
329 // Receive data from the client
330 size_t received = _socket.read_some(asio::buffer(buffer, size), ec);
331 if (received > 0)
332 {
333 // Update statistic
334 _bytes_received += received;
335 _server->_bytes_received += received;
336
337 // Call the buffer received handler
338 onReceived(buffer, received);
339 }
340
341 // Disconnect on error
342 if (ec)
343 {
344 SendError(ec);
345 Disconnect();
346 }
347
348 return received;
349}
350
351std::string TCPSession::Receive(size_t size)
352{
353 std::string text(size, 0);
354 text.resize(Receive(text.data(), text.size()));
355 return text;
356}
357
358size_t TCPSession::Receive(void* buffer, size_t size, const CppCommon::Timespan& timeout)
359{
360 if (!IsConnected())
361 return 0;
362
363 if (size == 0)
364 return 0;
365
366 assert((buffer != nullptr) && "Pointer to the buffer should not be null!");
367 if (buffer == nullptr)
368 return 0;
369
370 int done = 0;
371 std::mutex mtx;
372 std::condition_variable cv;
373 asio::error_code error;
374 asio::system_timer timer(_socket.get_executor());
375
376 // Prepare done handler
377 auto async_done_handler = [&](asio::error_code ec)
378 {
379 std::unique_lock<std::mutex> lck(mtx);
380 if (done++ == 0)
381 {
382 error = ec;
383 _socket.cancel();
384 timer.cancel();
385 }
386 cv.notify_one();
387 };
388
389 // Async wait for timeout
390 timer.expires_after(timeout.chrono());
391 timer.async_wait([&](const asio::error_code& ec) { async_done_handler(ec ? ec : asio::error::timed_out); });
392
393 // Async read some data from the client
394 size_t received = 0;
395 _socket.async_read_some(asio::buffer(buffer, size), [&](std::error_code ec, size_t read) { async_done_handler(ec); received = read; });
396
397 // Wait for complete or timeout
398 std::unique_lock<std::mutex> lck(mtx);
399 cv.wait(lck, [&]() { return done == 2; });
400
401 // Received some data from the client
402 if (received > 0)
403 {
404 // Update statistic
405 _bytes_received += received;
406 _server->_bytes_received += received;
407
408 // Call the buffer received handler
409 onReceived(buffer, received);
410 }
411
412 // Disconnect on error
413 if (error && (error != asio::error::timed_out))
414 {
415 SendError(error);
416 Disconnect();
417 }
418
419 return received;
420}
421
422std::string TCPSession::Receive(size_t size, const CppCommon::Timespan& timeout)
423{
424 std::string text(size, 0);
425 text.resize(Receive(text.data(), text.size(), timeout));
426 return text;
427}
428
430{
431 // Try to receive data from the client
432 TryReceive();
433}
434
435void TCPSession::TryReceive()
436{
437 if (_receiving)
438 return;
439
440 if (!IsConnected())
441 return;
442
443 // Async receive with the receive handler
444 _receiving = true;
445 auto self(this->shared_from_this());
446 auto async_receive_handler = make_alloc_handler(_receive_storage, [this, self](std::error_code ec, size_t size)
447 {
448 _receiving = false;
449
450 if (!IsConnected())
451 return;
452
453 // Received some data from the client
454 if (size > 0)
455 {
456 // Update statistic
457 _bytes_received += size;
458 _server->_bytes_received += size;
459
460 // Call the buffer received handler
461 onReceived(_receive_buffer.data(), size);
462
463 // If the receive buffer is full increase its size
464 if (_receive_buffer.size() == size)
465 {
466 // Check the receive buffer limit
467 if (((2 * size) > _receive_buffer_limit) && (_receive_buffer_limit > 0))
468 {
469 SendError(asio::error::no_buffer_space);
470 Disconnect(true);
471 return;
472 }
473
474 _receive_buffer.resize(2 * size);
475 }
476 }
477
478 // Try to receive again if the session is valid
479 if (!ec)
480 TryReceive();
481 else
482 {
483 SendError(ec);
484 Disconnect(true);
485 }
486 });
487 if (_strand_required)
488 _socket.async_read_some(asio::buffer(_receive_buffer.data(), _receive_buffer.size()), bind_executor(_strand, async_receive_handler));
489 else
490 _socket.async_read_some(asio::buffer(_receive_buffer.data(), _receive_buffer.size()), async_receive_handler);
491}
492
493void TCPSession::TrySend()
494{
495 if (_sending)
496 return;
497
498 if (!IsConnected())
499 return;
500
501 // Swap send buffers
502 if (_send_buffer_flush.empty())
503 {
504 std::scoped_lock locker(_send_lock);
505
506 // Swap flush and main buffers
507 _send_buffer_flush.swap(_send_buffer_main);
508 _send_buffer_flush_offset = 0;
509
510 // Update statistic
511 _bytes_pending = 0;
512 _bytes_sending += _send_buffer_flush.size();
513 }
514
515 // Check if the flush buffer is empty
516 if (_send_buffer_flush.empty())
517 {
518 // Call the empty send buffer handler
519 onEmpty();
520 return;
521 }
522
523 // Async write with the write handler
524 _sending = true;
525 auto self(this->shared_from_this());
526 auto async_write_handler = make_alloc_handler(_send_storage, [this, self](std::error_code ec, size_t size)
527 {
528 _sending = false;
529
530 if (!IsConnected())
531 return;
532
533 // Send some data to the client
534 if (size > 0)
535 {
536 // Update statistic
537 _bytes_sending -= size;
538 _bytes_sent += size;
539 _server->_bytes_sent += size;
540
541 // Increase the flush buffer offset
542 _send_buffer_flush_offset += size;
543
544 // Successfully send the whole flush buffer
545 if (_send_buffer_flush_offset == _send_buffer_flush.size())
546 {
547 // Clear the flush buffer
548 _send_buffer_flush.clear();
549 _send_buffer_flush_offset = 0;
550 }
551
552 // Call the buffer sent handler
553 onSent(size, bytes_pending());
554 }
555
556 // Try to send again if the session is valid
557 if (!ec)
558 TrySend();
559 else
560 {
561 SendError(ec);
562 Disconnect(true);
563 }
564 });
565 if (_strand_required)
566 _socket.async_write_some(asio::buffer(_send_buffer_flush.data() + _send_buffer_flush_offset, _send_buffer_flush.size() - _send_buffer_flush_offset), bind_executor(_strand, async_write_handler));
567 else
568 _socket.async_write_some(asio::buffer(_send_buffer_flush.data() + _send_buffer_flush_offset, _send_buffer_flush.size() - _send_buffer_flush_offset), async_write_handler);
569}
570
571void TCPSession::ClearBuffers()
572{
573 {
574 std::scoped_lock locker(_send_lock);
575
576 // Clear send buffers
577 _send_buffer_main.clear();
578 _send_buffer_flush.clear();
579 _send_buffer_flush_offset = 0;
580
581 // Update statistic
582 _bytes_pending = 0;
583 _bytes_sending = 0;
584 }
585}
586
587void TCPSession::ResetServer()
588{
589 // Reset cycle-reference to the server
590 _server.reset();
591}
592
593void TCPSession::SendError(std::error_code ec)
594{
595 // Skip Asio disconnect errors
596 if ((ec == asio::error::connection_aborted) ||
597 (ec == asio::error::connection_refused) ||
598 (ec == asio::error::connection_reset) ||
599 (ec == asio::error::eof) ||
600 (ec == asio::error::operation_aborted))
601 return;
602
603 onError(ec.value(), ec.category().name(), ec.message());
604}
605
606} // namespace Asio
607} // namespace CppServer
void SetupReceiveBufferSize(size_t size)
Setup option: receive buffer size.
TCPSession(const std::shared_ptr< TCPServer > &server)
Initialize the session with a given server.
virtual void onConnected()
Handle session connected notification.
uint64_t bytes_pending() const noexcept
Get the number of bytes pending sent by the session.
Definition tcp_session.h:57
virtual void onDisconnecting()
Handle session disconnecting notification.
std::shared_ptr< TCPServer > & server() noexcept
Get the server.
Definition tcp_session.h:48
void SetupSendBufferSize(size_t size)
Setup option: send buffer size.
virtual bool SendAsync(const void *buffer, size_t size)
Send data to the client (asynchronous).
virtual size_t Send(const void *buffer, size_t size)
Send data to the client (synchronous).
bool IsConnected() const noexcept
Is the session connected?
Definition tcp_session.h:73
virtual void onConnecting()
Handle session connecting notification.
virtual void onEmpty()
Handle empty send buffer notification.
virtual void onReceived(const void *buffer, size_t size)
Handle buffer received notification.
size_t option_receive_buffer_size() const
Get the option: receive buffer size.
virtual void onDisconnected()
Handle session disconnected notification.
size_t option_send_buffer_size() const
Get the option: send buffer size.
virtual void onSent(size_t sent, size_t pending)
Handle buffer sent notification.
virtual void onError(int error, const std::string &category, const std::string &message)
Handle error notification.
virtual size_t Receive(void *buffer, size_t size)
Receive data from the client (synchronous).
virtual bool Disconnect()
Disconnect the session.
Definition tcp_session.h:79
virtual void ReceiveAsync()
Receive data from the client (asynchronous).
Asio definitions.
AllocateHandler< THandler > make_alloc_handler(HandlerStorage &storage, THandler handler)
Helper function to wrap a handler object to add custom allocation.
Definition memory.inl:39
C++ Server project definitions.
Definition asio.h:56
TCP server definition.
TCP session definition.