11#include "errors/fatal.h"
17 : _strand_required(false),
22 assert((
threads >= 0) &&
"Working threads counter must not be negative!");
27 _io_contexts.emplace_back(std::make_shared<asio::io_context>());
32 for (
int thread = 0; thread <
threads; ++thread)
34 _io_contexts.emplace_back(std::make_shared<asio::io_context>());
35 _threads.emplace_back(std::thread());
41 _io_contexts.emplace_back(std::make_shared<asio::io_context>());
42 for (
int thread = 0; thread <
threads; ++thread)
43 _threads.emplace_back(std::thread());
44 _strand = std::make_shared<asio::io_context::strand>(*_io_contexts[0]);
45 _strand_required =
true;
50 : _strand_required(strands),
55 assert((io_context !=
nullptr) &&
"Asio IO context is invalid!");
56 if (io_context ==
nullptr)
57 throw CppCommon::ArgumentException(
"Asio IO context is invalid!");
59 _io_contexts.emplace_back(io_context);
61 _strand = std::make_shared<asio::io_context::strand>(*_io_contexts[0]);
66 assert(!
IsStarted() &&
"Asio service is already started!");
74 _round_robin_index = 0;
77 auto self(this->shared_from_this());
78 auto start_handler = [
this, self]()
90 asio::post(*_strand, start_handler);
92 asio::post(*_io_contexts[0], start_handler);
95 for (
size_t thread = 0; thread < _threads.size(); ++thread)
96 _threads[thread] = CppCommon::Thread::Start([
this, self, thread]() { ServiceThread(self, _io_contexts[thread % _io_contexts.size()]); });
100 CppCommon::Thread::Yield();
107 assert(
IsStarted() &&
"Asio service is not started!");
112 auto self(this->shared_from_this());
113 auto stop_handler = [
this, self]()
119 for (
auto& io_context : _io_contexts)
128 if (_strand_required)
129 asio::post(*_strand, stop_handler);
131 asio::post(*_io_contexts[0], stop_handler);
134 for (
auto& thread : _threads)
142 CppCommon::Thread::Yield();
155 for (
size_t i = 0; i < _io_contexts.size(); ++i)
156 _io_contexts[i] = std::make_shared<asio::io_context>();
157 if (_strand_required)
158 _strand = std::make_shared<asio::io_context::strand>(*_io_contexts[0]);
160 return Start(polling);
163void Service::ServiceThread(
const std::shared_ptr<Service>& service,
const std::shared_ptr<asio::io_context>& io_context)
165 bool polling = service->IsPolling();
168 service->onThreadInitialize();
173 auto work = asio::make_work_guard(*io_context);
196 catch (
const asio::system_error& ex)
198 std::error_code ec = ex.code();
201 if (ec == asio::error::not_connected)
206 }
while (service->IsStarted());
208 catch (
const asio::system_error& ex)
210 service->SendError(ex.code());
212 catch (
const std::exception& ex)
218 fatality(
"Asio service thread terminated!");
222 service->onThreadCleanup();
224#if (OPENSSL_VERSION_NUMBER >= 0x10100000L)
226 OPENSSL_thread_stop();
230void Service::SendError(std::error_code ec)
232 onError(ec.value(), ec.category().name(), ec.message());
virtual void onStopped()
Handle service stopped notification.
bool IsPolling() const noexcept
Is the service started with polling loop mode?
virtual void onError(int error, const std::string &category, const std::string &message)
Handle error notification.
bool IsStarted() const noexcept
Is the service started?
virtual bool Stop()
Stop the service.
virtual bool Start(bool polling=false)
Start the service.
Service(int threads=1, bool pool=false)
Initialize Asio service with single or multiple working threads.
size_t threads() const noexcept
Get the number of working threads.
virtual bool Restart()
Restart the service.
virtual void onStarted()
Handle service started notification.
C++ Server project definitions.