Skip to content

Commit 941f720

Browse files
authored
Merge pull request #1715 from RobertLeahy/p3373
let_value, _error, & _stopped: Destroy Child Operation State After Completion
2 parents 5ffee46 + 4c465db commit 941f720

File tree

2 files changed

+178
-104
lines changed

2 files changed

+178
-104
lines changed

include/stdexec/__detail/__let.hpp

Lines changed: 130 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
#include "__variant.hpp"
3434

3535
#include <exception>
36+
#include <type_traits>
3637

3738
namespace stdexec {
3839
//////////////////////////////////////////////////////////////////////////////
@@ -41,6 +42,11 @@ namespace stdexec {
4142
template <class _SetTag>
4243
struct __let_t;
4344

45+
template <class _SetTag>
46+
struct __let_tag {
47+
using __t = _SetTag;
48+
};
49+
4450
template <class _SetTag>
4551
inline constexpr __mstring __in_which_let_msg{"In stdexec::let_value(Sender, Function)..."};
4652

@@ -282,32 +288,86 @@ namespace stdexec {
282288

283289
//! The core of the operation state for `let_*`.
284290
//! This gets bundled up into a larger operation state (`__detail::__op_state<...>`).
285-
template <class _Receiver, class _Fun, class _SetTag, class _Env2, class... _Tuples>
291+
template <class _SetTag, class _Sender, class _Fun, class _Receiver, class... _Tuples>
286292
struct __let_state {
287-
using __fun_t = _Fun;
288-
using __env2_t = _Env2;
289-
using __env_t = __join_env_t<_Env2, env_of_t<_Receiver>>;
290-
using __rcvr_t = __receiver_with_env_t<_Receiver, _Env2>;
293+
using __env2_t =
294+
__let::__env2_t<_SetTag, env_of_t<const _Sender&>, env_of_t<const _Receiver&>>;
295+
using __second_rcvr_t = __receiver_with_env_t<_Receiver, __env2_t>;
296+
template <typename _Tag, typename... _Args>
297+
constexpr void __impl(_Receiver& __rcvr, _Tag __tag, _Args&&... __args) noexcept {
298+
if constexpr (std::is_same_v<_SetTag, _Tag>) {
299+
using __sender_t = __call_result_t<_Fun, __decay_t<_Args>&...>;
300+
using __submit_t = __submit_result<__sender_t, __env2_t, _Receiver>;
301+
constexpr bool __nothrow_store = (__nothrow_decay_copyable<_Args> && ...);
302+
constexpr bool __nothrow_invoke = __nothrow_callable<_Fun, __decay_t<_Args>&...>;
303+
constexpr bool __nothrow_submit = noexcept(
304+
__storage_
305+
.template emplace<__submit_t>(__declval<__sender_t>(), __declval<__second_rcvr_t>()));
306+
STDEXEC_TRY {
307+
auto& __tuple = __args_.emplace_from(__mktuple, static_cast<_Args&&>(__args)...);
308+
auto&& __sender = ::stdexec::__apply(static_cast<_Fun&&>(__fun_), __tuple);
309+
__storage_.template emplace<__monostate>();
310+
__second_rcvr_t __r{__rcvr, static_cast<__env2_t&&>(__env2_)};
311+
auto& __op = __storage_.template emplace<__submit_t>(
312+
static_cast<__sender_t&&>(__sender), static_cast<__second_rcvr_t&&>(__r));
313+
__op.submit(static_cast<__sender_t&&>(__sender), static_cast<__second_rcvr_t&&>(__r));
314+
}
315+
STDEXEC_CATCH_ALL {
316+
if constexpr (!(__nothrow_store && __nothrow_invoke && __nothrow_submit)) {
317+
::stdexec::set_error(static_cast<_Receiver&&>(__rcvr), std::current_exception());
318+
}
319+
}
320+
} else {
321+
__tag(static_cast<_Receiver&&>(__rcvr), static_cast<_Args&&>(__args)...);
322+
}
323+
}
324+
struct __first_rcvr_t {
325+
using receiver_concept = ::stdexec::receiver_t;
326+
__let_state& __state;
327+
_Receiver& __rcvr;
328+
template <typename... _Args>
329+
constexpr void set_value(_Args&&... __args) noexcept {
330+
__state.__impl(__rcvr, ::stdexec::set_value, static_cast<_Args&&>(__args)...);
331+
}
332+
template <typename... _Args>
333+
constexpr void set_error(_Args&&... __args) noexcept {
334+
__state.__impl(__rcvr, ::stdexec::set_error, static_cast<_Args&&>(__args)...);
335+
}
336+
template <typename... _Args>
337+
constexpr void set_stopped(_Args&&... __args) noexcept {
338+
__state.__impl(__rcvr, ::stdexec::set_stopped, static_cast<_Args&&>(__args)...);
339+
}
340+
constexpr decltype(auto) get_env() const noexcept {
341+
return ::stdexec::get_env(__rcvr);
342+
}
343+
};
344+
291345
using __result_variant = __variant_for<__monostate, _Tuples...>;
292-
using __submit_variant = __variant_for<
346+
using __op_state_variant = __variant_for<
293347
__monostate,
294-
__mapply<__submit_datum_for<_Receiver, _Fun, _SetTag, _Env2>, _Tuples>...
348+
::stdexec::connect_result_t<_Sender, __first_rcvr_t>,
349+
__mapply<__submit_datum_for<_Receiver, _Fun, _SetTag, __env2_t>, _Tuples>...
295350
>;
296351

297-
template <class _ResultSender, class _OpState>
298-
auto __get_result_receiver(const _ResultSender&, _OpState& __op_state) -> decltype(auto) {
299-
return __rcvr_t{__op_state.__rcvr_, __env2_};
352+
constexpr explicit __let_state(_Sender&& __sender, _Fun __fun, _Receiver& __r) noexcept(
353+
__nothrow_connectable<_Sender, __first_rcvr_t>
354+
&& std::is_nothrow_move_constructible_v<_Fun>)
355+
: __fun_(static_cast<_Fun&&>(__fun))
356+
, __env2_(
357+
// TODO(ericniebler): this needs a fallback
358+
__let::__mk_env2<_SetTag>(::stdexec::get_env(__sender), ::stdexec::get_env(__r))) {
359+
__storage_.emplace_from(
360+
::stdexec::connect, static_cast<_Sender&&>(__sender), __first_rcvr_t{*this, __r});
300361
}
301362

302363
STDEXEC_IMMOVABLE_NO_UNIQUE_ADDRESS
303364
_Fun __fun_;
304365
STDEXEC_IMMOVABLE_NO_UNIQUE_ADDRESS
305-
_Env2 __env2_;
366+
__env2_t __env2_;
306367
//! Variant to hold the results passed from upstream before passing them to the function:
307368
__result_variant __args_{};
308-
//! Variant type for holding the operation state from connecting
309-
//! the function result to the downstream receiver:
310-
__submit_variant __storage_{};
369+
//! Variant type for holding the operation state of the currently in flight operation
370+
__op_state_variant __storage_{};
311371
};
312372

313373
// The set_value completions of:
@@ -504,11 +564,23 @@ namespace stdexec {
504564
}
505565
};
506566

567+
template <class _Sender, class _Fun>
568+
struct __data_t {
569+
_Sender __sndr;
570+
_Fun __fun;
571+
};
572+
573+
template <class _Sender, class _Fun>
574+
STDEXEC_HOST_DEVICE_DEDUCTION_GUIDE __data_t(_Sender, _Fun) -> __data_t<_Sender, _Fun>;
575+
576+
template <typename _Sender>
577+
using __sender_of = decltype((__declval<__data_of<_Sender>>().__sndr));
578+
template <typename _Sender>
579+
using __fun_of = decltype((__declval<__data_of<_Sender>>().__fun));
580+
507581
//! Implementation of the `let_*_t` types, where `_SetTag` is, e.g., `set_value_t` for `let_value`.
508582
template <class _SetTag>
509583
struct __let_t {
510-
using __t = _SetTag;
511-
512584
template <sender _Sender, __movable_value _Fun>
513585
auto operator()(_Sender&& __sndr, _Fun __fun) const -> __well_formed_sender auto {
514586
return __make_sexpr<__let_t<_SetTag>>(
@@ -520,117 +592,63 @@ namespace stdexec {
520592
auto operator()(_Fun __fun) const {
521593
return __closure(*this, static_cast<_Fun&&>(__fun));
522594
}
595+
596+
template <class _Sender>
597+
auto transform_sender(set_value_t, _Sender&& __sndr, __ignore) {
598+
return __sexpr_apply(
599+
static_cast<_Sender&&>(__sndr),
600+
[]<class _Fun, class _Child>(__ignore, _Fun&& __fun, _Child&& __child) {
601+
return __make_sexpr<__let_tag<_SetTag>>(
602+
__data_t{static_cast<_Child&&>(__child), static_cast<_Fun&&>(__fun)});
603+
});
604+
}
523605
};
524606

525607
template <class _SetTag>
526608
struct __let_impl : __sexpr_defaults {
527-
static constexpr auto get_attrs = []<class _Fun, class _Child>(
528-
const _Fun&,
529-
[[maybe_unused]]
530-
const _Child& __child) noexcept {
531-
// BUGBUG:
532-
return stdexec::get_env(__child);
533-
//return __attrs<__let_t<_SetTag>, _Child, _Fun>{};
534-
};
609+
static constexpr auto get_attrs =
610+
[]<class _Child, class _Fun>(const __data_t<_Child, _Fun>& __data) noexcept {
611+
// BUGBUG:
612+
return stdexec::get_env(__data.__sndr);
613+
};
535614

536615
static constexpr auto get_completion_signatures =
537616
[]<class _Self, class _Env>(_Self&&, _Env&&...) noexcept {
538-
static_assert(sender_expr_for<_Self, __let_t<_SetTag>>);
617+
static_assert(sender_expr_for<_Self, __let_tag<_SetTag>>);
539618
if constexpr (__decay_copyable<_Self>) {
540-
using __fn_t = __decay_t<__data_of<_Self>>;
541-
return __completions_t<__let_t<_SetTag>, __fn_t, __child_of<_Self>, _Env>{};
619+
using __fn_t = __decay_t<__fun_of<_Self>>;
620+
using __result_t =
621+
__completions_t<__let_tag<_SetTag>, __fn_t, __sender_of<_Self>, _Env>;
622+
return __result_t{};
542623
} else {
543624
return __mexception<_SENDER_TYPE_IS_NOT_COPYABLE_, _WITH_SENDER_<_Self>>{};
544625
}
545626
};
546627

547628
static constexpr auto get_state =
548-
[]<class _Receiver, __decay_copyable _Sender>(_Sender&& __sndr, const _Receiver& __rcvr)
549-
requires sender_in<__child_of<_Sender>, env_of_t<_Receiver>>
629+
[]<class _Receiver, __decay_copyable _Sender>(_Sender&& __sndr, _Receiver& __rcvr)
630+
requires sender_in<__sender_of<_Sender>, env_of_t<_Receiver>>
550631
{
551-
static_assert(sender_expr_for<_Sender, __let_t<_SetTag>>);
552-
using _Fun = __decay_t<__data_of<_Sender>>;
553-
using _Child = __child_of<_Sender>;
554-
using _Env2 = __env2_t<_SetTag, env_of_t<_Child>, env_of_t<_Receiver>>;
555-
using __mk_let_state = __mbind_front_q<__let_state, _Receiver, _Fun, _SetTag, _Env2>;
556-
632+
static_assert(sender_expr_for<_Sender, __let_tag<_SetTag>>);
633+
using _Child = __sender_of<_Sender>;
634+
using _Fun = __decay_t<__fun_of<_Sender>>;
635+
using __mk_let_state = __mbind_front_q<__let_state, _SetTag, _Child, _Fun, _Receiver>;
557636
using __let_state_t = __gather_completions_of<
558637
_SetTag,
559638
_Child,
560639
env_of_t<_Receiver>,
561640
__q<__decayed_tuple>,
562641
__mk_let_state
563642
>;
564-
565-
return __sndr.apply(
566-
static_cast<_Sender&&>(__sndr),
567-
[&]<class _Fn, class _Child>(__ignore, _Fn&& __fn, _Child&& __child) {
568-
// TODO(ericniebler): this needs a fallback
569-
_Env2 __env2 =
570-
__let::__mk_env2<_SetTag>(stdexec::get_env(__child), stdexec::get_env(__rcvr));
571-
return __let_state_t{static_cast<_Fn&&>(__fn), static_cast<_Env2&&>(__env2)};
572-
});
643+
auto&& [__tag, __data] = static_cast<_Sender&&>(__sndr);
644+
return __let_state_t(
645+
__forward_like<_Sender>(__data).__sndr, __forward_like<_Sender>(__data).__fun, __rcvr);
573646
};
574647

575-
//! Helper function to actually invoke the function to produce `let_*`'s sender,
576-
//! connect it to the downstream receiver, and start it. This is the heart of
577-
//! `let_*`.
578-
template <class _State, class _OpState, class... _As>
579-
static void __bind_(_State& __state, _OpState& __op_state, _As&&... __as) {
580-
// Store the passed-in (received) args:
581-
auto& __args = __state.__args_.emplace_from(__mktuple, static_cast<_As&&>(__as)...);
582-
// Apply the function to the args to get the sender:
583-
auto __sndr2 = stdexec::__apply(std::move(__state.__fun_), __args);
584-
// Create a receiver based on the state, the computed sender, and the operation state:
585-
auto __rcvr2 = __state.__get_result_receiver(__sndr2, __op_state);
586-
// Connect the sender to the receiver and start it:
587-
using __result_t = decltype(submit_result{std::move(__sndr2), std::move(__rcvr2)});
588-
auto& __op = __state.__storage_
589-
.template emplace<__result_t>(std::move(__sndr2), std::move(__rcvr2));
590-
__op.submit(std::move(__sndr2), std::move(__rcvr2));
591-
}
592-
593-
template <class _OpState, class... _As>
594-
static void __bind(_OpState& __op_state, _As&&... __as) noexcept {
595-
using _State = decltype(__op_state.__state_);
596-
using _Receiver = decltype(__op_state.__rcvr_);
597-
using _Fun = _State::__fun_t;
598-
using _Env2 = _State::__env2_t;
599-
using _JoinEnv2 = __join_env_t<_Env2, env_of_t<_Receiver>>;
600-
using _ResultSender = __mcall<__result_sender_fn<_SetTag, _Fun, _JoinEnv2>, _As...>;
601-
602-
_State& __state = __op_state.__state_;
603-
_Receiver& __rcvr = __op_state.__rcvr_;
604-
605-
if constexpr (
606-
(__nothrow_decay_copyable<_As> && ...) && __nothrow_callable<_Fun, __decay_t<_As>&...>
607-
&& __nothrow_connectable<_ResultSender, __result_receiver_t<_Receiver, _Env2>>) {
608-
__bind_(__state, __op_state, static_cast<_As&&>(__as)...);
609-
} else {
610-
STDEXEC_TRY {
611-
__bind_(__state, __op_state, static_cast<_As&&>(__as)...);
612-
}
613-
STDEXEC_CATCH_ALL {
614-
using _Receiver = decltype(__op_state.__rcvr_);
615-
stdexec::set_error(static_cast<_Receiver&&>(__rcvr), std::current_exception());
616-
}
617-
}
618-
}
619-
620-
static constexpr auto complete = []<class _OpState, class _Tag, class... _As>(
621-
__ignore,
622-
_OpState& __op_state,
623-
_Tag,
624-
_As&&... __as) noexcept -> void {
625-
if constexpr (__same_as<_Tag, _SetTag>) {
626-
// Intercept the channel of interest to compute the sender and connect it:
627-
__bind(__op_state, static_cast<_As&&>(__as)...);
628-
} else {
629-
// Forward the other channels downstream:
630-
using _Receiver = decltype(__op_state.__rcvr_);
631-
_Tag()(static_cast<_Receiver&&>(__op_state.__rcvr_), static_cast<_As&&>(__as)...);
632-
}
633-
};
648+
static constexpr auto start =
649+
[]<typename _State, typename _Receiver>(_State& __state, _Receiver&) noexcept {
650+
::stdexec::start(__state.__storage_.template get<1>());
651+
};
634652
};
635653
} // namespace __let
636654

@@ -639,5 +657,13 @@ namespace stdexec {
639657
inline constexpr let_stopped_t let_stopped{};
640658

641659
template <class _SetTag>
642-
struct __sexpr_impl<__let::__let_t<_SetTag>> : __let::__let_impl<_SetTag> { };
660+
struct __sexpr_impl<__let::__let_tag<_SetTag>> : __let::__let_impl<_SetTag> { };
661+
662+
template <class _SetTag>
663+
struct __sexpr_impl<__let::__let_t<_SetTag>> : __sexpr_defaults {
664+
static constexpr auto get_completion_signatures =
665+
[]<class _Sender, class... _Env>(_Sender&&, const _Env&...) noexcept
666+
-> __completion_signatures_of_t<transform_sender_result_t<_Sender, _Env...>, _Env...> {
667+
};
668+
};
643669
} // namespace stdexec

test/stdexec/algos/adaptors/test_let_value.cpp

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -396,4 +396,52 @@ namespace {
396396
ex::start(op);
397397
CHECK(*ptr == 5);
398398
}
399+
400+
TEST_CASE(
401+
"let_value destroys the first operation state before invoking the sender factory",
402+
"[adaptors][let_value]") {
403+
const auto ptr = std::make_shared<int>(5);
404+
CHECK(ptr.use_count() == 1);
405+
auto first = ex::just() | ex::then([ptr = ptr]() { });
406+
CHECK(ptr.use_count() == 2);
407+
auto sender = ex::let_value(std::move(first), [&]() {
408+
CHECK(ptr.use_count() == 2);
409+
return ex::just();
410+
});
411+
CHECK(ptr.use_count() == 2);
412+
auto op = ex::connect(std::move(sender), expect_void_receiver{});
413+
CHECK(ptr.use_count() == 2);
414+
ex::start(op);
415+
CHECK(ptr.use_count() == 1);
416+
}
417+
418+
struct immovable_sender {
419+
using sender_concept = ::stdexec::sender_t;
420+
template <typename... Args>
421+
consteval auto get_completion_signatures(const Args&...) const & noexcept {
422+
return ::stdexec::completion_signatures_of_t<decltype(::stdexec::just()), Args...>{};
423+
}
424+
template <typename Receiver>
425+
auto connect(Receiver r) const & noexcept {
426+
return ::stdexec::connect(::stdexec::just(), std::move(r));
427+
}
428+
immovable_sender() = default;
429+
immovable_sender(const immovable_sender&) {
430+
throw std::logic_error("Unexpected copy");
431+
}
432+
};
433+
static_assert(::stdexec::sender<immovable_sender>);
434+
static_assert(::stdexec::sender<const immovable_sender&>);
435+
static_assert(::stdexec::sender_in<immovable_sender, ::stdexec::env<>>);
436+
static_assert(::stdexec::sender_in<const immovable_sender&, ::stdexec::env<>>);
437+
438+
TEST_CASE(
439+
"If the sender factory returns a reference to a sender that reference is passed to connect",
440+
"[adaptors][let_value]") {
441+
const immovable_sender s;
442+
auto just = ex::just();
443+
auto sender = ex::let_value(just, [&]() -> decltype(auto) { return (s); });
444+
auto op = ex::connect(sender, expect_void_receiver{});
445+
ex::start(op);
446+
}
399447
} // namespace

0 commit comments

Comments
 (0)