diff --git a/codecov.yml b/codecov.yml index c2d9be0..ab23be4 100644 --- a/codecov.yml +++ b/codecov.yml @@ -2,7 +2,7 @@ coverage: status: patch: default: - target: 90% + target: 80% threshold: 0% if_ci_failed: error project: diff --git a/include/msd/blocking_iterator.hpp b/include/msd/blocking_iterator.hpp index dd66c1e..cb55195 100644 --- a/include/msd/blocking_iterator.hpp +++ b/include/msd/blocking_iterator.hpp @@ -5,7 +5,6 @@ #include #include -#include namespace msd { @@ -76,13 +75,7 @@ class blocking_iterator { * @return true if the channel is not closed or not empty (continue iterating). * @return false if the channel is closed and empty (stop iterating). */ - bool operator!=(blocking_iterator) const - { - std::unique_lock lock{chan_.mtx_}; - chan_.waitBeforeRead(lock); - - return !(chan_.closed() && chan_.empty()); - } + bool operator!=(blocking_iterator) const { return !chan_.drained(); } private: Channel& chan_; diff --git a/include/msd/channel.hpp b/include/msd/channel.hpp index 35b5e20..5f19799 100644 --- a/include/msd/channel.hpp +++ b/include/msd/channel.hpp @@ -3,7 +3,6 @@ #ifndef MSD_CHANNEL_HPP_ #define MSD_CHANNEL_HPP_ -#include #include #include #include @@ -82,17 +81,81 @@ class channel { /** * @brief Pops an element from the channel. * - * @tparam Type The type of the elements + * @tparam Type The type of the elements. */ template friend channel& operator>>(channel&, Type&); + /** + * @brief Pushes an element into the channel. + * + * @tparam Type The type of the elements. + * + * @param value The element to be pushed into the channel. + * + * @return true If an element was successfully pushed into the channel. + * @return false If the channel is closed. + */ + template + bool write(Type&& value) + { + { + std::unique_lock lock{mtx_}; + waitBeforeWrite(lock); + + if (is_closed_) { + return false; + } + + queue_.push(std::forward(value)); + ++size_; + } + + cnd_.notify_one(); + + return true; + } + + /** + * @brief Pops an element from the channel. + * + * @param out Reference to the variable where the popped element will be stored. + * + * @return true If an element was successfully read from the channel. + * @return false If the channel is closed and empty. + */ + bool read(T& out) + { + { + std::unique_lock lock{mtx_}; + waitBeforeRead(lock); + + if (is_closed_ && size_ == 0) { + return false; + } + + if (!(size_ == 0)) { + out = std::move(queue_.front()); + queue_.pop(); + --size_; + } + } + + cnd_.notify_one(); + + return true; + } + /** * @brief Returns the current size of the channel. * * @return The number of elements in the channel. */ - NODISCARD size_type constexpr size() const noexcept { return size_; } + NODISCARD size_type size() const noexcept + { + std::unique_lock lock{mtx_}; + return size_; + } /** * @brief Checks if the channel is empty. @@ -100,16 +163,20 @@ class channel { * @return true If the channel contains no elements. * @return false Otherwise. */ - NODISCARD bool constexpr empty() const noexcept { return size_ == 0; } + NODISCARD bool empty() const noexcept + { + std::unique_lock lock{mtx_}; + return size_ == 0; + } /** - * @brief Closes the channel. + * @brief Closes the channel, no longer accepting new elements. */ void close() noexcept { { std::unique_lock lock{mtx_}; - is_closed_.store(true, std::memory_order_seq_cst); + is_closed_ = true; } cnd_.notify_all(); } @@ -120,7 +187,23 @@ class channel { * @return true If no more elements can be added to the channel. * @return false Otherwise. */ - NODISCARD bool closed() const noexcept { return is_closed_.load(std::memory_order_seq_cst); } + NODISCARD bool closed() const noexcept + { + std::unique_lock lock{mtx_}; + return is_closed_; + } + + /** + * @brief Checks if the channel has been closed and is empty. + * + * @return true If nothing can be read anymore from the channel. + * @return false Otherwise. + */ + NODISCARD bool drained() noexcept + { + std::unique_lock lock{mtx_}; + return is_closed_ && size_ == 0; + } /** * @brief Returns an iterator to the beginning of the channel. @@ -146,16 +229,16 @@ class channel { virtual ~channel() = default; private: - const size_type cap_{0}; std::queue queue_; - std::atomic size_{0}; - std::mutex mtx_; + std::size_t size_{0}; + const size_type cap_{0}; + mutable std::mutex mtx_; std::condition_variable cnd_; - std::atomic is_closed_{false}; + bool is_closed_{false}; void waitBeforeRead(std::unique_lock& lock) { - cnd_.wait(lock, [this]() { return !empty() || closed(); }); + cnd_.wait(lock, [this]() { return !(size_ == 0) || is_closed_; }); }; void waitBeforeWrite(std::unique_lock& lock) @@ -171,42 +254,17 @@ class channel { template channel::type>& operator<<(channel::type>& chan, T&& value) { - { - std::unique_lock lock{chan.mtx_}; - chan.waitBeforeWrite(lock); - - if (chan.closed()) { - throw closed_channel{"cannot write on closed channel"}; - } - - chan.queue_.push(std::forward(value)); - ++chan.size_; + if (!chan.write(std::forward(value))) { + throw closed_channel{"cannot write on closed channel"}; } - chan.cnd_.notify_one(); - return chan; } template channel& operator>>(channel& chan, T& out) { - { - std::unique_lock lock{chan.mtx_}; - chan.waitBeforeRead(lock); - - if (chan.closed() && chan.empty()) { - return chan; - } - - if (!chan.empty()) { - out = std::move(chan.queue_.front()); - chan.queue_.pop(); - --chan.size_; - } - } - - chan.cnd_.notify_one(); + chan.read(out); return chan; } diff --git a/tests/channel_test.cpp b/tests/channel_test.cpp index 1ddbcfa..d6e7468 100644 --- a/tests/channel_test.cpp +++ b/tests/channel_test.cpp @@ -49,6 +49,30 @@ TEST(ChannelTest, PushAndFetch) EXPECT_EQ(4, out); } +TEST(ChannelTest, WriteAndRead) +{ + msd::channel channel; + + int in = 1; + EXPECT_TRUE(channel.write(in)); + + const int cin = 3; + EXPECT_TRUE(channel.write(cin)); + + channel.close(); + EXPECT_FALSE(channel.write(2)); + + int out = 0; + + EXPECT_TRUE(channel.read(out)); + EXPECT_EQ(1, out); + + EXPECT_TRUE(channel.read(out)); + EXPECT_EQ(3, out); + + EXPECT_FALSE(channel.read(out)); +} + TEST(ChannelTest, PushAndFetchWithBufferedChannel) { msd::channel channel{2}; @@ -167,6 +191,23 @@ TEST(ChannelTest, close) EXPECT_THROW(channel << std::move(in), msd::closed_channel); } +TEST(ChannelTest, drained) +{ + msd::channel channel; + EXPECT_FALSE(channel.drained()); + + int in = 1; + channel << in; + + channel.close(); + EXPECT_FALSE(channel.drained()); + + int out = 0; + channel >> out; + EXPECT_EQ(1, out); + EXPECT_TRUE(channel.drained()); +} + TEST(ChannelTest, Iterator) { msd::channel channel; @@ -238,3 +279,45 @@ TEST(ChannelTest, Multithreading) EXPECT_EQ(expected, sum_numbers); } + +TEST(ChannelTest, ReadWriteClose) +{ + const int numbers = 10000; + const std::int64_t expected_sum = 50005000; + constexpr std::size_t kThreadsToReadFrom = 20; + + msd::channel channel{kThreadsToReadFrom}; + std::atomic sum{0}; + std::atomic nums{0}; + + std::thread writer([&channel]() { + for (int i = 1; i <= numbers; ++i) { + channel << i; + } + channel.close(); + }); + + std::vector readers; + for (std::size_t i = 0; i < kThreadsToReadFrom; ++i) { + readers.emplace_back([&channel, &sum, &nums]() { + while (true) { + int value = 0; + + if (!channel.read(value)) { + return; + } + + sum += value; + ++nums; + } + }); + } + + writer.join(); + for (auto& reader : readers) { + reader.join(); + } + + EXPECT_EQ(sum, expected_sum); + EXPECT_EQ(nums, numbers); +}