CppLogging 1.0.5.0
C++ Logging Library
Loading...
Searching...
No Matches
async_wait_processor.cpp
Go to the documentation of this file.
1
10
11#include "errors/fatal.h"
12#include "threads/thread.h"
13
14#include <cassert>
15
16namespace CppLogging {
17
18AsyncWaitProcessor::AsyncWaitProcessor(const std::shared_ptr<Layout>& layout, bool auto_start, size_t capacity, size_t initial, const std::function<void ()>& on_thread_initialize, const std::function<void ()>& on_thread_clenup)
19 : Processor(layout),
20 _queue(capacity, initial),
21 _on_thread_initialize(on_thread_initialize),
22 _on_thread_clenup(on_thread_clenup)
23{
24 _started = false;
25
26 // Start the logging processor
27 if (auto_start)
28 Start();
29}
30
32{
33 // Stop the logging processor
34 if (IsStarted())
35 Stop();
36}
37
39{
40 bool started = IsStarted();
41
42 if (!Processor::Start())
43 return false;
44
45 if (!started)
46 {
47 // Start processing thread
48 _thread = CppCommon::Thread::Start([this]() { ProcessThread(_on_thread_initialize, _on_thread_clenup); });
49 }
50
51 return true;
52}
53
55{
56 if (IsStarted())
57 {
58 // Thread local stop operation record
59 thread_local Record stop;
60
61 // Enqueue stop operation record
62 stop.timestamp = 0;
63 EnqueueRecord(stop);
64
65 // Wait for processing thread
66 _thread.join();
67 }
68
69 return Processor::Stop();
70}
71
73{
74 // Check if the logging processor started
75 if (!IsStarted())
76 return true;
77
78 // Enqueue the given logger record
79 return EnqueueRecord(record);
80}
81
82bool AsyncWaitProcessor::EnqueueRecord(Record& record)
83{
84 // Try to enqueue the given logger record
85 return _queue.Enqueue(record);
86}
87
88void AsyncWaitProcessor::ProcessThread(const std::function<void ()>& on_thread_initialize, const std::function<void ()>& on_thread_clenup)
89{
90 // Call the thread initialize handler
91 assert((on_thread_initialize) && "Thread initialize handler must be valid!");
92 if (on_thread_initialize)
93 on_thread_initialize();
94
95 try
96 {
97 // Thread local logger records to process
98 thread_local std::vector<Record> records;
99 thread_local uint64_t previous = 0;
100
101 // Reserve initial space for logging records
102 records.reserve(_queue.capacity());
103
104 while (_started)
105 {
106 // Dequeue the next logging record
107 if (!_queue.Dequeue(records))
108 return;
109
110 // Current timestamp
111 uint64_t current = 0;
112
113 // Process all logging records
114 for (auto& record : records)
115 {
116 // Handle stop operation record
117 if (record.timestamp == 0)
118 return;
119
120 // Handle flush operation record
121 if (record.timestamp == 1)
122 {
123 // Flush the logging processor
125 continue;
126 }
127
128 // Process logging record
130
131 // Find the latest record timestamp
132 if (record.timestamp > current)
133 current = record.timestamp;
134 }
135
136 // Handle auto-flush period
137 if (CppCommon::Timespan((int64_t)(current - previous)).seconds() > 1)
138 {
139 // Flush the logging processor
141
142 // Update the previous timestamp
143 previous = current;
144 }
145 }
146 }
147 catch (const std::exception& ex)
148 {
149 fatality(ex);
150 }
151 catch (...)
152 {
153 fatality("Asynchronous wait logging processor terminated!");
154 }
155
156 // Call the thread cleanup handler
157 assert((on_thread_clenup) && "Thread cleanup handler must be valid!");
158 if (on_thread_clenup)
159 on_thread_clenup();
160}
161
163{
164 // Check if the logging processor started
165 if (!IsStarted())
166 return;
167
168 // Thread local flush operation record
169 thread_local Record flush;
170
171 // Enqueue flush operation record
172 flush.timestamp = 1;
173 EnqueueRecord(flush);
174}
175
176} // namespace CppLogging
Asynchronous wait logging processor definition.
AsyncWaitProcessor(const std::shared_ptr< Layout > &layout, bool auto_start=true, size_t capacity=8192, size_t initial=8192, const std::function< void()> &on_thread_initialize=[](){}, const std::function< void()> &on_thread_clenup=[](){})
Initialize asynchronous processor with a given layout interface.
bool ProcessRecord(Record &record) override
Process the given logging record through all child filters, layouts and appenders.
bool Start() override
Start the logging element.
void Flush() override
Flush the current logging processor.
bool Stop() override
Stop the logging element.
Logging processor interface.
Definition processor.h:31
bool IsStarted() const noexcept override
Is the logging processor started?
Definition processor.h:55
bool Stop() override
Stop the logging processor.
Definition processor.cpp:60
virtual bool ProcessRecord(Record &record)
Process the given logging record through all child filters, layouts and appenders.
bool Start() override
Start the logging processor.
Definition processor.cpp:30
std::atomic< bool > _started
Definition processor.h:98
virtual void Flush()
Flush the current logging processor.
Logging record.
Definition record.h:37
uint64_t timestamp
Timestamp of the logging record.
Definition record.h:40
C++ Logging project definitions.
Definition appender.h:15