Skip to content

Commit 63ce7c6

Browse files
ispetersjanondrusek
authored andcommitted
Make unifex::async_scope::spawn return a unifex::future<> (#372)
* Make async_scope::spawn return a lazy<> This diff changes `unifex::async_scope::spawn(Sender auto)` to return a new type, `unifex::lazy<...>`. `lazy<>` is a _Sender_ that completes with the result of the _Sender_ given to `spawn`. * Fix memory leak * Rationalize the names of some internal bits * Fuse the promise and the operation state This diff merges the spawned operation's promise with its operation state so there's only one allocation. One consequence of this change is that outstanding operations don't record themselves as complete within their corresponding scope until the associated `lazy<>` is either connected and started, or discarded. * Fix build Looks like our CI turns on exhaustive switch warnings, which I broke. * Fix infinite hang in create_test.cpp * Cleanup * Last bit of cleanup before bed * Try to fix ASAN failure In the light of the morning, I think it's wrong to set the event before requesting stop on the stop source because setting the event could lead to destruction of the operation state, invalidating the stop source. * Fix request_stop() The callers of `request_stop()` are not owners so they may not call `decref()`, which means I can't invoke `set_done()` from `request_stop()`. * Clean up, bug fixes, comments Among other clean-up, this diff fixes a stack-use-after-return in `future<>`'s `connect()`. * Add constraints and run clang-format * Comments, tests, bug fixes, and clang-format * Restore nothrow assertion in detached_spawn_call_on * Round out the async_scope tests add async_scope::attach (#392) * returned `Sender` needs to be connected and started * avoids paying penalty of `future<>` Fix `record_done` ordering in `async_scope::attach` (#424) * `record_done`, which decrements outstanding operation count, must be called after `set_*` * add regression test fix cancellation race in async_scope::attach* (#425) * use refcount to pass ownership of `deliver_result` * replace `fused_stop_source` with `inplace_stop_source` make `async_scope::attached_sender` copyable (#428) * copy constructor calls `async_scope::try_record_start` internally * copy of attached Sender will increment outstanding number of operations on async_scope `async_scope::attach` cleanup (#433) * remove unused template argument * add missing test case update `async_scope` docs (#434) * spawn() -> detached_spawn() * spawn() returns a `future` * add missing public methods add `async_scope::attach` docs (#434) fix `async_scope_test::attach_record_done` (#437) fix `tag_invoke(CPO)` in `async_scope` (#462) add `unifex::v2::async_scope` (#463) * simpler than `unifex::v1::async_scope` (`nest()` and `join()`) * does not support cancellation Introduce unifex::nest() (#468) `unifex::nest()` is a CPO that delegates to either a `tag_invoke` customization taking a *Sender* and a "scope" reference, or to a member function on the given scope that takes a *Sender*. This diff wires `unifex::nest()` to the `nest()` member function on `v2::async_scope` and to the `attach()` member function on `v1::async_scope`. Introduce spawn_detached(sender, scope, allocator) (#470) This diff introduces a new algorithm, `unifex::spawn_detached()`. `spawn_detached` takes a sender, an "async scope", and an optional allocator. It nests the sender in the scope with `unifex::nest`, allocates and operation state using the allocator, and starts that operation. The given async scope may be anything that `nest()` supports, which currently includes both `v1::async_scope` and `v2::async_scope`. Add an internal receiver to v2::async_scope's nest op (#484) While writing `unifex::spawn_future()`, I discovered that waiting until the destructor of the `v2::async_scope`'s `nest()` operation to drop the scope reference is too late (it led to hangs). This diff adds an internal receiver to the nest operation so we can detect when the operation is complete (which is likely before the operation state is destroyed) and drop the reference as soon as we reach that state. Fix stop_when's handling of stop requests (#500) * Fix stop_when's handling of stop requests When trying to sync PR #495 into our internal repo, I discovered that there's a lifetime issue in `stop_when()`. If the `stop_when()` operation receives a stop request from its Receiver and the last-to-finish child operation completes synchronously in response to the stop request then `stop_when()`'s stop callback will access the internal stop source after it's been destroyed. This first diff just formats `test/stop_when_test.cpp` and `include/unifex/stop_when.hpp` with `clang-format` in preparation for fixing the above problem. * Add a broken unit test This diff adds a unit test to `test/stop_when_test.cpp` that crashes when ASAN is enabled because it dereferences a destroyed `inplace_stop_source`. * Increment stop_when's refcount in its stop callback This diff fixes the broken test from the previous diff by incrementing the `stop_when` operation's refcount while processing a stop request from the receiver. Add spawn_future() (#489) This diff adds `unifex::spawn_future(Sender, Scope)`. Implement async_scope::spawn with spawn_future (#501) This diff reimplements the `v1::async_scope::spawn()` method in terms of the newly-added `unifex::spawn_future()` algorithm. I had to delete a few tests that exercise behaviour that's no longer supported. Work around an MSVC bug in C++20 mode (#492) * merge unit test that depends on `v2/async_scope` Co-authored-by: Ian Petersen <ispeters@gmail.com>
1 parent 2b1f57b commit 63ce7c6

22 files changed

+5225
-476
lines changed

doc/api_reference.md

Lines changed: 77 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
* `with_query_value()`
3939
* `with_allocator()`
4040
* `done_as_optional()`
41+
* `nest()`
4142
* Sender Types
4243
* `async_trace_sender`
4344
* Sender Queries
@@ -711,6 +712,13 @@ task<void> g() {
711712
}
712713
```
713714

715+
### `nest(Sender sender, Scope& scope) -> Sender`
716+
717+
`nest` registers the given *Sender* with the given "scope", which should be of a
718+
type that works like `v1::async_scope` or `v2::async_scope`. A *Sender* that has
719+
been registered with a scope will prevent that scope from being joined until the
720+
*Sender* has either been discarded or executed.
721+
714722
## Sender Types
715723

716724
### `async_trace_sender`
@@ -1201,8 +1209,10 @@ namespace unifex
12011209
// Asserts if the sender returned from cleanup has not yet completed.
12021210
~async_scope();
12031211
1204-
// Returns a sender that, when started, marks this scope as cleaned up,
1205-
// requests stop on the internal stop source, and then waits for all
1212+
// Returns a sender that, when started, marks this scope so that no more
1213+
// work can be spawned within it,
1214+
// requests stop on the internal stop source,
1215+
// i.e. cancellation of all outstanding work, and then waits for all
12061216
// outstanding work to complete.
12071217
//
12081218
// The sender returned from cleanup must complete before this scope is
@@ -1212,28 +1222,87 @@ namespace unifex
12121222
// times in series or in parallel).
12131223
[[nodiscard]] sender auto cleanup() noexcept;
12141224
1215-
// Connects sender to an internal receiver and starts the operation. Once
1225+
// Returns a sender that, when started, marks this scope so that no more
1226+
// work can be spawned within it and then waits for all outstanding work
1227+
// to complete.
1228+
//
1229+
// The sender returned from complete must complete before this scope is
1230+
// destroyed.
1231+
//
1232+
// complete is thread-safe and idempotent (i.e. it can be invoked multiple
1233+
// times in series or in parallel).
1234+
[[nodiscard]] sender auto complete() noexcept;
1235+
1236+
// Returns a stop token from the scope's internal stop source.
1237+
inplace_stop_token get_stop_token() noexcept;
1238+
1239+
// Marks the scope so that no more work can be spawned within it,
1240+
// requests stop on the internal stop source,
1241+
// i.e. cancellation of all outstanding work
1242+
void request_stop() noexcept;
1243+
1244+
// Implemented as (void)spawn(sender).
1245+
void detached_spawn(sender);
1246+
1247+
// Implemented as detached_spawn(on(scheduler, sender)).
1248+
void detached_spawn_on(scheduler, sender);
1249+
1250+
// Implemented as detached_spawn_on(scheduler, just_from(invocable)).
1251+
void detached_spawn_call_on(scheduler, invocable);
1252+
1253+
// Connects sender to an internal receiver and starts the operation. Once
12161254
// started, the given sender must complete with void or done; completing
12171255
// with an error will result in a call to std::terminate.
12181256
//
12191257
// The receiver to which the sender is connected responds to get_stop_token
12201258
// with a stoppable token that becomes stopped when clean-up begins.
12211259
//
12221260
// Space for the operation state is allocated with std::make_unique and
1223-
// so this operation may throw if the allocation fails. This operation may
1261+
// so this operation may throw if the allocation fails. This operation may
12241262
// also throw if connect throws.
12251263
//
12261264
// Once connect has succeeded, start will only be called if this scope has
12271265
// not yet been cleaned up; if a call to spawn loses a race with a call to
12281266
// cleanup, the operation state created by connect will be destroyed and
12291267
// deallocated without being started.
1230-
void spawn(sender);
1268+
//
1269+
// future is a handle to an eagerly-started operation; it is also a Sender,
1270+
// the result is retreived by connecting it to an appropriate Receiver
1271+
// and starting the resulting Operation.
1272+
//
1273+
// Discarding a future without connecting or starting it requests cancellation
1274+
// of the associated operation and discards the operation's result when it
1275+
// ultimately completes.
1276+
//
1277+
// Requesting cancellation of a connected-and-started future will also request
1278+
// cancellation of the associated operation.
1279+
future spawn(sender);
12311280
1232-
// Implemented as spawn(on(scheduler, sender)).
1233-
void spawn_on(scheduler, sender);
1281+
// Implemented as detached_spawn(on(scheduler, sender)).
1282+
future spawn_on(scheduler, sender);
12341283
12351284
// Implemented as spawn_on(scheduler, just_from(invocable)).
1236-
void spawn_call_on(scheduler, invocable);
1285+
future spawn_call_on(scheduler, invocable);
1286+
1287+
// Returns a new sender that, when connected and started, connects and starts
1288+
// the given sender.
1289+
//
1290+
// Returned sender owns a reference to this async_scope. Discarding the sender
1291+
// prior to connecting it or discarding the operation prior to starting it
1292+
// discards the reference to this async_scope.
1293+
//
1294+
// The receiver to which the sender is connected responds to get_stop_token
1295+
// with a stoppable token that becomes stopped when clean-up begins.
1296+
[[nodiscard]] sender auto attach(sender);
1297+
1298+
// Implemented as attach(just_from(fun))
1299+
[[nodiscard]] sender auto attach_call(fun);
1300+
1301+
// Implemented as attach(on(scheduler, sender))
1302+
[[nodiscard]] sender auto attach_on(scheduler, sender);
1303+
1304+
// Implemented as attach_on(scheduler, just_from(fun))
1305+
[[nodiscard]] sender auto attach_call_on(scheduler, fun);
12371306
};
12381307
}
12391308
```

include/unifex/config.hpp.in

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@
176176
#define UNIFEX_DIAGNOSTIC_IGNORE_INIT_LIST_LIFETIME
177177
#define UNIFEX_DIAGNOSTIC_IGNORE_FLOAT_EQUAL
178178
#define UNIFEX_DIAGNOSTIC_IGNORE_CPP2A_COMPAT
179+
#define UNIFEX_DIAGNOSTIC_IGNORE_UNUSED_RESULT
179180
#else // ^^^ defined(_MSC_VER) ^^^ / vvv !defined(_MSC_VER) vvv
180181
#if defined(__GNUC__) || defined(__clang__)
181182
#define UNIFEX_PRAGMA(X) _Pragma(#X)
@@ -194,12 +195,15 @@
194195
UNIFEX_DIAGNOSTIC_IGNORE("-Wfloat-equal")
195196
#define UNIFEX_DIAGNOSTIC_IGNORE_CPP2A_COMPAT \
196197
UNIFEX_DIAGNOSTIC_IGNORE("-Wc++2a-compat")
198+
#define UNIFEX_DIAGNOSTIC_IGNORE_UNUSED_RESULT \
199+
UNIFEX_DIAGNOSTIC_IGNORE("-Wunused-result")
197200
#else
198201
#define UNIFEX_DIAGNOSTIC_PUSH
199202
#define UNIFEX_DIAGNOSTIC_POP
200203
#define UNIFEX_DIAGNOSTIC_IGNORE_INIT_LIST_LIFETIME
201204
#define UNIFEX_DIAGNOSTIC_IGNORE_FLOAT_EQUAL
202205
#define UNIFEX_DIAGNOSTIC_IGNORE_CPP2A_COMPAT
206+
#define UNIFEX_DIAGNOSTIC_IGNORE_UNUSED_RESULT
203207
#endif
204208
#endif // MSVC/Generic configuration switch
205209

include/unifex/fused_stop_source.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include <optional>
1919

2020
#include <unifex/inplace_stop_token.hpp>
21+
2122
#include <unifex/detail/prologue.hpp>
2223

2324
namespace unifex {

include/unifex/nest.hpp

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License Version 2.0 with LLVM Exceptions
5+
* (the "License"); you may not use this file except in compliance with
6+
* the License. You may obtain a copy of the License at
7+
*
8+
* https://llvm.org/LICENSE.txt
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
#pragma once
17+
18+
#include <unifex/config.hpp>
19+
#include <unifex/bind_back.hpp>
20+
#include <unifex/sender_concepts.hpp>
21+
#include <unifex/tag_invoke.hpp>
22+
23+
#include <unifex/detail/prologue.hpp>
24+
25+
namespace unifex {
26+
27+
namespace _nest_cpo {
28+
29+
inline const struct _nest_fn final {
30+
private:
31+
template <typename Scope, typename Sender>
32+
using nest_member_result_t =
33+
decltype(UNIFEX_DECLVAL(Scope&).nest(UNIFEX_DECLVAL(Sender)));
34+
35+
template <typename Scope, typename Sender>
36+
static constexpr bool is_nest_member_nothrow_v =
37+
noexcept(UNIFEX_DECLVAL(Scope&).nest(UNIFEX_DECLVAL(Sender)));
38+
39+
struct deref;
40+
41+
public:
42+
template(typename Sender, typename Scope) //
43+
(requires typed_sender<Sender> AND tag_invocable<_nest_fn, Sender, Scope&>
44+
AND typed_sender<tag_invoke_result_t<_nest_fn, Sender, Scope&>>) //
45+
auto
46+
operator()(Sender&& sender, Scope& scope) const
47+
noexcept(is_nothrow_tag_invocable_v<_nest_fn, Sender, Scope&>)
48+
-> tag_invoke_result_t<_nest_fn, Sender, Scope&> {
49+
return tag_invoke(_nest_fn{}, static_cast<Sender&&>(sender), scope);
50+
}
51+
52+
template(typename Sender, typename Scope) //
53+
(requires typed_sender<Sender> AND(
54+
!tag_invocable<_nest_fn, Sender, Scope&>)
55+
AND typed_sender<nest_member_result_t<Scope, Sender>>) //
56+
auto
57+
operator()(Sender&& sender, Scope& scope) const
58+
noexcept(is_nest_member_nothrow_v<Scope, Sender>)
59+
-> nest_member_result_t<Scope, Sender> {
60+
return scope.nest(static_cast<Sender&&>(sender));
61+
}
62+
63+
template <typename Scope>
64+
constexpr auto operator()(Scope& scope) const
65+
noexcept(is_nothrow_callable_v<tag_t<bind_back>, deref, Scope*>)
66+
-> bind_back_result_t<deref, Scope*>;
67+
} nest{};
68+
69+
struct _nest_fn::deref final {
70+
template <typename Sender, typename Scope>
71+
constexpr auto operator()(Sender&& sender, Scope* scope) const
72+
noexcept(noexcept(_nest_fn{}(static_cast<Sender&&>(sender), *scope)))
73+
-> decltype(_nest_fn{}(static_cast<Sender&&>(sender), *scope)) {
74+
return _nest_fn{}(static_cast<Sender&&>(sender), *scope);
75+
}
76+
};
77+
78+
template <typename Scope>
79+
inline constexpr auto _nest_fn::operator()(Scope& scope) const
80+
noexcept(is_nothrow_callable_v<tag_t<bind_back>, deref, Scope*>)
81+
-> bind_back_result_t<deref, Scope*> {
82+
// bind_back will try to store a copy of any lvalue references it's passed,
83+
// which doesn't work for us here so we have to pass a scope pointer instead
84+
// of a scope reference. we don't, in general, want to expose a
85+
// `nest(Sender&&, Scope*)` function so we use `_nest_fn::deref` to do the
86+
// indirection for us in this scenario.
87+
return bind_back(deref{}, &scope);
88+
}
89+
90+
} // namespace _nest_cpo
91+
92+
using _nest_cpo::nest;
93+
94+
} // namespace unifex
95+
96+
#include <unifex/detail/epilogue.hpp>

0 commit comments

Comments
 (0)