Skip to content

Commit 63e4a35

Browse files
committed
fdsdump: add channel component
1 parent 946ea0b commit 63e4a35

File tree

1 file changed

+240
-0
lines changed

1 file changed

+240
-0
lines changed
Lines changed: 240 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,240 @@
1+
/**
2+
* @file
3+
* @author Michal Sedlak <[email protected]>
4+
* @brief Channel implementation for communication between threads
5+
*
6+
* Copyright: (C) 2024 CESNET, z.s.p.o.
7+
* SPDX-License-Identifier: BSD-3-Clause
8+
*/
9+
10+
#pragma once
11+
12+
#include <common/common.hpp>
13+
14+
#include <mutex>
15+
#include <queue>
16+
#include <condition_variable>
17+
#include <chrono>
18+
19+
namespace fdsdump {
20+
21+
template <typename T>
22+
class Channel {
23+
DISABLE_COPY_AND_MOVE(Channel)
24+
25+
public:
26+
/**
27+
* @brief An object that is thrown when the channel closes and a put/get operation is issued
28+
*/
29+
struct ChannelClosed {};
30+
31+
/**
32+
* @brief Create an instance of this channel
33+
*/
34+
Channel() = default;
35+
36+
/**
37+
* @brief Destruct the channel
38+
*/
39+
~Channel()
40+
{
41+
close();
42+
}
43+
44+
/**
45+
* @brief Put/send a value over the channel
46+
*
47+
* @param value The value
48+
*
49+
* @throw ChannelClosed if the channel is closed
50+
*/
51+
void
52+
put(const T& value)
53+
{
54+
std::unique_lock<std::mutex> lock(m_mutex);
55+
if (m_closed) {
56+
throw ChannelClosed {};
57+
}
58+
m_queue.emplace(value);
59+
m_cv.notify_one();
60+
}
61+
62+
/**
63+
* @brief Same as `Channel::put`
64+
*/
65+
void
66+
operator<<(const T& value)
67+
{
68+
put(value);
69+
}
70+
71+
/**
72+
* @brief Get/receive a value from the channel
73+
*
74+
* @param[out] value Where the value will be stored
75+
*
76+
* @throw ChannelClosed if the channel is closed
77+
*/
78+
void
79+
get(T& value)
80+
{
81+
std::unique_lock<std::mutex> lock(m_mutex);
82+
while (m_queue.empty()) {
83+
if (m_closed) {
84+
throw ChannelClosed {};
85+
}
86+
m_cv.wait(lock);
87+
}
88+
value = m_queue.front();
89+
m_queue.pop();
90+
91+
}
92+
93+
/**
94+
* @brief Same as `Channel::get(T& value)`
95+
*/
96+
void
97+
operator>>(T& value)
98+
{
99+
get(value);
100+
}
101+
102+
/**
103+
* @brief Get/receive a value from the channel
104+
*
105+
* @param[out] value Where the value will be stored (if retrieved)
106+
* @param[in] timeout Maximum time to wait for a value to be available
107+
*
108+
* @return true if we got the value, false if timeout elapsed and no value was present
109+
*
110+
* @throw ChannelClosed if the channel is closed
111+
*/
112+
bool
113+
get(T& value, std::chrono::milliseconds timeout)
114+
{
115+
std::unique_lock<std::mutex> lock(m_mutex);
116+
if (m_queue.empty()) {
117+
if (m_closed) {
118+
throw ChannelClosed {};
119+
}
120+
m_cv.wait_for(lock, timeout);
121+
if (m_closed) {
122+
throw ChannelClosed {};
123+
}
124+
if (m_queue.empty()) {
125+
return false;
126+
}
127+
}
128+
value = m_queue.front();
129+
m_queue.pop();
130+
return true;
131+
}
132+
133+
/**
134+
* @brief Get/receive a value from the channel and throw it away
135+
*
136+
* @param[in] timeout Maximum time to wait for a value to be available
137+
*
138+
* @return true if we got the value, false if timeout elapsed and no value was present
139+
*
140+
* @throw ChannelClosed if the channel is closed
141+
*/
142+
bool
143+
get(std::chrono::milliseconds timeout)
144+
{
145+
T throwaway;
146+
return get(throwaway, timeout);
147+
}
148+
149+
/**
150+
* @brief Get/receive a value from the channel without waiting
151+
*
152+
* @param[out] value Where the value will be stored (if retrieved)
153+
*
154+
* @return true if we got the value, false if no value was immediately present
155+
*
156+
* @throw ChannelClosed if the channel is closed
157+
*/
158+
bool
159+
get_nowait(T& value)
160+
{
161+
std::unique_lock<std::mutex> lock(m_mutex);
162+
if (m_queue.empty()) {
163+
if (m_closed) {
164+
throw ChannelClosed {};
165+
}
166+
return false;
167+
}
168+
value = m_queue.front();
169+
m_queue.pop();
170+
return true;
171+
}
172+
173+
/**
174+
* @brief Block till a value becomes available
175+
*
176+
* @throw ChannelClosed if the channel is closed
177+
*/
178+
void
179+
wait()
180+
{
181+
std::unique_lock<std::mutex> lock(m_mutex);
182+
while (m_queue.empty()) {
183+
if (m_closed) {
184+
throw ChannelClosed {};
185+
}
186+
m_cv.wait(lock);
187+
}
188+
}
189+
190+
/**
191+
* @brief Block till a value becomes available
192+
*
193+
* @param timeout The maximum time to wait for
194+
*
195+
* @return true if there is a value present, false if the timeout elapsed and no value is present
196+
*
197+
* @throw ChannelClosed if the channel is closed
198+
*/
199+
bool
200+
wait(std::chrono::milliseconds timeout)
201+
{
202+
std::unique_lock<std::mutex> lock(m_mutex);
203+
if (!m_queue.empty()) {
204+
return true;
205+
}
206+
if (m_closed) {
207+
throw ChannelClosed {};
208+
}
209+
m_cv.wait_for(lock, timeout);
210+
if (!m_queue.empty()) {
211+
return true;
212+
}
213+
if (m_closed) {
214+
throw ChannelClosed {};
215+
}
216+
return false;
217+
}
218+
219+
/**
220+
* @brief Close the channel interrupting all currently blocking operations
221+
* and disallowing any further operations
222+
*
223+
* After this call, all subsequent and currently blocking will throw ChannelClosed
224+
*/
225+
void
226+
close()
227+
{
228+
std::lock_guard<std::mutex> lock(m_mutex);
229+
m_closed = true;
230+
m_cv.notify_all();
231+
}
232+
233+
private:
234+
std::queue<T> m_queue;
235+
std::mutex m_mutex;
236+
std::condition_variable m_cv;
237+
bool m_closed = false;
238+
};
239+
240+
} // fdsdump

0 commit comments

Comments
 (0)