CppServer 1.0.5.0
C++ Server Library
Loading...
Searching...
No Matches
tcp_session.cpp
Go to the documentation of this file.
1
11
12namespace CppServer {
13namespace Asio {
14
15TCPSession::TCPSession(const std::shared_ptr<TCPServer>& server)
16 : _id(CppCommon::UUID::Sequential()),
17 _server(server),
18 _io_service(server->service()->GetAsioService()),
19 _strand(*_io_service),
20 _strand_required(_server->_strand_required),
21 _socket(*_io_service),
22 _connected(false),
23 _bytes_pending(0),
24 _bytes_sending(0),
25 _bytes_sent(0),
26 _bytes_received(0),
27 _receiving(false),
28 _sending(false),
29 _send_buffer_flush_offset(0)
30{
31}
32
34{
35 asio::socket_base::receive_buffer_size option;
36 _socket.get_option(option);
37 return option.value();
38}
39
41{
42 asio::socket_base::send_buffer_size option;
43 _socket.get_option(option);
44 return option.value();
45}
46
48{
49 asio::socket_base::receive_buffer_size option((int)size);
50 _socket.set_option(option);
51}
52
54{
55 asio::socket_base::send_buffer_size option((int)size);
56 _socket.set_option(option);
57}
58
59void TCPSession::Connect()
60{
61 // Apply the option: keep alive
62 if (_server->option_keep_alive())
63 _socket.set_option(asio::ip::tcp::socket::keep_alive(true));
64 // Apply the option: no delay
65 if (_server->option_no_delay())
66 _socket.set_option(asio::ip::tcp::no_delay(true));
67
68 // Prepare receive & send buffers
69 _receive_buffer.resize(option_receive_buffer_size());
70 _send_buffer_main.reserve(option_send_buffer_size());
71 _send_buffer_flush.reserve(option_send_buffer_size());
72
73 // Reset statistic
74 _bytes_pending = 0;
75 _bytes_sending = 0;
76 _bytes_sent = 0;
77 _bytes_received = 0;
78
79 // Update the connected flag
80 _connected = true;
81
82 // Try to receive something from the client
83 TryReceive();
84
85 // Call the session connected handler
87
88 // Call the session connected handler in the server
90 _server->onConnected(connected_session);
91
92 // Call the empty send buffer handler
93 if (_send_buffer_main.empty())
94 onEmpty();
95}
96
97bool TCPSession::Disconnect(bool dispatch)
98{
99 if (!IsConnected())
100 return false;
101
102 // Dispatch or post the disconnect handler
103 auto self(this->shared_from_this());
104 auto disconnect_handler = [this, self]()
105 {
106 if (!IsConnected())
107 return;
108
109 // Close the session socket
110 _socket.close();
111
112 // Update the connected flag
113 _connected = false;
114
115 // Update sending/receiving flags
116 _receiving = false;
117 _sending = false;
118
119 // Clear send/receive buffers
120 ClearBuffers();
121
122 // Call the session disconnected handler
124
125 // Call the session disconnected handler in the server
126 auto disconnected_session(this->shared_from_this());
127 _server->onDisconnected(disconnected_session);
128
129 // Dispatch the unregister session handler
130 auto unregister_session_handler = [this, self]()
131 {
132 _server->UnregisterSession(id());
133 };
134 if (_server->_strand_required)
135 _server->_strand.dispatch(unregister_session_handler);
136 else
137 _server->_io_service->dispatch(unregister_session_handler);
138 };
139 if (_strand_required)
140 {
141 if (dispatch)
142 _strand.dispatch(disconnect_handler);
143 else
144 _strand.post(disconnect_handler);
145 }
146 else
147 {
148 if (dispatch)
149 _io_service->dispatch(disconnect_handler);
150 else
151 _io_service->post(disconnect_handler);
152 }
153
154 return true;
155}
156
157size_t TCPSession::Send(const void* buffer, size_t size)
158{
159 if (!IsConnected())
160 return 0;
161
162 if (size == 0)
163 return 0;
164
165 assert((buffer != nullptr) && "Pointer to the buffer should not be null!");
166 if (buffer == nullptr)
167 return 0;
168
169 asio::error_code ec;
170
171 // Send data to the client
172 size_t sent = asio::write(_socket, asio::buffer(buffer, size), ec);
173 if (sent > 0)
174 {
175 // Update statistic
176 _bytes_sent += sent;
177 _server->_bytes_sent += sent;
178
179 // Call the buffer sent handler
181 }
182
183 // Disconnect on error
184 if (ec)
185 {
186 SendError(ec);
187 Disconnect();
188 }
189
190 return sent;
191}
192
193size_t TCPSession::Send(const void* buffer, size_t size, const CppCommon::Timespan& timeout)
194{
195 if (!IsConnected())
196 return 0;
197
198 if (size == 0)
199 return 0;
200
201 assert((buffer != nullptr) && "Pointer to the buffer should not be null!");
202 if (buffer == nullptr)
203 return 0;
204
205 int done = 0;
206 std::mutex mtx;
207 std::condition_variable cv;
208 asio::error_code error;
209 asio::system_timer timer(_socket.get_executor());
210
211 // Prepare done handler
212 auto async_done_handler = [&](asio::error_code ec)
213 {
214 std::unique_lock<std::mutex> lck(mtx);
215 if (done++ == 0)
216 {
217 error = ec;
218 _socket.cancel();
219 timer.cancel();
220 }
221 cv.notify_one();
222 };
223
224 // Async wait for timeout
225 timer.expires_from_now(timeout.chrono());
226 timer.async_wait([&](const asio::error_code& ec) { async_done_handler(ec ? ec : asio::error::timed_out); });
227
228 // Async write some data to the client
229 size_t sent = 0;
230 _socket.async_write_some(asio::buffer(buffer, size), [&](std::error_code ec, size_t write) { async_done_handler(ec); sent = write; });
231
232 // Wait for complete or timeout
233 std::unique_lock<std::mutex> lck(mtx);
234 cv.wait(lck, [&]() { return done == 2; });
235
236 // Send data to the client
237 if (sent > 0)
238 {
239 // Update statistic
240 _bytes_sent += sent;
241 _server->_bytes_sent += sent;
242
243 // Call the buffer sent handler
245 }
246
247 // Disconnect on error
248 if (error && (error != asio::error::timed_out))
249 {
250 SendError(error);
251 Disconnect();
252 }
253
254 return sent;
255}
256
257bool TCPSession::SendAsync(const void* buffer, size_t size)
258{
259 if (!IsConnected())
260 return false;
261
262 if (size == 0)
263 return true;
264
265 assert((buffer != nullptr) && "Pointer to the buffer should not be null!");
266 if (buffer == nullptr)
267 return false;
268
269 {
270 std::scoped_lock locker(_send_lock);
271
272 // Detect multiple send handlers
273 bool send_required = _send_buffer_main.empty() || _send_buffer_flush.empty();
274
275 // Check the send buffer limit
276 if (((_send_buffer_main.size() + size) > _send_buffer_limit) && (_send_buffer_limit > 0))
277 {
278 SendError(asio::error::no_buffer_space);
279 return false;
280 }
281
282 // Fill the main send buffer
283 const uint8_t* bytes = (const uint8_t*)buffer;
284 _send_buffer_main.insert(_send_buffer_main.end(), bytes, bytes + size);
285
286 // Update statistic
287 _bytes_pending = _send_buffer_main.size();
288
289 // Avoid multiple send handlers
290 if (!send_required)
291 return true;
292 }
293
294 // Dispatch the send handler
295 auto self(this->shared_from_this());
296 auto send_handler = [this, self]()
297 {
298 // Try to send the main buffer
299 TrySend();
300 };
301 if (_strand_required)
302 _strand.dispatch(send_handler);
303 else
304 _io_service->dispatch(send_handler);
305
306 return true;
307}
308
309size_t TCPSession::Receive(void* buffer, size_t size)
310{
311 if (!IsConnected())
312 return 0;
313
314 if (size == 0)
315 return 0;
316
317 assert((buffer != nullptr) && "Pointer to the buffer should not be null!");
318 if (buffer == nullptr)
319 return 0;
320
321 asio::error_code ec;
322
323 // Receive data from the client
324 size_t received = _socket.read_some(asio::buffer(buffer, size), ec);
325 if (received > 0)
326 {
327 // Update statistic
328 _bytes_received += received;
329 _server->_bytes_received += received;
330
331 // Call the buffer received handler
333 }
334
335 // Disconnect on error
336 if (ec)
337 {
338 SendError(ec);
339 Disconnect();
340 }
341
342 return received;
343}
344
345std::string TCPSession::Receive(size_t size)
346{
347 std::string text(size, 0);
348 text.resize(Receive(text.data(), text.size()));
349 return text;
350}
351
352size_t TCPSession::Receive(void* buffer, size_t size, const CppCommon::Timespan& timeout)
353{
354 if (!IsConnected())
355 return 0;
356
357 if (size == 0)
358 return 0;
359
360 assert((buffer != nullptr) && "Pointer to the buffer should not be null!");
361 if (buffer == nullptr)
362 return 0;
363
364 int done = 0;
365 std::mutex mtx;
366 std::condition_variable cv;
367 asio::error_code error;
368 asio::system_timer timer(_socket.get_executor());
369
370 // Prepare done handler
371 auto async_done_handler = [&](asio::error_code ec)
372 {
373 std::unique_lock<std::mutex> lck(mtx);
374 if (done++ == 0)
375 {
376 error = ec;
377 _socket.cancel();
378 timer.cancel();
379 }
380 cv.notify_one();
381 };
382
383 // Async wait for timeout
384 timer.expires_from_now(timeout.chrono());
385 timer.async_wait([&](const asio::error_code& ec) { async_done_handler(ec ? ec : asio::error::timed_out); });
386
387 // Async read some data from the client
388 size_t received = 0;
389 _socket.async_read_some(asio::buffer(buffer, size), [&](std::error_code ec, size_t read) { async_done_handler(ec); received = read; });
390
391 // Wait for complete or timeout
392 std::unique_lock<std::mutex> lck(mtx);
393 cv.wait(lck, [&]() { return done == 2; });
394
395 // Received some data from the client
396 if (received > 0)
397 {
398 // Update statistic
399 _bytes_received += received;
400 _server->_bytes_received += received;
401
402 // Call the buffer received handler
404 }
405
406 // Disconnect on error
407 if (error && (error != asio::error::timed_out))
408 {
409 SendError(error);
410 Disconnect();
411 }
412
413 return received;
414}
415
416std::string TCPSession::Receive(size_t size, const CppCommon::Timespan& timeout)
417{
418 std::string text(size, 0);
419 text.resize(Receive(text.data(), text.size(), timeout));
420 return text;
421}
422
424{
425 // Try to receive data from the client
426 TryReceive();
427}
428
429void TCPSession::TryReceive()
430{
431 if (_receiving)
432 return;
433
434 if (!IsConnected())
435 return;
436
437 // Async receive with the receive handler
438 _receiving = true;
439 auto self(this->shared_from_this());
440 auto async_receive_handler = make_alloc_handler(_receive_storage, [this, self](std::error_code ec, size_t size)
441 {
442 _receiving = false;
443
444 if (!IsConnected())
445 return;
446
447 // Received some data from the client
448 if (size > 0)
449 {
450 // Update statistic
451 _bytes_received += size;
452 _server->_bytes_received += size;
453
454 // Call the buffer received handler
455 onReceived(_receive_buffer.data(), size);
456
457 // If the receive buffer is full increase its size
458 if (_receive_buffer.size() == size)
459 {
460 // Check the receive buffer limit
461 if (((2 * size) > _receive_buffer_limit) && (_receive_buffer_limit > 0))
462 {
463 SendError(asio::error::no_buffer_space);
464 Disconnect(true);
465 return;
466 }
467
468 _receive_buffer.resize(2 * size);
469 }
470 }
471
472 // Try to receive again if the session is valid
473 if (!ec)
474 TryReceive();
475 else
476 {
477 SendError(ec);
478 Disconnect(true);
479 }
480 });
481 if (_strand_required)
482 _socket.async_read_some(asio::buffer(_receive_buffer.data(), _receive_buffer.size()), bind_executor(_strand, async_receive_handler));
483 else
484 _socket.async_read_some(asio::buffer(_receive_buffer.data(), _receive_buffer.size()), async_receive_handler);
485}
486
487void TCPSession::TrySend()
488{
489 if (_sending)
490 return;
491
492 if (!IsConnected())
493 return;
494
495 // Swap send buffers
496 if (_send_buffer_flush.empty())
497 {
498 std::scoped_lock locker(_send_lock);
499
500 // Swap flush and main buffers
501 _send_buffer_flush.swap(_send_buffer_main);
502 _send_buffer_flush_offset = 0;
503
504 // Update statistic
505 _bytes_pending = 0;
506 _bytes_sending += _send_buffer_flush.size();
507 }
508
509 // Check if the flush buffer is empty
510 if (_send_buffer_flush.empty())
511 {
512 // Call the empty send buffer handler
513 onEmpty();
514 return;
515 }
516
517 // Async write with the write handler
518 _sending = true;
519 auto self(this->shared_from_this());
520 auto async_write_handler = make_alloc_handler(_send_storage, [this, self](std::error_code ec, size_t size)
521 {
522 _sending = false;
523
524 if (!IsConnected())
525 return;
526
527 // Send some data to the client
528 if (size > 0)
529 {
530 // Update statistic
531 _bytes_sending -= size;
532 _bytes_sent += size;
533 _server->_bytes_sent += size;
534
535 // Increase the flush buffer offset
536 _send_buffer_flush_offset += size;
537
538 // Successfully send the whole flush buffer
539 if (_send_buffer_flush_offset == _send_buffer_flush.size())
540 {
541 // Clear the flush buffer
542 _send_buffer_flush.clear();
543 _send_buffer_flush_offset = 0;
544 }
545
546 // Call the buffer sent handler
547 onSent(size, bytes_pending());
548 }
549
550 // Try to send again if the session is valid
551 if (!ec)
552 TrySend();
553 else
554 {
555 SendError(ec);
556 Disconnect(true);
557 }
558 });
559 if (_strand_required)
560 _socket.async_write_some(asio::buffer(_send_buffer_flush.data() + _send_buffer_flush_offset, _send_buffer_flush.size() - _send_buffer_flush_offset), bind_executor(_strand, async_write_handler));
561 else
562 _socket.async_write_some(asio::buffer(_send_buffer_flush.data() + _send_buffer_flush_offset, _send_buffer_flush.size() - _send_buffer_flush_offset), async_write_handler);
563}
564
565void TCPSession::ClearBuffers()
566{
567 {
568 std::scoped_lock locker(_send_lock);
569
570 // Clear send buffers
571 _send_buffer_main.clear();
572 _send_buffer_flush.clear();
573 _send_buffer_flush_offset = 0;
574
575 // Update statistic
576 _bytes_pending = 0;
577 _bytes_sending = 0;
578 }
579}
580
581void TCPSession::ResetServer()
582{
583 // Reset cycle-reference to the server
584 _server.reset();
585}
586
587void TCPSession::SendError(std::error_code ec)
588{
589 // Skip Asio disconnect errors
590 if ((ec == asio::error::connection_aborted) ||
591 (ec == asio::error::connection_refused) ||
592 (ec == asio::error::connection_reset) ||
593 (ec == asio::error::eof) ||
594 (ec == asio::error::operation_aborted))
595 return;
596
597 onError(ec.value(), ec.category().name(), ec.message());
598}
599
600} // namespace Asio
601} // namespace CppServer
Asio allocate handler wrapper.
Definition memory.h:133
void SetupReceiveBufferSize(size_t size)
Setup option: receive buffer size.
TCPSession(const std::shared_ptr< TCPServer > &server)
Initialize the session with a given server.
virtual void onConnected()
Handle session connected notification.
uint64_t bytes_pending() const noexcept
Get the number of bytes pending sent by the session.
Definition tcp_session.h:57
void SetupSendBufferSize(size_t size)
Setup option: send buffer size.
virtual bool SendAsync(const void *buffer, size_t size)
Send data to the client (asynchronous)
virtual size_t Send(const void *buffer, size_t size)
Send data to the client (synchronous)
bool IsConnected() const noexcept
Is the session connected?
Definition tcp_session.h:73
virtual void onEmpty()
Handle empty send buffer notification.
virtual void onReceived(const void *buffer, size_t size)
Handle buffer received notification.
size_t option_receive_buffer_size() const
Get the option: receive buffer size.
virtual void onDisconnected()
Handle session disconnected notification.
size_t option_send_buffer_size() const
Get the option: send buffer size.
virtual void onSent(size_t sent, size_t pending)
Handle buffer sent notification.
virtual void onError(int error, const std::string &category, const std::string &message)
Handle error notification.
virtual size_t Receive(void *buffer, size_t size)
Receive data from the client (synchronous)
virtual bool Disconnect()
Disconnect the session.
Definition tcp_session.h:79
virtual void ReceiveAsync()
Receive data from the client (asynchronous)
AllocateHandler< THandler > make_alloc_handler(HandlerStorage &storage, THandler handler)
Helper function to wrap a handler object to add custom allocation.
Definition memory.inl:39
C++ Server project definitions.
Definition asio.h:56
TCP server definition.
TCP session definition.