Skip to content

Commit 7d2a2b0

Browse files
authored
Affine on (#199)
* enhanced run_loop to have an infallible scheduler * clang format * don't use set_stopped for unstoppable tokens * removed remaining uses of set_error in run_loop * try to avoid set_error in let* based on noexcept * fix run_loop's use of empty_env * implemented a basic affine_on * added the various constraints and customization to affine_on * added some documentation to affine_on * minor fixes to the affine_on implementation * added some tests and an example affine_on customization * added a few affine_on customizations * restore empty CMakeLists.txt for code examples * fix two minor issues discovered by CI * fix a formatting issue * remove duplicate headers
1 parent 562ec1d commit 7d2a2b0

File tree

16 files changed

+459
-24
lines changed

16 files changed

+459
-24
lines changed

docs/overview.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -639,6 +639,13 @@ The expression <code>schedule(<i>scheduler</i>)</code> creates a sender which up
639639
### Sender Adaptors
640640
The sender adaptors take one or more senders and adapt their respective behavior to complete with a corresponding result. The description uses the informal function <code><i>completions-of</i>(<i>sender</i>)</code> to represent the completion signatures which <code><i>sender</i></code> produces. Also, completion signatures are combined using <code>+</code>: the result is the deduplicated set of the combined completion signatures.
641641

642+
<details>
643+
<summary><code>affine_on(<i>sender</i>) -> <i>sender-of</i><<i>completions-of</i>(<i>sender</i>)></code></summary>
644+
The expression <code>affine_on(<i>sender</i>)</code> creates
645+
a sender which completes on the same scheduler it was started on, even if <code><i>sender</i></code> changes the scheduler. The scheduler to resume on is determined using <code>get_scheduler(get_env(<i>rcvr</i>))</code> where <code><i>rcvr</i></code> is the receiver the sender is <code>connect</code>ed to.
646+
647+
The primary use of <code>affine_on</code> is implementing scheduler affinity for <code>task</code>.
648+
</details>
642649
<details>
643650
<summary>`bulk`</summary>
644651
</details>
@@ -698,6 +705,10 @@ The expression <code>into_variant(<i>sender</i>)</code> creates a sender which t
698705
<details>
699706
<summary><code>when_all_with_variant(<i>sender</i>...) -> <i>sender</i></code></summary>
700707
</details>
708+
<details>
709+
<summary><code>write_env(<i>sender</i>, <i>env</i>) -> <i>sender</i></code></summary>
710+
</details>
711+
701712

702713
### Sender Consumers
703714

docs/tutorial.mds

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -469,7 +469,7 @@ pipe notation.
469469

470470
int main() {
471471
int f{3};
472-
try { *tst::sync_wait(ex::just(17) | ex::let_value([f](int i){ throw f * i; return ex::just(); })); }
472+
try { tst::sync_wait(ex::just(17) | ex::let_value([f](int i){ throw f * i; return ex::just(); })); }
473473
catch (int e) {
474474
std::cout << "e=" << e << "\n";
475475
}
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
// include/beman/execution/detail/affine_on.hpp -*-C++-*-
2+
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
3+
4+
#ifndef INCLUDED_INCLUDE_BEMAN_EXECUTION_DETAIL_AFFINE_ON
5+
#define INCLUDED_INCLUDE_BEMAN_EXECUTION_DETAIL_AFFINE_ON
6+
7+
#include <beman/execution/detail/env.hpp>
8+
#include <beman/execution/detail/forward_like.hpp>
9+
#include <beman/execution/detail/fwd_env.hpp>
10+
#include <beman/execution/detail/get_domain_early.hpp>
11+
#include <beman/execution/detail/get_scheduler.hpp>
12+
#include <beman/execution/detail/get_stop_token.hpp>
13+
#include <beman/execution/detail/join_env.hpp>
14+
#include <beman/execution/detail/make_sender.hpp>
15+
#include <beman/execution/detail/never_stop_token.hpp>
16+
#include <beman/execution/detail/prop.hpp>
17+
#include <beman/execution/detail/schedule_from.hpp>
18+
#include <beman/execution/detail/scheduler.hpp>
19+
#include <beman/execution/detail/sender.hpp>
20+
#include <beman/execution/detail/sender_adaptor.hpp>
21+
#include <beman/execution/detail/sender_adaptor_closure.hpp>
22+
#include <beman/execution/detail/sender_for.hpp>
23+
#include <beman/execution/detail/sender_has_affine_on.hpp>
24+
#include <beman/execution/detail/tag_of_t.hpp>
25+
#include <beman/execution/detail/transform_sender.hpp>
26+
#include <beman/execution/detail/write_env.hpp>
27+
28+
#include <concepts>
29+
#include <type_traits>
30+
31+
// ----------------------------------------------------------------------------
32+
33+
namespace beman::execution::detail {
34+
35+
/**
36+
* @brief The affine_on_t struct is a sender adaptor closure that transforms a sender
37+
* to complete on the scheduler obtained from the receiver's environment.
38+
*
39+
* This adaptor implements scheduler affinity to adapt a sender to complete on the
40+
* scheduler obtained the receiver's environment. The get_scheduler query is used
41+
* to obtain the scheduler on which the sender gets started.
42+
*/
43+
struct affine_on_t : ::beman::execution::sender_adaptor_closure<affine_on_t> {
44+
/**
45+
* @brief Adapt a sender with affine_on.
46+
*
47+
* @tparam Sender The deduced type of the sender to be transformed.
48+
* @param sender The sender to be transformed.
49+
* @return An adapted sender to complete on the scheduler it was started on.
50+
*/
51+
template <::beman::execution::sender Sender>
52+
auto operator()(Sender&& sender) const {
53+
return ::beman::execution::detail::transform_sender(
54+
::beman::execution::detail::get_domain_early(sender),
55+
::beman::execution::detail::make_sender(
56+
*this, ::beman::execution::env<>{}, ::std::forward<Sender>(sender)));
57+
}
58+
59+
/**
60+
* @brief Overload for creating a sender adaptor from affine_on.
61+
*
62+
* @return A sender adaptor for the affine_on_t.
63+
*/
64+
auto operator()() const { return ::beman::execution::detail::sender_adaptor{*this}; }
65+
66+
/**
67+
* @brief affine_on is implemented by transforming it into a use of schedule_from.
68+
*
69+
* The constraints ensure that the environment provides a scheduler which is
70+
* infallible and, thus, can be used to guarantee completion on the correct
71+
* scheduler.
72+
*
73+
* The implementation first tries to see if the child sender's tag has a custom
74+
* affine_on implementation. If it does, that is used. Otherwise, the default
75+
* implementation gets a scheduler from the environment and uses schedule_from
76+
* to adapt the sender to complete on that scheduler.
77+
*
78+
* @tparam Sender The type of the sender to be transformed.
79+
* @tparam Env The type of the environment providing the scheduler.
80+
* @param sender The sender to be transformed.
81+
* @param env The environment providing the scheduler.
82+
* @return A transformed sender that is affined to the scheduler.
83+
*/
84+
template <::beman::execution::sender Sender, typename Env>
85+
requires ::beman::execution::detail::sender_for<Sender, affine_on_t> && requires(const Env& env) {
86+
{ ::beman::execution::get_scheduler(env) } -> ::beman::execution::scheduler;
87+
{ ::beman::execution::schedule(::beman::execution::get_scheduler(env)) } -> ::beman::execution::sender;
88+
{
89+
::beman::execution::get_completion_signatures(
90+
::beman::execution::schedule(::beman::execution::get_scheduler(env)),
91+
::beman::execution::detail::join_env(
92+
::beman::execution::env{::beman::execution::prop{::beman::execution::get_stop_token,
93+
::beman::execution::never_stop_token{}}},
94+
env))
95+
} -> ::std::same_as<::beman::execution::completion_signatures<::beman::execution::set_value_t()>>;
96+
}
97+
static auto transform_sender(Sender&& sender, const Env& env) {
98+
[[maybe_unused]] auto& [tag, data, child] = sender;
99+
using child_tag_t = ::beman::execution::tag_of_t<::std::remove_cvref_t<decltype(child)>>;
100+
101+
#if 0
102+
if constexpr (requires(const child_tag_t& t) {
103+
{
104+
t.affine_on(::beman::execution::detail::forward_like<Sender>(child), env)
105+
} -> ::beman::execution::sender;
106+
})
107+
#else
108+
if constexpr (::beman::execution::detail::nested_sender_has_affine_on<Sender, Env>)
109+
#endif
110+
{
111+
return child_tag_t{}.affine_on(::beman::execution::detail::forward_like<Sender>(child), env);
112+
} else {
113+
return ::beman::execution::write_env(
114+
::beman::execution::schedule_from(
115+
::beman::execution::get_scheduler(env),
116+
::beman::execution::write_env(::beman::execution::detail::forward_like<Sender>(child), env)),
117+
::beman::execution::detail::join_env(
118+
::beman::execution::env{::beman::execution::prop{::beman::execution::get_stop_token,
119+
::beman::execution::never_stop_token{}}},
120+
env));
121+
}
122+
}
123+
};
124+
125+
} // namespace beman::execution::detail
126+
127+
namespace beman::execution {
128+
/**
129+
* @brief affine_on is a CPO, used to adapt a sender to complete on the scheduler
130+
* it got started on which is derived from get_scheduler on the receiver's environment.
131+
*/
132+
using beman::execution::detail::affine_on_t;
133+
inline constexpr affine_on_t affine_on{};
134+
} // namespace beman::execution
135+
136+
// ----------------------------------------------------------------------------
137+
138+
#endif

include/beman/execution/detail/just.hpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@ struct just_t {
3434
return ::beman::execution::detail::make_sender(
3535
*this, ::beman::execution::detail::product_type{::std::forward<T>(arg)...});
3636
}
37+
template <::beman::execution::sender Sender>
38+
static auto affine_on(Sender&& sndr, const auto&) noexcept {
39+
return ::std::forward<Sender>(sndr);
40+
}
3741
};
3842

3943
template <typename Completion, typename... T, typename Env>

include/beman/execution/detail/let.hpp

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,12 @@ struct impls_for<::beman::execution::detail::let_t<Completion>> : ::beman::execu
162162
{}};
163163
}};
164164
template <typename Receiver, typename... Args>
165-
static auto let_bind(auto& state, Receiver& receiver, Args&&... args) {
165+
static auto
166+
let_bind(auto& state, Receiver& receiver, Args&&... args) noexcept(noexcept(::beman::execution::connect(
167+
::std::apply(::std::move(state.fun),
168+
::std::move(state.args.template emplace<::beman::execution::detail::decayed_tuple<Args...>>(
169+
::std::forward<Args>(args)...))),
170+
let_receiver<Receiver, decltype(state.env)>{receiver, state.env}))) {
166171
using args_t = ::beman::execution::detail::decayed_tuple<Args...>;
167172
auto mkop{[&] {
168173
return ::beman::execution::connect(
@@ -179,7 +184,8 @@ struct impls_for<::beman::execution::detail::let_t<Completion>> : ::beman::execu
179184
try {
180185
let_bind(state, receiver, ::std::forward<Args>(args)...);
181186
} catch (...) {
182-
::beman::execution::set_error(::std::move(receiver), ::std::current_exception());
187+
if constexpr (not noexcept(let_bind(state, receiver, ::std::forward<Args>(args)...)))
188+
::beman::execution::set_error(::std::move(receiver), ::std::current_exception());
183189
}
184190
} else {
185191
Tag()(::std::move(receiver), ::std::forward<Args>(args)...);
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
// include/beman/execution/detail/nested_sender_has_affine_on.hpp -*-C++-*-
2+
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
3+
4+
#ifndef INCLUDED_INCLUDE_BEMAN_EXECUTION_DETAIL_NESTED_SENDER_HAS_AFFINE_ON
5+
#define INCLUDED_INCLUDE_BEMAN_EXECUTION_DETAIL_NESTED_SENDER_HAS_AFFINE_ON
6+
7+
#include <beman/execution/detail/sender_has_affine_on.hpp>
8+
9+
// ----------------------------------------------------------------------------
10+
11+
namespace beman::execution::detail {
12+
template <typename Sender, typename Env>
13+
concept nested_sender_has_affine_on = requires(Sender&& sndr, const Env& env) {
14+
{ sndr.template get<2>() } -> ::beman::execution::detail::sender_has_affine_on<Env>;
15+
};
16+
} // namespace beman::execution::detail
17+
18+
// ----------------------------------------------------------------------------
19+
20+
#endif

include/beman/execution/detail/read_env.hpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@
2121
namespace beman::execution::detail {
2222
struct read_env_t {
2323
auto operator()(auto&& query) const { return ::beman::execution::detail::make_sender(*this, query); }
24+
template <::beman::execution::sender Sender>
25+
static auto affine_on(Sender&& sndr, const auto&) noexcept {
26+
return ::std::forward<Sender>(sndr);
27+
}
2428
};
2529

2630
template <>

include/beman/execution/detail/run_loop.hpp

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,9 @@
1212
#include <beman/execution/detail/operation_state.hpp>
1313
#include <beman/execution/detail/scheduler.hpp>
1414
#include <beman/execution/detail/sender.hpp>
15-
#include <beman/execution/detail/set_error.hpp>
1615
#include <beman/execution/detail/set_stopped.hpp>
1716
#include <beman/execution/detail/set_value.hpp>
17+
#include <beman/execution/detail/unstoppable_token.hpp>
1818

1919
#include <exception>
2020
#include <condition_variable>
@@ -53,27 +53,29 @@ class run_loop {
5353
// NOLINTBEGIN(misc-no-recursion)
5454
template <typename R>
5555
opstate(run_loop* l, R&& rcvr) : loop(l), receiver(::std::forward<Receiver>(rcvr)) {}
56-
auto start() & noexcept -> void {
57-
try {
58-
this->loop->push_back(this);
59-
} catch (...) {
60-
::beman::execution::set_error(::std::move(this->receiver), ::std::current_exception());
61-
}
62-
}
56+
auto start() & noexcept -> void { this->loop->push_back(this); }
6357
// NOLINTEND(misc-no-recursion)
6458
auto execute() noexcept -> void override {
65-
if (::beman::execution::get_stop_token(::beman::execution::get_env(this->receiver)).stop_requested())
66-
::beman::execution::set_stopped(::std::move(this->receiver));
67-
else
59+
using token = decltype(::beman::execution::get_stop_token(::beman::execution::get_env(this->receiver)));
60+
if constexpr (not ::beman::execution::unstoppable_token<token>) {
61+
if (::beman::execution::get_stop_token(::beman::execution::get_env(this->receiver)).stop_requested())
62+
::beman::execution::set_stopped(::std::move(this->receiver));
63+
else
64+
::beman::execution::set_value(::std::move(this->receiver));
65+
} else
6866
::beman::execution::set_value(::std::move(this->receiver));
6967
}
7068
};
7169
struct sender {
7270
using sender_concept = ::beman::execution::sender_t;
73-
using completion_signatures =
74-
::beman::execution::completion_signatures<::beman::execution::set_value_t(),
75-
::beman::execution::set_error_t(::std::exception_ptr),
76-
::beman::execution::set_stopped_t()>;
71+
template <typename Env = ::beman::execution::env<>>
72+
auto get_completion_signatures(Env&& env) const noexcept {
73+
if constexpr (::beman::execution::unstoppable_token<decltype(::beman::execution::get_stop_token(env))>)
74+
return ::beman::execution::completion_signatures<::beman::execution::set_value_t()>{};
75+
else
76+
return ::beman::execution::completion_signatures<::beman::execution::set_value_t(),
77+
::beman::execution::set_stopped_t()>{};
78+
}
7779

7880
run_loop* loop;
7981

@@ -100,7 +102,8 @@ class run_loop {
100102
opstate_base* front{};
101103
opstate_base* back{};
102104

103-
auto push_back(opstate_base* item) -> void {
105+
auto push_back(opstate_base* item) noexcept -> void {
106+
//-dk:TODO run_loop::push_back should really be lock-free
104107
::std::lock_guard guard(this->mutex);
105108
if (auto previous_back{::std::exchange(this->back, item)}) {
106109
previous_back->next = item;
@@ -109,7 +112,8 @@ class run_loop {
109112
this->condition.notify_one();
110113
}
111114
}
112-
auto pop_front() -> opstate_base* {
115+
auto pop_front() noexcept -> opstate_base* {
116+
//-dk:TODO run_loop::pop_front should really be lock-free
113117
::std::unique_lock guard(this->mutex);
114118
this->condition.wait(guard, [this] { return this->front || this->current_state == state::finishing; });
115119
if (this->front == this->back)
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
// include/beman/execution/detail/sender_has_affine_on.hpp -*-C++-*-
2+
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
3+
4+
#ifndef INCLUDED_INCLUDE_BEMAN_EXECUTION_DETAIL_SENDER_HAS_AFFINE_ON
5+
#define INCLUDED_INCLUDE_BEMAN_EXECUTION_DETAIL_SENDER_HAS_AFFINE_ON
6+
7+
#include <beman/execution/detail/sender.hpp>
8+
#include <utility>
9+
#include <type_traits>
10+
11+
// ----------------------------------------------------------------------------
12+
13+
namespace beman::execution::detail {
14+
template <typename Sender, typename Env>
15+
concept sender_has_affine_on =
16+
beman::execution::sender<::std::remove_cvref_t<Sender>> && requires(Sender&& sndr, const Env& env) {
17+
sndr.template get<0>();
18+
{ sndr.template get<0>().affine_on(std::forward<Sender>(sndr), env) } -> ::beman::execution::sender;
19+
};
20+
} // namespace beman::execution::detail
21+
22+
// ----------------------------------------------------------------------------
23+
24+
#endif

include/beman/execution/detail/then.hpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#include <beman/execution/detail/meta_combine.hpp>
1717
#include <beman/execution/detail/meta_unique.hpp>
1818
#include <beman/execution/detail/movable_value.hpp>
19+
#include <beman/execution/detail/nested_sender_has_affine_on.hpp>
1920
#include <beman/execution/detail/sender.hpp>
2021
#include <beman/execution/detail/sender_adaptor.hpp>
2122
#include <beman/execution/detail/sender_adaptor_closure.hpp>
@@ -46,6 +47,11 @@ struct then_t : ::beman::execution::sender_adaptor_closure<then_t<Completion>> {
4647
domain,
4748
::beman::execution::detail::make_sender(*this, ::std::forward<Fun>(fun), ::std::forward<Sender>(sender)));
4849
}
50+
template <::beman::execution::sender Sender, typename Env>
51+
requires ::beman::execution::detail::nested_sender_has_affine_on<Sender, Env>
52+
static auto affine_on(Sender&& sndr, const Env&) noexcept {
53+
return ::std::forward<Sender>(sndr);
54+
}
4955
};
5056

5157
template <typename Completion>

0 commit comments

Comments
 (0)