Skip to content

Commit d00a757

Browse files
Add static channel
1 parent 65a88c5 commit d00a757

File tree

6 files changed

+451
-14
lines changed

6 files changed

+451
-14
lines changed

.github/workflows/cmake.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,11 +76,11 @@ jobs:
7676

7777
- name: Build
7878
working-directory: ${{github.workspace}}/build
79-
run: cmake --build . --config ${{ matrix.config.build_type }} --target tests -j
79+
run: cmake --build . --config ${{ matrix.config.build_type }} --target channel_tests -j
8080

8181
- name: Test
8282
working-directory: ${{github.workspace}}/build
83-
run: ctest -C ${{ matrix.config.build_type }} --verbose -R channel_test --output-on-failure -j
83+
run: ctest -C ${{ matrix.config.build_type }} --verbose -L channel_tests --output-on-failure -j
8484

8585
- name: Run examples
8686
working-directory: ${{github.workspace}}/build
@@ -106,7 +106,7 @@ jobs:
106106

107107
- name: Test
108108
working-directory: ${{github.workspace}}/build
109-
run: ctest -C Debug --verbose -R channel_test -j
109+
run: ctest -C Debug --verbose -L channel_test -j
110110

111111
- name: Upload coverage reports to Codecov
112112
uses: codecov/codecov-action@v5

include/msd/blocking_iterator.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ class blocking_iterator {
6464
*/
6565
reference operator*()
6666
{
67-
chan_ >> value_;
67+
chan_.read(value_);
6868

6969
return value_;
7070
}

include/msd/channel.hpp

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -134,11 +134,9 @@ class channel {
134134
return false;
135135
}
136136

137-
if (!(size_ == 0)) {
138-
out = std::move(queue_.front());
139-
queue_.pop();
140-
--size_;
141-
}
137+
out = std::move(queue_.front());
138+
queue_.pop();
139+
--size_;
142140
}
143141

144142
cnd_.notify_one();
@@ -247,8 +245,6 @@ class channel {
247245
cnd_.wait(lock, [this]() { return size_ < cap_; });
248246
}
249247
}
250-
251-
friend class blocking_iterator<channel>;
252248
};
253249

254250
template <typename T>

include/msd/static_channel.hpp

