Skip to content

Commit 96d4661

Browse files
committed
Add stdexec::associate
This diff defines `stdexec::associate` and adds some initial tests to confirm it works properly. Still a work in progress.
1 parent 91c4158 commit 96d4661

File tree

5 files changed

+521
-1
lines changed

5 files changed

+521
-1
lines changed
Lines changed: 222 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,222 @@
1+
/*
2+
* Copyright (c) 2025 Ian Petersen
3+
* Copyright (c) 2025 NVIDIA Corporation
4+
*
5+
* Licensed under the Apache License Version 2.0 with LLVM Exceptions
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* https://llvm.org/LICENSE.txt
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
#pragma once
18+
19+
#include "__execution_fwd.hpp"
20+
21+
#include "__basic_sender.hpp"
22+
#include "__concepts.hpp"
23+
#include "__diagnostics.hpp"
24+
#include "__queries.hpp"
25+
#include "__scope_concepts.hpp"
26+
#include "__senders.hpp"
27+
#include "__sender_adaptor_closure.hpp"
28+
29+
namespace stdexec {
30+
/////////////////////////////////////////////////////////////////////////////
31+
// [exec.associate]
32+
namespace __associate {
33+
template <scope_token _Token, sender _Sender>
34+
struct __associate_data {
35+
using __wrap_result_t = decltype(__declval<_Token&>().wrap(__declval<_Sender>()));
36+
using __wrap_sender_t = std::remove_cvref_t<__wrap_result_t>;
37+
38+
using __assoc_t = decltype(__declval<_Token&>().try_associate());
39+
40+
using __sender_ref =
41+
std::unique_ptr<__wrap_sender_t, decltype([](auto* p) noexcept { std::destroy_at(p); })>;
42+
43+
// BUGBUG: should the spec require __token to be declared as a const _Token, or should this be
44+
// changed to declare __token as a mutable _Token?
45+
explicit __associate_data(const _Token __token, _Sender&& __sndr) noexcept(
46+
__nothrow_constructible_from<__wrap_sender_t, __wrap_result_t>
47+
&& noexcept(__token.wrap(static_cast<_Sender&&>(__sndr)))
48+
&& noexcept(__token.try_associate()))
49+
: __sndr_(__token.wrap(static_cast<_Sender&&>(__sndr)))
50+
, __assoc_([&] {
51+
__sender_ref guard{std::addressof(__sndr_)};
52+
53+
auto assoc = __token.try_associate();
54+
55+
if (assoc) {
56+
(void) guard.release();
57+
}
58+
59+
return assoc;
60+
}()) {
61+
}
62+
63+
__associate_data(const __associate_data& __other) noexcept(
64+
__nothrow_copy_constructible<__wrap_sender_t> && noexcept(__other.__assoc_.try_associate()))
65+
requires copy_constructible<__wrap_sender_t>
66+
: __assoc_(__other.__assoc_.try_associate()) {
67+
if (__assoc_) {
68+
std::construct_at(&__sndr_, __other.__sndr_);
69+
}
70+
}
71+
72+
__associate_data(__associate_data&& __other)
73+
noexcept(__nothrow_move_constructible<__wrap_sender_t>)
74+
: __associate_data(std::move(__other).release()) {
75+
}
76+
77+
~__associate_data() {
78+
if (__assoc_) {
79+
std::destroy_at(&__sndr_);
80+
}
81+
}
82+
83+
std::pair<__assoc_t, __sender_ref> release() && noexcept {
84+
__sender_ref u(__assoc_ ? std::addressof(__sndr_) : nullptr);
85+
return {std::move(__assoc_), std::move(u)};
86+
}
87+
88+
private:
89+
__associate_data(std::pair<__assoc_t, __sender_ref> __parts)
90+
: __assoc_(std::move(__parts.first)) {
91+
if (__assoc_) {
92+
std::construct_at(&__sndr_, std::move(*__parts.second));
93+
}
94+
}
95+
96+
union {
97+
__wrap_sender_t __sndr_;
98+
};
99+
__assoc_t __assoc_;
100+
};
101+
102+
template <scope_token _Token, sender _Sender>
103+
__associate_data(_Token, _Sender&&) -> __associate_data<_Token, _Sender>;
104+
105+
////////////////////////////////////////////////////////////////////////////////////////////////
106+
struct associate_t {
107+
template <sender _Sender, scope_token _Token>
108+
auto operator()(_Sender&& __sndr, _Token&& __token) const
109+
noexcept(__nothrow_constructible_from<
110+
__associate_data<std::remove_cvref_t<_Token>, _Sender>,
111+
_Token,
112+
_Sender
113+
>) -> __well_formed_sender auto {
114+
return __make_sexpr<associate_t>(
115+
__associate_data(static_cast<_Token&&>(__token), static_cast<_Sender&&>(__sndr)));
116+
}
117+
118+
template <scope_token _Token>
119+
STDEXEC_ATTRIBUTE(always_inline)
120+
auto operator()(_Token&& __token) const noexcept {
121+
return __closure(*this, static_cast<_Token&&>(__token));
122+
}
123+
};
124+
125+
template <class _Sender, class _Receiver>
126+
struct op_state {
127+
using associate_data_t = std::remove_cvref_t<__data_of<_Sender>>;
128+
using assoc_t = associate_data_t::__assoc_t;
129+
using sender_ref_t = associate_data_t::__sender_ref;
130+
131+
using op_t = connect_result_t<typename sender_ref_t::element_type, _Receiver>;
132+
133+
assoc_t __assoc_;
134+
union {
135+
_Receiver* __rcvr_;
136+
op_t __op_;
137+
};
138+
139+
explicit op_state(std::pair<assoc_t, sender_ref_t> parts, _Receiver& r)
140+
: __assoc_(std::move(parts.first)) {
141+
if (__assoc_) {
142+
::new ((void*) std::addressof(__op_))
143+
op_t(connect(std::move(*parts.second), std::move(r)));
144+
} else {
145+
__rcvr_ = std::addressof(r);
146+
}
147+
}
148+
149+
explicit op_state(associate_data_t&& ad, _Receiver& r)
150+
: op_state(std::move(ad).release(), r) {
151+
}
152+
153+
explicit op_state(const associate_data_t& ad, _Receiver& r)
154+
requires copy_constructible<associate_data_t>
155+
: op_state(associate_data_t(ad).release(), r) {
156+
}
157+
158+
~op_state() {
159+
if (__assoc_) {
160+
std::destroy_at(&__op_);
161+
}
162+
}
163+
164+
void __run() noexcept {
165+
if (__assoc_) {
166+
stdexec::start(__op_);
167+
} else {
168+
stdexec::set_stopped(std::move(*__rcvr_));
169+
}
170+
}
171+
};
172+
173+
struct __associate_impl : __sexpr_defaults {
174+
static constexpr auto get_attrs = []<class _Child>(__ignore, const _Child& __child) noexcept {
175+
return __sync_attrs{__child};
176+
};
177+
178+
template <class _Sender>
179+
using __wrap_sender_of_t =
180+
__copy_cvref_t<_Sender, typename __data_of<std::remove_cvref_t<_Sender>>::__wrap_sender_t>;
181+
182+
static constexpr auto get_completion_signatures =
183+
[]<class _Sender, class... _Env>(_Sender&&, _Env&&...) noexcept
184+
-> transform_completion_signatures<
185+
__completion_signatures_of_t<__wrap_sender_of_t<_Sender>>,
186+
completion_signatures<set_stopped_t()>
187+
> {
188+
static_assert(sender_expr_for<_Sender, associate_t>);
189+
return {};
190+
};
191+
192+
static constexpr auto get_state =
193+
[]<class _Self, class _Receiver>(_Self&& __self, _Receiver& __rcvr) noexcept(
194+
(same_as<_Self, std::remove_cvref_t<_Self>>
195+
|| __nothrow_constructible_from<std::remove_cvref_t<_Self>, _Self>) &&
196+
__nothrow_callable<
197+
connect_t,
198+
typename std::remove_cvref_t<__data_of<_Self>>::__wrap_sender_t,
199+
_Receiver
200+
>) {
201+
auto&& [_, data] = std::forward<_Self>(__self);
202+
203+
using op_state_t = op_state<std::remove_cvref_t<_Self>, _Receiver>;
204+
return op_state_t{__forward_like<_Self>(data), __rcvr};
205+
};
206+
207+
static constexpr auto start = [](auto& __state, auto&) noexcept -> void {
208+
__state.__run();
209+
};
210+
};
211+
} // namespace __associate
212+
213+
using __associate::associate_t;
214+
215+
/// @brief The associate sender adaptor, which associates a sender with the
216+
/// async scope referred to by the given token
217+
/// @hideinitializer
218+
inline constexpr associate_t associate{};
219+
220+
template <>
221+
struct __sexpr_impl<associate_t> : __associate::__associate_impl { };
222+
} // namespace stdexec

include/stdexec/__detail/__sender_adaptor_closure.hpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,8 @@ namespace stdexec {
6969

7070
template <sender _Sender, __sender_adaptor_closure_for<_Sender> _Closure>
7171
STDEXEC_ATTRIBUTE(always_inline)
72-
auto operator|(_Sender&& __sndr, _Closure&& __clsur) -> __call_result_t<_Closure, _Sender> {
72+
auto operator|(_Sender&& __sndr, _Closure&& __clsur)
73+
noexcept(__nothrow_callable<_Closure, _Sender>) -> __call_result_t<_Closure, _Sender> {
7374
return static_cast<_Closure&&>(__clsur)(static_cast<_Sender&&>(__sndr));
7475
}
7576

include/stdexec/execution.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
// include these after __execution_fwd.hpp
2121
#include "__detail/__as_awaitable.hpp" // IWYU pragma: export
22+
#include "__detail/__associate.hpp" // IWYU pragma: export
2223
#include "__detail/__basic_sender.hpp" // IWYU pragma: export
2324
#include "__detail/__bulk.hpp" // IWYU pragma: export
2425
#include "__detail/__completion_signatures.hpp" // IWYU pragma: export

test/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ set(stdexec_test_sources
3838
stdexec/algos/factories/test_just_stopped.cpp
3939
stdexec/algos/factories/test_read.cpp
4040
stdexec/algos/factories/test_schedule.cpp
41+
stdexec/algos/adaptors/test_associate.cpp
4142
stdexec/algos/adaptors/test_starts_on.cpp
4243
stdexec/algos/adaptors/test_on.cpp
4344
stdexec/algos/adaptors/test_on2.cpp

0 commit comments

Comments
 (0)