Skip to content

Commit 657e4d2

Browse files
committed
exec::repeat_effect_until: Throwing Decay-Copy & Value Category Preservation
The asynchronous loop of exec::repeat_effect_until proceeds until the child operation sends a value which converts to true. Previously this process proceeded as follows: 1. Accept the child operation's result by value to avoid dangling references into the operation state (see next step) 2. Destroy the child operation state 3. Convert the result accepted in step 1 to bool and check if it's true, if so end the operation, otherwise 4. Connect the child sender 5. Start the new child operation Unfortunately step 1 meant that the result of the child operation would be decay-copied to pass it by value. This occurred within a noexcept function and therefore if that decay-copy threw std::terminate would be called. Moreover the previous implementation did not forward the result in step 3. This meant that if the child's result type was only rvalue convertible to bool compilation would fail. Additionally the same pass-by-value strategy was used for errors. However when handling an error there's no need to destroy the child operation state due to the fact the operation is ending and therefore doesn't need to reconnect the child sender for the next iteration (note this logic also applies to successful completion). Fixed all of the above by handling completion of the child operation as follows: 1. If the child completed with error or stopped simply forward that completion through (note the child operation state is not destroyed and will be cleaned up by the destructor of the operation state for exec::repeat_effect_until) (note that this saves one decay-copy over the previous implementation but requires a branch in the destructor, which was already present), otherwise 2. Forward the result, convert that forwarded expression to bool, and check if it's true, if so end the operation (note that once again the child operation state is not destroyed and once again a decay-copy is eliminated), otherwise 3. Destroy the child operation state 4. Connect the child sender 5. Start the new child operation
1 parent 5ffee46 commit 657e4d2

File tree

2 files changed

+134
-24
lines changed

2 files changed

+134
-24
lines changed

include/exec/repeat_effect_until.hpp

Lines changed: 24 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
#include "trampoline_scheduler.hpp"
2525
#include "sequence.hpp"
2626

27-
#include "../stdexec/__detail/__atomic.hpp"
2827
#include <exception>
2928
#include <type_traits>
3029

@@ -82,7 +81,7 @@ namespace exec {
8281
using __child_op_t = stdexec::connect_result_t<__child_on_sched_sender_t, __receiver_t>;
8382

8483
__child_t __child_;
85-
__std::atomic_flag __started_{};
84+
bool __has_child_op_ = false;
8685
stdexec::__manual_lifetime<__child_op_t> __child_op_;
8786
trampoline_scheduler __sched_;
8887

@@ -93,11 +92,7 @@ namespace exec {
9392
}
9493

9594
~__repeat_effect_state() {
96-
if (!__started_.test(__std::memory_order_acquire)) {
97-
__std::atomic_thread_fence(__std::memory_order_release);
98-
// TSan does not support __std::atomic_thread_fence, so we
99-
// need to use the TSan-specific __tsan_release instead:
100-
STDEXEC_WHEN(STDEXEC_TSAN(), __tsan_release(&__started_));
95+
if (__has_child_op_) {
10196
__child_op_.__destroy();
10297
}
10398
}
@@ -107,30 +102,42 @@ namespace exec {
107102
return stdexec::connect(
108103
exec::sequence(stdexec::schedule(__sched_), __child_), __receiver_t{this});
109104
});
105+
__has_child_op_ = true;
106+
}
107+
108+
void __destroy() noexcept {
109+
__child_op_.__destroy();
110+
__has_child_op_ = false;
110111
}
111112

112113
void __start() noexcept {
113-
const bool __already_started [[maybe_unused]]
114-
= __started_.test_and_set(__std::memory_order_relaxed);
115-
STDEXEC_ASSERT(!__already_started);
114+
STDEXEC_ASSERT(__has_child_op_);
116115
stdexec::start(__child_op_.__get());
117116
}
118117

