Skip to content

Commit 2b1f57b

Browse files
committed
move unifex::async_scope into the inline v0 namespace
* full name is `unifex::v0::async_scope` * still can be referred to as `unifex::async_scope`
1 parent d6e3ef3 commit 2b1f57b

File tree

2 files changed

+269
-247
lines changed

2 files changed

+269
-247
lines changed

include/unifex/async_scope.hpp

Lines changed: 1 addition & 247 deletions
Original file line numberDiff line numberDiff line change
@@ -15,250 +15,4 @@
1515
*/
1616
#pragma once
1717

18-
#include <unifex/config.hpp>
19-
#include <unifex/async_manual_reset_event.hpp>
20-
#include <unifex/get_stop_token.hpp>
21-
#include <unifex/inplace_stop_token.hpp>
22-
#include <unifex/just_from.hpp>
23-
#include <unifex/manual_lifetime.hpp>
24-
#include <unifex/receiver_concepts.hpp>
25-
#include <unifex/scheduler_concepts.hpp>
26-
#include <unifex/sender_concepts.hpp>
27-
#include <unifex/sequence.hpp>
28-
#include <unifex/then.hpp>
29-
#include <unifex/type_traits.hpp>
30-
#include <unifex/on.hpp>
31-
32-
#include <atomic>
33-
#include <memory>
34-
35-
#include <unifex/detail/prologue.hpp>
36-
37-
namespace unifex {
38-
39-
namespace _async_scope {
40-
41-
struct async_scope;
42-
43-
struct _receiver_base {
44-
[[noreturn]] void set_error(std::exception_ptr) noexcept {
45-
std::terminate();
46-
}
47-
48-
friend inplace_stop_token
49-
tag_invoke(tag_t<get_stop_token>, const _receiver_base& r) noexcept {
50-
return r.stopToken_;
51-
}
52-
53-
inplace_stop_token stopToken_;
54-
void* op_;
55-
async_scope* scope_;
56-
};
57-
58-
template <typename Sender>
59-
struct _receiver {
60-
struct type;
61-
};
62-
63-
template <typename Sender>
64-
using receiver = typename _receiver<Sender>::type;
65-
66-
void record_done(async_scope*) noexcept;
67-
68-
template <typename Sender>
69-
using _operation_t = connect_result_t<Sender, receiver<Sender>>;
70-
71-
template <typename Sender>
72-
struct _receiver<Sender>::type final : _receiver_base {
73-
template <typename Op>
74-
explicit type(inplace_stop_token stoken, Op* op, async_scope* scope) noexcept
75-
: _receiver_base{stoken, op, scope} {
76-
static_assert(same_as<Op, manual_lifetime<_operation_t<Sender>>>);
77-
}
78-
79-
// receivers uniquely own themselves; we don't need any special move-
80-
// construction behaviour, but we do need to ensure no copies are made
81-
type(type&&) noexcept = default;
82-
83-
~type() = default;
84-
85-
// it's just simpler to skip this
86-
type& operator=(type&&) = delete;
87-
88-
void set_value() noexcept {
89-
set_done();
90-
}
91-
92-
void set_done() noexcept {
93-
// we're about to delete this, so save the scope for later
94-
auto scope = scope_;
95-
auto op = static_cast<manual_lifetime<_operation_t<Sender>>*>(op_);
96-
op->destruct();
97-
delete op;
98-
record_done(scope);
99-
}
100-
};
101-
102-
struct async_scope {
103-
private:
104-
template <typename Scheduler, typename Sender>
105-
using _on_result_t =
106-
decltype(on(UNIFEX_DECLVAL(Scheduler&&), UNIFEX_DECLVAL(Sender&&)));
107-
108-
inplace_stop_source stopSource_;
109-
// (opState_ & 1) is 1 until we've been stopped
110-
// (opState_ >> 1) is the number of outstanding operations
111-
std::atomic<std::size_t> opState_{1};
112-
async_manual_reset_event evt_;
113-
114-
[[nodiscard]] auto await_and_sync() noexcept {
115-
return then(evt_.async_wait(), [this]() noexcept {
116-
// make sure to synchronize with all the fetch_subs being done while
117-
// operations complete
118-
(void)opState_.load(std::memory_order_acquire);
119-
});
120-
}
121-
122-
public:
123-
async_scope() noexcept = default;
124-
125-
~async_scope() {
126-
[[maybe_unused]] auto state = opState_.load(std::memory_order_relaxed);
127-
128-
UNIFEX_ASSERT(is_stopping(state));
129-
UNIFEX_ASSERT(op_count(state) == 0);
130-
}
131-
132-
template (typename Sender)
133-
(requires sender_to<Sender, receiver<Sender>>)
134-
void spawn(Sender&& sender) {
135-
// this could throw; if it does, there's nothing to clean up
136-
auto opToStart = std::make_unique<manual_lifetime<_operation_t<Sender>>>();
137-
138-
// this could throw; if it does, the only clean-up we need is to
139-
// deallocate the manual_lifetime, which is handled by opToStart's
140-
// destructor so we're good
141-
opToStart->construct_with([&] {
142-
return connect(
143-
(Sender&&) sender,
144-
receiver<Sender>{stopSource_.get_token(), opToStart.get(), this});
145-
});
146-
147-
// At this point, the rest of the function is noexcept, but opToStart's
148-
// destructor is no longer enough to properly clean up because it won't
149-
// invoke destruct(). We need to ensure that we either call destruct()
150-
// ourselves or complete the operation so *it* can call destruct().
151-
152-
if (try_record_start()) {
153-
// start is noexcept so we can assume that the operation will complete
154-
// after this, which means we can rely on its self-ownership to ensure
155-
// that it is eventually deleted
156-
unifex::start(opToStart.release()->get());
157-
}
158-
else {
159-
// we've been stopped so clean up and bail out
160-
opToStart->destruct();
161-
}
162-
}
163-
164-
template (typename Sender, typename Scheduler)
165-
(requires scheduler<Scheduler> AND
166-
sender_to<
167-
_on_result_t<Scheduler, Sender>,
168-
receiver<_on_result_t<Scheduler, Sender>>>)
169-
void spawn_on(Scheduler&& scheduler, Sender&& sender) {
170-
spawn(on((Scheduler&&) scheduler, (Sender&&) sender));
171-
}
172-
173-
template (typename Scheduler, typename Fun)
174-
(requires scheduler<Scheduler> AND callable<Fun>)
175-
void spawn_call_on(Scheduler&& scheduler, Fun&& fun) {
176-
static_assert(
177-
is_nothrow_callable_v<Fun>,
178-
"Please annotate your callable with noexcept.");
179-
spawn_on(
180-
(Scheduler&&) scheduler,
181-
just_from((Fun&&) fun));
182-
}
183-
184-
[[nodiscard]] auto complete() noexcept {
185-
return sequence(
186-
just_from([this] () noexcept {
187-
end_of_scope();
188-
}),
189-
await_and_sync());
190-
}
191-
192-
[[nodiscard]] auto cleanup() noexcept {
193-
return sequence(
194-
just_from([this]() noexcept {
195-
request_stop();
196-
}),
197-
await_and_sync());
198-
}
199-
200-
inplace_stop_token get_stop_token() noexcept {
201-
return stopSource_.get_token();
202-
}
203-
204-
void request_stop() noexcept {
205-
end_of_scope();
206-
stopSource_.request_stop();
207-
}
208-
209-
private:
210-
211-
static constexpr std::size_t stoppedBit{1};
212-
213-
static bool is_stopping(std::size_t state) noexcept {
214-
return (state & stoppedBit) == 0;
215-
}
216-
217-
static std::size_t op_count(std::size_t state) noexcept {
218-
return state >> 1;
219-
}
220-
221-
[[nodiscard]] bool try_record_start() noexcept {
222-
auto opState = opState_.load(std::memory_order_relaxed);
223-
224-
do {
225-
if (is_stopping(opState)) {
226-
return false;
227-
}
228-
229-
UNIFEX_ASSERT(opState + 2 > opState);
230-
} while (!opState_.compare_exchange_weak(
231-
opState,
232-
opState + 2,
233-
std::memory_order_relaxed));
234-
235-
return true;
236-
}
237-
238-
friend void record_done(async_scope* scope) noexcept {
239-
auto oldState = scope->opState_.fetch_sub(2, std::memory_order_release);
240-
241-
if (is_stopping(oldState) && op_count(oldState) == 1) {
242-
// the scope is stopping and we're the last op to finish
243-
scope->evt_.set();
244-
}
245-
}
246-
247-
void end_of_scope() noexcept {
248-
// stop adding work
249-
auto oldState = opState_.fetch_and(~stoppedBit, std::memory_order_release);
250-
251-
if (op_count(oldState) == 0) {
252-
// there are no outstanding operations to wait for
253-
evt_.set();
254-
}
255-
}
256-
};
257-
258-
} // namespace _async_scope
259-
260-
using _async_scope::async_scope;
261-
262-
} // namespace unifex
263-
264-
#include <unifex/detail/epilogue.hpp>
18+
#include <unifex/v0/async_scope.hpp>

0 commit comments

Comments
 (0)