CppLogging  1.0.4.0
C++ Logging Library
async_wait_free_processor.cpp
Go to the documentation of this file.
1 
10 
11 #include "errors/fatal.h"
12 #include "threads/thread.h"
13 
14 #include <cassert>
15 
16 namespace CppLogging {
17 
18 AsyncWaitFreeProcessor::AsyncWaitFreeProcessor(const std::shared_ptr<Layout>& layout, bool auto_start, size_t capacity, bool discard, const std::function<void ()>& on_thread_initialize, const std::function<void ()>& on_thread_clenup)
19  : Processor(layout),
20  _discard(discard),
21  _queue(capacity),
22  _on_thread_initialize(on_thread_initialize),
23  _on_thread_clenup(on_thread_clenup)
24 {
25  _started = false;
26 
27  // Start the logging processor
28  if (auto_start)
29  Start();
30 }
31 
33 {
34  // Stop the logging processor
35  if (IsStarted())
36  Stop();
37 }
38 
40 {
41  bool started = IsStarted();
42 
43  if (!Processor::Start())
44  return false;
45 
46  if (!started)
47  {
48  // Start processing thread
49  _thread = CppCommon::Thread::Start([this]() { ProcessThread(_on_thread_initialize, _on_thread_clenup); });
50  }
51 
52  return true;
53 }
54 
56 {
57  if (IsStarted())
58  {
59  // Thread local stop operation record
60  thread_local Record stop;
61 
62  // Enqueue stop operation record
63  stop.timestamp = 0;
64  EnqueueRecord(false, stop);
65 
66  // Wait for processing thread
67  _thread.join();
68  }
69 
70  return Processor::Stop();
71 }
72 
74 {
75  // Check if the logging processor started
76  if (!IsStarted())
77  return true;
78 
79  // Enqueue the given logger record
80  return EnqueueRecord(_discard, record);
81 }
82 
83 bool AsyncWaitFreeProcessor::EnqueueRecord(bool discard, Record& record)
84 {
85  // Try to enqueue the given logger record
86  if (!_queue.Enqueue(record))
87  {
88  // If the overflow policy is discard logging record, return immediately
89  if (discard)
90  return false;
91 
92  // If the overflow policy is blocking then yield if the queue is full
93  while (!_queue.Enqueue(record))
94  CppCommon::Thread::Yield();
95  }
96 
97  return true;
98 }
99 
100 void AsyncWaitFreeProcessor::ProcessThread(const std::function<void ()>& on_thread_initialize, const std::function<void ()>& on_thread_clenup)
101 {
102  // Call the thread initialize handler
103  assert((on_thread_initialize) && "Thread initialize handler must be valid!");
104  if (on_thread_initialize)
105  on_thread_initialize();
106 
107  try
108  {
109  // Thread local logger record to process
110  thread_local Record record;
111  thread_local uint64_t previous = CppCommon::Timestamp::utc();
112 
113  while (_started)
114  {
115  // Try to dequeue the next logging record
116  bool empty = !_queue.Dequeue(record);
117 
118  // Current timestamp
119  uint64_t current;
120 
121  if (!empty)
122  {
123  // Handle stop operation record
124  if (record.timestamp == 0)
125  return;
126 
127  // Handle flush operation record
128  if (record.timestamp == 1)
129  {
130  // Flush the logging processor
132  continue;
133  }
134 
135  // Process logging record
136  Processor::ProcessRecord(record);
137 
138  // Update the current timestamp
139  current = record.timestamp;
140  }
141  else
142  {
143  // Update the current timestamp
144  current = CppCommon::Timestamp::utc();
145  }
146 
147  // Handle auto-flush period
148  if (CppCommon::Timespan((int64_t)(current - previous)).seconds() > 1)
149  {
150  // Flush the logging processor
152 
153  // Update the previous timestamp
154  previous = current;
155  }
156 
157  // Sleep for a while if the queue was empty
158  if (empty)
159  CppCommon::Thread::Sleep(100);
160  }
161  }
162  catch (const std::exception& ex)
163  {
164  fatality(ex);
165  }
166  catch (...)
167  {
168  fatality("Asynchronous wait-free logging processor terminated!");
169  }
170 
171  // Call the thread cleanup handler
172  assert((on_thread_clenup) && "Thread cleanup handler must be valid!");
173  if (on_thread_clenup)
174  on_thread_clenup();
175 }
176 
178 {
179  // Check if the logging processor started
180  if (!IsStarted())
181  return;
182 
183  // Thread local flush operation record
184  thread_local Record flush;
185 
186  // Enqueue flush operation record
187  flush.timestamp = 1;
188  EnqueueRecord(false, flush);
189 }
190 
191 } // namespace CppLogging
Asynchronous wait-free logging processor definition.
bool ProcessRecord(Record &record) override
Process the given logging record through all child filters, layouts and appenders.
AsyncWaitFreeProcessor(const std::shared_ptr< Layout > &layout, bool auto_start=true, size_t capacity=8192, bool discard=false, const std::function< void()> &on_thread_initialize=[](){}, const std::function< void()> &on_thread_clenup=[](){})
Initialize asynchronous processor with a given layout interface, overflow policy and buffer capacity.
void Flush() override
Flush the current logging processor.
bool Stop() override
Stop the logging processor.
bool Start() override
Start the logging processor.
Logging processor interface.
Definition: processor.h:31
bool IsStarted() const noexcept override
Is the logging processor started?
Definition: processor.h:55
bool Stop() override
Stop the logging processor.
Definition: processor.cpp:60
virtual bool ProcessRecord(Record &record)
Process the given logging record through all child filters, layouts and appenders.
Definition: processor.cpp:100
bool Start() override
Start the logging processor.
Definition: processor.cpp:30
std::atomic< bool > _started
Definition: processor.h:98
virtual void Flush()
Flush the current logging processor.
Definition: processor.cpp:127
Logging record.
Definition: record.h:37
uint64_t timestamp
Timestamp of the logging record.
Definition: record.h:40
C++ Logging project definitions.
Definition: appender.h:15