11#include "errors/fatal.h"
17 : _strand_required(
false),
22 assert((
threads >= 0) &&
"Working threads counter must not be negative!");
27 _services.emplace_back(std::make_shared<asio::io_service>());
34 _services.emplace_back(std::make_shared<asio::io_service>());
35 _threads.emplace_back(std::thread());
41 _services.emplace_back(std::make_shared<asio::io_service>());
43 _threads.emplace_back(std::thread());
44 _strand = std::make_shared<asio::io_service::strand>(*_services[0]);
45 _strand_required =
true;
55 assert((service !=
nullptr) &&
"Asio IO service is invalid!");
56 if (service ==
nullptr)
57 throw CppCommon::ArgumentException(
"Asio IO service is invalid!");
59 _services.emplace_back(service);
61 _strand = std::make_shared<asio::io_service::strand>(*_services[0]);
74 _round_robin_index = 0;
96 _threads[
thread] = CppCommon::Thread::Start([
this,
self,
thread]() { ServiceThread(
self, _services[
thread % _services.size()]); });
100 CppCommon::Thread::Yield();
119 for (
auto& service : _services)
128 if (_strand_required)
134 for (
auto&
thread : _threads)
142 CppCommon::Thread::Yield();
155 for (
size_t service = 0; service < _services.size(); ++service)
156 _services[service] = std::make_shared<asio::io_service>();
157 if (_strand_required)
158 _strand = std::make_shared<asio::io_service::strand>(*_services[0]);
163void Service::ServiceThread(
const std::shared_ptr<Service>& service,
const std::shared_ptr<asio::io_service>& io_service)
165 bool polling = service->IsPolling();
168 service->onThreadInitialize();
173 asio::io_service::work
work(*io_service);
196 catch (
const asio::system_error& ex)
198 std::error_code ec = ex.code();
201 if (ec == asio::error::not_connected)
206 }
while (service->IsStarted());
208 catch (
const asio::system_error& ex)
210 service->SendError(ex.code());
212 catch (
const std::exception& ex)
218 fatality(
"Asio service thread terminated!");
222 service->onThreadCleanup();
224#if (OPENSSL_VERSION_NUMBER >= 0x10100000L)
226 OPENSSL_thread_stop();
230void Service::SendError(std::error_code ec)
232 onError(ec.value(), ec.category().name(), ec.message());
Asio allocate handler wrapper.
virtual void onStopped()
Handle service stopped notification.
bool IsPolling() const noexcept
Is the service started with polling loop mode?
virtual void onError(int error, const std::string &category, const std::string &message)
Handle error notification.
bool IsStarted() const noexcept
Is the service started?
virtual bool Stop()
Stop the service.
virtual bool Start(bool polling=false)
Start the service.
Service(int threads=1, bool pool=false)
Initialize Asio service with single or multiple working threads.
size_t threads() const noexcept
Get the number of working threads.
virtual bool Restart()
Restart the service.
virtual void onStarted()
Handle service started notification.
C++ Server project definitions.