Skip to content

Commit d82ccca

Browse files
Make bulk pipeable (#173)
* Make bulk pipeable * Add separate tests for pipable bulk syntax
1 parent fa6d441 commit d82ccca

File tree

2 files changed

+63
-1
lines changed

2 files changed

+63
-1
lines changed

include/beman/execution/detail/bulk.hpp

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
#ifndef INCLUDED_BEMAN_EXECUTION_DETAIL_BULK
55
#define INCLUDED_BEMAN_EXECUTION_DETAIL_BULK
66

7+
#include "beman/execution/detail/sender_adaptor.hpp"
8+
#include "beman/execution/detail/sender_adaptor_closure.hpp"
79
#include <beman/execution/detail/get_completion_signatures.hpp>
810
#include <beman/execution/detail/meta_combine.hpp>
911
#include <beman/execution/detail/meta_unique.hpp>
@@ -29,7 +31,13 @@
2931
#include <beman/execution/detail/suppress_push.hpp>
3032
namespace beman::execution::detail {
3133

32-
struct bulk_t {
34+
struct bulk_t : ::beman::execution::sender_adaptor_closure<bulk_t> {
35+
36+
template <class Shape, class f>
37+
requires(std::is_integral_v<Shape> && ::beman::execution::detail::movable_value<f>)
38+
auto operator()(Shape&& shape, f&& fun) const {
39+
return beman::execution::detail::sender_adaptor{*this, std::forward<Shape>(shape), std::forward<f>(fun)};
40+
}
3341

3442
template <class Sender, class Shape, class f>
3543
requires(::beman::execution::sender<Sender> && std::is_integral_v<Shape> &&

tests/beman/execution/exec-bulk.test.cpp

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,60 @@ auto test_bulk_noexept() {
8787
ASSERT(counter == 10);
8888
}
8989

90+
auto test_bulk_pipeable() {
91+
auto b0 = test_std::just() | test_std::bulk(1, [](int) {});
92+
93+
static_assert(test_std::sender<decltype(b0)>);
94+
auto b0_env = test_std::get_env(b0);
95+
auto b0_completions = test_std::get_completion_signatures(b0, b0_env);
96+
static_assert(
97+
std::is_same_v<decltype(b0_completions),
98+
beman::execution::completion_signatures<beman::execution::set_value_t(),
99+
beman::execution::set_error_t(std::exception_ptr)> >,
100+
"Completion signatures do not match!");
101+
102+
int counter = 0;
103+
104+
auto b1 = test_std::just() | test_std::bulk(5, [&](int i) { counter += i; });
105+
106+
static_assert(test_std::sender<decltype(b1)>);
107+
auto b1_env = test_std::get_env(b0);
108+
auto b1_completions = test_std::get_completion_signatures(b1, b1_env);
109+
static_assert(
110+
std::is_same_v<decltype(b1_completions),
111+
beman::execution::completion_signatures<beman::execution::set_value_t(),
112+
beman::execution::set_error_t(std::exception_ptr)> >,
113+
"Completion signatures do not match!");
114+
test_std::sync_wait(b1);
115+
ASSERT(counter == 10);
116+
117+
std::vector<int> a{1, 2, 3, 4, 5, 6, 7, 8};
118+
std::vector<int> b{9, 10, 11, 13, 14, 15, 16, 17};
119+
120+
std::vector<int> results(a.size(), 0);
121+
122+
auto b2 = test_std::just(a) | test_std::bulk(a.size(), [&](std::size_t index, const std::vector<int>& vec) {
123+
results[index] = vec[index] * b[index];
124+
});
125+
126+
static_assert(test_std::sender<decltype(b2)>);
127+
auto b2_env = test_std::get_env(b2);
128+
auto b2_completions = test_std::get_completion_signatures(b2, b2_env);
129+
static_assert(
130+
std::is_same_v<decltype(b2_completions),
131+
beman::execution::completion_signatures<beman::execution::set_value_t(std::vector<int>),
132+
beman::execution::set_error_t(std::exception_ptr)> >,
133+
"Completion signatures do not match!");
134+
test_std::sync_wait(b2);
135+
136+
// Expected results: element-wise multiplication of a and b
137+
std::vector<int> expected{9, 20, 33, 52, 70, 90, 112, 136};
138+
139+
for (size_t i = 0; i < results.size(); ++i) {
140+
ASSERT(results[i] == expected[i]);
141+
}
142+
}
143+
90144
} // namespace
91145

92146
TEST(exec_bulk) {

0 commit comments

Comments
 (0)