Skip to content

Commit 263c854

Browse files
committed
Improve handling of type-erased receiver environment.
Instead of having a dedicated parameter of type `env`, the receiver environment is now exposed thorough `receiver`. We don't need to store any extra data in the operation state, and the receiver can be queried at any time for receiver environment properties.
1 parent d174c7e commit 263c854

File tree

5 files changed

+120
-152
lines changed

5 files changed

+120
-152
lines changed

include/exec/__detail/__system_context_default_impl.hpp

Lines changed: 16 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ namespace exec::__system_context_default_impl {
3030
using system_context_replaceability::receiver;
3131
using system_context_replaceability::bulk_item_receiver;
3232
using system_context_replaceability::storage;
33-
using system_context_replaceability::env;
3433
using system_context_replaceability::system_scheduler;
3534
using system_context_replaceability::__system_context_backend_factory;
3635

@@ -45,20 +44,18 @@ namespace exec::__system_context_default_impl {
4544
- __recv::__r_ (receiver*) -- 8
4645
- __recv::__op_ (__operation*) -- 8
4746
- __operation::__inner_op_ (stdexec::connect_result_t<_Sender, __recv<_Sender>>) -- 56 (when connected with an empty receiver)
48-
- __operation::__stop_token_ (inplace_stop_token) -- 8
4947
- __operation::__on_heap_ (bool) -- optimized away
5048
---------------------
51-
Total: 80; extra 24 bytes compared to internal operation state.
49+
Total: 72; extra 16 bytes compared to internal operation state.
5250
5351
extra for bulk:
5452
- __recv::__r_ (receiver*) -- 8
5553
- __recv::__op_ (__operation*) -- 8
5654
- __operation::__inner_op_ (stdexec::connect_result_t<_Sender, __recv<_Sender>>) -- 128 (when connected with an empty receiver & fun)
57-
- __operation::__stop_token_ (inplace_stop_token) -- 8
5855
- __operation::__on_heap_ (bool) -- optimized away
5956
- __bulk_functor::__r_ (bulk_item_receiver*) - 8
6057
---------------------
61-
Total: 160; extra 32 bytes compared to internal operation state.
58+
Total: 152; extra 24 bytes compared to internal operation state.
6259
6360
Using libdispatch backend, the operation sizes are 48 (down from 80) and 128 (down from 160).
6461
@@ -100,11 +97,9 @@ namespace exec::__system_context_default_impl {
10097
}
10198

10299
decltype(auto) get_env() const noexcept {
103-
return stdexec::prop{stdexec::get_stop_token, __get_stop_token()};
104-
}
105-
106-
stdexec::inplace_stop_token __get_stop_token() const noexcept {
107-
return __op_->__stop_token_;
100+
auto __o = __r_->try_query<stdexec::inplace_stop_token>();
101+
stdexec::inplace_stop_token __st = __o ? *__o : stdexec::inplace_stop_token{};
102+
return stdexec::prop{stdexec::get_stop_token, __st};
108103
}
109104
};
110105

@@ -127,22 +122,17 @@ namespace exec::__system_context_default_impl {
127122
struct __operation {
128123
/// The inner operation state, that results out of connecting the underlying sender with the receiver.
129124
stdexec::connect_result_t<_Sender, __recv<_Sender>> __inner_op_;
130-
/// The stop token sent to the operation throught dynamic environment.
131-
stdexec::inplace_stop_token __stop_token_;
132125
/// True if the operation is on the heap, false if it is in the preallocated space.
133126
bool __on_heap_;
134127

135128
/// Try to construct the operation in the preallocated memory if it fits, otherwise allocate a new operation.
136-
static __operation* __construct_maybe_alloc(
137-
storage __storage,
138-
receiver* __completion,
139-
_Sender __sndr,
140-
stdexec::inplace_stop_token __st) {
129+
static __operation*
130+
__construct_maybe_alloc(storage __storage, receiver* __completion, _Sender __sndr) {
141131
__storage = __ensure_alignment(__storage, alignof(__operation));
142132
if (__storage.__data == nullptr || __storage.__size < sizeof(__operation)) {
143-
return new __operation(std::move(__sndr), __completion, __st, true);
133+
return new __operation(std::move(__sndr), __completion, true);
144134
} else {
145-
return new (__storage.__data) __operation(std::move(__sndr), __completion, __st, false);
135+
return new (__storage.__data) __operation(std::move(__sndr), __completion, false);
146136
}
147137
}
148138

@@ -161,13 +151,8 @@ namespace exec::__system_context_default_impl {
161151
}
162152

163153
private:
164-
__operation(
165-
_Sender __sndr,
166-
receiver* __completion,
167-
stdexec::inplace_stop_token __st,
168-
bool __on_heap)
154+
__operation(_Sender __sndr, receiver* __completion, bool __on_heap)
169155
: __inner_op_(stdexec::connect(std::move(__sndr), __recv<_Sender>{__completion, this}))
170-
, __stop_token_(std::move(__st))
171156
, __on_heap_(__on_heap) {
172157
}
173158
};
@@ -202,29 +187,25 @@ namespace exec::__system_context_default_impl {
202187
std::declval<__bulk_functor>()))>;
203188

204189
public:
205-
void schedule(storage __storage, receiver* __r, env __e) noexcept override {
190+
void schedule(storage __storage, receiver* __r) noexcept override {
206191
try {
207-
auto __o = __e.template try_query<stdexec::inplace_stop_token>();
208-
auto __st = __o ? *__o : stdexec::inplace_stop_token{};
209192
auto __sndr = stdexec::schedule(__pool_scheduler_);
210-
auto __os = __schedule_operation_t::__construct_maybe_alloc(
211-
__storage, __r, std::move(__sndr), std::move(__st));
193+
auto __os =
194+
__schedule_operation_t::__construct_maybe_alloc(__storage, __r, std::move(__sndr));
212195
__os->start();
213196
} catch (std::exception& __e) {
214197
__r->set_error(std::current_exception());
215198
}
216199
}
217200

218201
void
219-
bulk_schedule(uint32_t __size, storage __storage, bulk_item_receiver* __r, env __e) noexcept
202+
bulk_schedule(uint32_t __size, storage __storage, bulk_item_receiver* __r) noexcept
220203
override {
221204
try {
222-
auto __o = __e.template try_query<stdexec::inplace_stop_token>();
223-
auto __st = __o ? *__o : stdexec::inplace_stop_token{};
224205
auto __sndr =
225206
stdexec::bulk(stdexec::schedule(__pool_scheduler_), __size, __bulk_functor{__r});
226-
auto __os = __bulk_schedule_operation_t::__construct_maybe_alloc(
227-
__storage, __r, std::move(__sndr), std::move(__st));
207+
auto __os =
208+
__bulk_schedule_operation_t::__construct_maybe_alloc(__storage, __r, std::move(__sndr));
228209
__os->start();
229210
} catch (std::exception& __e) {
230211
__r->set_error(std::current_exception());

include/exec/__detail/__system_context_replaceability_api.hpp

Lines changed: 21 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -78,12 +78,29 @@ namespace exec::system_context_replaceability {
7878
struct receiver {
7979
virtual ~receiver() = default;
8080

81+
protected:
82+
virtual bool __query_env(__uuid, void*) noexcept = 0;
83+
84+
public:
8185
/// Called when the system scheduler completes successfully.
8286
virtual void set_value() noexcept = 0;
8387
/// Called when the system scheduler completes with an error.
8488
virtual void set_error(std::exception_ptr) noexcept = 0;
8589
/// Called when the system scheduler was stopped.
8690
virtual void set_stopped() noexcept = 0;
91+
92+
/// Query the receiver for a property of type `_P`.
93+
template <typename _P>
94+
std::optional<std::decay_t<_P>> try_query() noexcept {
95+
if constexpr (__runtime_property<_P>) {
96+
std::decay_t<_P> __p;
97+
bool __success =
98+
__query_env(__runtime_property_helper<std::decay_t<_P>>::__property_identifier, &__p);
99+
return __success ? std::make_optional(std::move(__p)) : std::nullopt;
100+
} else {
101+
return std::nullopt;
102+
}
103+
}
87104
};
88105

89106
/// Receiver for bulk sheduling operations.
@@ -99,82 +116,16 @@ namespace exec::system_context_replaceability {
99116
uint32_t __size;
100117
};
101118

102-
struct env {
103-
/// Query the system context for a property of type `_P`.
104-
template <typename _P>
105-
std::optional<_P> try_query() noexcept {
106-
if constexpr (__runtime_property<_P>) {
107-
const void* __r = __query_(__data_, __runtime_property_helper<_P>::__property_identifier);
108-
return __r ? std::make_optional(*static_cast<const _P*>(__r)) : std::nullopt;
109-
} else {
110-
return std::nullopt;
111-
}
112-
}
113-
114-
// IMPLEMENTATION DETAIL: used by the frontend with no proerties.
115-
explicit env() noexcept
116-
: __data_{nullptr}
117-
, __query_{&__query_none} {
118-
}
119-
120-
// IMPLEMENTATION DETAIL: used by the frontend with a single property.
121-
template <__runtime_property _P>
122-
explicit env(const _P& __p) noexcept
123-
: __data_{std::addressof(__p)}
124-
, __query_{&__query_single<_P>} {
125-
}
126-
127-
// IMPLEMENTATION DETAIL: used by the frontend with multiple properties.
128-
template <__runtime_property... _Ps>
129-
explicit env(const std::tuple<_Ps...>& __properties) noexcept
130-
: __data_{std::addressof(__properties)}
131-
, __query_{&__query_multi<_Ps...>} {
132-
}
133-
134-
135-
private:
136-
/// The source data containing all the properties.
137-
const void* __data_;
138-
/// Function called by the backend to query for a specific property.
139-
const void* (*__query_)(const void*, __uuid) noexcept;
140-
141-
static const void* __query_none(const void* __data, __uuid __id) noexcept {
142-
return nullptr;
143-
}
144-
145-
template <__runtime_property _P>
146-
static const void* __query_single(const void* __data, __uuid __id) noexcept {
147-
if (__id == __runtime_property_helper<_P>::__property_identifier)
148-
return __data;
149-
return nullptr;
150-
}
151-
152-
template <__runtime_property _P, typename _Tuple>
153-
static uintptr_t __select_property_as_int(const _Tuple& __tuple, __uuid __id) noexcept {
154-
if (__id == __runtime_property_helper<_P>::__property_identifier)
155-
return reinterpret_cast<uintptr_t>(std::addressof(std::get<_P>(__tuple)));
156-
return 0;
157-
}
158-
159-
template <__runtime_property... _Ps>
160-
static const void* __query_multi(const void* __data, __uuid __id) noexcept {
161-
const std::tuple<_Ps...>& __properties = *static_cast<const std::tuple<_Ps...>*>(__data);
162-
uintptr_t __result = (... + __select_property_as_int<_Ps>(__properties, __id));
163-
return reinterpret_cast<const void*>(__result);
164-
}
165-
};
166-
167119
/// Interface for the system scheduler
168120
struct system_scheduler {
169121
static constexpr __uuid __interface_identifier{0x5ee9202498c4bd4f, 0xa1df2508ffcd9d7e};
170122

171123
virtual ~system_scheduler() = default;
172124

173-
/// Schedule work on system scheduler, calling `__r` when done and using `__s` for preallocated memory, using `__e` for environment.
174-
virtual void schedule(storage __s, receiver* __r, env __e) noexcept = 0;
175-
/// Schedule bulk work of size `__n` on system scheduler, calling `__r` for each item and then when done, and using `__s` for preallocated memory, using `__e` for environment.
176-
virtual void
177-
bulk_schedule(uint32_t __n, storage __s, bulk_item_receiver* __r, env __e) noexcept = 0;
125+
/// Schedule work on system scheduler, calling `__r` when done and using `__s` for preallocated memory.
126+
virtual void schedule(storage __s, receiver* __r) noexcept = 0;
127+
/// Schedule bulk work of size `__n` on system scheduler, calling `__r` for each item and then when done, and using `__s` for preallocated memory.
128+
virtual void bulk_schedule(uint32_t __n, storage __s, bulk_item_receiver* __r) noexcept = 0;
178129
};
179130

180131
} // namespace exec::system_context_replaceability

include/exec/system_context.hpp

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,20 @@ namespace exec {
4444
: __rcvr_{std::forward<_Rcvr>(__rcvr)} {
4545
}
4646

47+
bool __query_env(__uuid __id, void* __dest) noexcept override {
48+
using system_context_replaceability::__runtime_property_helper;
49+
using __StopToken = decltype(stdexec::get_stop_token(stdexec::get_env(__rcvr_)));
50+
if constexpr (std::is_same_v<stdexec::inplace_stop_token, __StopToken>) {
51+
if (
52+
__id == __runtime_property_helper<stdexec::inplace_stop_token>::__property_identifier) {
53+
*static_cast<stdexec::inplace_stop_token*>(__dest) =
54+
stdexec::get_stop_token(stdexec::get_env(__rcvr_));
55+
return true;
56+
}
57+
}
58+
return false;
59+
}
60+
4761
void set_value() noexcept override {
4862
stdexec::set_value(std::forward<_Rcvr>(__rcvr_));
4963
}
@@ -174,14 +188,10 @@ namespace exec {
174188
stdexec::set_stopped(__rcvr_);
175189
return;
176190
}
177-
system_context_replaceability::env __e{};
178-
if constexpr (std::is_same_v<stdexec::inplace_stop_token, std::decay_t<decltype(st)>>) {
179-
__e = system_context_replaceability::env{st};
180-
}
181191
auto& __scheduler_impl = __preallocated_.__as<__system_scheduler_ptr>();
182192
auto __impl = std::move(__scheduler_impl);
183193
std::destroy_at(&__scheduler_impl);
184-
__impl->schedule(__preallocated_.__as_storage(), &__rcvr_, __e);
194+
__impl->schedule(__preallocated_.__as_storage(), &__rcvr_);
185195
}
186196

187197
/// Object that receives completion from the work described by the sender.
@@ -304,6 +314,21 @@ namespace exec {
304314
std::tuple<stdexec::__decay_t<_As>...>{std::move(__as)...};
305315
}
306316

317+
bool __query_env(__uuid __id, void* __dest) noexcept override {
318+
auto __state = reinterpret_cast<_BulkState*>(this);
319+
using system_context_replaceability::__runtime_property_helper;
320+
using __StopToken = decltype(stdexec::get_stop_token(stdexec::get_env(__state->__rcvr_)));
321+
if constexpr (std::is_same_v<stdexec::inplace_stop_token, __StopToken>) {
322+
if (
323+
__id == __runtime_property_helper<stdexec::inplace_stop_token>::__property_identifier) {
324+
*static_cast<stdexec::inplace_stop_token*>(__dest) =
325+
stdexec::get_stop_token(stdexec::get_env(__state->__rcvr_));
326+
return true;
327+
}
328+
}
329+
return false;
330+
}
331+
307332
/// Calls `set_value()` on the final receiver of the bulk operation, using the values from the previous sender.
308333
void set_value() noexcept override {
309334
auto __state = reinterpret_cast<_BulkState*>(this);
@@ -385,10 +410,6 @@ namespace exec {
385410
stdexec::set_stopped(__state_.__rcvr_);
386411
return;
387412
}
388-
system_context_replaceability::env __e{};
389-
if constexpr (std::is_same_v<stdexec::inplace_stop_token, std::decay_t<decltype(st)>>) {
390-
__e = system_context_replaceability::env{st};
391-
}
392413

393414
// Store the input data in the shared state.
394415
using __typed_forward_args_receiver_t =
@@ -404,7 +425,7 @@ namespace exec {
404425

405426
// Schedule the bulk work on the system scheduler.
406427
// This will invoke `start` on our receiver multiple times, and then a completion signal (e.g., `set_value`).
407-
__scheduler->bulk_schedule(__size, __storage, __r, __e);
428+
__scheduler->bulk_schedule(__size, __storage, __r);
408429
}
409430

410431
/// Invoked when the previous sender completes with "stopped" to stop the entire work.

0 commit comments

Comments
 (0)