CppServer  1.0.4.0
C++ Server Library
tcp_server.cpp
Go to the documentation of this file.
1 
10 
11 namespace CppServer {
12 namespace Asio {
13 
14 TCPServer::TCPServer(const std::shared_ptr<Service>& service, int port, InternetProtocol protocol)
15  : _id(CppCommon::UUID::Sequential()),
16  _service(service),
17  _io_service(_service->GetAsioService()),
18  _strand(*_io_service),
19  _strand_required(_service->IsStrandRequired()),
20  _port(port),
21  _acceptor(*_io_service),
22  _started(false),
23  _bytes_pending(0),
24  _bytes_sent(0),
25  _bytes_received(0),
26  _option_keep_alive(false),
27  _option_no_delay(false),
28  _option_reuse_address(false),
29  _option_reuse_port(false)
30 {
31  assert((service != nullptr) && "Asio service is invalid!");
32  if (service == nullptr)
33  throw CppCommon::ArgumentException("Asio service is invalid!");
34 
35  // Prepare endpoint
36  switch (protocol)
37  {
39  _endpoint = asio::ip::tcp::endpoint(asio::ip::tcp::v4(), (unsigned short)port);
40  break;
42  _endpoint = asio::ip::tcp::endpoint(asio::ip::tcp::v6(), (unsigned short)port);
43  break;
44  }
45 }
46 
47 TCPServer::TCPServer(const std::shared_ptr<Service>& service, const std::string& address, int port)
48  : _id(CppCommon::UUID::Sequential()),
49  _service(service),
50  _io_service(_service->GetAsioService()),
51  _strand(*_io_service),
52  _strand_required(_service->IsStrandRequired()),
53  _address(address),
54  _port(port),
55  _acceptor(*_io_service),
56  _started(false),
57  _bytes_pending(0),
58  _bytes_sent(0),
59  _bytes_received(0),
60  _option_keep_alive(false),
61  _option_no_delay(false),
62  _option_reuse_address(false),
63  _option_reuse_port(false)
64 {
65  assert((service != nullptr) && "Asio service is invalid!");
66  if (service == nullptr)
67  throw CppCommon::ArgumentException("Asio service is invalid!");
68 
69  // Prepare endpoint
70  _endpoint = asio::ip::tcp::endpoint(asio::ip::make_address(address), (unsigned short)port);
71 }
72 
73 TCPServer::TCPServer(const std::shared_ptr<Service>& service, const asio::ip::tcp::endpoint& endpoint)
74  : _id(CppCommon::UUID::Sequential()),
75  _service(service),
76  _io_service(_service->GetAsioService()),
77  _strand(*_io_service),
78  _strand_required(_service->IsStrandRequired()),
79  _address(endpoint.address().to_string()),
80  _port(endpoint.port()),
81  _endpoint(endpoint),
82  _acceptor(*_io_service),
83  _started(false),
84  _bytes_pending(0),
85  _bytes_sent(0),
86  _bytes_received(0),
87  _option_keep_alive(false),
88  _option_no_delay(false),
89  _option_reuse_address(false),
90  _option_reuse_port(false)
91 {
92  assert((service != nullptr) && "Asio service is invalid!");
93  if (service == nullptr)
94  throw CppCommon::ArgumentException("Asio service is invalid!");
95 }
96 
98 {
99  assert(!IsStarted() && "TCP server is already started!");
100  if (IsStarted())
101  return false;
102 
103  // Post the start handler
104  auto self(this->shared_from_this());
105  auto start_handler = [this, self]()
106  {
107  if (IsStarted())
108  return;
109 
110  // Create a server acceptor
111  _acceptor = asio::ip::tcp::acceptor(*_io_service);
112  _acceptor.open(_endpoint.protocol());
113  if (option_reuse_address())
114  _acceptor.set_option(asio::ip::tcp::acceptor::reuse_address(true));
115 #if (defined(unix) || defined(__unix) || defined(__unix__) || defined(__APPLE__)) && !defined(__CYGWIN__)
116  if (option_reuse_port())
117  {
118  typedef asio::detail::socket_option::boolean<SOL_SOCKET, SO_REUSEPORT> reuse_port;
119  _acceptor.set_option(reuse_port(true));
120  }
121 #endif
122  _acceptor.bind(_endpoint);
123  _acceptor.listen();
124 
125  // Reset statistic
126  _bytes_pending = 0;
127  _bytes_sent = 0;
128  _bytes_received = 0;
129 
130  // Update the started flag
131  _started = true;
132 
133  // Call the server started handler
134  onStarted();
135 
136  // Perform the first server accept
137  Accept();
138  };
139  if (_strand_required)
140  _strand.post(start_handler);
141  else
142  _io_service->post(start_handler);
143 
144  return true;
145 }
146 
148 {
149  assert(IsStarted() && "TCP server is not started!");
150  if (!IsStarted())
151  return false;
152 
153  // Post the stop handler
154  auto self(this->shared_from_this());
155  auto stop_handler = [this, self]()
156  {
157  if (!IsStarted())
158  return;
159 
160  // Close the server acceptor
161  _acceptor.close();
162 
163  // Reset the session
164  _session->ResetServer();
165 
166  // Disconnect all sessions
167  DisconnectAll();
168 
169  // Update the started flag
170  _started = false;
171 
172  // Clear multicast buffer
173  ClearBuffers();
174 
175  // Call the server stopped handler
176  onStopped();
177  };
178  if (_strand_required)
179  _strand.post(stop_handler);
180  else
181  _io_service->post(stop_handler);
182 
183  return true;
184 }
185 
187 {
188  if (!Stop())
189  return false;
190 
191  while (IsStarted())
192  CppCommon::Thread::Yield();
193 
194  return Start();
195 }
196 
197 void TCPServer::Accept()
198 {
199  if (!IsStarted())
200  return;
201 
202  // Dispatch the accept handler
203  auto self(this->shared_from_this());
204  auto accept_handler = make_alloc_handler(_acceptor_storage, [this, self]()
205  {
206  if (!IsStarted())
207  return;
208 
209  // Create a new session to accept
210  _session = CreateSession(self);
211 
212  auto async_accept_handler = make_alloc_handler(_acceptor_storage, [this, self](std::error_code ec)
213  {
214  if (!ec)
215  {
216  RegisterSession();
217 
218  // Connect a new session
219  _session->Connect();
220  }
221  else
222  SendError(ec);
223 
224  // Perform the next server accept
225  Accept();
226  });
227  if (_strand_required)
228  _acceptor.async_accept(_session->socket(), bind_executor(_strand, async_accept_handler));
229  else
230  _acceptor.async_accept(_session->socket(), async_accept_handler);
231  });
232  if (_strand_required)
233  _strand.dispatch(accept_handler);
234  else
235  _io_service->dispatch(accept_handler);
236 }
237 
238 bool TCPServer::Multicast(const void* buffer, size_t size)
239 {
240  if (!IsStarted())
241  return false;
242 
243  if (size == 0)
244  return true;
245 
246  assert((buffer != nullptr) && "Pointer to the buffer should not be null!");
247  if (buffer == nullptr)
248  return false;
249 
250  std::shared_lock<std::shared_mutex> locker(_sessions_lock);
251 
252  // Multicast all sessions
253  for (auto& session : _sessions)
254  session.second->SendAsync(buffer, size);
255 
256  return true;
257 }
258 
260 {
261  if (!IsStarted())
262  return false;
263 
264  // Dispatch the disconnect all handler
265  auto self(this->shared_from_this());
266  auto disconnect_all_handler = [this, self]()
267  {
268  if (!IsStarted())
269  return;
270 
271  std::shared_lock<std::shared_mutex> locker(_sessions_lock);
272 
273  // Disconnect all sessions
274  for (auto& session : _sessions)
275  session.second->Disconnect();
276  };
277  if (_strand_required)
278  _strand.dispatch(disconnect_all_handler);
279  else
280  _io_service->dispatch(disconnect_all_handler);
281 
282  return true;
283 }
284 
285 std::shared_ptr<TCPSession> TCPServer::FindSession(const CppCommon::UUID& id)
286 {
287  std::shared_lock<std::shared_mutex> locker(_sessions_lock);
288 
289  // Try to find the required session
290  auto it = _sessions.find(id);
291  return (it != _sessions.end()) ? it->second : nullptr;
292 }
293 
294 void TCPServer::RegisterSession()
295 {
296  std::unique_lock<std::shared_mutex> locker(_sessions_lock);
297 
298  // Register a new session
299  _sessions.emplace(_session->id(), _session);
300 }
301 
302 void TCPServer::UnregisterSession(const CppCommon::UUID& id)
303 {
304  std::unique_lock<std::shared_mutex> locker(_sessions_lock);
305 
306  // Try to find the unregistered session
307  auto it = _sessions.find(id);
308  if (it != _sessions.end())
309  {
310  // Erase the session
311  _sessions.erase(it);
312  }
313 }
314 
315 void TCPServer::ClearBuffers()
316 {
317  // Update statistic
318  _bytes_pending = 0;
319 }
320 
321 void TCPServer::SendError(std::error_code ec)
322 {
323  // Skip Asio disconnect errors
324  if ((ec == asio::error::connection_aborted) ||
325  (ec == asio::error::connection_refused) ||
326  (ec == asio::error::connection_reset) ||
327  (ec == asio::error::eof) ||
328  (ec == asio::error::operation_aborted))
329  return;
330 
331  // Skip Winsock error 995: The I/O operation has been aborted because of either a thread exit or an application request
332  if (ec.value() == 995)
333  return;
334 
335  onError(ec.value(), ec.category().name(), ec.message());
336 }
337 
338 } // namespace Asio
339 } // namespace CppServer
virtual bool DisconnectAll()
Disconnect all connected sessions.
Definition: tcp_server.cpp:259
virtual bool Multicast(const void *buffer, size_t size)
Multicast data to all connected sessions.
Definition: tcp_server.cpp:238
virtual bool Start()
Start the server.
Definition: tcp_server.cpp:97
std::map< CppCommon::UUID, std::shared_ptr< TCPSession > > _sessions
Definition: tcp_server.h:212
std::shared_mutex _sessions_lock
Definition: tcp_server.h:211
std::shared_ptr< TCPSession > FindSession(const CppCommon::UUID &id)
Find a session with a given Id.
Definition: tcp_server.cpp:285
const std::string & address() const noexcept
Get the server address.
Definition: tcp_server.h:77
TCPServer(const std::shared_ptr< Service > &service, int port, InternetProtocol protocol=InternetProtocol::IPv4)
Initialize TCP server with a given Asio service and port number.
Definition: tcp_server.cpp:14
virtual bool Stop()
Stop the server.
Definition: tcp_server.cpp:147
bool option_reuse_port() const noexcept
Get the option: reuse port.
Definition: tcp_server.h:97
std::shared_ptr< Service > & service() noexcept
Get the Asio service.
Definition: tcp_server.h:66
virtual std::shared_ptr< TCPSession > CreateSession(const std::shared_ptr< TCPServer > &server)
Create TCP session factory method.
Definition: tcp_server.h:182
int port() const noexcept
Get the server port number.
Definition: tcp_server.h:79
virtual bool Restart()
Restart the server.
Definition: tcp_server.cpp:186
virtual void onStopped()
Handle server stopped notification.
Definition: tcp_server.h:188
virtual void onStarted()
Handle server started notification.
Definition: tcp_server.h:186
bool IsStarted() const noexcept
Is the server started?
Definition: tcp_server.h:100
virtual void onError(int error, const std::string &category, const std::string &message)
Handle error notification.
Definition: tcp_server.h:207
bool option_reuse_address() const noexcept
Get the option: reuse address.
Definition: tcp_server.h:95
AllocateHandler< THandler > make_alloc_handler(HandlerStorage &storage, THandler handler)
Helper function to wrap a handler object to add custom allocation.
Definition: memory.inl:39
InternetProtocol
Internet protocol.
Definition: asio.h:66
@ IPv4
Internet Protocol version 4.
@ IPv6
Internet Protocol version 6.
C++ Server project definitions.
Definition: asio.h:56
TCP server definition.