CppLogging  1.0.4.0
C++ Logging Library
async_wait_processor.cpp
Go to the documentation of this file.
1 
10 
11 #include "errors/fatal.h"
12 #include "threads/thread.h"
13 
14 #include <cassert>
15 
16 namespace CppLogging {
17 
18 AsyncWaitProcessor::AsyncWaitProcessor(const std::shared_ptr<Layout>& layout, bool auto_start, size_t capacity, size_t initial, const std::function<void ()>& on_thread_initialize, const std::function<void ()>& on_thread_clenup)
19  : Processor(layout),
20  _queue(capacity, initial),
21  _on_thread_initialize(on_thread_initialize),
22  _on_thread_clenup(on_thread_clenup)
23 {
24  _started = false;
25 
26  // Start the logging processor
27  if (auto_start)
28  Start();
29 }
30 
32 {
33  // Stop the logging processor
34  if (IsStarted())
35  Stop();
36 }
37 
39 {
40  bool started = IsStarted();
41 
42  if (!Processor::Start())
43  return false;
44 
45  if (!started)
46  {
47  // Start processing thread
48  _thread = CppCommon::Thread::Start([this]() { ProcessThread(_on_thread_initialize, _on_thread_clenup); });
49  }
50 
51  return true;
52 }
53 
55 {
56  if (IsStarted())
57  {
58  // Thread local stop operation record
59  thread_local Record stop;
60 
61  // Enqueue stop operation record
62  stop.timestamp = 0;
63  EnqueueRecord(stop);
64 
65  // Wait for processing thread
66  _thread.join();
67  }
68 
69  return Processor::Stop();
70 }
71 
73 {
74  // Check if the logging processor started
75  if (!IsStarted())
76  return true;
77 
78  // Enqueue the given logger record
79  return EnqueueRecord(record);
80 }
81 
82 bool AsyncWaitProcessor::EnqueueRecord(Record& record)
83 {
84  // Try to enqueue the given logger record
85  return _queue.Enqueue(record);
86 }
87 
88 void AsyncWaitProcessor::ProcessThread(const std::function<void ()>& on_thread_initialize, const std::function<void ()>& on_thread_clenup)
89 {
90  // Call the thread initialize handler
91  assert((on_thread_initialize) && "Thread initialize handler must be valid!");
92  if (on_thread_initialize)
93  on_thread_initialize();
94 
95  try
96  {
97  // Thread local logger records to process
98  thread_local std::vector<Record> records;
99  thread_local uint64_t previous = 0;
100 
101  // Reserve initial space for logging records
102  records.reserve(_queue.capacity());
103 
104  while (_started)
105  {
106  // Dequeue the next logging record
107  if (!_queue.Dequeue(records))
108  return;
109 
110  // Current timestamp
111  uint64_t current = 0;
112 
113  // Process all logging records
114  for (auto& record : records)
115  {
116  // Handle stop operation record
117  if (record.timestamp == 0)
118  return;
119 
120  // Handle flush operation record
121  if (record.timestamp == 1)
122  {
123  // Flush the logging processor
125  continue;
126  }
127 
128  // Process logging record
129  Processor::ProcessRecord(record);
130 
131  // Find the latest record timestamp
132  if (record.timestamp > current)
133  current = record.timestamp;
134  }
135 
136  // Handle auto-flush period
137  if (CppCommon::Timespan((int64_t)(current - previous)).seconds() > 1)
138  {
139  // Flush the logging processor
141 
142  // Update the previous timestamp
143  previous = current;
144  }
145  }
146  }
147  catch (const std::exception& ex)
148  {
149  fatality(ex);
150  }
151  catch (...)
152  {
153  fatality("Asynchronous wait logging processor terminated!");
154  }
155 
156  // Call the thread cleanup handler
157  assert((on_thread_clenup) && "Thread cleanup handler must be valid!");
158  if (on_thread_clenup)
159  on_thread_clenup();
160 }
161 
163 {
164  // Check if the logging processor started
165  if (!IsStarted())
166  return;
167 
168  // Thread local flush operation record
169  thread_local Record flush;
170 
171  // Enqueue flush operation record
172  flush.timestamp = 1;
173  EnqueueRecord(flush);
174 }
175 
176 } // namespace CppLogging
Asynchronous wait logging processor definition.
AsyncWaitProcessor(const std::shared_ptr< Layout > &layout, bool auto_start=true, size_t capacity=8192, size_t initial=8192, const std::function< void()> &on_thread_initialize=[](){}, const std::function< void()> &on_thread_clenup=[](){})
Initialize asynchronous processor with a given layout interface.
bool ProcessRecord(Record &record) override
Process the given logging record through all child filters, layouts and appenders.
bool Start() override
Start the logging processor.
void Flush() override
Flush the current logging processor.
bool Stop() override
Stop the logging processor.
Logging processor interface.
Definition: processor.h:31
bool IsStarted() const noexcept override
Is the logging processor started?
Definition: processor.h:55
bool Stop() override
Stop the logging processor.
Definition: processor.cpp:60
virtual bool ProcessRecord(Record &record)
Process the given logging record through all child filters, layouts and appenders.
Definition: processor.cpp:100
bool Start() override
Start the logging processor.
Definition: processor.cpp:30
std::atomic< bool > _started
Definition: processor.h:98
virtual void Flush()
Flush the current logging processor.
Definition: processor.cpp:127
Logging record.
Definition: record.h:37
uint64_t timestamp
Timestamp of the logging record.
Definition: record.h:40
C++ Logging project definitions.
Definition: appender.h:15