CppServer 1.0.5.0
C++ Server Library
Loading...
Searching...
No Matches
tcp_server.cpp
Go to the documentation of this file.
1
10
11namespace CppServer {
12namespace Asio {
13
14TCPServer::TCPServer(const std::shared_ptr<Service>& service, int port, InternetProtocol protocol)
15 : _id(CppCommon::UUID::Sequential()),
16 _service(service),
17 _io_service(_service->GetAsioService()),
18 _strand(*_io_service),
19 _strand_required(_service->IsStrandRequired()),
20 _port(port),
21 _acceptor(*_io_service),
22 _started(false),
23 _bytes_pending(0),
24 _bytes_sent(0),
25 _bytes_received(0),
26 _option_keep_alive(false),
27 _option_no_delay(false),
28 _option_reuse_address(false),
29 _option_reuse_port(false)
30{
31 assert((service != nullptr) && "Asio service is invalid!");
32 if (service == nullptr)
33 throw CppCommon::ArgumentException("Asio service is invalid!");
34
35 // Prepare endpoint
36 switch (protocol)
37 {
39 _endpoint = asio::ip::tcp::endpoint(asio::ip::tcp::v4(), (unsigned short)port);
40 break;
42 _endpoint = asio::ip::tcp::endpoint(asio::ip::tcp::v6(), (unsigned short)port);
43 break;
44 }
45}
46
47TCPServer::TCPServer(const std::shared_ptr<Service>& service, const std::string& address, int port)
48 : _id(CppCommon::UUID::Sequential()),
49 _service(service),
50 _io_service(_service->GetAsioService()),
51 _strand(*_io_service),
52 _strand_required(_service->IsStrandRequired()),
53 _address(address),
54 _port(port),
55 _acceptor(*_io_service),
56 _started(false),
57 _bytes_pending(0),
58 _bytes_sent(0),
59 _bytes_received(0),
60 _option_keep_alive(false),
61 _option_no_delay(false),
62 _option_reuse_address(false),
63 _option_reuse_port(false)
64{
65 assert((service != nullptr) && "Asio service is invalid!");
66 if (service == nullptr)
67 throw CppCommon::ArgumentException("Asio service is invalid!");
68
69 // Prepare endpoint
70 _endpoint = asio::ip::tcp::endpoint(asio::ip::make_address(address), (unsigned short)port);
71}
72
73TCPServer::TCPServer(const std::shared_ptr<Service>& service, const asio::ip::tcp::endpoint& endpoint)
74 : _id(CppCommon::UUID::Sequential()),
75 _service(service),
76 _io_service(_service->GetAsioService()),
77 _strand(*_io_service),
78 _strand_required(_service->IsStrandRequired()),
79 _address(endpoint.address().to_string()),
80 _port(endpoint.port()),
81 _endpoint(endpoint),
82 _acceptor(*_io_service),
83 _started(false),
84 _bytes_pending(0),
85 _bytes_sent(0),
86 _bytes_received(0),
87 _option_keep_alive(false),
88 _option_no_delay(false),
89 _option_reuse_address(false),
90 _option_reuse_port(false)
91{
92 assert((service != nullptr) && "Asio service is invalid!");
93 if (service == nullptr)
94 throw CppCommon::ArgumentException("Asio service is invalid!");
95}
96
98{
99 assert(!IsStarted() && "TCP server is already started!");
100 if (IsStarted())
101 return false;
102
103 // Post the start handler
104 auto self(this->shared_from_this());
105 auto start_handler = [this, self]()
106 {
107 if (IsStarted())
108 return;
109
110 // Create a server acceptor
111 _acceptor = asio::ip::tcp::acceptor(*_io_service);
112 _acceptor.open(_endpoint.protocol());
114 _acceptor.set_option(asio::ip::tcp::acceptor::reuse_address(true));
115#if (defined(unix) || defined(__unix) || defined(__unix__) || defined(__APPLE__)) && !defined(__CYGWIN__)
116 if (option_reuse_port())
117 {
118 typedef asio::detail::socket_option::boolean<SOL_SOCKET, SO_REUSEPORT> reuse_port;
119 _acceptor.set_option(reuse_port(true));
120 }
121#endif
122 _acceptor.bind(_endpoint);
123 _acceptor.listen();
124
125 // Reset statistic
126 _bytes_pending = 0;
127 _bytes_sent = 0;
128 _bytes_received = 0;
129
130 // Update the started flag
131 _started = true;
132
133 // Call the server started handler
134 onStarted();
135
136 // Perform the first server accept
137 Accept();
138 };
139 if (_strand_required)
140 _strand.post(start_handler);
141 else
142 _io_service->post(start_handler);
143
144 return true;
145}
146
148{
149 assert(IsStarted() && "TCP server is not started!");
150 if (!IsStarted())
151 return false;
152
153 // Post the stop handler
154 auto self(this->shared_from_this());
155 auto stop_handler = [this, self]()
156 {
157 if (!IsStarted())
158 return;
159
160 // Close the server acceptor
161 _acceptor.close();
162
163 // Reset the session
164 _session->ResetServer();
165
166 // Disconnect all sessions
168
169 // Update the started flag
170 _started = false;
171
172 // Clear multicast buffer
173 ClearBuffers();
174
175 // Call the server stopped handler
176 onStopped();
177 };
178 if (_strand_required)
179 _strand.post(stop_handler);
180 else
181 _io_service->post(stop_handler);
182
183 return true;
184}
185
187{
188 if (!Stop())
189 return false;
190
191 while (IsStarted())
192 CppCommon::Thread::Yield();
193
194 return Start();
195}
196
197void TCPServer::Accept()
198{
199 if (!IsStarted())
200 return;
201
202 // Dispatch the accept handler
203 auto self(this->shared_from_this());
204 auto accept_handler = make_alloc_handler(_acceptor_storage, [this, self]()
205 {
206 if (!IsStarted())
207 return;
208
209 // Create a new session to accept
210 _session = CreateSession(self);
211
212 auto async_accept_handler = make_alloc_handler(_acceptor_storage, [this, self](std::error_code ec)
213 {
214 if (!ec)
215 {
216 RegisterSession();
217
218 // Connect a new session
219 _session->Connect();
220 }
221 else
222 SendError(ec);
223
224 // Perform the next server accept
225 Accept();
226 });
227 if (_strand_required)
228 _acceptor.async_accept(_session->socket(), bind_executor(_strand, async_accept_handler));
229 else
230 _acceptor.async_accept(_session->socket(), async_accept_handler);
231 });
232 if (_strand_required)
233 _strand.dispatch(accept_handler);
234 else
235 _io_service->dispatch(accept_handler);
236}
237
238bool TCPServer::Multicast(const void* buffer, size_t size)
239{
240 if (!IsStarted())
241 return false;
242
243 if (size == 0)
244 return true;
245
246 assert((buffer != nullptr) && "Pointer to the buffer should not be null!");
247 if (buffer == nullptr)
248 return false;
249
250 std::shared_lock<std::shared_mutex> locker(_sessions_lock);
251
252 // Multicast all sessions
253 for (auto& session : _sessions)
254 session.second->SendAsync(buffer, size);
255
256 return true;
257}
258
260{
261 if (!IsStarted())
262 return false;
263
264 // Dispatch the disconnect all handler
265 auto self(this->shared_from_this());
266 auto disconnect_all_handler = [this, self]()
267 {
268 if (!IsStarted())
269 return;
270
271 std::shared_lock<std::shared_mutex> locker(_sessions_lock);
272
273 // Disconnect all sessions
274 for (auto& session : _sessions)
275 session.second->Disconnect();
276 };
277 if (_strand_required)
278 _strand.dispatch(disconnect_all_handler);
279 else
280 _io_service->dispatch(disconnect_all_handler);
281
282 return true;
283}
284
285std::shared_ptr<TCPSession> TCPServer::FindSession(const CppCommon::UUID& id)
286{
287 std::shared_lock<std::shared_mutex> locker(_sessions_lock);
288
289 // Try to find the required session
290 auto it = _sessions.find(id);
291 return (it != _sessions.end()) ? it->second : nullptr;
292}
293
294void TCPServer::RegisterSession()
295{
296 std::unique_lock<std::shared_mutex> locker(_sessions_lock);
297
298 // Register a new session
299 _sessions.emplace(_session->id(), _session);
300}
301
302void TCPServer::UnregisterSession(const CppCommon::UUID& id)
303{
304 std::unique_lock<std::shared_mutex> locker(_sessions_lock);
305
306 // Try to find the unregistered session
307 auto it = _sessions.find(id);
308 if (it != _sessions.end())
309 {
310 // Erase the session
311 _sessions.erase(it);
312 }
313}
314
315void TCPServer::ClearBuffers()
316{
317 // Update statistic
318 _bytes_pending = 0;
319}
320
321void TCPServer::SendError(std::error_code ec)
322{
323 // Skip Asio disconnect errors
324 if ((ec == asio::error::connection_aborted) ||
325 (ec == asio::error::connection_refused) ||
326 (ec == asio::error::connection_reset) ||
327 (ec == asio::error::eof) ||
328 (ec == asio::error::operation_aborted))
329 return;
330
331 // Skip Winsock error 995: The I/O operation has been aborted because of either a thread exit or an application request
332 if (ec.value() == 995)
333 return;
334
335 onError(ec.value(), ec.category().name(), ec.message());
336}
337
338} // namespace Asio
339} // namespace CppServer
Asio allocate handler wrapper.
Definition memory.h:133
virtual bool DisconnectAll()
Disconnect all connected sessions.
virtual bool Multicast(const void *buffer, size_t size)
Multicast data to all connected sessions.
const std::string & address() const noexcept
Get the server address.
Definition tcp_server.h:77
virtual bool Start()
Start the server.
std::shared_ptr< Service > & service() noexcept
Get the Asio service.
Definition tcp_server.h:66
std::map< CppCommon::UUID, std::shared_ptr< TCPSession > > _sessions
Definition tcp_server.h:212
std::shared_mutex _sessions_lock
Definition tcp_server.h:211
virtual std::shared_ptr< TCPSession > CreateSession(const std::shared_ptr< TCPServer > &server)
Create TCP session factory method.
Definition tcp_server.h:182
std::shared_ptr< TCPSession > FindSession(const CppCommon::UUID &id)
Find a session with a given Id.
TCPServer(const std::shared_ptr< Service > &service, int port, InternetProtocol protocol=InternetProtocol::IPv4)
Initialize TCP server with a given Asio service and port number.
virtual bool Stop()
Stop the server.
bool option_reuse_port() const noexcept
Get the option: reuse port.
Definition tcp_server.h:97
int port() const noexcept
Get the server port number.
Definition tcp_server.h:79
virtual bool Restart()
Restart the server.
virtual void onStopped()
Handle server stopped notification.
Definition tcp_server.h:188
virtual void onStarted()
Handle server started notification.
Definition tcp_server.h:186
bool IsStarted() const noexcept
Is the server started?
Definition tcp_server.h:100
virtual void onError(int error, const std::string &category, const std::string &message)
Handle error notification.
Definition tcp_server.h:207
bool option_reuse_address() const noexcept
Get the option: reuse address.
Definition tcp_server.h:95
AllocateHandler< THandler > make_alloc_handler(HandlerStorage &storage, THandler handler)
Helper function to wrap a handler object to add custom allocation.
Definition memory.inl:39
InternetProtocol
Internet protocol.
Definition asio.h:66
@ IPv4
Internet Protocol version 4.
@ IPv6
Internet Protocol version 6.
C++ Server project definitions.
Definition asio.h:56
TCP server definition.