CppCommon  1.0.4.1
C++ Common Library
mpsc_ring_queue.inl
Go to the documentation of this file.
1 
9 namespace CppCommon {
10 
11 template<typename T>
12 inline MPSCRingQueue<T>::MPSCRingQueue(size_t capacity, size_t concurrency) : _capacity(capacity - 1), _concurrency(concurrency), _consumer(0)
13 {
14  // Initialize producers' ring queue
15  for (size_t i = 0; i < concurrency; ++i)
16  _producers.push_back(std::make_shared<Producer>(capacity));
17 }
18 
19 template<typename T>
20 inline size_t MPSCRingQueue<T>::size() const noexcept
21 {
22  size_t size = 0;
23  for (const auto& producer : _producers)
24  size += producer->queue.size();
25  return size;
26 }
27 
28 template<typename T>
29 inline bool MPSCRingQueue<T>::Enqueue(const T& item)
30 {
31  T temp = item;
32  return Enqueue(std::forward<T>(temp));
33 }
34 
35 template<typename T>
36 inline bool MPSCRingQueue<T>::Enqueue(T&& item)
37 {
38  // Get producer index for the current thread based on RDTS value
39  size_t index = Timestamp::rdts() % _concurrency;
40 
41  // Lock the chosen producer using its spin-lock
42  Locker<SpinLock> lock(_producers[index]->lock);
43 
44  // Enqueue the item into the producer's ring queue
45  return _producers[index]->queue.Enqueue(std::forward<T>(item));
46 }
47 
48 template<typename T>
49 inline bool MPSCRingQueue<T>::Dequeue(T& item)
50 {
51  // Try to dequeue one item from the one of producer's ring queue
52  for (size_t i = 0; i < _concurrency; ++i)
53  {
54  if (_producers[_consumer++ % _concurrency]->queue.Dequeue(item))
55  return true;
56  }
57 
58  return false;
59 }
60 
61 template<typename T>
62 inline bool MPSCRingQueue<T>::Dequeue(const std::function<void(const T&)>& handler)
63 {
64  assert((handler) && "Batch handler must be valid!");
65 
66  bool result = false;
67 
68  // Consume all available items from producers' ring queues
69  for (auto& producer : _producers)
70  {
71  T item;
72  while (producer->queue.Dequeue(item))
73  {
74  handler(item);
75  result = true;
76  }
77  }
78 
79  return result;
80 }
81 
82 } // namespace CppCommon
Locker synchronization primitive.
Definition: locker.h:23
bool Dequeue(T &item)
Dequeue an item from the ring queue (single consumer threads method)
MPSCRingQueue(size_t capacity, size_t concurrency=std::thread::hardware_concurrency())
Default class constructor.
size_t size() const noexcept
Get ring queue size.
bool Enqueue(const T &item)
Enqueue an item into the ring queue (multiple producers threads method)
size_t capacity() const noexcept
Get ring queue capacity.
size_t concurrency() const noexcept
Get ring queue concurrency.
static uint64_t rdts()
Get the current value of RDTS (Read Time Stamp Counter)
Definition: timestamp.cpp:205
C++ Common project definitions.
Definition: token_bucket.h:15