Skip to content

Commit a3f3195

Browse files
committed
Add stdexec::__stop_when
This diff defines `stdexec::__stop_when` as the implementation of _`stop-when`_ and adds tests to validate the algorithm.
1 parent 5a37d11 commit a3f3195

File tree

3 files changed

+297
-0
lines changed

3 files changed

+297
-0
lines changed
Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
/*
2+
* Copyright (c) 2025 Ian Petersen
3+
* Copyright (c) 2025 NVIDIA Corporation
4+
*
5+
* Licensed under the Apache License Version 2.0 with LLVM Exceptions
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* https://llvm.org/LICENSE.txt
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
#pragma once
18+
19+
#include "__execution_fwd.hpp"
20+
21+
#include "__basic_sender.hpp"
22+
23+
namespace stdexec {
24+
/////////////////////////////////////////////////////////////////////////////
25+
// [exec.stop.when]
26+
namespace __stop_when_ {
27+
28+
////////////////////////////////////////////////////////////////////////////////////////////////
29+
struct __stop_when_t {
30+
template <sender _Sender, unstoppable_token _Token>
31+
constexpr _Sender&& operator()(_Sender&& __sndr, _Token&&) const noexcept {
32+
return static_cast<_Sender&&>(__sndr);
33+
}
34+
35+
template <sender _Sender, stoppable_token _Token>
36+
constexpr auto operator()(_Sender&& __sndr, _Token&& __token) const noexcept(
37+
__nothrow_constructible_from<std::remove_cvref_t<_Sender>, _Sender>
38+
&& __nothrow_constructible_from<std::remove_cvref_t<_Token>, _Token>) {
39+
return __make_sexpr<__stop_when_t>(
40+
static_cast<_Token&&>(__token), static_cast<_Sender&&>(__sndr));
41+
}
42+
};
43+
44+
struct __stop_when_impl : __sexpr_defaults {
45+
static constexpr auto get_completion_signatures =
46+
[]<class _Sender, class... _Env>(_Sender&&, _Env&&...) noexcept
47+
-> __completion_signatures_of_t<__child_of<_Sender>, _Env...> {
48+
static_assert(sender_expr_for<_Sender, __stop_when_t>);
49+
return {};
50+
};
51+
52+
static constexpr auto get_env =
53+
[](__ignore, const auto& __state, const auto& __rcvr) noexcept {
54+
return __env::__join(prop(get_stop_token, __state), stdexec::get_env(__rcvr));
55+
};
56+
57+
template <stoppable_token _Token1, stoppable_token _Token2>
58+
struct __fused_token {
59+
_Token1 __tkn1_;
60+
_Token2 __tkn2_;
61+
62+
friend constexpr bool operator==(const __fused_token&, const __fused_token&) = default;
63+
64+
bool stop_requested() const noexcept {
65+
return __tkn1_.stop_requested() || __tkn2_.stop_requested();
66+
}
67+
68+
bool stop_possible() const noexcept {
69+
return __tkn1_.stop_possible() || __tkn2_.stop_possible();
70+
}
71+
72+
template <class _Fn>
73+
struct callback_type {
74+
struct __cb {
75+
callback_type* self;
76+
77+
void operator()() noexcept {
78+
(*self)();
79+
}
80+
};
81+
82+
using __cb1_t = _Token1::template callback_type<__cb>;
83+
using __cb2_t = _Token2::template callback_type<__cb>;
84+
85+
_Fn __fn_;
86+
[[no_unique_address]]
87+
std::atomic<bool> __called_{false};
88+
__cb1_t __cb1_;
89+
__cb2_t __cb2_;
90+
91+
template <class _C>
92+
requires constructible_from<_Fn, _C>
93+
explicit callback_type(__fused_token&& __ftkn, _C&& __fn)
94+
noexcept(__nothrow_constructible_from<_Fn, _C>)
95+
: __fn_(static_cast<_C&&>(__fn))
96+
, __cb1_(std::move(__ftkn.__tkn1_), __cb(this))
97+
, __cb2_(std::move(__ftkn.__tkn2_), __cb(this)) {
98+
}
99+
100+
template <class _C>
101+
requires constructible_from<_Fn, _C>
102+
explicit callback_type(const __fused_token& __ftkn, _C&& __fn)
103+
noexcept(__nothrow_constructible_from<_Fn, _C>)
104+
: __fn_(static_cast<_C&&>(__fn))
105+
, __cb1_(__ftkn.__tkn1_, __cb(this))
106+
, __cb2_(__ftkn.__tkn2_, __cb(this)) {
107+
}
108+
109+
callback_type(callback_type&&) = delete;
110+
111+
private:
112+
void operator()() noexcept {
113+
if (!__called_.exchange(true, std::memory_order_relaxed)) {
114+
__fn_();
115+
}
116+
}
117+
};
118+
};
119+
120+
struct __make_token_fn {
121+
template <stoppable_token _SenderToken, unstoppable_token _ReceiverToken>
122+
std::remove_cvref_t<_SenderToken>
123+
operator()(_SenderToken&& __sndrToken, _ReceiverToken&&) noexcept {
124+
// the stoppable_token concept requires that stop tokens be no-throw copyable...
125+
static_assert(noexcept(
126+
__nothrow_constructible_from<std::remove_cvref_t<_SenderToken>, _SenderToken>));
127+
128+
// when the receiver's stop token is unstoppable, the net token is just
129+
// the sender's captured token
130+
return static_cast<_SenderToken&&>(__sndrToken);
131+
}
132+
133+
template <stoppable_token _SenderToken, stoppable_token _ReceiverToken>
134+
__fused_token<std::remove_cvref_t<_SenderToken>, std::remove_cvref_t<_ReceiverToken>>
135+
operator()(_SenderToken&& __sndrToken, _ReceiverToken&& __rcvrToken) noexcept {
136+
// the stoppable_token concept requires that stop tokens be no-throw copyable...
137+
static_assert(noexcept(
138+
__nothrow_constructible_from<std::remove_cvref_t<_SenderToken>, _SenderToken>
139+
&& __nothrow_constructible_from<std::remove_cvref_t<_ReceiverToken>, _ReceiverToken>));
140+
141+
// when the receiver's stop token is stoppable, the net token must be
142+
// a fused token that responds to signals from both the sender's captured
143+
// token and the receiver's token
144+
return {
145+
static_cast<_SenderToken&&>(__sndrToken), static_cast<_ReceiverToken&&>(__rcvrToken)};
146+
}
147+
};
148+
149+
static constexpr auto get_state =
150+
[]<class _Self, class _Receiver>(_Self&& __self, _Receiver& __rcvr) noexcept {
151+
return __sexpr_apply(
152+
static_cast<_Self&&>(__self), [&__rcvr](__ignore, auto&& token, __ignore) noexcept {
153+
return __make_token_fn{}(
154+
static_cast<decltype(token)>(token), get_stop_token(stdexec::get_env(__rcvr)));
155+
});
156+
};
157+
};
158+
} // namespace __stop_when_
159+
160+
using __stop_when_::__stop_when_t;
161+
162+
/// @brief The stop-when sender adaptor, which fuses an additional stop token
163+
/// into its child sender such that the sender responds to stop
164+
/// requests from both the given stop token and the receiver's token
165+
/// @hideinitializer
166+
inline constexpr __stop_when_t __stop_when{};
167+
168+
template <>
169+
struct __sexpr_impl<__stop_when_t> : __stop_when_::__stop_when_impl { };
170+
} // namespace stdexec

test/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ set(stdexec_test_sources
5959
stdexec/algos/adaptors/test_stopped_as_error.cpp
6060
stdexec/algos/adaptors/test_ensure_started.cpp
6161
stdexec/algos/adaptors/test_write_env.cpp
62+
stdexec/algos/adaptors/test_stop_when.cpp
6263
stdexec/algos/consumers/test_start_detached.cpp
6364
stdexec/algos/consumers/test_sync_wait.cpp
6465
stdexec/algos/other/test_execute.cpp
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
/*
2+
* Copyright (c) 2025 Ian Petersen
3+
* Copyright (c) 2025 NVIDIA Corporation
4+
*
5+
* Licensed under the Apache License Version 2.0 with LLVM Exceptions
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* https://llvm.org/LICENSE.txt
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
#include <catch2/catch.hpp>
19+
#include <stdexec/execution.hpp>
20+
#include <stdexec/__detail/__stop_when.hpp>
21+
#include <test_common/receivers.hpp>
22+
23+
namespace ex = stdexec;
24+
25+
namespace {
26+
TEST_CASE("stop-when of an unstoppable token is the identity", "[adaptors][stop-when]") {
27+
auto snd = ex::just(42);
28+
auto checkIdentity = [](auto&& snd) {
29+
auto&& result = ex::__stop_when(std::forward<decltype(snd)>(snd), ex::never_stop_token{});
30+
31+
REQUIRE(&snd == &result);
32+
};
33+
34+
checkIdentity(snd);
35+
checkIdentity(std::as_const(snd));
36+
checkIdentity(std::move(snd));
37+
}
38+
39+
TEST_CASE("stop-when(just(), token) returns a sender", "[adaptors][stop-when]") {
40+
ex::inplace_stop_source source;
41+
auto snd = ex::__stop_when(ex::just(), source.get_token());
42+
STATIC_REQUIRE(ex::sender<decltype(snd)>);
43+
wait_for_value(snd);
44+
}
45+
46+
auto isTokenStoppable() {
47+
return ex::read_env(ex::get_stop_token)
48+
| ex::then([](auto token) noexcept { return token.stop_possible(); });
49+
}
50+
51+
TEST_CASE(
52+
"stop-when substitutes its token when the receiver's token is unstoppable",
53+
"[adaptors][stop-when]") {
54+
55+
// check that, by default, wait_for_value provides an unstoppable stop token
56+
wait_for_value(isTokenStoppable(), false);
57+
58+
// now, check that stop-when mixes in a stoppable token
59+
60+
ex::inplace_stop_source source;
61+
62+
REQUIRE(source.get_token().stop_possible());
63+
64+
wait_for_value(ex::__stop_when(isTokenStoppable(), source.get_token()), true);
65+
}
66+
67+
TEST_CASE(
68+
"stop-when fuses its token with the receiver's when both are stoppable",
69+
"[adaptors][stop-when]") {
70+
ex::inplace_stop_source source;
71+
wait_for_value(
72+
ex::__stop_when(ex::__stop_when(isTokenStoppable(), source.get_token()), source.get_token()),
73+
true);
74+
}
75+
76+
template <std::invocable Fn>
77+
ex::sender auto make_stop_callback(Fn&& fn) noexcept {
78+
return ex::read_env(ex::get_stop_token)
79+
| ex::then([fn = std::forward<Fn>(fn)](auto token) mutable noexcept {
80+
using cb_t = decltype(token)::template callback_type<std::remove_cvref_t<Fn>>;
81+
return std::optional<cb_t>(std::in_place, std::move(token), std::move(fn));
82+
});
83+
}
84+
85+
TEST_CASE("callbacks registered with stop-when's token can be invoked", "[adaptors][stop-when]") {
86+
int invokeCount = 0;
87+
auto snd = make_stop_callback([&invokeCount]() noexcept { invokeCount++; });
88+
89+
{
90+
ex::inplace_stop_source source;
91+
wait_for_value(
92+
snd | ex::then([&](auto&& optCallback) noexcept {
93+
source.request_stop();
94+
optCallback.reset();
95+
return invokeCount;
96+
}),
97+
0);
98+
}
99+
100+
{
101+
ex::inplace_stop_source source;
102+
103+
wait_for_value(
104+
ex::__stop_when(snd, source.get_token()) | ex::then([&](auto&& optCallback) noexcept {
105+
source.request_stop();
106+
optCallback.reset();
107+
return invokeCount;
108+
}),
109+
1);
110+
}
111+
112+
{
113+
ex::inplace_stop_source source1;
114+
ex::inplace_stop_source source2;
115+
116+
wait_for_value(
117+
ex::__stop_when(snd, source2.get_token()) | ex::then([&](auto&& optCallback) noexcept {
118+
source1.request_stop();
119+
source2.request_stop();
120+
optCallback.reset();
121+
return invokeCount;
122+
}) | ex::write_env(ex::prop(ex::get_stop_token, source1.get_token())),
123+
2);
124+
}
125+
}
126+
} // namespace

0 commit comments

Comments
 (0)