CppLogging 1.0.5.0
C++ Logging Library
Loading...
Searching...
No Matches
async_wait_free_queue.inl
Go to the documentation of this file.
1
9#if defined(_MSC_VER)
10#pragma warning(push)
11#pragma warning(disable: 4702) // C4702: unreachable code
12#endif
13
14namespace CppLogging {
15
16template<typename T>
17inline AsyncWaitFreeQueue<T>::AsyncWaitFreeQueue(size_t capacity) : _capacity(capacity), _mask(capacity - 1), _buffer(new Node[capacity]), _head(0), _tail(0)
18{
19 assert((capacity > 1) && "Ring queue capacity must be greater than one!");
20 assert(((capacity & (capacity - 1)) == 0) && "Ring queue capacity must be a power of two!");
21
22 memset(_pad0, 0, sizeof(cache_line_pad));
23 memset(_pad1, 0, sizeof(cache_line_pad));
24 memset(_pad2, 0, sizeof(cache_line_pad));
25 memset(_pad3, 0, sizeof(cache_line_pad));
26
27 // Populate the sequence initial values
28 for (size_t i = 0; i < capacity; ++i)
29 _buffer[i].sequence.store(i, std::memory_order_relaxed);
30}
31
32template<typename T>
33inline size_t AsyncWaitFreeQueue<T>::size() const noexcept
34{
35 const size_t head = _head.load(std::memory_order_acquire);
36 const size_t tail = _tail.load(std::memory_order_acquire);
37
38 return head - tail;
39}
40
41template<typename T>
43{
44 size_t head_sequence = _head.load(std::memory_order_relaxed);
45
46 for (;;)
47 {
48 Node* node = &_buffer[head_sequence & _mask];
49 size_t node_sequence = node->sequence.load(std::memory_order_acquire);
50
51 // If node sequence and head sequence are the same then it means this slot is empty
52 int64_t diff = (int64_t)node_sequence - (int64_t)head_sequence;
53 if (diff == 0)
54 {
55 // Claim our spot by moving head. If head isn't the same
56 // as we last checked then that means someone beat us to
57 // the punch weak compare is faster, but can return spurious
58 // results which in this instance is OK, because it's in the loop
59 if (_head.compare_exchange_weak(head_sequence, head_sequence + 1, std::memory_order_relaxed))
60 {
61 // Store and swap the item value
62 swap(node->value, record);
63
64 // Increment the sequence so that the tail knows it's accessible
65 node->sequence.store(head_sequence + 1, std::memory_order_release);
66 return true;
67 }
68 }
69 else if (diff < 0)
70 {
71 // If node sequence is less than head sequence then it means this slot is full
72 // and therefore buffer is full
73 return false;
74 }
75 else
76 {
77 // Under normal circumstances this branch should never be taken
78 head_sequence = _head.load(std::memory_order_relaxed);
79 }
80 }
81
82 // Never happens...
83 return false;
84}
85
86template<typename T>
88{
89 size_t tail_sequence = _tail.load(std::memory_order_relaxed);
90
91 for (;;)
92 {
93 Node* node = &_buffer[tail_sequence & _mask];
94 size_t node_sequence = node->sequence.load(std::memory_order_acquire);
95
96 // If node sequence and head sequence are the same then it means this slot is empty
97 int64_t diff = (int64_t)node_sequence - (int64_t)(tail_sequence + 1);
98 if (diff == 0)
99 {
100 // Claim our spot by moving head. If head isn't the same
101 // as we last checked then that means someone beat us to
102 // the punch weak compare is faster, but can return spurious
103 // results which in this instance is OK, because it's in the loop
104 if (_tail.compare_exchange_weak(tail_sequence, tail_sequence + 1, std::memory_order_relaxed))
105 {
106 // Swap and get the item value
107 swap(record, node->value);
108
109 // Set the sequence to what the head sequence should be next time around
110 node->sequence.store(tail_sequence + _mask + 1, std::memory_order_release);
111 return true;
112 }
113 }
114 else if (diff < 0)
115 {
116 // If seq is less than head seq then it means this slot is full and therefore the buffer is full
117 return false;
118 }
119 else
120 {
121 // Under normal circumstances this branch should never be taken
122 tail_sequence = _tail.load(std::memory_order_relaxed);
123 }
124 }
125
126 // Never happens...
127 return false;
128}
129
130} // namespace CppLogging
131
132#if defined(_MSC_VER)
133#pragma warning(pop)
134#endif
AsyncWaitFreeQueue(size_t capacity)
Default class constructor.
size_t size() const noexcept
Get ring queue size.
bool Enqueue(Record &record)
Enqueue and swap the logging record into the ring queue (multiple producers threads method)
bool Dequeue(Record &record)
Dequeue and swap the logging record from the ring queue (multiple consumers threads method)
size_t capacity() const noexcept
Get ring queue capacity.
Logging record.
Definition record.h:37
C++ Logging project definitions.
Definition appender.h:15
void swap(Record &record1, Record &record2) noexcept
Definition record.inl:472