Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
"cSpell.words": [
"ARGN",
"clangd",
"cout",
"cppcoreguidelines",
"cpupower",
"ctest",
Expand Down
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ project(cpp_channel VERSION 1.3.0)

set(CMAKE_CXX_STANDARD 11 CACHE STRING "C++ standard")
set(CMAKE_CXX_STANDARD_REQUIRED ON)
set(CXX_EXTENSIONS OFF)
set(CMAKE_CXX_EXTENSIONS OFF)

list(APPEND CMAKE_MODULE_PATH "${PROJECT_SOURCE_DIR}/cmake")
include(warnings)
Expand Down
143 changes: 64 additions & 79 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,40 +3,44 @@
[![build](https://github.com/andreiavrammsd/cpp-channel/actions/workflows/cmake.yml/badge.svg)](https://github.com/andreiavrammsd/cpp-channel/actions) [![codecov](https://codecov.io/github/andreiavrammsd/cpp-channel/graph/badge.svg?token=CKQ0TVW62Z)](https://codecov.io/github/andreiavrammsd/cpp-channel)
[![documentation](https://github.com/andreiavrammsd/cpp-channel/actions/workflows/doc.yml/badge.svg)](https://andreiavrammsd.github.io/cpp-channel/)

### Thread-safe container for sharing data between threads (synchronized queue). Header-only. Compatible with C++11 and newer.
> Thread-safe container for sharing data between threads (synchronized queue). Header-only. Compatible with C++11 and newer.

## About

`msd::channel`

* A synchronized queue that can be easily and safely shared between multiple threads.
* Tested with GCC, Clang, and MSVC.
* Uses [std::mutex](https://en.cppreference.com/w/cpp/thread/mutex.html) for synchronization.
* Uses a customizable `storage` to store elements.

It's a class that can be constructed in several ways:

* Buffered:
* The channel accepts a specified number of elements, after which it blocks the writer threads and waits for a reader thread to read an element.
* It blocks the reader threads when channel is empty until a writer thread writes elements.
* `msd::channel<int> chan{2};`
* The channel accepts a specified number of elements, after which it blocks the writer threads and waits for a reader thread to read an element.
* It blocks the reader threads when channel is empty until a writer thread writes elements.
* `msd::channel<int> chan{2};`
* Unbuffered:
* Never blocks writes.
* It blocks the reader threads when channel is empty until a writer thread writes elements.
* `msd::channel<int> chan{};`
* Never blocks writes.
* It blocks the reader threads when channel is empty until a writer thread writes elements.
* `msd::channel<int> chan{};`
* Heap- or stack-allocated: pass a custom storage or choose a [built-in storage](https://github.com/andreiavrammsd/cpp-channel/blob/master/include/msd/storage.hpp):
* `msd::queue_storage` (default): uses [std::queue](https://en.cppreference.com/w/cpp/container/queue.html)
* `msd::vector_storage`: uses [std::vector](https://en.cppreference.com/w/cpp/container/vector.html) (if cache locality is important)
* `msd::channel<int, msd::vector_storage<int>> chan{2};`
* `msd::array_storage` (always buffered): uses [std::array](https://en.cppreference.com/w/cpp/container/array.html) (if you want stack allocation)
* `msd::channel<int, msd::array_storage<int, 10>> chan{};`
* `msd::channel<int, msd::array_storage<int, 10>> chan{10}; // does not compile because capacity is already passed as template argument`
* aka `msd::static_channel<int, 10>`
* `msd::queue_storage` (default): uses [std::queue](https://en.cppreference.com/w/cpp/container/queue.html)
* `msd::vector_storage`: uses [std::vector](https://en.cppreference.com/w/cpp/container/vector.html) (if cache locality is important)
* `msd::channel<int, msd::vector_storage<int>> chan{2};`
* `msd::array_storage` (always buffered): uses [std::array](https://en.cppreference.com/w/cpp/container/array.html) (if you want stack allocation)
* `msd::channel<int, msd::array_storage<int, 10>> chan{};`
* `msd::channel<int, msd::array_storage<int, 10>> chan{10}; // does not compile because capacity is already passed as template argument`
* aka `msd::static_channel<int, 10>`

A `storage` is:

* A class with a specific interface for storing elements.
* Must implement [FIFO](https://en.wikipedia.org/wiki/FIFO) logic.
* See [built-in storages](https://github.com/andreiavrammsd/cpp-channel/blob/master/include/msd/storage.hpp).

Exceptions:

* msd::operator<< throws `msd::closed_channel` if channel is closed.
* `msd::channel::write` returns `bool` status instead of throwing.
* Heap-allocated storages could throw.
Expand Down Expand Up @@ -78,104 +82,85 @@ VERSION=X.Y.Z \

## Usage

```c++
#include <cassert>
```cpp
// Unbuffered channel

#include <msd/channel.hpp>

int main() {
msd::channel<int> chan; // Unbuffered

// Send to channel
chan << 1 << 2;

// Read from channel
int first{};
int second{};
int main()
{
msd::channel<int> chan;

chan >> first >> second;
chan << 1 << 2; // Send

assert(first == 1);
assert(second == 2);
int first_value{};
int second_value{};
chan >> first_value >> second_value; // Receive
chan.read(first_value); // Returns channel close status (true/false), blocks thread when channel is empty
}
```

```c++
#include <cassert>

#include <msd/channel.hpp>

int main() {
msd::channel<int, msd::vector_storage<int>> chan{2}; // Buffered with vector storage
```cpp
// Buffered channel with custom storage

// Send to channel
chan << 1; // Throws if the channel is closed (after chan.close())
assert(chan.write(2)); // Returns false if the channel is closed (after chan.close())
chan << 3; // Blocks because the capacity is 2 (and no one reads from channel)
}
```

```c++
#include <msd/channel.hpp>

int main() {
msd::channel<int> chan{2}; // Buffered

int in = 1;
int out = 0;

// Send to channel
chan << in;
chan << in;
int main()
{
msd::channel<int, msd::vector_storage<int>> chan{2};

// Read from channel
chan.read(out);
chan >> out;
chan >> out; // Blocks because the channel is empty (and no one writes on it)
chan << 1; // Throws if channel is closed
chan.write(2); // Non-throwing write, returns channel close status (true/false)
chan << 3; // Blocks thread (no space, no reader)
}
```

```c++
#include <iostream>
```cpp
// Range-based iteration

#include <msd/channel.hpp>

int main() {
msd::channel<int, msd::vector_storage<int>> chan;
#include <iostream>

int in1 = 1;
int in2 = 2;
int main()
{
msd::channel<int> chan{2};

chan << in1 << in2;
chan << 1 << 2;
for (int value : chan) {
if (chan.closed()) {
// You can break before it's empty
break;
}

for (const auto out : chan) { // Blocks: waits forever for channel items
std::cout << out << '\n';
std::cout << value << '\n'; // Blocks thread until there is data to read or channel is closed and empty
}
}
```

```c++
```cpp
// Channel with statically-allocated storage (always buffered)

#include <msd/static_channel.hpp>

int main() {
msd::static_channel<int, 2> chan{}; // Always buffered
// Same as msd::channel<int, msd::array_storage<int, 2>>
#include <algorithm>

int main()
{
msd::static_channel<int, 2> src{};
msd::static_channel<int, 2> dst{};

int in = 1;
int out = 0;
src.write(1);
src.write(2);
src.close();

// Send to channel
chan.write(in);
chan.write(in);
std::copy_if(src.begin(), src.end(), msd::back_inserter(dst), [](int value) { return value % 2 == 0; });

// Read from channel
chan.read(out);
chan.read(out);
chan.read(out); // Blocks because the channel is empty (and no one writes on it)
dst.size(); // 1;
}
```

See [examples](https://github.com/andreiavrammsd/cpp-channel/tree/master/examples) and [documentation](https://andreiavrammsd.github.io/cpp-channel/).
See [examples](https://github.com/andreiavrammsd/cpp-channel/tree/master/examples) and [tests](https://github.com/andreiavrammsd/cpp-channel/tree/master/tests). Read the [documentation](https://andreiavrammsd.github.io/cpp-channel/) for full API reference.

## Known limitations

Expand Down
7 changes: 1 addition & 6 deletions examples/.clang-tidy
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,4 @@ InheritParentConfig: true

Checks: >
-cppcoreguidelines-avoid-magic-numbers,
-readability-magic-numbers,
-fuchsia-default-arguments-calls

CheckOptions:
- { key: readability-identifier-naming.ClassCase, value: CamelCase }
- { key: readability-identifier-length.MinimumVariableNameLength, value: '1' }
-readability-magic-numbers
3 changes: 3 additions & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,6 @@ run_example(example_multithreading_static_channel)

add_example(example_merge_channels merge_channels.cpp)
run_example(example_merge_channels)

add_example(example_semaphore semaphore.cpp)
run_example(example_semaphore)
24 changes: 12 additions & 12 deletions examples/move.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,17 @@

#include <iostream>

class Data final {
class data final {
int value_{};

public:
Data() = default;
explicit Data(int value) : value_{value} {}
data() = default;
explicit data(int value) : value_{value} {}

int get_value() const { return value_; }

Data(const Data& other) noexcept : value_{other.value_} { std::cout << "copy " << value_ << '\n'; }
Data& operator=(const Data& other)
data(const data& other) noexcept : value_{other.value_} { std::cout << "copy " << value_ << '\n'; }
data& operator=(const data& other)
{
if (this != &other) {
value_ = other.value_;
Expand All @@ -22,8 +22,8 @@ class Data final {
return *this;
}

Data(Data&& other) noexcept : value_{other.value_} { std::cout << "move " << value_ << '\n'; }
Data& operator=(Data&& other) noexcept
data(data&& other) noexcept : value_{other.value_} { std::cout << "move " << value_ << '\n'; }
data& operator=(data&& other) noexcept
{
if (this != &other) {
value_ = other.value_;
Expand All @@ -33,19 +33,19 @@ class Data final {
return *this;
}

~Data() = default;
~data() = default;
};

int main()
{
msd::channel<Data> chan{10};
msd::channel<data> chan{10};

auto in1 = Data{1};
auto in1 = data{1};
chan << in1;

chan << Data{2};
chan << data{2};

auto in3 = Data{3};
auto in3 = data{3};
chan << std::move(in3);

for (const auto& out : chan) {
Expand Down
57 changes: 57 additions & 0 deletions examples/semaphore.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#include <msd/channel.hpp>

#include <cassert>
#include <chrono>
#include <future>
#include <iostream>
#include <numeric>
#include <thread>
#include <vector>

class semaphore {
public:
explicit semaphore(std::size_t limit) : chan_{limit} {}

void acquire() { chan_ << empty_; }

void release() { chan_ >> empty_; }

private:
struct empty {};
[[no_unique_address]] empty empty_{};
msd::channel<empty> chan_;
};

int simulate_heavy_computation(const int value)
{
const int result = value * 2;
std::this_thread::sleep_for(std::chrono::milliseconds(500));

return result;
};

int main()
{
semaphore sem{2};

std::vector<std::future<int>> futures;

for (int i = 1; i <= 10; ++i) {
const auto worker = [&sem](const int value) {
sem.acquire();
const int result = simulate_heavy_computation(value);
sem.release();

return result;
};

futures.push_back(std::async(worker, i));
}

std::cout << "Waiting for result...\n";
const int result = std::accumulate(futures.begin(), futures.end(), 0,
[](int acc, std::future<int>& future) { return acc + future.get(); });
std::cout << "Result: " << result << "\n";

assert(result == 110);
}
Loading