119118
template <class _Tag, class... _Args>
120-
void __complete(_Tag, _Args... __args) noexcept { // Intentionally by value...
121-
__child_op_.__destroy(); // ... because this could potentially invalidate them.
119+
void __complete(_Tag, _Args &&...__args) noexcept {
122120
if constexpr (same_as<_Tag, set_value_t>) {
123121
// If the sender completed with true, we're done
124122
STDEXEC_TRY {
125-
const bool __done = (static_cast<bool>(__args) && ...);
123+
const bool __done = (static_cast<bool>(static_cast<_Args &&>(__args)) && ...);
126124
if (__done) {
127125
stdexec::set_value(static_cast<_Receiver &&>(this->__receiver()));
128-
} else {
126+
return;
127+
}
128+
__destroy();
129+
STDEXEC_TRY {
129130
__connect();
130-
stdexec::start(__child_op_.__get());
131131
}
132+
STDEXEC_CATCH_ALL {
133+
stdexec::set_error(
134+
static_cast<_Receiver &&>(this->__receiver()), std::current_exception());
135+
return;
136+
}
137+
stdexec::start(__child_op_.__get());
132138
}
133139
STDEXEC_CATCH_ALL {
140+
__destroy();
134141
stdexec::set_error(
135142
static_cast<_Receiver &&>(this->__receiver()), std::current_exception());
136143
}
@@ -160,20 +167,14 @@ namespace exec {
160167
__mexception<_INVALID_ARGUMENT_TO_REPEAT_EFFECT_UNTIL_<>, _WITH_SENDER_<_Sender>>
161168
>;
162169

163-
template <class _Error>
164-
using __error_t = completion_signatures<set_error_t(__decay_t<_Error>)>;
165-
166170
template <class _Sender, class... _Env>
167171
using __completions_t = stdexec::transform_completion_signatures<
168172
__completion_signatures_of_t<__decay_t<_Sender> &, _Env...>,
169173
stdexec::transform_completion_signatures<
170174
__completion_signatures_of_t<stdexec::schedule_result_t<exec::trampoline_scheduler>, _Env...>,
171-
__eptr_completion,
172-
__sigs::__default_set_value,
173-
__error_t
175+
__eptr_completion
174176
>,
175-
__mbind_front_q<__values_t, _Sender>::template __f,
176-
__error_t
177+
__mbind_front_q<__values_t, _Sender>::template __f
177178
>;
178179

179180
struct __repeat_effect_tag { };

test/exec/test_repeat_effect_until.cpp

Lines changed: 110 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@
2626

2727
#include <catch2/catch.hpp>
2828

29+
#include <limits>
2930
#include <memory>
31+
#include <stdexcept>
3032
#include <utility>
3133

3234
namespace ex = stdexec;
@@ -37,7 +39,8 @@ namespace {
3739
using sender_concept = ex::sender_t;
3840
using __t = boolean_sender;
3941
using __id = boolean_sender;
40-
using completion_signatures = ex::completion_signatures<ex::set_value_t(bool)>;
42+
using completion_signatures =
43+
ex::completion_signatures<ex::set_value_t(bool), ex::set_error_t(const int&)>;
4144

4245
template <class Receiver>
4346
struct operation {
@@ -88,6 +91,12 @@ namespace {
8891

8992
TEST_CASE("simple example for repeat_effect_until", "[adaptors][repeat_effect_until]") {
9093
ex::sender auto snd = exec::repeat_effect_until(boolean_sender{});
94+
static_assert(all_contained_in<
95+
ex::completion_signatures<ex::set_error_t(const int&)>,
96+
ex::completion_signatures_of_t<decltype(snd), ex::env<>>>);
97+
static_assert(!all_contained_in<
98+
ex::completion_signatures<ex::set_error_t(int)>,
99+
ex::completion_signatures_of_t<decltype(snd), ex::env<>>>);
91100
ex::sync_wait(std::move(snd));
92101
}
93102

@@ -197,4 +206,104 @@ namespace {
197206
ex::start(op);
198207
REQUIRE(counter == 10);
199208
}
209+
210+
TEST_CASE(
211+
"repeat_effect works correctly when the child operation sends an error type which throws when "
212+
"decay-copied",
213+
"[adaptors][repeat_effect]") {
214+
struct error_type {
215+
explicit error_type(unsigned& throw_after) noexcept
216+
: throw_after_(throw_after) {
217+
}
218+
error_type(const error_type& other)
219+
: throw_after_(other.throw_after_) {
220+
if (!throw_after_) {
221+
throw std::logic_error("TEST");
222+
}
223+
--throw_after_;
224+
}
225+
unsigned& throw_after_;
226+
};
227+
struct receiver {
228+
using receiver_concept = ::stdexec::receiver_t;
229+
void set_value() && noexcept {
230+
FAIL_CHECK("Unexpected value completion signal");
231+
}
232+
void set_stopped() && noexcept {
233+
FAIL_CHECK("Unexpected stopped completion signal");
234+
}
235+
void set_error(std::exception_ptr) && noexcept {
236+
CHECK(!done_);
237+
}
238+
void set_error(const error_type&) && noexcept {
239+
CHECK(!done_);
240+
done_ = true;
241+
}
242+
bool& done_;
243+
};
244+
unsigned throw_after = 0;
245+
bool done = false;
246+
do {
247+
const auto tmp = throw_after;
248+
throw_after = std::numeric_limits<unsigned>::max();
249+
auto op =
250+
ex::connect(exec::repeat_effect(ex::just_error(error_type(throw_after))), receiver(done));
251+
throw_after = tmp;
252+
ex::start(op);
253+
throw_after = tmp;
254+
++throw_after;
255+
} while (!done);
256+
}
257+
258+
TEST_CASE(
259+
"repeat_effect_until works correctly when the child operation sends type which throws when "
260+
"decay-copied, and when converted to bool, and which is only rvalue convertible to bool",
261+
"[adaptors][repeat_effect_until]") {
262+
class value_type {
263+
void maybe_throw_() const {
264+
if (!throw_after_) {
265+
throw std::logic_error("TEST");
266+
}
267+
--throw_after_;
268+
}
269+
public:
270+
explicit value_type(unsigned& throw_after) noexcept
271+
: throw_after_(throw_after) {
272+
}
273+
value_type(const value_type& other)
274+
: throw_after_(other.throw_after_) {
275+
maybe_throw_();
276+
}
277+
unsigned& throw_after_;
278+
operator bool() && {
279+
maybe_throw_();
280+
return true;
281+
}
282+
};
283+
struct receiver {
284+
using receiver_concept = ::stdexec::receiver_t;
285+
void set_value() && noexcept {
286+
done_ = true;
287+
}
288+
void set_stopped() && noexcept {
289+
FAIL_CHECK("Unexpected stopped completion signal");
290+
}
291+
void set_error(std::exception_ptr) && noexcept {
292+
CHECK(!done_);
293+
}
294+
bool& done_;
295+
};
296+
unsigned throw_after = 0;
297+
bool done = false;
298+
do {
299+
const auto tmp = throw_after;
300+
throw_after = std::numeric_limits<unsigned>::max();
301+
auto op =
302+
ex::connect(exec::repeat_effect_until(ex::just(value_type(throw_after))), receiver(done));
303+
throw_after = tmp;
304+
ex::start(op);
305+
throw_after = tmp;
306+
++throw_after;
307+
} while (!done);
308+
}
200309
} // namespace

0 commit comments

Comments
 (0)