Skip to content

Commit 39adf38

Browse files
committed
Clickhouse - introduce SyncQueue class
1 parent 68e33b7 commit 39adf38

File tree

1 file changed

+89
-0
lines changed

1 file changed

+89
-0
lines changed
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/**
2+
* @file
3+
* @author Michal Sedlak <[email protected]>
4+
* @brief Thread-safe queue implementation
5+
* @date 2025
6+
*
7+
* Copyright(c) 2025 CESNET z.s.p.o.
8+
* SPDX-License-Identifier: BSD-3-Clause
9+
*/
10+
#pragma once
11+
12+
#include <atomic>
13+
#include <condition_variable>
14+
#include <mutex>
15+
#include <optional>
16+
#include <queue>
17+
18+
/**
19+
* @brief A thread-safe queue
20+
*/
21+
template <typename Item>
22+
class SyncQueue {
23+
public:
24+
/**
25+
* @brief Put an item into the queue
26+
*
27+
* @param item The item
28+
*/
29+
void put(Item item)
30+
{
31+
std::lock_guard<std::mutex> lock(m_mutex);
32+
m_items.push(item);
33+
m_size = m_items.size();
34+
m_avail_cv.notify_all();
35+
}
36+
37+
/**
38+
* @brief Get an item from the queue, block and wait if there aren't any
39+
*
40+
* @return The item
41+
*/
42+
Item get()
43+
{
44+
std::unique_lock<std::mutex> lock(m_mutex);
45+
while (true) {
46+
if (!m_items.empty()) {
47+
auto item = m_items.front();
48+
m_items.pop();
49+
m_size = m_items.size();
50+
return item;
51+
}
52+
m_avail_cv.wait(lock);
53+
}
54+
}
55+
56+
/**
57+
* @brief Try to get an item from the queue, return immediately if there aren't any
58+
*
59+
* @return The item or std::nullopt
60+
*/
61+
std::optional<Item> try_get()
62+
{
63+
std::unique_lock<std::mutex> lock(m_mutex);
64+
if (!m_items.empty()) {
65+
auto item = m_items.front();
66+
m_items.pop();
67+
m_size = m_items.size();
68+
return item;
69+
} else {
70+
return std::nullopt;
71+
}
72+
}
73+
74+
/**
75+
* @brief Get the current size of the queue
76+
*
77+
* @return The number of items in the queue
78+
*/
79+
std::size_t size()
80+
{
81+
return m_size;
82+
}
83+
84+
private:
85+
std::atomic_size_t m_size = 0;
86+
std::queue<Item> m_items;
87+
std::mutex m_mutex;
88+
std::condition_variable m_avail_cv;
89+
};

0 commit comments

Comments
 (0)