Skip to content

Commit ee8ac34

Browse files
committed
feat: channel broadcast
1 parent eec917b commit ee8ac34

File tree

4 files changed

+470
-0
lines changed

4 files changed

+470
-0
lines changed

.github/workflows/ci.yml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,12 @@ jobs:
141141
run: |
142142
./coro_channel${{ matrix.env.BIN_SUFFIX }}
143143
144+
- name: Test broadcast
145+
if: always()
146+
working-directory: build
147+
run: |
148+
./coro_broadcast${{ matrix.env.BIN_SUFFIX }}
149+
144150
- name: Test when
145151
if: always()
146152
working-directory: build

CMakeLists.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,10 @@ if (CORO_BUILD_TEST)
8181
set(TARGET_NAME ${PROJECT_NAME}_channel)
8282
add_executable(${TARGET_NAME} test/coro_channel.cpp)
8383

84+
# test broadcast
85+
set(TARGET_NAME ${PROJECT_NAME}_broadcast)
86+
add_executable(${TARGET_NAME} test/coro_broadcast.cpp)
87+
8488
# test when
8589
set(TARGET_NAME ${PROJECT_NAME}_when)
8690
add_executable(${TARGET_NAME} test/coro_when.cpp)

include/coro/channel.hpp

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,75 @@ struct channel {
8787
return send_awaitable{this, std::move(value)};
8888
}
8989

90+
// Broadcast awaitable: sends the value to ALL waiting receivers
91+
struct broadcast_awaitable {
92+
channel* ch_;
93+
T value_;
94+
std::coroutine_handle<> handle_{};
95+
executor* exec_ = nullptr;
96+
size_t receivers_notified_ = 0;
97+
98+
bool await_ready() const noexcept {
99+
return ch_->closed_.load(std::memory_order_acquire);
100+
}
101+
102+
template <typename Promise>
103+
bool await_suspend([[maybe_unused]] std::coroutine_handle<Promise> h) {
104+
std::unique_lock<MUTEX> lock(ch_->mutex_);
105+
106+
if (ch_->closed_.load(std::memory_order_relaxed)) {
107+
return false;
108+
}
109+
110+
// Broadcast to ALL waiting receivers (not just one)
111+
if (ch_->recv_queue_head_ != nullptr) {
112+
auto* recv_awaiter = ch_->recv_queue_head_;
113+
114+
// Iterate through all waiting receivers and notify them
115+
while (recv_awaiter != nullptr) {
116+
// Copy the value to each receiver's pending value
117+
recv_awaiter->pending_value_ = value_;
118+
receivers_notified_++;
119+
120+
auto recv_handle = recv_awaiter->handle_;
121+
auto recv_exec = recv_awaiter->exec_;
122+
auto* next = recv_awaiter->next_;
123+
124+
// Resume the receiver, must use post
125+
assert(recv_exec);
126+
recv_exec->post([recv_handle]() {
127+
recv_handle.resume();
128+
});
129+
130+
recv_awaiter = next;
131+
}
132+
133+
// Clear the receiver queue as all have been notified
134+
ch_->recv_queue_head_ = nullptr;
135+
136+
return false; // Don't suspend, broadcast is complete
137+
}
138+
139+
// No receivers waiting - for broadcast, we typically don't buffer
140+
// and just complete immediately. If you want buffering behavior,
141+
// you can modify this logic.
142+
return false;
143+
}
144+
145+
size_t await_resume() {
146+
if (ch_->closed_.load(std::memory_order_acquire)) {
147+
return 0;
148+
}
149+
return receivers_notified_;
150+
}
151+
};
152+
153+
// Broadcast: sends value to ALL waiting receivers
154+
// Returns the number of receivers that received the broadcast
155+
auto broadcast(T value) {
156+
return broadcast_awaitable{this, std::move(value)};
157+
}
158+
90159
struct recv_awaitable {
91160
channel* ch_;
92161
std::coroutine_handle<> handle_;

0 commit comments

Comments
 (0)