Lines changed: 215 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,215 @@
1+
// Copyright (C) 2020-2025 Andrei Avram
2+
3+
#ifndef MSD_STATIC_CHANNEL_HPP_
4+
#define MSD_STATIC_CHANNEL_HPP_
5+
6+
#include <array>
7+
#include <condition_variable>
8+
#include <cstdlib>
9+
#include <mutex>
10+
11+
#include "blocking_iterator.hpp"
12+
#include "channel.hpp"
13+
14+
namespace msd {
15+
16+
/**
17+
* @brief Thread-safe container for sharing data between threads.
18+
*
19+
* Allocates on the stack.
20+
* Does not throw exceptions.
21+
* Implements a blocking input iterator.
22+
*
23+
* @tparam T The type of the elements.
24+
* @tparam Capacity The maximum number of elements the channel can hold before blocking.
25+
*/
26+
template <typename T, std::size_t Capacity>
27+
class static_channel {
28+
public:
29+
static_assert(Capacity > 0, "Channel capacity must be greater than zero.");
30+
31+
/**
32+
* @brief The type of elements stored in the channel.
33+
*/
34+
using value_type = T;
35+
36+
/**
37+
* @brief The iterator type used to traverse the channel.
38+
*/
39+
using iterator = blocking_iterator<static_channel<T, Capacity>>;
40+
41+
/**
42+
* @brief The type used to represent sizes and counts.
43+
*/
44+
using size_type = std::size_t;
45+
46+
/**
47+
* @brief Creates a new channel.
48+
*/
49+
constexpr static_channel() = default;
50+
51+
/**
52+
* @brief Pushes an element into the channel.
53+
*
54+
* @tparam Type The type of the elements.
55+
*
56+
* @param value The element to be pushed into the channel.
57+
*
58+
* @return true If an element was successfully pushed into the channel.
59+
* @return false If the channel is closed.
60+
*/
61+
template <typename Type>
62+
bool write(Type&& value)
63+
{
64+
{
65+
std::unique_lock<std::mutex> lock{mtx_};
66+
waitBeforeWrite(lock);
67+
68+
if (is_closed_) {
69+
return false;
70+
}
71+
72+
array_[(front_ + size_) % Capacity] = std::forward<Type>(value);
73+
++size_;
74+
}
75+
76+
cnd_.notify_one();
77+
78+
return true;
79+
}
80+
81+
/**
82+
* @brief Pops an element from the channel.
83+
*
84+
* @param out Reference to the variable where the popped element will be stored.
85+
*
86+
* @return true If an element was successfully read from the channel.
87+
* @return false If the channel is closed and empty.
88+
*/
89+
bool read(T& out)
90+
{
91+
{
92+
std::unique_lock<std::mutex> lock{mtx_};
93+
waitBeforeRead(lock);
94+
95+
if (is_closed_ && size_ == 0) {
96+
return false;
97+
}
98+
99+
out = std::move(array_[front_]);
100+
front_ = (front_ + 1) % Capacity;
101+
--size_;
102+
}
103+
104+
cnd_.notify_one();
105+
106+
return true;
107+
}
108+
109+
/**
110+
* @brief Returns the current size of the channel.
111+
*
112+
* @return The number of elements in the channel.
113+
*/
114+
NODISCARD size_type size() const noexcept
115+
{
116+
std::unique_lock<std::mutex> lock{mtx_};
117+
return size_;
118+
}
119+
120+
/**
121+
* @brief Checks if the channel is empty.
122+
*
123+
* @return true If the channel contains no elements.
124+
* @return false Otherwise.
125+
*/
126+
NODISCARD bool empty() const noexcept
127+
{
128+
std::unique_lock<std::mutex> lock{mtx_};
129+
return size_ == 0;
130+
}
131+
132+
/**
133+
* @brief Closes the channel, no longer accepting new elements.
134+
*/
135+
void close() noexcept
136+
{
137+
{
138+
std::unique_lock<std::mutex> lock{mtx_};
139+
is_closed_ = true;
140+
}
141+
cnd_.notify_all();
142+
}
143+
144+
/**
145+
* @brief Checks if the channel has been closed.
146+
*
147+
* @return true If no more elements can be added to the channel.
148+
* @return false Otherwise.
149+
*/
150+
NODISCARD bool closed() const noexcept
151+
{
152+
std::unique_lock<std::mutex> lock{mtx_};
153+
return is_closed_;
154+
}
155+
156+
/**
157+
* @brief Checks if the channel has been closed and is empty.
158+
*
159+
* @return true If nothing can be read anymore from the channel.
160+
* @return false Otherwise.
161+
*/
162+
NODISCARD bool drained() noexcept
163+
{
164+
std::unique_lock<std::mutex> lock{mtx_};
165+
return is_closed_ && size_ == 0;
166+
}
167+
168+
/**
169+
* @brief Returns an iterator to the beginning of the channel.
170+
*
171+
* @return A blocking iterator pointing to the start of the channel.
172+
*/
173+
iterator begin() noexcept { return blocking_iterator<static_channel<T, Capacity>>{*this}; }
174+
175+
/**
176+
* @brief Returns an iterator representing the end of the channel.
177+
*
178+
* @return A blocking iterator representing the end condition.
179+
*/
180+
iterator end() noexcept { return blocking_iterator<static_channel<T, Capacity>>{*this}; }
181+
182+
/**
183+
* Channel cannot be copied or moved.
184+
*/
185+
static_channel(const static_channel&) = delete;
186+
static_channel& operator=(const static_channel&) = delete;
187+
static_channel(static_channel&&) = delete;
188+
static_channel& operator=(static_channel&&) = delete;
189+
virtual ~static_channel() = default;
190+
191+
private:
192+
std::array<T, Capacity> array_{};
193+
size_type front_{0};
194+
std::size_t size_{0};
195+
const size_type cap_{Capacity};
196+
mutable std::mutex mtx_;
197+
std::condition_variable cnd_;
198+
bool is_closed_{false};
199+
200+
void waitBeforeRead(std::unique_lock<std::mutex>& lock)
201+
{
202+
cnd_.wait(lock, [this]() { return !(size_ == 0) || is_closed_; });
203+
};
204+
205+
void waitBeforeWrite(std::unique_lock<std::mutex>& lock)
206+
{
207+
if (cap_ > 0 && size_ == cap_) {
208+
cnd_.wait(lock, [this]() { return size_ < cap_; });
209+
}
210+
}
211+
};
212+
213+
} // namespace msd
214+
215+
#endif // MSD_STATIC_CHANNEL_HPP_

tests/CMakeLists.txt

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,14 +34,17 @@ function(package_add_test TESTNAME)
3434
endif ()
3535

3636
add_test(NAME ${TESTNAME} COMMAND ${TESTNAME})
37+
set_tests_properties(${TESTNAME} PROPERTIES LABELS "channel_tests")
3738

38-
add_dependencies(tests ${TESTNAME})
39+
add_dependencies(channel_tests ${TESTNAME})
3940
endfunction()
4041

41-
add_custom_target(tests)
42+
add_custom_target(channel_tests)
4243

4344
# Tests
44-
package_add_test(channel_test channel_test.cpp blocking_iterator_test.cpp)
45+
package_add_test(channel_test channel_test.cpp)
46+
package_add_test(static_channel_test static_channel_test.cpp)
47+
package_add_test(blocking_iterator_test blocking_iterator_test.cpp)
4548

4649
if (CMAKE_CXX_COMPILER_ID STREQUAL "AppleClang")
4750
# Disable warnings about C++17 extensions

0 commit comments

Comments
 (0)