Skip to content

Commit d3e61b5

Browse files
committed
started to implement on
1 parent 752882e commit d3e61b5

File tree

7 files changed

+263
-2
lines changed

7 files changed

+263
-2
lines changed
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
// include/beman/execution26/detail/on.hpp -*-C++-*-
2+
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
3+
4+
#ifndef INCLUDED_INCLUDE_BEMAN_EXECUTION26_DETAIL_ON
5+
#define INCLUDED_INCLUDE_BEMAN_EXECUTION26_DETAIL_ON
6+
7+
#include <beman/execution26/detail/scheduler.hpp>
8+
#include <beman/execution26/detail/sender.hpp>
9+
#include <beman/execution26/detail/sender_adaptor_closure.hpp>
10+
#include <beman/execution26/detail/transform_sender.hpp>
11+
#include <beman/execution26/detail/query_with_default.hpp>
12+
#include <beman/execution26/detail/get_domain.hpp>
13+
#include <beman/execution26/detail/get_domain_early.hpp>
14+
#include <beman/execution26/detail/default_domain.hpp>
15+
#include <beman/execution26/detail/make_sender.hpp>
16+
#include <beman/execution26/detail/product_type.hpp>
17+
#include <beman/execution26/detail/sender_for.hpp>
18+
#include <beman/execution26/detail/join_env.hpp>
19+
#include <beman/execution26/detail/forward_like.hpp>
20+
#include <beman/execution26/detail/fwd_env.hpp>
21+
#include <beman/execution26/detail/sched_env.hpp>
22+
#include <utility>
23+
24+
// ----------------------------------------------------------------------------
25+
26+
namespace beman::execution26::detail {
27+
struct on_t {
28+
template <::beman::execution26::detail::sender_for<on_t> OutSndr, typename Env>
29+
auto transform_env(OutSndr&& out_sndr, Env&& env) const -> decltype(auto) {
30+
// auto&&[_, data, _] = out_sndr;
31+
auto&& data{out_sndr.template get<1>()};
32+
33+
if constexpr (::beman::execution26::scheduler<decltype(data)>)
34+
return ::beman::execution26::detail::join_env(
35+
::beman::execution26::detail::sched_env(::beman::execution26::detail::forward_like<OutSndr>(data)
36+
37+
),
38+
::beman::execution26::detail::fwd_env(::std::forward<Env>(env)));
39+
else
40+
return std::forward<Env>(env);
41+
}
42+
43+
template <::beman::execution26::scheduler Sch, ::beman::execution26::sender Sndr>
44+
requires ::beman::execution26::detail::is_sender_adaptor_closure<Sndr>
45+
auto operator()(Sch&&, Sndr&&) const -> void =
46+
BEMAN_EXECUTION26_DELETE("on(sch, sndr) requires that sndr isn't both a sender and sender adaptor closure");
47+
48+
template <::beman::execution26::scheduler Sch,
49+
::beman::execution26::sender Sndr,
50+
::beman::execution26::detail::is_sender_adaptor_closure Closure>
51+
requires ::beman::execution26::detail::is_sender_adaptor_closure<Sndr>
52+
auto operator()(Sndr&&, Sch&&, Closure&&) const -> void =
53+
BEMAN_EXECUTION26_DELETE("on(sch, sndr) requires that sndr isn't both a sender and sender adaptor closure");
54+
55+
template <::beman::execution26::scheduler Sch, ::beman::execution26::sender Sndr>
56+
auto operator()(Sch&& sch, Sndr&& sndr) const {
57+
auto domain{::beman::execution26::detail::query_with_default(
58+
::beman::execution26::get_domain, sch, ::beman::execution26::default_domain{})};
59+
return ::beman::execution26::transform_sender(
60+
domain,
61+
::beman::execution26::detail::make_sender(*this, ::std::forward<Sch>(sch), ::std::forward<Sndr>(sndr)));
62+
}
63+
template <::beman::execution26::scheduler Sch,
64+
::beman::execution26::sender Sndr,
65+
::beman::execution26::detail::is_sender_adaptor_closure Closure>
66+
auto operator()(Sndr&& sndr, Sch&& sch, Closure&& closure) const {
67+
auto domain{::beman::execution26::detail::get_domain_early(sndr)};
68+
return ::beman::execution26::transform_sender(
69+
domain,
70+
::beman::execution26::detail::make_sender(
71+
*this,
72+
::beman::execution26::detail::product_type{::std::forward<Sch>(sch), ::std::forward<Closure>(closure)},
73+
::std::forward<Sndr>(sndr)));
74+
}
75+
};
76+
} // namespace beman::execution26::detail
77+
78+
namespace beman::execution26 {
79+
using on_t = ::beman::execution26::detail::on_t;
80+
inline constexpr ::beman::execution26::on_t on{};
81+
} // namespace beman::execution26
82+
83+
// ----------------------------------------------------------------------------
84+
85+
#endif

