CppServer  1.0.4.0
C++ Server Library
tcp_session.cpp
Go to the documentation of this file.
1 
10 #include "server/asio/tcp_server.h"
11 
12 namespace CppServer {
13 namespace Asio {
14 
15 TCPSession::TCPSession(const std::shared_ptr<TCPServer>& server)
16  : _id(CppCommon::UUID::Sequential()),
17  _server(server),
18  _io_service(server->service()->GetAsioService()),
19  _strand(*_io_service),
20  _strand_required(_server->_strand_required),
21  _socket(*_io_service),
22  _connected(false),
23  _bytes_pending(0),
24  _bytes_sending(0),
25  _bytes_sent(0),
26  _bytes_received(0),
27  _receiving(false),
28  _sending(false),
29  _send_buffer_flush_offset(0)
30 {
31 }
32 
34 {
35  asio::socket_base::receive_buffer_size option;
36  _socket.get_option(option);
37  return option.value();
38 }
39 
41 {
42  asio::socket_base::send_buffer_size option;
43  _socket.get_option(option);
44  return option.value();
45 }
46 
48 {
49  asio::socket_base::receive_buffer_size option((int)size);
50  _socket.set_option(option);
51 }
52 
54 {
55  asio::socket_base::send_buffer_size option((int)size);
56  _socket.set_option(option);
57 }
58 
59 void TCPSession::Connect()
60 {
61  // Apply the option: keep alive
62  if (_server->option_keep_alive())
63  _socket.set_option(asio::ip::tcp::socket::keep_alive(true));
64  // Apply the option: no delay
65  if (_server->option_no_delay())
66  _socket.set_option(asio::ip::tcp::no_delay(true));
67 
68  // Prepare receive & send buffers
69  _receive_buffer.resize(option_receive_buffer_size());
70  _send_buffer_main.reserve(option_send_buffer_size());
71  _send_buffer_flush.reserve(option_send_buffer_size());
72 
73  // Reset statistic
74  _bytes_pending = 0;
75  _bytes_sending = 0;
76  _bytes_sent = 0;
77  _bytes_received = 0;
78 
79  // Update the connected flag
80  _connected = true;
81 
82  // Try to receive something from the client
83  TryReceive();
84 
85  // Call the session connected handler
86  onConnected();
87 
88  // Call the session connected handler in the server
89  auto connected_session(this->shared_from_this());
90  _server->onConnected(connected_session);
91 
92  // Call the empty send buffer handler
93  if (_send_buffer_main.empty())
94  onEmpty();
95 }
96 
97 bool TCPSession::Disconnect(bool dispatch)
98 {
99  if (!IsConnected())
100  return false;
101 
102  // Dispatch or post the disconnect handler
103  auto self(this->shared_from_this());
104  auto disconnect_handler = [this, self]()
105  {
106  if (!IsConnected())
107  return;
108 
109  // Close the session socket
110  _socket.close();
111 
112  // Update the connected flag
113  _connected = false;
114 
115  // Update sending/receiving flags
116  _receiving = false;
117  _sending = false;
118 
119  // Clear send/receive buffers
120  ClearBuffers();
121 
122  // Call the session disconnected handler
123  onDisconnected();
124 
125  // Call the session disconnected handler in the server
126  auto disconnected_session(this->shared_from_this());
127  _server->onDisconnected(disconnected_session);
128 
129  // Dispatch the unregister session handler
130  auto unregister_session_handler = [this, self]()
131  {
132  _server->UnregisterSession(id());
133  };
134  if (_server->_strand_required)
135  _server->_strand.dispatch(unregister_session_handler);
136  else
137  _server->_io_service->dispatch(unregister_session_handler);
138  };
139  if (_strand_required)
140  {
141  if (dispatch)
142  _strand.dispatch(disconnect_handler);
143  else
144  _strand.post(disconnect_handler);
145  }
146  else
147  {
148  if (dispatch)
149  _io_service->dispatch(disconnect_handler);
150  else
151  _io_service->post(disconnect_handler);
152  }
153 
154  return true;
155 }
156 
157 size_t TCPSession::Send(const void* buffer, size_t size)
158 {
159  if (!IsConnected())
160  return 0;
161 
162  if (size == 0)
163  return 0;
164 
165  assert((buffer != nullptr) && "Pointer to the buffer should not be null!");
166  if (buffer == nullptr)
167  return 0;
168 
169  asio::error_code ec;
170 
171  // Send data to the client
172  size_t sent = asio::write(_socket, asio::buffer(buffer, size), ec);
173  if (sent > 0)
174  {
175  // Update statistic
176  _bytes_sent += sent;
177  _server->_bytes_sent += sent;
178 
179  // Call the buffer sent handler
180  onSent(sent, bytes_pending());
181  }
182 
183  // Disconnect on error
184  if (ec)
185  {
186  SendError(ec);
187  Disconnect();
188  }
189 
190  return sent;
191 }
192 
193 size_t TCPSession::Send(const void* buffer, size_t size, const CppCommon::Timespan& timeout)
194 {
195  if (!IsConnected())
196  return 0;
197 
198  if (size == 0)
199  return 0;
200 
201  assert((buffer != nullptr) && "Pointer to the buffer should not be null!");
202  if (buffer == nullptr)
203  return 0;
204 
205  int done = 0;
206  std::mutex mtx;
207  std::condition_variable cv;
208  asio::error_code error;
209  asio::system_timer timer(_socket.get_executor());
210 
211  // Prepare done handler
212  auto async_done_handler = [&](asio::error_code ec)
213  {
214  std::unique_lock<std::mutex> lck(mtx);
215  if (done++ == 0)
216  {
217  error = ec;
218  _socket.cancel();
219  timer.cancel();
220  }
221  cv.notify_one();
222  };
223 
224  // Async wait for timeout
225  timer.expires_from_now(timeout.chrono());
226  timer.async_wait([&](const asio::error_code& ec) { async_done_handler(ec ? ec : asio::error::timed_out); });
227 
228  // Async write some data to the client
229  size_t sent = 0;
230  _socket.async_write_some(asio::buffer(buffer, size), [&](std::error_code ec, size_t write) { async_done_handler(ec); sent = write; });
231 
232  // Wait for complete or timeout
233  std::unique_lock<std::mutex> lck(mtx);
234  cv.wait(lck, [&]() { return done == 2; });
235 
236  // Send data to the client
237  if (sent > 0)
238  {
239  // Update statistic
240  _bytes_sent += sent;
241  _server->_bytes_sent += sent;
242 
243  // Call the buffer sent handler
244  onSent(sent, bytes_pending());
245  }
246 
247  // Disconnect on error
248  if (error && (error != asio::error::timed_out))
249  {
250  SendError(error);
251  Disconnect();
252  }
253 
254  return sent;
255 }
256 
257 bool TCPSession::SendAsync(const void* buffer, size_t size)
258 {
259  if (!IsConnected())
260  return false;
261 
262  if (size == 0)
263  return true;
264 
265  assert((buffer != nullptr) && "Pointer to the buffer should not be null!");
266  if (buffer == nullptr)
267  return false;
268 
269  {
270  std::scoped_lock locker(_send_lock);
271 
272  // Detect multiple send handlers
273  bool send_required = _send_buffer_main.empty() || _send_buffer_flush.empty();
274 
275  // Check the send buffer limit
276  if (((_send_buffer_main.size() + size) > _send_buffer_limit) && (_send_buffer_limit > 0))
277  {
278  SendError(asio::error::no_buffer_space);
279  return false;
280  }
281 
282  // Fill the main send buffer
283  const uint8_t* bytes = (const uint8_t*)buffer;
284  _send_buffer_main.insert(_send_buffer_main.end(), bytes, bytes + size);
285 
286  // Update statistic
287  _bytes_pending = _send_buffer_main.size();
288 
289  // Avoid multiple send handlers
290  if (!send_required)
291  return true;
292  }
293 
294  // Dispatch the send handler
295  auto self(this->shared_from_this());
296  auto send_handler = [this, self]()
297  {
298  // Try to send the main buffer
299  TrySend();
300  };
301  if (_strand_required)
302  _strand.dispatch(send_handler);
303  else
304  _io_service->dispatch(send_handler);
305 
306  return true;
307 }
308 
309 size_t TCPSession::Receive(void* buffer, size_t size)
310 {
311  if (!IsConnected())
312  return 0;
313 
314  if (size == 0)
315  return 0;
316 
317  assert((buffer != nullptr) && "Pointer to the buffer should not be null!");
318  if (buffer == nullptr)
319  return 0;
320 
321  asio::error_code ec;
322 
323  // Receive data from the client
324  size_t received = _socket.read_some(asio::buffer(buffer, size), ec);
325  if (received > 0)
326  {
327  // Update statistic
328  _bytes_received += received;
329  _server->_bytes_received += received;
330 
331  // Call the buffer received handler
332  onReceived(buffer, received);
333  }
334 
335  // Disconnect on error
336  if (ec)
337  {
338  SendError(ec);
339  Disconnect();
340  }
341 
342  return received;
343 }
344 
345 std::string TCPSession::Receive(size_t size)
346 {
347  std::string text(size, 0);
348  text.resize(Receive(text.data(), text.size()));
349  return text;
350 }
351 
352 size_t TCPSession::Receive(void* buffer, size_t size, const CppCommon::Timespan& timeout)
353 {
354  if (!IsConnected())
355  return 0;
356 
357  if (size == 0)
358  return 0;
359 
360  assert((buffer != nullptr) && "Pointer to the buffer should not be null!");
361  if (buffer == nullptr)
362  return 0;
363 
364  int done = 0;
365  std::mutex mtx;
366  std::condition_variable cv;
367  asio::error_code error;
368  asio::system_timer timer(_socket.get_executor());
369 
370  // Prepare done handler
371  auto async_done_handler = [&](asio::error_code ec)
372  {
373  std::unique_lock<std::mutex> lck(mtx);
374  if (done++ == 0)
375  {
376  error = ec;
377  _socket.cancel();
378  timer.cancel();
379  }
380  cv.notify_one();
381  };
382 
383  // Async wait for timeout
384  timer.expires_from_now(timeout.chrono());
385  timer.async_wait([&](const asio::error_code& ec) { async_done_handler(ec ? ec : asio::error::timed_out); });
386 
387  // Async read some data from the client
388  size_t received = 0;
389  _socket.async_read_some(asio::buffer(buffer, size), [&](std::error_code ec, size_t read) { async_done_handler(ec); received = read; });
390 
391  // Wait for complete or timeout
392  std::unique_lock<std::mutex> lck(mtx);
393  cv.wait(lck, [&]() { return done == 2; });
394 
395  // Received some data from the client
396  if (received > 0)
397  {
398  // Update statistic
399  _bytes_received += received;
400  _server->_bytes_received += received;
401 
402  // Call the buffer received handler
403  onReceived(buffer, received);
404  }
405 
406  // Disconnect on error
407  if (error && (error != asio::error::timed_out))
408  {
409  SendError(error);
410  Disconnect();
411  }
412 
413  return received;
414 }
415 
416 std::string TCPSession::Receive(size_t size, const CppCommon::Timespan& timeout)
417 {
418  std::string text(size, 0);
419  text.resize(Receive(text.data(), text.size(), timeout));
420  return text;
421 }
422 
424 {
425  // Try to receive data from the client
426  TryReceive();
427 }
428 
429 void TCPSession::TryReceive()
430 {
431  if (_receiving)
432  return;
433 
434  if (!IsConnected())
435  return;
436 
437  // Async receive with the receive handler
438  _receiving = true;
439  auto self(this->shared_from_this());
440  auto async_receive_handler = make_alloc_handler(_receive_storage, [this, self](std::error_code ec, size_t size)
441  {
442  _receiving = false;
443 
444  if (!IsConnected())
445  return;
446 
447  // Received some data from the client
448  if (size > 0)
449  {
450  // Update statistic
451  _bytes_received += size;
452  _server->_bytes_received += size;
453 
454  // Call the buffer received handler
455  onReceived(_receive_buffer.data(), size);
456 
457  // If the receive buffer is full increase its size
458  if (_receive_buffer.size() == size)
459  {
460  // Check the receive buffer limit
461  if (((2 * size) > _receive_buffer_limit) && (_receive_buffer_limit > 0))
462  {
463  SendError(asio::error::no_buffer_space);
464  Disconnect(true);
465  return;
466  }
467 
468  _receive_buffer.resize(2 * size);
469  }
470  }
471 
472  // Try to receive again if the session is valid
473  if (!ec)
474  TryReceive();
475  else
476  {
477  SendError(ec);
478  Disconnect(true);
479  }
480  });
481  if (_strand_required)
482  _socket.async_read_some(asio::buffer(_receive_buffer.data(), _receive_buffer.size()), bind_executor(_strand, async_receive_handler));
483  else
484  _socket.async_read_some(asio::buffer(_receive_buffer.data(), _receive_buffer.size()), async_receive_handler);
485 }
486 
487 void TCPSession::TrySend()
488 {
489  if (_sending)
490  return;
491 
492  if (!IsConnected())
493  return;
494 
495  // Swap send buffers
496  if (_send_buffer_flush.empty())
497  {
498  std::scoped_lock locker(_send_lock);
499 
500  // Swap flush and main buffers
501  _send_buffer_flush.swap(_send_buffer_main);
502  _send_buffer_flush_offset = 0;
503 
504  // Update statistic
505  _bytes_pending = 0;
506  _bytes_sending += _send_buffer_flush.size();
507  }
508 
509  // Check if the flush buffer is empty
510  if (_send_buffer_flush.empty())
511  {
512  // Call the empty send buffer handler
513  onEmpty();
514  return;
515  }
516 
517  // Async write with the write handler
518  _sending = true;
519  auto self(this->shared_from_this());
520  auto async_write_handler = make_alloc_handler(_send_storage, [this, self](std::error_code ec, size_t size)
521  {
522  _sending = false;
523 
524  if (!IsConnected())
525  return;
526 
527  // Send some data to the client
528  if (size > 0)
529  {
530  // Update statistic
531  _bytes_sending -= size;
532  _bytes_sent += size;
533  _server->_bytes_sent += size;
534 
535  // Increase the flush buffer offset
536  _send_buffer_flush_offset += size;
537 
538  // Successfully send the whole flush buffer
539  if (_send_buffer_flush_offset == _send_buffer_flush.size())
540  {
541  // Clear the flush buffer
542  _send_buffer_flush.clear();
543  _send_buffer_flush_offset = 0;
544  }
545 
546  // Call the buffer sent handler
547  onSent(size, bytes_pending());
548  }
549 
550  // Try to send again if the session is valid
551  if (!ec)
552  TrySend();
553  else
554  {
555  SendError(ec);
556  Disconnect(true);
557  }
558  });
559  if (_strand_required)
560  _socket.async_write_some(asio::buffer(_send_buffer_flush.data() + _send_buffer_flush_offset, _send_buffer_flush.size() - _send_buffer_flush_offset), bind_executor(_strand, async_write_handler));
561  else
562  _socket.async_write_some(asio::buffer(_send_buffer_flush.data() + _send_buffer_flush_offset, _send_buffer_flush.size() - _send_buffer_flush_offset), async_write_handler);
563 }
564 
565 void TCPSession::ClearBuffers()
566 {
567  {
568  std::scoped_lock locker(_send_lock);
569 
570  // Clear send buffers
571  _send_buffer_main.clear();
572  _send_buffer_flush.clear();
573  _send_buffer_flush_offset = 0;
574 
575  // Update statistic
576  _bytes_pending = 0;
577  _bytes_sending = 0;
578  }
579 }
580 
581 void TCPSession::ResetServer()
582 {
583  // Reset cycle-reference to the server
584  _server.reset();
585 }
586 
587 void TCPSession::SendError(std::error_code ec)
588 {
589  // Skip Asio disconnect errors
590  if ((ec == asio::error::connection_aborted) ||
591  (ec == asio::error::connection_refused) ||
592  (ec == asio::error::connection_reset) ||
593  (ec == asio::error::eof) ||
594  (ec == asio::error::operation_aborted))
595  return;
596 
597  onError(ec.value(), ec.category().name(), ec.message());
598 }
599 
600 } // namespace Asio
601 } // namespace CppServer
void SetupReceiveBufferSize(size_t size)
Setup option: receive buffer size.
Definition: tcp_session.cpp:47
TCPSession(const std::shared_ptr< TCPServer > &server)
Initialize the session with a given server.
Definition: tcp_session.cpp:15
virtual void onConnected()
Handle session connected notification.
Definition: tcp_session.h:191
uint64_t bytes_pending() const noexcept
Get the number of bytes pending sent by the session.
Definition: tcp_session.h:57
void SetupSendBufferSize(size_t size)
Setup option: send buffer size.
Definition: tcp_session.cpp:53
virtual bool SendAsync(const void *buffer, size_t size)
Send data to the client (asynchronous)
virtual size_t Send(const void *buffer, size_t size)
Send data to the client (synchronous)
bool IsConnected() const noexcept
Is the session connected?
Definition: tcp_session.h:73
virtual void onEmpty()
Handle empty send buffer notification.
Definition: tcp_session.h:224
virtual void onReceived(const void *buffer, size_t size)
Handle buffer received notification.
Definition: tcp_session.h:203
size_t option_receive_buffer_size() const
Get the option: receive buffer size.
Definition: tcp_session.cpp:33
virtual void onDisconnected()
Handle session disconnected notification.
Definition: tcp_session.h:193
size_t option_send_buffer_size() const
Get the option: send buffer size.
Definition: tcp_session.cpp:40
virtual void onSent(size_t sent, size_t pending)
Handle buffer sent notification.
Definition: tcp_session.h:215
virtual void onError(int error, const std::string &category, const std::string &message)
Handle error notification.
Definition: tcp_session.h:232
virtual size_t Receive(void *buffer, size_t size)
Receive data from the client (synchronous)
virtual bool Disconnect()
Disconnect the session.
Definition: tcp_session.h:79
virtual void ReceiveAsync()
Receive data from the client (asynchronous)
AllocateHandler< THandler > make_alloc_handler(HandlerStorage &storage, THandler handler)
Helper function to wrap a handler object to add custom allocation.
Definition: memory.inl:39
C++ Server project definitions.
Definition: asio.h:56
TCP server definition.
TCP session definition.