CppLogging 1.0.5.0
C++ Logging Library
Loading...
Searching...
No Matches
async_wait_free_processor.cpp
Go to the documentation of this file.
1
10
11#include "errors/fatal.h"
12#include "threads/thread.h"
13
14#include <cassert>
15
16namespace CppLogging {
17
18AsyncWaitFreeProcessor::AsyncWaitFreeProcessor(const std::shared_ptr<Layout>& layout, bool auto_start, size_t capacity, bool discard, const std::function<void ()>& on_thread_initialize, const std::function<void ()>& on_thread_clenup)
19 : Processor(layout),
20 _discard(discard),
21 _queue(capacity),
22 _on_thread_initialize(on_thread_initialize),
23 _on_thread_clenup(on_thread_clenup)
24{
25 _started = false;
26
27 // Start the logging processor
28 if (auto_start)
29 Start();
30}
31
33{
34 // Stop the logging processor
35 if (IsStarted())
36 Stop();
37}
38
40{
41 bool started = IsStarted();
42
43 if (!Processor::Start())
44 return false;
45
46 if (!started)
47 {
48 // Start processing thread
49 _thread = CppCommon::Thread::Start([this]() { ProcessThread(_on_thread_initialize, _on_thread_clenup); });
50 }
51
52 return true;
53}
54
56{
57 if (IsStarted())
58 {
59 // Thread local stop operation record
60 thread_local Record stop;
61
62 // Enqueue stop operation record
63 stop.timestamp = 0;
64 EnqueueRecord(false, stop);
65
66 // Wait for processing thread
67 _thread.join();
68 }
69
70 return Processor::Stop();
71}
72
74{
75 // Check if the logging processor started
76 if (!IsStarted())
77 return true;
78
79 // Enqueue the given logger record
80 return EnqueueRecord(_discard, record);
81}
82
83bool AsyncWaitFreeProcessor::EnqueueRecord(bool discard, Record& record)
84{
85 // Try to enqueue the given logger record
86 if (!_queue.Enqueue(record))
87 {
88 // If the overflow policy is discard logging record, return immediately
89 if (discard)
90 return false;
91
92 // If the overflow policy is blocking then yield if the queue is full
93 while (!_queue.Enqueue(record))
94 CppCommon::Thread::Yield();
95 }
96
97 return true;
98}
99
100void AsyncWaitFreeProcessor::ProcessThread(const std::function<void ()>& on_thread_initialize, const std::function<void ()>& on_thread_clenup)
101{
102 // Call the thread initialize handler
103 assert((on_thread_initialize) && "Thread initialize handler must be valid!");
104 if (on_thread_initialize)
105 on_thread_initialize();
106
107 try
108 {
109 // Thread local logger record to process
110 thread_local Record record;
111 thread_local uint64_t previous = CppCommon::Timestamp::utc();
112
113 while (_started)
114 {
115 // Try to dequeue the next logging record
116 bool empty = !_queue.Dequeue(record);
117
118 // Current timestamp
119 uint64_t current;
120
121 if (!empty)
122 {
123 // Handle stop operation record
124 if (record.timestamp == 0)
125 return;
126
127 // Handle flush operation record
128 if (record.timestamp == 1)
129 {
130 // Flush the logging processor
132 continue;
133 }
134
135 // Process logging record
137
138 // Update the current timestamp
139 current = record.timestamp;
140 }
141 else
142 {
143 // Update the current timestamp
144 current = CppCommon::Timestamp::utc();
145 }
146
147 // Handle auto-flush period
148 if (CppCommon::Timespan((int64_t)(current - previous)).seconds() > 1)
149 {
150 // Flush the logging processor
152
153 // Update the previous timestamp
154 previous = current;
155 }
156
157 // Sleep for a while if the queue was empty
158 if (empty)
159 CppCommon::Thread::Sleep(100);
160 }
161 }
162 catch (const std::exception& ex)
163 {
164 fatality(ex);
165 }
166 catch (...)
167 {
168 fatality("Asynchronous wait-free logging processor terminated!");
169 }
170
171 // Call the thread cleanup handler
172 assert((on_thread_clenup) && "Thread cleanup handler must be valid!");
173 if (on_thread_clenup)
174 on_thread_clenup();
175}
176
178{
179 // Check if the logging processor started
180 if (!IsStarted())
181 return;
182
183 // Thread local flush operation record
184 thread_local Record flush;
185
186 // Enqueue flush operation record
187 flush.timestamp = 1;
188 EnqueueRecord(false, flush);
189}
190
191} // namespace CppLogging
Asynchronous wait-free logging processor definition.
bool ProcessRecord(Record &record) override
Process the given logging record through all child filters, layouts and appenders.
AsyncWaitFreeProcessor(const std::shared_ptr< Layout > &layout, bool auto_start=true, size_t capacity=8192, bool discard=false, const std::function< void()> &on_thread_initialize=[](){}, const std::function< void()> &on_thread_clenup=[](){})
Initialize asynchronous processor with a given layout interface, overflow policy and buffer capacity.
void Flush() override
Flush the current logging processor.
bool Stop() override
Stop the logging element.
bool Start() override
Start the logging element.
Logging processor interface.
Definition processor.h:31
bool IsStarted() const noexcept override
Is the logging processor started?
Definition processor.h:55
bool Stop() override
Stop the logging processor.
Definition processor.cpp:60
virtual bool ProcessRecord(Record &record)
Process the given logging record through all child filters, layouts and appenders.
bool Start() override
Start the logging processor.
Definition processor.cpp:30
std::atomic< bool > _started
Definition processor.h:98
virtual void Flush()
Flush the current logging processor.
Logging record.
Definition record.h:37
uint64_t timestamp
Timestamp of the logging record.
Definition record.h:40
C++ Logging project definitions.
Definition appender.h:15