include/beman/execution26/detail/sender_adaptor_closure.hpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,16 @@ namespace beman::execution26 {
2020
template <typename>
2121
struct sender_adaptor_closure : ::beman::execution26::detail::pipeable::sender_adaptor_closure_base {};
2222
// NOLINTEND(bugprone-crtp-constructor-accessibility)
23+
2324
} // namespace beman::execution26
2425

26+
namespace beman::execution26::detail {
27+
template <typename Closure>
28+
concept is_sender_adaptor_closure =
29+
::std::derived_from<::std::decay_t<Closure>,
30+
::beman::execution26::sender_adaptor_closure<::std::decay_t<Closure>>>;
31+
}
32+
2533
namespace beman::execution26::detail::pipeable {
2634
template <::beman::execution26::sender Sender, typename Adaptor>
2735
requires(not::beman::execution26::sender<Adaptor>) &&

include/beman/execution26/execution.hpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,15 +42,16 @@
4242
#include <beman/execution26/detail/into_variant.hpp>
4343
#include <beman/execution26/detail/just.hpp>
4444
#include <beman/execution26/detail/let.hpp>
45+
// #include <beman/execution26/detail/on.hpp>
4546
#include <beman/execution26/detail/read_env.hpp>
4647
#include <beman/execution26/detail/schedule_from.hpp>
4748
#include <beman/execution26/detail/starts_on.hpp>
4849
#include <beman/execution26/detail/sync_wait.hpp>
4950
#include <beman/execution26/detail/then.hpp>
50-
#include <beman/execution26/detail/write_env.hpp>
5151
#include <beman/execution26/detail/when_all.hpp>
5252
#include <beman/execution26/detail/when_all_with_variant.hpp>
5353
#include <beman/execution26/detail/with_awaitable_senders.hpp>
54+
#include <beman/execution26/detail/write_env.hpp>
5455

5556
// ----------------------------------------------------------------------------
5657

src/beman/execution26/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ target_sources(
123123
${PROJECT_SOURCE_DIR}/include/beman/execution26/detail/nostopstate.hpp
124124
${PROJECT_SOURCE_DIR}/include/beman/execution26/detail/nothrow_callable.hpp
125125
${PROJECT_SOURCE_DIR}/include/beman/execution26/detail/notify.hpp
126+
${PROJECT_SOURCE_DIR}/include/beman/execution26/detail/on.hpp
126127
${PROJECT_SOURCE_DIR}/include/beman/execution26/detail/on_stop_request.hpp
127128
${PROJECT_SOURCE_DIR}/include/beman/execution26/detail/operation_state.hpp
128129
${PROJECT_SOURCE_DIR}/include/beman/execution26/detail/operation_state_task.hpp

tests/beman/execution26/CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ endif()
1111
list(
1212
APPEND
1313
execution_tests
14+
exec-on.test
1415
notify.test
1516
exec-scounting.test
1617
exec-awaitable.test
@@ -128,7 +129,7 @@ foreach(test ${execution_tests})
128129
add_test(NAME ${TEST_EXE} COMMAND $<TARGET_FILE:${TEST_EXE}>)
129130
endforeach()
130131

131-
if(NOT PROJECT_IS_TOP_LEVEL)
132+
if(PROJECT_IS_TOP_LEVEL)
132133
# test if the targets are findable from the build directory
133134
# cmake-format: off
134135
add_test(NAME find-package-test
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
// tests/beman/execution26/exec-on.test.cpp -*-C++-*-
2+
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
3+
4+
#include <beman/execution26/detail/on.hpp>
5+
#include <beman/execution26/detail/just.hpp>
6+
#include <beman/execution26/detail/sender_adaptor_closure.hpp>
7+
#include <beman/execution26/detail/then.hpp>
8+
#include <test/execution.hpp>
9+
#include <test/thread_pool.hpp>
10+
#include <concepts>
11+
12+
// ----------------------------------------------------------------------------
13+
14+
namespace {
15+
struct both : test_std::sender_adaptor_closure<both> {
16+
using sender_concept = test_std::sender_t;
17+
};
18+
19+
static_assert(test_std::sender<both>);
20+
static_assert(test_detail::is_sender_adaptor_closure<both>);
21+
22+
template <test_std::scheduler Sch,
23+
test_std::sender Sndr,
24+
test_detail::is_sender_adaptor_closure Closure,
25+
typename Both>
26+
auto test_interface(Sch sch, Sndr sndr, Closure closure, Both both) -> void {
27+
static_assert(requires {
28+
{ test_std::on(sch, sndr) } -> test_std::sender;
29+
});
30+
static_assert(not requires { test_std::on(sch, both); });
31+
static_assert(requires {
32+
{ test_std::on(sndr, sch, closure) } -> test_std::sender;
33+
});
34+
static_assert(not requires { test_std::on(both, sch, closure); });
35+
36+
auto sndr1{test_std::on(sch, sndr)};
37+
auto sndr2{test_std::on(sndr, sch, closure)};
38+
test::use(sndr1, sndr2);
39+
}
40+
41+
template <test_detail::sender_for<test_std::on_t> OutSndr>
42+
auto test_transform_env(OutSndr out_sndr) -> void {
43+
auto e{test_std::on.transform_env(out_sndr, test_std::empty_env{})};
44+
}
45+
} // namespace
46+
47+
TEST(exec_on) {
48+
test::thread_pool pool{};
49+
50+
static_assert(std::same_as<const test_std::on_t, decltype(test_std::on)>);
51+
static_assert(test_detail::is_sender_adaptor_closure<decltype(test_std::then([] {}))>);
52+
static_assert(not test_detail::is_sender_adaptor_closure<decltype(test_std::just([] {}))>);
53+
test_interface(pool.get_scheduler(), test_std::just(), test_std::then([] {}), both{});
54+
55+
test_transform_env(test_std::on(pool.get_scheduler(), test_std::just()));
56+
test_transform_env(test_std::on(test_std::just(), pool.get_scheduler(), test_std::then([] {})));
57+
}
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
// tests/beman/execution26/include/test/thread_pool.hpp -*-C++-*-
2+
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
3+
// ----------------------------------------------------------------------------
4+
5+
#ifndef INCLUDED_TESTS_BEMAN_EXECUTION26_INCLUDE_TEST_THREAD_POOL
6+
#define INCLUDED_TESTS_BEMAN_EXECUTION26_INCLUDE_TEST_THREAD_POOL
7+
8+
#include <beman/execution26/execution.hpp>
9+
#include <test/execution.hpp>
10+
11+
#include <mutex>
12+
#include <condition_variable>
13+
#include <thread>
14+
#include <memory>
15+
// ----------------------------------------------------------------------------
16+
17+
namespace test {
18+
struct thread_pool;
19+
}
20+
21+
struct test::thread_pool {
22+
struct node {
23+
node* next;
24+
virtual void run() = 0;
25+
26+
protected:
27+
~node() = default;
28+
};
29+
30+
std::mutex mutex;
31+
std::condition_variable condition;
32+
node* stack{};
33+
bool stopped{false};
34+
std::thread driver{[this] {
35+
while (std::optional<node*> n = [this] {
36+
std::unique_lock cerberus(mutex);
37+
condition.wait(cerberus, [this] { return stopped || stack; });
38+
return this->stack ? std::optional<node*>(std::exchange(this->stack, this->stack->next))
39+
: std::optional<node*>();
40+
}()) {
41+
(*n)->run();
42+
}
43+
}};
44+
45+
thread_pool() = default;
46+
thread_pool(thread_pool&&) = delete;
47+
~thread_pool() {
48+
this->stop();
49+
this->driver.join();
50+
}
51+
void stop() {
52+
{
53+
std::lock_guard cerberus(this->mutex);
54+
stopped = true;
55+
}
56+
this->condition.notify_one();
57+
}
58+
59+
struct scheduler {
60+
using scheduler_concept = test_std::scheduler_t;
61+
struct env {
62+
test::thread_pool* pool;
63+
64+
template <typename T>
65+
scheduler query(const test_std::get_completion_scheduler_t<T>&) const noexcept {
66+
return {this->pool};
67+
}
68+
};
69+
template <typename Receiver>
70+
struct state final : test::thread_pool::node {
71+
using operation_state_concept = test_std::operation_state_t;
72+
std::remove_cvref_t<Receiver> receiver;
73+
test::thread_pool* pool;
74+
75+
template <typename R>
76+
state(R&& r, test::thread_pool* p) : node{}, receiver(std::forward<R>(r)), pool(p) {}
77+
void start() & noexcept {
78+
{
79+
std::lock_guard cerberus(this->pool->mutex);
80+
this->next = std::exchange(this->pool->stack, this);
81+
}
82+
this->pool->condition.notify_one();
83+
}
84+
void run() override { test_std::set_value(std::move(this->receiver)); }
85+
};
86+
struct sender {
87+
using sender_concept = test_std::sender_t;
88+
using completion_signatures = test_std::completion_signatures<test_std::set_value_t()>;
89+
test::thread_pool* pool;
90+
template <typename Receiver>
91+
state<Receiver> connect(Receiver&& receiver) {
92+
return state<Receiver>(std::forward<Receiver>(receiver), pool);
93+
}
94+
95+
env get_env() const noexcept { return {this->pool}; }
96+
};
97+
test::thread_pool* pool;
98+
sender schedule() { return {this->pool}; }
99+
bool operator==(const scheduler&) const = default;
100+
};
101+
scheduler get_scheduler() { return {this}; }
102+
};
103+
104+
static_assert(test_std::scheduler<test::thread_pool::scheduler>);
105+
106+
// ----------------------------------------------------------------------------
107+
108+
#endif

0 commit comments

Comments
 (0)