CppCommon  1.0.4.1
C++ Common Library
mpsc_ring_buffer.inl
Go to the documentation of this file.
1 
9 namespace CppCommon {
10 
11 inline MPSCRingBuffer::MPSCRingBuffer(size_t capacity, size_t concurrency) : _capacity(capacity - 1), _concurrency(concurrency), _consumer(0)
12 {
13  // Initialize producers' ring buffer
14  for (size_t i = 0; i < concurrency; ++i)
15  _producers.push_back(std::make_shared<Producer>(capacity));
16 }
17 
18 inline size_t MPSCRingBuffer::size() const noexcept
19 {
20  size_t size = 0;
21  for (const auto& producer : _producers)
22  size += producer->buffer.size();
23  return size;
24 }
25 
26 inline bool MPSCRingBuffer::Enqueue(const void* data, size_t size)
27 {
28  // Get producer index for the current thread based on RDTS value
29  size_t index = Timestamp::rdts() % _concurrency;
30 
31  // Lock the chosen producer using its spin-lock
32  Locker<SpinLock> lock(_producers[index]->lock);
33 
34  // Enqueue the item into the producer's ring buffer
35  return _producers[index]->buffer.Enqueue(data, size);
36 }
37 
38 inline bool MPSCRingBuffer::Dequeue(void* data, size_t& size)
39 {
40  // Try to dequeue one item from the one of producer's ring buffers
41  for (size_t i = 0; i < _concurrency; ++i)
42  {
43  size_t temp = size;
44  if (_producers[_consumer++ % _concurrency]->buffer.Dequeue(data, temp))
45  {
46  size = temp;
47  return true;
48  }
49  }
50 
51  size = 0;
52  return false;
53 }
54 
55 } // namespace CppCommon
Locker synchronization primitive.
Definition: locker.h:23
size_t capacity() const noexcept
Get ring buffer capacity.
MPSCRingBuffer(size_t capacity, size_t concurrency=std::thread::hardware_concurrency())
Default class constructor.
size_t size() const noexcept
Get ring buffer size.
bool Enqueue(const void *data, size_t size)
Enqueue a data into the ring buffer (single producer thread method)
size_t concurrency() const noexcept
Get ring buffer concurrency.
bool Dequeue(void *data, size_t &size)
Dequeue a data from the ring buffer (single consumer thread method)
static uint64_t rdts()
Get the current value of RDTS (Read Time Stamp Counter)
Definition: timestamp.cpp:205
C++ Common project definitions.
Definition: token_bucket.h:15