Skip to content

Commit 350b226

Browse files
committed
Clickhouse - introduce SyncQueue class
1 parent 82956c6 commit 350b226

File tree

1 file changed

+70
-0
lines changed

1 file changed

+70
-0
lines changed
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/**
2+
* @file
3+
* @author Michal Sedlak <[email protected]>
4+
* @brief SyncQueue class implementation
5+
* @date 2024
6+
*
7+
* Copyright(c) 2024 CESNET z.s.p.o.
8+
* SPDX-License-Identifier: BSD-3-Clause
9+
*/
10+
#pragma once
11+
12+
#include <atomic>
13+
#include <condition_variable>
14+
#include <mutex>
15+
#include <queue>
16+
17+
/**
18+
* @brief A thread-safe queue
19+
*/
20+
template <typename Item>
21+
class SyncQueue {
22+
public:
23+
/**
24+
* @brief Put an item into the queue
25+
*
26+
* @param item The item
27+
*/
28+
void put(Item item)
29+
{
30+
std::lock_guard<std::mutex> lock(m_mutex);
31+
m_items.push(item);
32+
m_size = m_items.size();
33+
m_avail_cv.notify_all();
34+
}
35+
36+
/**
37+
* @brief Get an item from the queue, block and wait if there aren't any
38+
*
39+
* @return The item
40+
*/
41+
Item get()
42+
{
43+
std::unique_lock<std::mutex> lock(m_mutex);
44+
while (true) {
45+
if (!m_items.empty()) {
46+
auto item = m_items.front();
47+
m_items.pop();
48+
m_size = m_items.size();
49+
return item;
50+
}
51+
m_avail_cv.wait(lock);
52+
}
53+
}
54+
55+
/**
56+
* @brief Get the current size of the queue
57+
*
58+
* @return The number of items in the queue
59+
*/
60+
std::size_t size()
61+
{
62+
return m_size;
63+
}
64+
65+
private:
66+
std::atomic_size_t m_size = 0;
67+
std::queue<Item> m_items;
68+
std::mutex m_mutex;
69+
std::condition_variable m_avail_cv;
70+
};

0 commit comments

Comments
 (0)