CppCommon  1.0.4.1
C++ Common Library
mpmc_ring_queue.inl
Go to the documentation of this file.
1 
9 namespace CppCommon {
10 
11 template<typename T>
12 inline MPMCRingQueue<T>::MPMCRingQueue(size_t capacity) : _capacity(capacity), _mask(capacity - 1), _buffer(new Node[capacity]), _head(0), _tail(0)
13 {
14  assert((capacity > 1) && "Ring queue capacity must be greater than one!");
15  assert(((capacity & (capacity - 1)) == 0) && "Ring queue capacity must be a power of two!");
16 
17  memset(_pad0, 0, sizeof(cache_line_pad));
18  memset(_pad1, 0, sizeof(cache_line_pad));
19  memset(_pad2, 0, sizeof(cache_line_pad));
20  memset(_pad3, 0, sizeof(cache_line_pad));
21 
22  // Populate the sequence initial values
23  for (size_t i = 0; i < capacity; ++i)
24  _buffer[i].sequence.store(i, std::memory_order_relaxed);
25 }
26 
27 template<typename T>
28 inline size_t MPMCRingQueue<T>::size() const noexcept
29 {
30  const size_t head = _head.load(std::memory_order_acquire);
31  const size_t tail = _tail.load(std::memory_order_acquire);
32 
33  return head - tail;
34 }
35 
36 template<typename T>
37 inline bool MPMCRingQueue<T>::Enqueue(const T& item)
38 {
39  T temp = item;
40  return Enqueue(std::forward<T>(temp));
41 }
42 
43 template<typename T>
44 inline bool MPMCRingQueue<T>::Enqueue(T&& item)
45 {
46  size_t head_sequence = _head.load(std::memory_order_relaxed);
47 
48  for (;;)
49  {
50  Node* node = &_buffer[head_sequence & _mask];
51  size_t node_sequence = node->sequence.load(std::memory_order_acquire);
52 
53  // If node sequence and head sequence are the same then it means this slot is empty
54  int64_t diff = (int64_t)node_sequence - (int64_t)head_sequence;
55  if (diff == 0)
56  {
57  // Claim our spot by moving head. If head isn't the same
58  // as we last checked then that means someone beat us to
59  // the punch weak compare is faster, but can return spurious
60  // results which in this instance is OK, because it's in the loop
61  if (_head.compare_exchange_weak(head_sequence, head_sequence + 1, std::memory_order_relaxed))
62  {
63  // Store the item value
64  node->value = std::move(item);
65 
66  // Increment the sequence so that the tail knows it's accessible
67  node->sequence.store(head_sequence + 1, std::memory_order_release);
68  return true;
69  }
70  }
71  else if (diff < 0)
72  {
73  // If node sequence is less than head sequence then it means this slot is full
74  // and therefore buffer is full
75  return false;
76  }
77  else
78  {
79  // Under normal circumstances this branch should never be taken
80  head_sequence = _head.load(std::memory_order_relaxed);
81  }
82  }
83 
84  // Never happens...
85  return false;
86 }
87 
88 template<typename T>
89 inline bool MPMCRingQueue<T>::Dequeue(T& item)
90 {
91  size_t tail_sequence = _tail.load(std::memory_order_relaxed);
92 
93  for (;;)
94  {
95  Node* node = &_buffer[tail_sequence & _mask];
96  size_t node_sequence = node->sequence.load(std::memory_order_acquire);
97 
98  // If node sequence and head sequence are the same then it means this slot is empty
99  int64_t diff = (int64_t)node_sequence - (int64_t)(tail_sequence + 1);
100  if (diff == 0)
101  {
102  // Claim our spot by moving head. If head isn't the same
103  // as we last checked then that means someone beat us to
104  // the punch weak compare is faster, but can return spurious
105  // results which in this instance is OK, because it's in the loop
106  if (_tail.compare_exchange_weak(tail_sequence, tail_sequence + 1, std::memory_order_relaxed))
107  {
108  // Get the item value
109  item = std::move(node->value);
110 
111  // Set the sequence to what the head sequence should be next time around
112  node->sequence.store(tail_sequence + _mask + 1, std::memory_order_release);
113  return true;
114  }
115  }
116  else if (diff < 0)
117  {
118  // If seq is less than head seq then it means this slot is full and therefore the buffer is full
119  return false;
120  }
121  else
122  {
123  // Under normal circumstances this branch should never be taken
124  tail_sequence = _tail.load(std::memory_order_relaxed);
125  }
126  }
127 
128  // Never happens...
129  return false;
130 }
131 
132 } // namespace CppCommon
size_t size() const noexcept
Get ring queue size.
size_t capacity() const noexcept
Get ring queue capacity.
bool Enqueue(const T &item)
Enqueue an item into the ring queue (multiple producers threads method)
bool Dequeue(T &item)
Dequeue an item from the ring queue (multiple consumers threads method)
MPMCRingQueue(size_t capacity)
Default class constructor.
C++ Common project definitions.
Definition: token_bucket.h:15