2323#include < boost/asio/associated_cancellation_slot.hpp>
2424#include < boost/asio/async_result.hpp>
2525#include < boost/asio/execution/context.hpp>
26- #include < boost/asio/io_context.hpp>
2726#include < boost/asio/query.hpp>
2827#include < boost/asio/spawn.hpp>
2928#include < boost/intrusive_ptr.hpp>
3029#include < boost/smart_ptr/intrusive_ref_counter.hpp>
3130#include " common/async/cancel_on_error.h"
3231#include " common/async/service.h"
33- #include " common/async/yield_context.h"
3432
3533namespace ceph ::async::detail {
3634
3735struct spawn_throttle_handler ;
3836
39- // Reference-counted spawn throttle interface .
37+ // Reference-counted spawn throttle implementation .
4038class spawn_throttle_impl :
4139 public boost::intrusive_ref_counter<spawn_throttle_impl,
42- boost::thread_unsafe_counter>
40+ boost::thread_unsafe_counter>,
41+ public service_list_base_hook
4342{
4443 public:
45- spawn_throttle_impl (size_t limit, cancel_on_error on_error)
46- : limit(limit), on_error(on_error),
44+ spawn_throttle_impl (boost::asio::yield_context yield,
45+ size_t limit, cancel_on_error on_error)
46+ : svc(boost::asio::use_service<service<spawn_throttle_impl>>(
47+ boost::asio::query (yield.get_executor(),
48+ boost::asio::execution::context))),
49+ yield(yield), limit(limit), on_error(on_error),
4750 children(std::make_unique<child[]>(limit))
4851 {
52+ // register for service_shutdown() notifications
53+ svc.add (*this );
54+
4955 // initialize the free list
5056 for (size_t i = 0 ; i < limit; i++) {
5157 free.push_back (children[i]);
5258 }
5359 }
54- virtual ~spawn_throttle_impl () {}
5560
56- // factory function
57- static auto create (optional_yield y, size_t limit, cancel_on_error on_error)
58- -> boost::intrusive_ptr<spawn_throttle_impl>;
61+ ~spawn_throttle_impl ()
62+ {
63+ svc.remove (*this );
64+ }
5965
6066 // return the completion handler for a new child. may block due to throttling
6167 // or rethrow an exception from a previously-spawned child
@@ -68,19 +74,42 @@ class spawn_throttle_impl :
6874 };
6975
7076 using executor_type = boost::asio::any_io_executor;
71- virtual executor_type get_executor () = 0;
77+ executor_type get_executor ()
78+ {
79+ return yield.get_executor ();
80+ }
7281
7382 // wait until count <= target_count
74- virtual void wait_for (size_t target_count) = 0;
83+ void wait_for (size_t target_count)
84+ {
85+ if (count > target_count) {
86+ wait_for_count = target_count;
87+
88+ boost::asio::async_initiate<boost::asio::yield_context, WaitSignature>(
89+ [this ] (auto handler) {
90+ auto slot = get_associated_cancellation_slot (handler);
91+ if (slot.is_connected ()) {
92+ slot.template emplace <op_cancellation>(this );
93+ }
94+ waiter.emplace (std::move (handler));
95+ }, yield);
96+ // this is a coroutine, so the wait has completed by this point
97+ }
98+
99+ report_exception (); // throw unreported exception
100+ }
75101
76102 // cancel outstanding coroutines
77- virtual void cancel (bool shutdown )
103+ void cancel ()
78104 {
79105 cancel_outstanding_from (outstanding.begin ());
106+ if (waiter) {
107+ wait_complete (make_error_code (boost::asio::error::operation_aborted));
108+ }
80109 }
81110
82111 // complete the given child coroutine
83- virtual void on_complete (child& c, std::exception_ptr eptr)
112+ void on_complete (child& c, std::exception_ptr eptr)
84113 {
85114 --count;
86115
@@ -103,9 +132,21 @@ class spawn_throttle_impl :
103132 }
104133 cancel_outstanding_from (cancel_from);
105134 }
135+
136+ if (waiter && count <= wait_for_count) {
137+ wait_complete ({});
138+ }
139+ }
140+
141+
142+ void service_shutdown ()
143+ {
144+ waiter.reset ();
106145 }
107146
108- protected:
147+ private:
148+ service<spawn_throttle_impl>& svc;
149+ boost::asio::yield_context yield;
109150 const size_t limit;
110151 const cancel_on_error on_error;
111152 size_t count = 0 ;
@@ -117,7 +158,6 @@ class spawn_throttle_impl :
117158 }
118159 }
119160
120- private:
121161 std::exception_ptr unreported_exception;
122162 std::unique_ptr<child[]> children;
123163
@@ -135,6 +175,42 @@ class spawn_throttle_impl :
135175 c.signal ->emit (boost::asio::cancellation_type::terminal);
136176 }
137177 }
178+
179+ using WaitSignature = void (boost::system::error_code);
180+ struct wait_state {
181+ using Work = boost::asio::executor_work_guard<
182+ boost::asio::any_io_executor>;
183+ using Handler = typename boost::asio::async_result<
184+ boost::asio::yield_context, WaitSignature>::handler_type;
185+
186+ Work work;
187+ Handler handler;
188+
189+ explicit wait_state (Handler&& h)
190+ : work(make_work_guard(h)),
191+ handler(std::move(h))
192+ {}
193+ };
194+ std::optional<wait_state> waiter;
195+ size_t wait_for_count = 0 ;
196+
197+ struct op_cancellation {
198+ spawn_throttle_impl* self;
199+ explicit op_cancellation (spawn_throttle_impl* self) noexcept
200+ : self(self) {}
201+ void operator ()(boost::asio::cancellation_type type) {
202+ if (type != boost::asio::cancellation_type::none) {
203+ self->cancel ();
204+ }
205+ }
206+ };
207+
208+ void wait_complete (boost::system::error_code ec)
209+ {
210+ auto w = std::move (*waiter);
211+ waiter.reset ();
212+ boost::asio::dispatch (boost::asio::append (std::move (w.handler ), ec));
213+ }
138214};
139215
140216// A cancellable spawn() completion handler that notifies the spawn_throttle
@@ -189,172 +265,4 @@ inline spawn_throttle_handler spawn_throttle_impl::get()
189265 return {this , c};
190266}
191267
192-
193- // Spawn throttle implementation for use in synchronous contexts where wait()
194- // blocks the calling thread until completion.
195- class sync_spawn_throttle_impl final : public spawn_throttle_impl {
196- static constexpr int concurrency = 1 ; // only run from a single thread
197- public:
198- sync_spawn_throttle_impl (size_t limit, cancel_on_error on_error)
199- : spawn_throttle_impl(limit, on_error),
200- ctx (std::in_place, concurrency)
201- {}
202-
203- executor_type get_executor () override
204- {
205- return ctx->get_executor ();
206- }
207-
208- void wait_for (size_t target_count) override
209- {
210- while (count > target_count) {
211- if (ctx->stopped ()) {
212- ctx->restart ();
213- }
214- ctx->run_one ();
215- }
216-
217- report_exception (); // throw unreported exception
218- }
219-
220- void cancel (bool shutdown) override
221- {
222- spawn_throttle_impl::cancel (shutdown);
223-
224- if (shutdown) {
225- // destroy the io_context to trigger two-phase shutdown which
226- // destroys any completion handlers with a reference to 'this'
227- ctx.reset ();
228- count = 0 ;
229- }
230- }
231-
232- private:
233- std::optional<boost::asio::io_context> ctx;
234- };
235-
236- // Spawn throttle implementation for use in asynchronous contexts where wait()
237- // suspends the calling stackful coroutine.
238- class async_spawn_throttle_impl final :
239- public spawn_throttle_impl,
240- public service_list_base_hook
241- {
242- public:
243- async_spawn_throttle_impl (boost::asio::yield_context yield,
244- size_t limit, cancel_on_error on_error)
245- : spawn_throttle_impl(limit, on_error),
246- svc (boost::asio::use_service<service<async_spawn_throttle_impl>>(
247- boost::asio::query (yield.get_executor(),
248- boost::asio::execution::context))),
249- yield(yield)
250- {
251- // register for service_shutdown() notifications
252- svc.add (*this );
253- }
254-
255- ~async_spawn_throttle_impl ()
256- {
257- svc.remove (*this );
258- }
259-
260- executor_type get_executor () override
261- {
262- return yield.get_executor ();
263- }
264-
265- void service_shutdown ()
266- {
267- waiter.reset ();
268- }
269-
270- private:
271- service<async_spawn_throttle_impl>& svc;
272- boost::asio::yield_context yield;
273-
274- using WaitSignature = void (boost::system::error_code);
275- struct wait_state {
276- using Work = boost::asio::executor_work_guard<
277- boost::asio::any_io_executor>;
278- using Handler = typename boost::asio::async_result<
279- boost::asio::yield_context, WaitSignature>::handler_type;
280-
281- Work work;
282- Handler handler;
283-
284- explicit wait_state (Handler&& h)
285- : work(make_work_guard(h)),
286- handler(std::move(h))
287- {}
288- };
289- std::optional<wait_state> waiter;
290- size_t wait_for_count = 0 ;
291-
292- struct op_cancellation {
293- async_spawn_throttle_impl* self;
294- explicit op_cancellation (async_spawn_throttle_impl* self) noexcept
295- : self(self) {}
296- void operator ()(boost::asio::cancellation_type type) {
297- if (type != boost::asio::cancellation_type::none) {
298- self->cancel (false );
299- }
300- }
301- };
302-
303- void wait_for (size_t target_count) override
304- {
305- if (count > target_count) {
306- wait_for_count = target_count;
307-
308- boost::asio::async_initiate<boost::asio::yield_context, WaitSignature>(
309- [this ] (auto handler) {
310- auto slot = get_associated_cancellation_slot (handler);
311- if (slot.is_connected ()) {
312- slot.template emplace <op_cancellation>(this );
313- }
314- waiter.emplace (std::move (handler));
315- }, yield);
316- // this is a coroutine, so the wait has completed by this point
317- }
318-
319- report_exception (); // throw unreported exception
320- }
321-
322- void wait_complete (boost::system::error_code ec)
323- {
324- auto w = std::move (*waiter);
325- waiter.reset ();
326- boost::asio::dispatch (boost::asio::append (std::move (w.handler ), ec));
327- }
328-
329- void on_complete (child& c, std::exception_ptr eptr) override
330- {
331- spawn_throttle_impl::on_complete (c, eptr);
332-
333- if (waiter && count <= wait_for_count) {
334- wait_complete ({});
335- }
336- }
337-
338- void cancel (bool shutdown) override
339- {
340- spawn_throttle_impl::cancel (shutdown);
341-
342- if (waiter) {
343- wait_complete (make_error_code (boost::asio::error::operation_aborted));
344- }
345- }
346- };
347-
348- inline auto spawn_throttle_impl::create (optional_yield y, size_t limit,
349- cancel_on_error on_error)
350- -> boost::intrusive_ptr<spawn_throttle_impl>
351- {
352- if (y) {
353- auto yield = y.get_yield_context ();
354- return new async_spawn_throttle_impl (yield, limit, on_error);
355- } else {
356- return new sync_spawn_throttle_impl (limit, on_error);
357- }
358- }
359-
360268} // namespace ceph::async::detail
0 commit comments