Skip to content

Commit 65a88c5

Browse files
Add write, read, drained (#68)
- Add drained: returns true if the channel was closed and is empty. - Add read method: fetches a value from the channel and returns whether the channel is drained (offers better control to stop reading). - Add write method: pushes a value into the channel if not closed and returns whether the channel is closed or not. - Used mutex-based access for all information (replaced atomics): closed, empty, size.
1 parent 4f9e263 commit 65a88c5

File tree

4 files changed

+183
-49
lines changed

4 files changed

+183
-49
lines changed

codecov.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ coverage:
22
status:
33
patch:
44
default:
5-
target: 90%
5+
target: 80%
66
threshold: 0%
77
if_ci_failed: error
88
project:

include/msd/blocking_iterator.hpp

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55

66
#include <cstddef>
77
#include <iterator>
8-
#include <mutex>
98

109
namespace msd {
1110

@@ -76,13 +75,7 @@ class blocking_iterator {
7675
* @return true if the channel is not closed or not empty (continue iterating).
7776
* @return false if the channel is closed and empty (stop iterating).
7877
*/
79-
bool operator!=(blocking_iterator<Channel>) const
80-
{
81-
std::unique_lock<std::mutex> lock{chan_.mtx_};
82-
chan_.waitBeforeRead(lock);
83-
84-
return !(chan_.closed() && chan_.empty());
85-
}
78+
bool operator!=(blocking_iterator<Channel>) const { return !chan_.drained(); }
8679

8780
private:
8881
Channel& chan_;

include/msd/channel.hpp

Lines changed: 98 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
#ifndef MSD_CHANNEL_HPP_
44
#define MSD_CHANNEL_HPP_
55

6-
#include <atomic>
76
#include <condition_variable>
87
#include <cstdlib>
98
#include <mutex>
@@ -82,34 +81,102 @@ class channel {
8281
/**
8382
* @brief Pops an element from the channel.
8483
*
85-
* @tparam Type The type of the elements
84+
* @tparam Type The type of the elements.
8685
*/
8786
template <typename Type>
8887
friend channel<Type>& operator>>(channel<Type>&, Type&);
8988

89+
/**
90+
* @brief Pushes an element into the channel.
91+
*
92+
* @tparam Type The type of the elements.
93+
*
94+
* @param value The element to be pushed into the channel.
95+
*
96+
* @return true If an element was successfully pushed into the channel.
97+
* @return false If the channel is closed.
98+
*/
99+
template <typename Type>
100+
bool write(Type&& value)
101+
{
102+
{
103+
std::unique_lock<std::mutex> lock{mtx_};
104+
waitBeforeWrite(lock);
105+
106+
if (is_closed_) {
107+
return false;
108+
}
109+
110+
queue_.push(std::forward<Type>(value));
111+
++size_;
112+
}
113+
114+
cnd_.notify_one();
115+
116+
return true;
117+
}
118+
119+
/**
120+
* @brief Pops an element from the channel.
121+
*
122+
* @param out Reference to the variable where the popped element will be stored.
123+
*
124+
* @return true If an element was successfully read from the channel.
125+
* @return false If the channel is closed and empty.
126+
*/
127+
bool read(T& out)
128+
{
129+
{
130+
std::unique_lock<std::mutex> lock{mtx_};
131+
waitBeforeRead(lock);
132+
133+
if (is_closed_ && size_ == 0) {
134+
return false;
135+
}
136+
137+
if (!(size_ == 0)) {
138+
out = std::move(queue_.front());
139+
queue_.pop();
140+
--size_;
141+
}
142+
}
143+
144+
cnd_.notify_one();
145+
146+
return true;
147+
}
148+
90149
/**
91150
* @brief Returns the current size of the channel.
92151
*
93152
* @return The number of elements in the channel.
94153
*/
95-
NODISCARD size_type constexpr size() const noexcept { return size_; }
154+
NODISCARD size_type size() const noexcept
155+
{
156+
std::unique_lock<std::mutex> lock{mtx_};
157+
return size_;
158+
}
96159

97160
/**
98161
* @brief Checks if the channel is empty.
99162
*
100163
* @return true If the channel contains no elements.
101164
* @return false Otherwise.
102165
*/
103-
NODISCARD bool constexpr empty() const noexcept { return size_ == 0; }
166+
NODISCARD bool empty() const noexcept
167+
{
168+
std::unique_lock<std::mutex> lock{mtx_};
169+
return size_ == 0;
170+
}
104171

105172
/**
106-
* @brief Closes the channel.
173+
* @brief Closes the channel, no longer accepting new elements.
107174
*/
108175
void close() noexcept
109176
{
110177
{
111178
std::unique_lock<std::mutex> lock{mtx_};
112-
is_closed_.store(true, std::memory_order_seq_cst);
179+
is_closed_ = true;
113180
}
114181
cnd_.notify_all();
115182
}
@@ -120,7 +187,23 @@ class channel {
120187
* @return true If no more elements can be added to the channel.
121188
* @return false Otherwise.
122189
*/
123-
NODISCARD bool closed() const noexcept { return is_closed_.load(std::memory_order_seq_cst); }
190+
NODISCARD bool closed() const noexcept
191+
{
192+
std::unique_lock<std::mutex> lock{mtx_};
193+
return is_closed_;
194+
}
195+
196+
/**
197+
* @brief Checks if the channel has been closed and is empty.
198+
*
199+
* @return true If nothing can be read anymore from the channel.
200+
* @return false Otherwise.
201+
*/
202+
NODISCARD bool drained() noexcept
203+
{
204+
std::unique_lock<std::mutex> lock{mtx_};
205+
return is_closed_ && size_ == 0;
206+
}
124207

125208
/**
126209
* @brief Returns an iterator to the beginning of the channel.
@@ -146,16 +229,16 @@ class channel {
146229
virtual ~channel() = default;
147230

148231
private:
149-
const size_type cap_{0};
150232
std::queue<T> queue_;
151-
std::atomic<std::size_t> size_{0};
152-
std::mutex mtx_;
233+
std::size_t size_{0};
234+
const size_type cap_{0};
235+
mutable std::mutex mtx_;
153236
std::condition_variable cnd_;
154-
std::atomic<bool> is_closed_{false};
237+
bool is_closed_{false};
155238

156239
void waitBeforeRead(std::unique_lock<std::mutex>& lock)
157240
{
158-
cnd_.wait(lock, [this]() { return !empty() || closed(); });
241+
cnd_.wait(lock, [this]() { return !(size_ == 0) || is_closed_; });
159242
};
160243

161244
void waitBeforeWrite(std::unique_lock<std::mutex>& lock)
@@ -171,42 +254,17 @@ class channel {
171254
template <typename T>
172255
channel<typename std::decay<T>::type>& operator<<(channel<typename std::decay<T>::type>& chan, T&& value)
173256
{
174-
{
175-
std::unique_lock<std::mutex> lock{chan.mtx_};
176-
chan.waitBeforeWrite(lock);
177-
178-
if (chan.closed()) {
179-
throw closed_channel{"cannot write on closed channel"};
180-
}
181-
182-
chan.queue_.push(std::forward<T>(value));
183-
++chan.size_;
257+
if (!chan.write(std::forward<T>(value))) {
258+
throw closed_channel{"cannot write on closed channel"};
184259
}
185260

186-
chan.cnd_.notify_one();
187-
188261
return chan;
189262
}
190263

191264
template <typename T>
192265
channel<T>& operator>>(channel<T>& chan, T& out)
193266
{
194-
{
195-
std::unique_lock<std::mutex> lock{chan.mtx_};
196-
chan.waitBeforeRead(lock);
197-
198-
if (chan.closed() && chan.empty()) {
199-
return chan;
200-
}
201-
202-
if (!chan.empty()) {
203-
out = std::move(chan.queue_.front());
204-
chan.queue_.pop();
205-
--chan.size_;
206-
}
207-
}
208-
209-
chan.cnd_.notify_one();
267+
chan.read(out);
210268

211269
return chan;
212270
}

tests/channel_test.cpp

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,30 @@ TEST(ChannelTest, PushAndFetch)
4949
EXPECT_EQ(4, out);
5050
}
5151

52+
TEST(ChannelTest, WriteAndRead)
53+
{
54+
msd::channel<int> channel;
55+
56+
int in = 1;
57+
EXPECT_TRUE(channel.write(in));
58+
59+
const int cin = 3;
60+
EXPECT_TRUE(channel.write(cin));
61+
62+
channel.close();
63+
EXPECT_FALSE(channel.write(2));
64+
65+
int out = 0;
66+
67+
EXPECT_TRUE(channel.read(out));
68+
EXPECT_EQ(1, out);
69+
70+
EXPECT_TRUE(channel.read(out));
71+
EXPECT_EQ(3, out);
72+
73+
EXPECT_FALSE(channel.read(out));
74+
}
75+
5276
TEST(ChannelTest, PushAndFetchWithBufferedChannel)
5377
{
5478
msd::channel<int> channel{2};
@@ -167,6 +191,23 @@ TEST(ChannelTest, close)
167191
EXPECT_THROW(channel << std::move(in), msd::closed_channel);
168192
}
169193

194+
TEST(ChannelTest, drained)
195+
{
196+
msd::channel<int> channel;
197+
EXPECT_FALSE(channel.drained());
198+
199+
int in = 1;
200+
channel << in;
201+
202+
channel.close();
203+
EXPECT_FALSE(channel.drained());
204+
205+
int out = 0;
206+
channel >> out;
207+
EXPECT_EQ(1, out);
208+
EXPECT_TRUE(channel.drained());
209+
}
210+
170211
TEST(ChannelTest, Iterator)
171212
{
172213
msd::channel<int> channel;
@@ -238,3 +279,45 @@ TEST(ChannelTest, Multithreading)
238279

239280
EXPECT_EQ(expected, sum_numbers);
240281
}
282+
283+
TEST(ChannelTest, ReadWriteClose)
284+
{
285+
const int numbers = 10000;
286+
const std::int64_t expected_sum = 50005000;
287+
constexpr std::size_t kThreadsToReadFrom = 20;
288+
289+
msd::channel<int> channel{kThreadsToReadFrom};
290+
std::atomic<std::int64_t> sum{0};
291+
std::atomic<std::int64_t> nums{0};
292+
293+
std::thread writer([&channel]() {
294+
for (int i = 1; i <= numbers; ++i) {
295+
channel << i;
296+
}
297+
channel.close();
298+
});
299+
300+
std::vector<std::thread> readers;
301+
for (std::size_t i = 0; i < kThreadsToReadFrom; ++i) {
302+
readers.emplace_back([&channel, &sum, &nums]() {
303+
while (true) {
304+
int value = 0;
305+
306+
if (!channel.read(value)) {
307+
return;
308+
}
309+
310+
sum += value;
311+
++nums;
312+
}
313+
});
314+
}
315+
316+
writer.join();
317+
for (auto& reader : readers) {
318+
reader.join();
319+
}
320+
321+
EXPECT_EQ(sum, expected_sum);
322+
EXPECT_EQ(nums, numbers);
323+
}

0 commit comments

Comments
 (0)