CppCommon 1.0.5.0
C++ Common Library
Loading...
Searching...
No Matches
mpmc_ring_queue.inl
Go to the documentation of this file.
1
9namespace CppCommon {
10
11template<typename T>
12inline MPMCRingQueue<T>::MPMCRingQueue(size_t capacity) : _capacity(capacity), _mask(capacity - 1), _buffer(new Node[capacity]), _head(0), _tail(0)
13{
14 assert((capacity > 1) && "Ring queue capacity must be greater than one!");
15 assert(((capacity & (capacity - 1)) == 0) && "Ring queue capacity must be a power of two!");
16
17 memset(_pad0, 0, sizeof(cache_line_pad));
18 memset(_pad1, 0, sizeof(cache_line_pad));
19 memset(_pad2, 0, sizeof(cache_line_pad));
20 memset(_pad3, 0, sizeof(cache_line_pad));
21
22 // Populate the sequence initial values
23 for (size_t i = 0; i < capacity; ++i)
24 _buffer[i].sequence.store(i, std::memory_order_relaxed);
25}
26
27template<typename T>
28inline size_t MPMCRingQueue<T>::size() const noexcept
29{
30 const size_t head = _head.load(std::memory_order_acquire);
31 const size_t tail = _tail.load(std::memory_order_acquire);
32
33 return head - tail;
34}
35
36template<typename T>
37inline bool MPMCRingQueue<T>::Enqueue(const T& item)
38{
39 T temp = item;
40 return Enqueue(std::forward<T>(temp));
41}
42
43template<typename T>
44inline bool MPMCRingQueue<T>::Enqueue(T&& item)
45{
46 size_t head_sequence = _head.load(std::memory_order_relaxed);
47
48 for (;;)
49 {
50 Node* node = &_buffer[head_sequence & _mask];
51 size_t node_sequence = node->sequence.load(std::memory_order_acquire);
52
53 // If node sequence and head sequence are the same then it means this slot is empty
54 int64_t diff = (int64_t)node_sequence - (int64_t)head_sequence;
55 if (diff == 0)
56 {
57 // Claim our spot by moving head. If head isn't the same
58 // as we last checked then that means someone beat us to
59 // the punch weak compare is faster, but can return spurious
60 // results which in this instance is OK, because it's in the loop
61 if (_head.compare_exchange_weak(head_sequence, head_sequence + 1, std::memory_order_relaxed))
62 {
63 // Store the item value
64 node->value = std::move(item);
65
66 // Increment the sequence so that the tail knows it's accessible
67 node->sequence.store(head_sequence + 1, std::memory_order_release);
68 return true;
69 }
70 }
71 else if (diff < 0)
72 {
73 // If node sequence is less than head sequence then it means this slot is full
74 // and therefore buffer is full
75 return false;
76 }
77 else
78 {
79 // Under normal circumstances this branch should never be taken
80 head_sequence = _head.load(std::memory_order_relaxed);
81 }
82 }
83
84 // Never happens...
85 return false;
86}
87
88template<typename T>
89inline bool MPMCRingQueue<T>::Dequeue(T& item)
90{
91 size_t tail_sequence = _tail.load(std::memory_order_relaxed);
92
93 for (;;)
94 {
95 Node* node = &_buffer[tail_sequence & _mask];
96 size_t node_sequence = node->sequence.load(std::memory_order_acquire);
97
98 // If node sequence and head sequence are the same then it means this slot is empty
99 int64_t diff = (int64_t)node_sequence - (int64_t)(tail_sequence + 1);
100 if (diff == 0)
101 {
102 // Claim our spot by moving head. If head isn't the same
103 // as we last checked then that means someone beat us to
104 // the punch weak compare is faster, but can return spurious
105 // results which in this instance is OK, because it's in the loop
106 if (_tail.compare_exchange_weak(tail_sequence, tail_sequence + 1, std::memory_order_relaxed))
107 {
108 // Get the item value
109 item = std::move(node->value);
110
111 // Set the sequence to what the head sequence should be next time around
112 node->sequence.store(tail_sequence + _mask + 1, std::memory_order_release);
113 return true;
114 }
115 }
116 else if (diff < 0)
117 {
118 // If seq is less than head seq then it means this slot is full and therefore the buffer is full
119 return false;
120 }
121 else
122 {
123 // Under normal circumstances this branch should never be taken
124 tail_sequence = _tail.load(std::memory_order_relaxed);
125 }
126 }
127
128 // Never happens...
129 return false;
130}
131
132} // namespace CppCommon
size_t size() const noexcept
Get ring queue size.
size_t capacity() const noexcept
Get ring queue capacity.
bool Enqueue(const T &item)
Enqueue an item into the ring queue (multiple producers threads method)
bool Dequeue(T &item)
Dequeue an item from the ring queue (multiple consumers threads method)
MPMCRingQueue(size_t capacity)
Default class constructor.
C++ Common project definitions.