Skip to content

Commit 15e99d6

Browse files
committed
fix the spawn_future completion behavior
1 parent a7a2bae commit 15e99d6

File tree

5 files changed

+153
-40
lines changed

5 files changed

+153
-40
lines changed

include/beman/execution/detail/spawn_future.hpp

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -138,24 +138,28 @@ struct spawn_future_state
138138
auto complete() noexcept -> void override {
139139
{
140140
::std::lock_guard cerberos(this->gate);
141-
if (this->receiver == nullptr) {
141+
if (this->fun == nullptr) {
142142
this->receiver = this;
143143
return;
144144
}
145145
}
146-
this->fun(this->receiver, this->result);
146+
this->fun(this->receiver, *this);
147147
}
148148
auto abandon() noexcept -> void {
149-
{
149+
bool ready{[&] {
150150
::std::lock_guard cerberos(this->gate);
151151
if (this->receiver == nullptr) {
152152
this->receiver = this;
153-
this->fun = [](void*, typename spawn_future_state::result_t&) noexcept {};
154-
this->source.request_stop();
155-
return;
153+
this->fun = [](void*, spawn_future_state& state) noexcept { state.destroy(); };
154+
return false;
156155
}
156+
return true;
157+
}()};
158+
if (ready) {
159+
this->destroy();
160+
} else {
161+
this->source.request_stop();
157162
}
158-
this->destroy();
159163
}
160164
template <::beman::execution::receiver Rcvr>
161165
static auto complete_receiver(Rcvr& rcvr, typename spawn_future_state::result_t& res) noexcept {
@@ -175,10 +179,10 @@ struct spawn_future_state
175179
auto consume(Rcvr& rcvr) noexcept -> void {
176180
{
177181
::std::lock_guard cerberos(this->gate);
178-
if (this->receiver != nullptr) {
182+
if (this->receiver == nullptr) {
179183
this->receiver = &rcvr;
180-
this->fun = [](void* ptr, typename spawn_future_state::result_t& res) noexcept {
181-
spawn_future_state::complete_receiver(*static_cast<Rcvr*>(ptr), res);
184+
this->fun = [](void* ptr, spawn_future_state& state) noexcept {
185+
spawn_future_state::complete_receiver(*static_cast<Rcvr*>(ptr), state.result);
182186
};
183187
return;
184188
}
@@ -205,7 +209,7 @@ struct spawn_future_state
205209
Token token;
206210
bool associated{false};
207211
void* receiver{};
208-
auto (*fun)(void*, typename spawn_future_state::result_t&) noexcept -> void;
212+
auto (*fun)(void*, spawn_future_state&) noexcept -> void = nullptr;
209213
};
210214

211215
template <::beman::execution::sender Sndr, typename Ev>

tests/beman/execution/CMakeLists.txt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -138,9 +138,9 @@ foreach(test ${execution_tests})
138138
endforeach()
139139

140140
if(FALSE)
141-
if(NOT PROJECT_IS_TOP_LEVEL AND BEMAN_EXECUTION_ENABLE_TESTING)
142-
# test if the targets are findable from the build directory
143-
# cmake-format: off
141+
if(NOT PROJECT_IS_TOP_LEVEL AND BEMAN_EXECUTION_ENABLE_TESTING)
142+
# test if the targets are findable from the build directory
143+
# cmake-format: off
144144
add_test(NAME find-package-test
145145
COMMAND ${CMAKE_CTEST_COMMAND}
146146
# --verbose
@@ -159,5 +159,5 @@ if(NOT PROJECT_IS_TOP_LEVEL AND BEMAN_EXECUTION_ENABLE_TESTING)
159159
# TODO(CK): Needed too? "--config $<CONFIG>"
160160
)
161161
# cmake-format: on
162-
endif()
162+
endif()
163163
endif()

tests/beman/execution/exec-scope-simple-counting.test.cpp

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,7 @@ struct join_receiver {
5454
};
5555

5656
auto ctor() -> void {
57-
{
58-
test_std::simple_counting_scope scope;
59-
}
57+
{ test_std::simple_counting_scope scope; }
6058
test::death([] {
6159
test_std::simple_counting_scope scope;
6260
scope.get_token().try_associate();

tests/beman/execution/exec-spawn-future.test.cpp

Lines changed: 129 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#include <beman/execution/detail/join_env.hpp>
1313
#include <beman/execution/detail/inplace_stop_source.hpp>
1414
#include <beman/execution/detail/just.hpp>
15+
#include <beman/execution/detail/then.hpp>
1516
#include <test/execution.hpp>
1617
#include <concepts>
1718

@@ -32,20 +33,40 @@ static_assert(test_detail::queryable<env>);
3233
struct non_sender {};
3334
static_assert(not test_std::sender<non_sender>);
3435

36+
template <typename... T>
3537
struct sender {
36-
using sender_concept = test_std::sender_t;
37-
using completion_signatures = test_std::completion_signatures<test_std::set_value_t(), test_std::set_stopped_t()>;
38+
using sender_concept = test_std::sender_t;
39+
using completion_signatures =
40+
test_std::completion_signatures<test_std::set_value_t(T...), test_std::set_stopped_t()>;
41+
42+
struct state_base {
43+
virtual ~state_base() = default;
44+
virtual auto complete(T...) -> void = 0;
45+
};
3846
template <test_std::receiver Rcvr>
39-
auto connect(Rcvr&& rcvr) {
40-
return test_std::connect(test_std::just(), ::std::forward<Rcvr>(rcvr));
47+
struct state : state_base {
48+
using operation_state_concept = test_std::operation_state_t;
49+
std::remove_cvref_t<Rcvr> rcvr;
50+
state_base** handle{};
51+
auto complete(T... a) -> void override { test_std::set_value(std::move(this->rcvr), a...); }
52+
state(auto&& r, state_base** h) : rcvr(std::forward<decltype(r)>(r)), handle(h) {}
53+
auto start() & noexcept { *this->handle = this; }
54+
};
55+
state_base** handle{nullptr};
56+
template <test_std::receiver Rcvr>
57+
auto connect(Rcvr&& rcvr) -> state<Rcvr> {
58+
return state<Rcvr>(std::forward<Rcvr>(rcvr), this->handle);
4159
}
4260
};
43-
static_assert(test_std::sender<sender>);
61+
static_assert(test_std::sender<sender<>>);
62+
static_assert(test_std::sender<sender<int>>);
63+
static_assert(test_std::sender<sender<int, bool>>);
4464

4565
template <bool Noexcept>
4666
struct token {
47-
auto try_associate() -> bool { return {}; }
48-
auto disassociate() noexcept(Noexcept) -> void {}
67+
std::size_t* count{nullptr};
68+
auto try_associate() -> bool { return this->count && bool(++*this->count); }
69+
auto disassociate() noexcept(Noexcept) -> void { --*this->count; }
4970
template <test_std::sender Sender>
5071
auto wrap(Sender&& sender) -> Sender {
5172
return std::forward<Sender>(sender);
@@ -226,7 +247,7 @@ static_assert(test_std::sender<alloc_sender>);
226247
auto test_get_allocator() {
227248
{
228249
alloc_env ae{87};
229-
auto [alloc, ev] = test_detail::spawn_future_get_allocator(sender{}, ae);
250+
auto [alloc, ev] = test_detail::spawn_future_get_allocator(sender<>{}, ae);
230251
static_assert(std::same_as<decltype(alloc), allocator>);
231252
ASSERT(alloc == allocator{87});
232253
static_assert(std::same_as<decltype(ev), alloc_env>);
@@ -259,7 +280,7 @@ auto test_get_allocator() {
259280
ASSERT(test_std::get_stop_token(ev) == source.get_token());
260281
}
261282
{
262-
auto [alloc, ev] = test_detail::spawn_future_get_allocator(sender{}, env{42});
283+
auto [alloc, ev] = test_detail::spawn_future_get_allocator(sender<>{}, env{42});
263284
static_assert(std::same_as<decltype(alloc), std::allocator<void>>);
264285
static_assert(std::same_as<decltype(ev), env>);
265286
ASSERT(ev == env{42});
@@ -275,13 +296,104 @@ struct rcvr {
275296
};
276297
static_assert(test_std::receiver<rcvr>);
277298

278-
template <test_std::sender Sndr, test_std::async_scope_token Tok, typename Ev>
279-
auto test_spawn_future(Sndr&& sndr, Tok&& tok, Ev&& ev) {
280-
auto sender{test_std::spawn_future(std::forward<Sndr>(sndr), std::forward<Tok>(tok), std::forward<Ev>(ev))};
281-
static_assert(test_std::sender<decltype(sender)>);
282-
auto state(test_std::connect(std::move(sender), rcvr{}));
283-
static_assert(test_std::operation_state<decltype(state)>);
284-
test_std::start(state);
299+
auto test_spawn_future() {
300+
{
301+
std::size_t count{};
302+
sender<int>::state_base* handle{};
303+
int result{};
304+
ASSERT(count == 0u);
305+
ASSERT(handle == nullptr);
306+
ASSERT(result == 0);
307+
308+
auto sndr{test_std::spawn_future(
309+
sender<int>{&handle} | test_std::then([](int v) { return v; }), token<true>{&count}, env{})};
310+
ASSERT(count == 1u);
311+
ASSERT(handle != nullptr);
312+
ASSERT(result == 0);
313+
314+
{
315+
auto state(test_std::connect(std::move(sndr) | test_std::then([&result](int v) { result = v; }), rcvr{}));
316+
ASSERT(result == 0);
317+
318+
test_std::start(state);
319+
ASSERT(count == 1u);
320+
ASSERT(result == 0);
321+
322+
handle->complete(42);
323+
ASSERT(result == 42);
324+
ASSERT(count == 1u);
325+
}
326+
ASSERT(count == 0u);
327+
}
328+
{
329+
std::size_t count{};
330+
sender<int>::state_base* handle{};
331+
int result{};
332+
ASSERT(count == 0u);
333+
ASSERT(handle == nullptr);
334+
ASSERT(result == 0);
335+
336+
auto sndr{test_std::spawn_future(
337+
sender<int>{&handle} | test_std::then([](int v) { return v; }), token<true>{&count}, env{})};
338+
ASSERT(count == 1u);
339+
ASSERT(handle != nullptr);
340+
ASSERT(result == 0);
341+
342+
{
343+
auto state(test_std::connect(std::move(sndr) | test_std::then([&result](int v) { result = v; }), rcvr{}));
344+
ASSERT(result == 0);
345+
346+
handle->complete(42);
347+
ASSERT(result == 0);
348+
349+
test_std::start(state);
350+
351+
ASSERT(result == 42);
352+
ASSERT(count == 1u);
353+
}
354+
ASSERT(count == 0u);
355+
}
356+
{
357+
std::size_t count{};
358+
sender<int>::state_base* handle{};
359+
int result{};
360+
ASSERT(count == 0u);
361+
ASSERT(handle == nullptr);
362+
ASSERT(result == 0);
363+
364+
{
365+
auto sndr{test_std::spawn_future(
366+
sender<int>{&handle} | test_std::then([](int v) { return v; }), token<true>{&count}, env{})};
367+
ASSERT(count == 1u);
368+
ASSERT(handle != nullptr);
369+
ASSERT(result == 0);
370+
}
371+
ASSERT(count == 1u);
372+
handle->complete(17);
373+
ASSERT(count == 0u);
374+
ASSERT(result == 0);
375+
}
376+
{
377+
std::size_t count{};
378+
sender<int>::state_base* handle{};
379+
int result{};
380+
ASSERT(count == 0u);
381+
ASSERT(handle == nullptr);
382+
ASSERT(result == 0);
383+
384+
{
385+
auto sndr{test_std::spawn_future(
386+
sender<int>{&handle} | test_std::then([](int v) { return v; }), token<true>{&count}, env{})};
387+
ASSERT(count == 1u);
388+
ASSERT(handle != nullptr);
389+
ASSERT(result == 0);
390+
391+
handle->complete(17);
392+
ASSERT(result == 0);
393+
}
394+
ASSERT(count == 0u);
395+
ASSERT(result == 0);
396+
}
285397
}
286398

287399
} // namespace
@@ -299,5 +411,5 @@ TEST(exec_spawn_future) {
299411

300412
test_get_allocator();
301413

302-
test_spawn_future(sender{}, token<true>{}, env{});
414+
test_spawn_future();
303415
}

tests/beman/execution/include/test/execution.hpp

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -58,17 +58,16 @@ struct throws {
5858
auto operator=(const throws&) noexcept(false) -> throws& = default;
5959
};
6060

61-
inline auto death([[maybe_unused]] auto fun,
62-
[[maybe_unused]] ::std::source_location location = test::source_location::current()) noexcept
63-
-> void {
61+
inline auto
62+
death([[maybe_unused]] auto fun,
63+
[[maybe_unused]] ::std::source_location location = test::source_location::current()) noexcept -> void {
6464
#ifndef _MSC_VER
6565
switch (::pid_t rc = ::fork()) {
6666
default: {
6767
int stat{};
6868
ASSERT(rc == ::wait(&stat));
6969
if (stat == EXIT_SUCCESS) {
70-
::std::cerr << "failed death test at "
71-
<< "file=" << location.file_name() << ":" << location.line() << "\n"
70+
::std::cerr << "failed death test at " << "file=" << location.file_name() << ":" << location.line() << "\n"
7271
<< std::flush;
7372
ASSERT(stat != EXIT_SUCCESS);
7473
}

0 commit comments

Comments
 (0)