CppCommon 1.0.5.0
C++ Common Library
Loading...
Searching...
No Matches
mpsc_ring_queue.inl
Go to the documentation of this file.
1
9namespace CppCommon {
10
11template<typename T>
12inline MPSCRingQueue<T>::MPSCRingQueue(size_t capacity, size_t concurrency) : _capacity(capacity - 1), _concurrency(concurrency), _consumer(0)
13{
14 // Initialize producers' ring queue
15 for (size_t i = 0; i < concurrency; ++i)
16 _producers.push_back(std::make_shared<Producer>(capacity));
17}
18
19template<typename T>
20inline size_t MPSCRingQueue<T>::size() const noexcept
21{
22 size_t size = 0;
23 for (const auto& producer : _producers)
24 size += producer->queue.size();
25 return size;
26}
27
28template<typename T>
29inline bool MPSCRingQueue<T>::Enqueue(const T& item)
30{
31 T temp = item;
32 return Enqueue(std::forward<T>(temp));
33}
34
35template<typename T>
36inline bool MPSCRingQueue<T>::Enqueue(T&& item)
37{
38 // Get producer index for the current thread based on RDTS value
39 size_t index = Timestamp::rdts() % _concurrency;
40
41 // Lock the chosen producer using its spin-lock
42 Locker<SpinLock> lock(_producers[index]->lock);
43
44 // Enqueue the item into the producer's ring queue
45 return _producers[index]->queue.Enqueue(std::forward<T>(item));
46}
47
48template<typename T>
49inline bool MPSCRingQueue<T>::Dequeue(T& item)
50{
51 // Try to dequeue one item from the one of producer's ring queue
52 for (size_t i = 0; i < _concurrency; ++i)
53 {
54 if (_producers[_consumer++ % _concurrency]->queue.Dequeue(item))
55 return true;
56 }
57
58 return false;
59}
60
61template<typename T>
62inline bool MPSCRingQueue<T>::Dequeue(const std::function<void(const T&)>& handler)
63{
64 assert((handler) && "Batch handler must be valid!");
65
66 bool result = false;
67
68 // Consume all available items from producers' ring queues
69 for (auto& producer : _producers)
70 {
71 T item;
72 while (producer->queue.Dequeue(item))
73 {
74 handler(item);
75 result = true;
76 }
77 }
78
79 return result;
80}
81
82} // namespace CppCommon
Locker synchronization primitive.
Definition locker.h:23
bool Dequeue(T &item)
Dequeue an item from the ring queue (single consumer threads method)
MPSCRingQueue(size_t capacity, size_t concurrency=std::thread::hardware_concurrency())
Default class constructor.
size_t size() const noexcept
Get ring queue size.
bool Enqueue(const T &item)
Enqueue an item into the ring queue (multiple producers threads method)
size_t capacity() const noexcept
Get ring queue capacity.
size_t concurrency() const noexcept
Get ring queue concurrency.
static uint64_t rdts()
Get the current value of RDTS (Read Time Stamp Counter)
C++ Common project definitions.