CppServer  1.0.4.0
C++ Server Library
service.cpp
Go to the documentation of this file.
1 
9 #include "server/asio/service.h"
10 
11 #include "errors/fatal.h"
12 
13 namespace CppServer {
14 namespace Asio {
15 
16 Service::Service(int threads, bool pool)
17  : _strand_required(false),
18  _polling(false),
19  _started(false),
20  _round_robin_index(0)
21 {
22  assert((threads >= 0) && "Working threads counter must not be negative!");
23 
24  if (threads == 0)
25  {
26  // Single Asio IO service without thread pool
27  _services.emplace_back(std::make_shared<asio::io_service>());
28  }
29  else if (!pool)
30  {
31  // Io-service-per-thread design
32  for (int thread = 0; thread < threads; ++thread)
33  {
34  _services.emplace_back(std::make_shared<asio::io_service>());
35  _threads.emplace_back(std::thread());
36  }
37  }
38  else
39  {
40  // Thread-pool design
41  _services.emplace_back(std::make_shared<asio::io_service>());
42  for (int thread = 0; thread < threads; ++thread)
43  _threads.emplace_back(std::thread());
44  _strand = std::make_shared<asio::io_service::strand>(*_services[0]);
45  _strand_required = true;
46  }
47 }
48 
49 Service::Service(const std::shared_ptr<asio::io_service>& service, bool strands)
50  : _strand_required(strands),
51  _polling(false),
52  _started(false),
53  _round_robin_index(0)
54 {
55  assert((service != nullptr) && "Asio IO service is invalid!");
56  if (service == nullptr)
57  throw CppCommon::ArgumentException("Asio IO service is invalid!");
58 
59  _services.emplace_back(service);
60  if (_strand_required)
61  _strand = std::make_shared<asio::io_service::strand>(*_services[0]);
62 }
63 
64 bool Service::Start(bool polling)
65 {
66  assert(!IsStarted() && "Asio service is already started!");
67  if (IsStarted())
68  return false;
69 
70  // Update polling loop mode flag
71  _polling = polling;
72 
73  // Reset round robin index
74  _round_robin_index = 0;
75 
76  // Post the started handler
77  auto self(this->shared_from_this());
78  auto start_handler = [this, self]()
79  {
80  if (IsStarted())
81  return;
82 
83  // Update the started flag
84  _started = true;
85 
86  // Call the service started handler
87  onStarted();
88  };
89  if (_strand_required)
90  _strand->post(start_handler);
91  else
92  _services[0]->post(start_handler);
93 
94  // Start service working threads
95  for (size_t thread = 0; thread < _threads.size(); ++thread)
96  _threads[thread] = CppCommon::Thread::Start([this, self, thread]() { ServiceThread(self, _services[thread % _services.size()]); });
97 
98  // Wait for service is started
99  while (!IsStarted())
100  CppCommon::Thread::Yield();
101 
102  return true;
103 }
104 
106 {
107  assert(IsStarted() && "Asio service is not started!");
108  if (!IsStarted())
109  return false;
110 
111  // Post the stop routine
112  auto self(this->shared_from_this());
113  auto stop_handler = [this, self]()
114  {
115  if (!IsStarted())
116  return;
117 
118  // Stop Asio services
119  for (auto& service : _services)
120  service->stop();
121 
122  // Update the started flag
123  _started = false;
124 
125  // Call the service stopped handler
126  onStopped();
127  };
128  if (_strand_required)
129  _strand->post(stop_handler);
130  else
131  _services[0]->post(stop_handler);
132 
133  // Wait for all service working threads
134  for (auto& thread : _threads)
135  thread.join();
136 
137  // Update polling loop mode flag
138  _polling = false;
139 
140  // Wait for service is stopped
141  while (IsStarted())
142  CppCommon::Thread::Yield();
143 
144  return true;
145 }
146 
148 {
149  bool polling = IsPolling();
150 
151  if (!Stop())
152  return false;
153 
154  // Reinitialize new Asio IO services
155  for (size_t service = 0; service < _services.size(); ++service)
156  _services[service] = std::make_shared<asio::io_service>();
157  if (_strand_required)
158  _strand = std::make_shared<asio::io_service::strand>(*_services[0]);
159 
160  return Start(polling);
161 }
162 
163 void Service::ServiceThread(const std::shared_ptr<Service>& service, const std::shared_ptr<asio::io_service>& io_service)
164 {
165  bool polling = service->IsPolling();
166 
167  // Call the initialize thread handler
168  service->onThreadInitialize();
169 
170  try
171  {
172  // Attach the current working thread to the Asio service
173  asio::io_service::work work(*io_service);
174 
175  // Service loop...
176  do
177  {
178  // ...with handling some specific Asio errors
179  try
180  {
181  if (polling)
182  {
183  // Poll all pending handlers
184  io_service->poll();
185 
186  // Call the idle handler
187  service->onIdle();
188  }
189  else
190  {
191  // Run all pending handlers
192  io_service->run();
193  break;
194  }
195  }
196  catch (const asio::system_error& ex)
197  {
198  std::error_code ec = ex.code();
199 
200  // Skip Asio disconnect errors
201  if (ec == asio::error::not_connected)
202  continue;
203 
204  throw;
205  }
206  } while (service->IsStarted());
207  }
208  catch (const asio::system_error& ex)
209  {
210  service->SendError(ex.code());
211  }
212  catch (const std::exception& ex)
213  {
214  fatality(ex);
215  }
216  catch (...)
217  {
218  fatality("Asio service thread terminated!");
219  }
220 
221  // Call the cleanup thread handler
222  service->onThreadCleanup();
223 
224 #if (OPENSSL_VERSION_NUMBER >= 0x10100000L)
225  // Delete OpenSSL thread state
226  OPENSSL_thread_stop();
227 #endif
228 }
229 
230 void Service::SendError(std::error_code ec)
231 {
232  onError(ec.value(), ec.category().name(), ec.message());
233 }
234 
235 } // namespace Asio
236 } // namespace CppServer
virtual void onStopped()
Handle service stopped notification.
Definition: service.h:149
bool IsPolling() const noexcept
Is the service started with polling loop mode?
Definition: service.h:81
virtual void onError(int error, const std::string &category, const std::string &message)
Handle error notification.
Definition: service.h:160
bool IsStarted() const noexcept
Is the service started?
Definition: service.h:83
virtual bool Stop()
Stop the service.
Definition: service.cpp:105
virtual bool Start(bool polling=false)
Start the service.
Definition: service.cpp:64
Service(int threads=1, bool pool=false)
Initialize Asio service with single or multiple working threads.
Definition: service.cpp:16
size_t threads() const noexcept
Get the number of working threads.
Definition: service.h:76
virtual bool Restart()
Restart the service.
Definition: service.cpp:147
virtual void onStarted()
Handle service started notification.
Definition: service.h:147
C++ Server project definitions.
Definition: asio.h:56
Asio service definition.