Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/cmake.yml
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ jobs:

- name: Run examples
working-directory: ${{github.workspace}}/build
run: cmake --build . --config ${{ matrix.config.build_type }} --target examples -j
run: cmake --build . --config ${{ matrix.config.build_type }} --target run_examples -j

coverage:
name: Coverage
Expand Down
1 change: 1 addition & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
"cSpell.words": [
"ARGN",
"clangd",
"cout",
"cppcoreguidelines",
"cpupower",
"ctest",
Expand Down
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ project(cpp_channel VERSION 1.3.0)

set(CMAKE_CXX_STANDARD 11 CACHE STRING "C++ standard")
set(CMAKE_CXX_STANDARD_REQUIRED ON)
set(CXX_EXTENSIONS OFF)
set(CMAKE_CXX_EXTENSIONS OFF)

list(APPEND CMAKE_MODULE_PATH "${PROJECT_SOURCE_DIR}/cmake")
include(warnings)
Expand Down
143 changes: 64 additions & 79 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,40 +3,44 @@
[![build](https://github.com/andreiavrammsd/cpp-channel/actions/workflows/cmake.yml/badge.svg)](https://github.com/andreiavrammsd/cpp-channel/actions) [![codecov](https://codecov.io/github/andreiavrammsd/cpp-channel/graph/badge.svg?token=CKQ0TVW62Z)](https://codecov.io/github/andreiavrammsd/cpp-channel)
[![documentation](https://github.com/andreiavrammsd/cpp-channel/actions/workflows/doc.yml/badge.svg)](https://andreiavrammsd.github.io/cpp-channel/)

### Thread-safe container for sharing data between threads (synchronized queue). Header-only. Compatible with C++11 and newer.
> Thread-safe container for sharing data between threads (synchronized queue). Header-only. Compatible with C++11 and newer.

## About

`msd::channel`

* A synchronized queue that can be easily and safely shared between multiple threads.
* Tested with GCC, Clang, and MSVC.
* Uses [std::mutex](https://en.cppreference.com/w/cpp/thread/mutex.html) for synchronization.
* Uses a customizable `storage` to store elements.

It's a class that can be constructed in several ways:

* Buffered:
* The channel accepts a specified number of elements, after which it blocks the writer threads and waits for a reader thread to read an element.
* It blocks the reader threads when channel is empty until a writer thread writes elements.
* `msd::channel<int> chan{2};`
* The channel accepts a specified number of elements, after which it blocks the writer threads and waits for a reader thread to read an element.
* It blocks the reader threads when channel is empty until a writer thread writes elements.
* `msd::channel<int> chan{2};`
* Unbuffered:
* Never blocks writes.
* It blocks the reader threads when channel is empty until a writer thread writes elements.
* `msd::channel<int> chan{};`
* Never blocks writes.
* It blocks the reader threads when channel is empty until a writer thread writes elements.
* `msd::channel<int> chan{};`
* Heap- or stack-allocated: pass a custom storage or choose a [built-in storage](https://github.com/andreiavrammsd/cpp-channel/blob/master/include/msd/storage.hpp):
* `msd::queue_storage` (default): uses [std::queue](https://en.cppreference.com/w/cpp/container/queue.html)
* `msd::vector_storage`: uses [std::vector](https://en.cppreference.com/w/cpp/container/vector.html) (if cache locality is important)
* `msd::channel<int, msd::vector_storage<int>> chan{2};`
* `msd::array_storage` (always buffered): uses [std::array](https://en.cppreference.com/w/cpp/container/array.html) (if you want stack allocation)
* `msd::channel<int, msd::array_storage<int, 10>> chan{};`
* `msd::channel<int, msd::array_storage<int, 10>> chan{10}; // does not compile because capacity is already passed as template argument`
* aka `msd::static_channel<int, 10>`
* `msd::queue_storage` (default): uses [std::queue](https://en.cppreference.com/w/cpp/container/queue.html)
* `msd::vector_storage`: uses [std::vector](https://en.cppreference.com/w/cpp/container/vector.html) (if cache locality is important)
* `msd::channel<int, msd::vector_storage<int>> chan{2};`
* `msd::array_storage` (always buffered): uses [std::array](https://en.cppreference.com/w/cpp/container/array.html) (if you want stack allocation)
* `msd::channel<int, msd::array_storage<int, 10>> chan{};`
* `msd::channel<int, msd::array_storage<int, 10>> chan{10}; // does not compile because capacity is already passed as template argument`
* aka `msd::static_channel<int, 10>`

A `storage` is:

* A class with a specific interface for storing elements.
* Must implement [FIFO](https://en.wikipedia.org/wiki/FIFO) logic.
* See [built-in storages](https://github.com/andreiavrammsd/cpp-channel/blob/master/include/msd/storage.hpp).

Exceptions:

* msd::operator<< throws `msd::closed_channel` if channel is closed.
* `msd::channel::write` returns `bool` status instead of throwing.
* Heap-allocated storages could throw.
Expand Down Expand Up @@ -78,104 +82,85 @@ VERSION=X.Y.Z \

## Usage

```c++
#include <cassert>
```cpp
// Unbuffered channel

#include <msd/channel.hpp>

int main() {
msd::channel<int> chan; // Unbuffered

// Send to channel
chan << 1 << 2;

// Read from channel
int first{};
int second{};
int main()
{
msd::channel<int> chan;

chan >> first >> second;
chan << 1 << 2; // Send

assert(first == 1);
assert(second == 2);
int first_value{};
int second_value{};
chan >> first_value >> second_value; // Receive
chan.read(first_value); // Returns channel close status (true/false), blocks thread when channel is empty
}
```

```c++
#include <cassert>

#include <msd/channel.hpp>

int main() {
msd::channel<int, msd::vector_storage<int>> chan{2}; // Buffered with vector storage
```cpp
// Buffered channel with custom storage

// Send to channel
chan << 1; // Throws if the channel is closed (after chan.close())
assert(chan.write(2)); // Returns false if the channel is closed (after chan.close())
chan << 3; // Blocks because the capacity is 2 (and no one reads from channel)
}
```

```c++
#include <msd/channel.hpp>

int main() {
msd::channel<int> chan{2}; // Buffered

int in = 1;
int out = 0;

// Send to channel
chan << in;
chan << in;
int main()
{
msd::channel<int, msd::vector_storage<int>> chan{2};

// Read from channel
chan.read(out);
chan >> out;
chan >> out; // Blocks because the channel is empty (and no one writes on it)
chan << 1; // Throws if channel is closed
chan.write(2); // Non-throwing write, returns channel close status (true/false)
chan << 3; // Blocks thread (no space, no reader)
}
```

```c++
#include <iostream>
```cpp
// Range-based iteration

#include <msd/channel.hpp>

int main() {
msd::channel<int, msd::vector_storage<int>> chan;
#include <iostream>

int in1 = 1;
int in2 = 2;
int main()
{
msd::channel<int> chan{2};

chan << in1 << in2;
chan << 1 << 2;
for (int value : chan) {
if (chan.closed()) {
// You can break before it's empty
break;
}

for (const auto out : chan) { // Blocks: waits forever for channel items
std::cout << out << '\n';
std::cout << value << '\n'; // Blocks thread until there is data to read or channel is closed and empty
}
}
```

```c++
```cpp
// Channel with statically-allocated storage (always buffered)

#include <msd/static_channel.hpp>

int main() {
msd::static_channel<int, 2> chan{}; // Always buffered
// Same as msd::channel<int, msd::array_storage<int, 2>>
#include <algorithm>

int main()
{
msd::static_channel<int, 2> src{};
msd::static_channel<int, 2> dst{};

int in = 1;
int out = 0;
src.write(1);
src.write(2);
src.close();

// Send to channel
chan.write(in);
chan.write(in);
std::copy_if(src.begin(), src.end(), msd::back_inserter(dst), [](int value) { return value % 2 == 0; });

// Read from channel
chan.read(out);
chan.read(out);
chan.read(out); // Blocks because the channel is empty (and no one writes on it)
dst.size(); // 1
}
```

See [examples](https://github.com/andreiavrammsd/cpp-channel/tree/master/examples) and [documentation](https://andreiavrammsd.github.io/cpp-channel/).
See [examples](https://github.com/andreiavrammsd/cpp-channel/tree/master/examples) and [tests](https://github.com/andreiavrammsd/cpp-channel/tree/master/tests). Read the [documentation](https://andreiavrammsd.github.io/cpp-channel/) for full API reference.

## Known limitations

Expand Down
7 changes: 1 addition & 6 deletions examples/.clang-tidy
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,4 @@ InheritParentConfig: true

Checks: >
-cppcoreguidelines-avoid-magic-numbers,
-readability-magic-numbers,
-fuchsia-default-arguments-calls

CheckOptions:
- { key: readability-identifier-naming.ClassCase, value: CamelCase }
- { key: readability-identifier-length.MinimumVariableNameLength, value: '1' }
-readability-magic-numbers
16 changes: 11 additions & 5 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,17 @@ function(add_example NAME)
endfunction()

function(run_example NAME)
add_custom_command(
TARGET ${NAME}
POST_BUILD
add_custom_target(
run_${NAME}
COMMAND ${NAME}
DEPENDS ${NAME}
COMMENT "Running example: ${NAME}")

add_dependencies(run_examples run_${NAME})
endfunction()

add_custom_target(examples)
add_custom_target(run_examples)

# Examples
add_example(example_basic basic.cpp)
Expand All @@ -40,5 +43,8 @@ run_example(example_streaming)
add_example(example_multithreading_static_channel multithreading_static_channel.cpp)
run_example(example_multithreading_static_channel)

add_example(example_merge_channels merge_channels.cpp)
run_example(example_merge_channels)
add_example(example_concurrent_map_filter concurrent_map_filter.cpp)
run_example(example_concurrent_map_filter)

add_example(example_semaphore semaphore.cpp)
run_example(example_semaphore)
36 changes: 19 additions & 17 deletions examples/close.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@
#include <future>
#include <iostream>
#include <mutex>
#include <string>
#include <sstream>
#include <thread>

int main()
{
msd::channel<int> channel{};

// Write data on the channel until it's closed
const auto input = [](msd::channel<int>& chan, int time_ms) {
const auto produce = [](msd::channel<int>& chan, int time_ms) {
static int inc = 0;

while (!chan.closed()) {
Expand All @@ -20,37 +20,39 @@ int main()
std::this_thread::sleep_for(std::chrono::milliseconds{time_ms});
}
};
const auto input_future = std::async(input, std::ref(channel), 10);
const auto producer = std::async(produce, std::ref(channel), 10);

// Close the channel after some time
const auto timeout = [](msd::channel<int>& chan, int time_ms) {
const auto close = [](msd::channel<int>& chan, int time_ms) {
std::this_thread::sleep_for(std::chrono::milliseconds{time_ms});

chan.close();
};
auto timeout_future = std::async(timeout, std::ref(channel), 100);
const auto closer = std::async(close, std::ref(channel), 100);

// Display all the data from the channel
// When the channel is closed and empty, the iteration will end
std::mutex cout_mutex;
std::mutex mutex;

const auto write = [&cout_mutex](msd::channel<int>& chan, int time_ms) {
for (auto out : chan) {
std::string msg{"out: " + std::to_string(out) + "\n"};
const auto consume = [&mutex](msd::channel<int>& chan, int time_ms) {
for (auto value : chan) {
std::stringstream stream;
stream << "value " << value << " from consumer " << std::this_thread::get_id() << '\n';

{
std::lock_guard<std::mutex> lock(cout_mutex);
std::cout << msg;
std::lock_guard<std::mutex> lock(mutex);

std::cout << stream.str();
}

std::this_thread::sleep_for(std::chrono::milliseconds{time_ms});
}
};
const auto write_future1 = std::async(write, std::ref(channel), 1);
const auto write_future2 = std::async(write, std::ref(channel), 100);
const auto consumer_1 = std::async(consume, std::ref(channel), 50);
const auto consumer_2 = std::async(consume, std::ref(channel), 10);

input_future.wait();
timeout_future.wait();
write_future1.wait();
write_future2.wait();
producer.wait();
closer.wait();
consumer_1.wait();
consumer_2.wait();
}
Loading
Loading