11 #include "errors/fatal.h"
17 : _strand_required(false),
22 assert((
threads >= 0) &&
"Working threads counter must not be negative!");
27 _services.emplace_back(std::make_shared<asio::io_service>());
32 for (
int thread = 0; thread <
threads; ++thread)
34 _services.emplace_back(std::make_shared<asio::io_service>());
35 _threads.emplace_back(std::thread());
41 _services.emplace_back(std::make_shared<asio::io_service>());
42 for (
int thread = 0; thread <
threads; ++thread)
43 _threads.emplace_back(std::thread());
44 _strand = std::make_shared<asio::io_service::strand>(*_services[0]);
45 _strand_required =
true;
50 : _strand_required(strands),
55 assert((service !=
nullptr) &&
"Asio IO service is invalid!");
56 if (service ==
nullptr)
57 throw CppCommon::ArgumentException(
"Asio IO service is invalid!");
59 _services.emplace_back(service);
61 _strand = std::make_shared<asio::io_service::strand>(*_services[0]);
66 assert(!
IsStarted() &&
"Asio service is already started!");
74 _round_robin_index = 0;
77 auto self(this->shared_from_this());
78 auto start_handler = [
this,
self]()
90 _strand->post(start_handler);
92 _services[0]->post(start_handler);
95 for (
size_t thread = 0; thread < _threads.size(); ++thread)
96 _threads[thread] = CppCommon::Thread::Start([
this,
self, thread]() { ServiceThread(
self, _services[thread % _services.size()]); });
100 CppCommon::Thread::Yield();
107 assert(
IsStarted() &&
"Asio service is not started!");
112 auto self(this->shared_from_this());
113 auto stop_handler = [
this,
self]()
119 for (
auto& service : _services)
128 if (_strand_required)
129 _strand->post(stop_handler);
131 _services[0]->post(stop_handler);
134 for (
auto& thread : _threads)
142 CppCommon::Thread::Yield();
155 for (
size_t service = 0; service < _services.size(); ++service)
156 _services[service] = std::make_shared<asio::io_service>();
157 if (_strand_required)
158 _strand = std::make_shared<asio::io_service::strand>(*_services[0]);
160 return Start(polling);
163 void Service::ServiceThread(
const std::shared_ptr<Service>& service,
const std::shared_ptr<asio::io_service>& io_service)
165 bool polling = service->IsPolling();
168 service->onThreadInitialize();
173 asio::io_service::work work(*io_service);
196 catch (
const asio::system_error& ex)
198 std::error_code ec = ex.code();
201 if (ec == asio::error::not_connected)
206 }
while (service->IsStarted());
208 catch (
const asio::system_error& ex)
210 service->SendError(ex.code());
212 catch (
const std::exception& ex)
218 fatality(
"Asio service thread terminated!");
222 service->onThreadCleanup();
224 #if (OPENSSL_VERSION_NUMBER >= 0x10100000L)
226 OPENSSL_thread_stop();
230 void Service::SendError(std::error_code ec)
232 onError(ec.value(), ec.category().name(), ec.message());
virtual void onStopped()
Handle service stopped notification.
bool IsPolling() const noexcept
Is the service started with polling loop mode?
virtual void onError(int error, const std::string &category, const std::string &message)
Handle error notification.
bool IsStarted() const noexcept
Is the service started?
virtual bool Stop()
Stop the service.
virtual bool Start(bool polling=false)
Start the service.
Service(int threads=1, bool pool=false)
Initialize Asio service with single or multiple working threads.
size_t threads() const noexcept
Get the number of working threads.
virtual bool Restart()
Restart the service.
virtual void onStarted()
Handle service started notification.
C++ Server project definitions.