2929#include < boost/smart_ptr/intrusive_ref_counter.hpp>
3030#include " common/async/cancel_on_error.h"
3131#include " common/async/service.h"
32- #include " common/async/yield_context.h"
3332
3433namespace ceph ::async::detail {
3534
3635struct spawn_throttle_handler ;
3736
38- // Reference-counted spawn throttle interface .
37+ // Reference-counted spawn throttle implementation .
3938class spawn_throttle_impl :
4039 public boost::intrusive_ref_counter<spawn_throttle_impl,
41- boost::thread_unsafe_counter>
40+ boost::thread_unsafe_counter>,
41+ public service_list_base_hook
4242{
4343 public:
44- spawn_throttle_impl (size_t limit, cancel_on_error on_error)
45- : limit(limit), on_error(on_error),
44+ spawn_throttle_impl (boost::asio::yield_context yield,
45+ size_t limit, cancel_on_error on_error)
46+ : svc(boost::asio::use_service<service<spawn_throttle_impl>>(
47+ boost::asio::query (yield.get_executor(),
48+ boost::asio::execution::context))),
49+ yield(yield), limit(limit), on_error(on_error),
4650 children(std::make_unique<child[]>(limit))
4751 {
52+ // register for service_shutdown() notifications
53+ svc.add (*this );
54+
4855 // initialize the free list
4956 for (size_t i = 0 ; i < limit; i++) {
5057 free.push_back (children[i]);
5158 }
5259 }
53- virtual ~spawn_throttle_impl () {}
60+
61+ ~spawn_throttle_impl ()
62+ {
63+ svc.remove (*this );
64+ }
5465
5566 // return the completion handler for a new child. may block due to throttling
5667 // or rethrow an exception from a previously-spawned child
@@ -63,19 +74,42 @@ class spawn_throttle_impl :
6374 };
6475
6576 using executor_type = boost::asio::any_io_executor;
66- virtual executor_type get_executor () = 0;
77+ executor_type get_executor ()
78+ {
79+ return yield.get_executor ();
80+ }
6781
6882 // wait until count <= target_count
69- virtual void wait_for (size_t target_count) = 0;
83+ void wait_for (size_t target_count)
84+ {
85+ if (count > target_count) {
86+ wait_for_count = target_count;
87+
88+ boost::asio::async_initiate<boost::asio::yield_context, WaitSignature>(
89+ [this ] (auto handler) {
90+ auto slot = get_associated_cancellation_slot (handler);
91+ if (slot.is_connected ()) {
92+ slot.template emplace <op_cancellation>(this );
93+ }
94+ waiter.emplace (std::move (handler));
95+ }, yield);
96+ // this is a coroutine, so the wait has completed by this point
97+ }
98+
99+ report_exception (); // throw unreported exception
100+ }
70101
71102 // cancel outstanding coroutines
72- virtual void cancel (bool shutdown )
103+ void cancel ()
73104 {
74105 cancel_outstanding_from (outstanding.begin ());
106+ if (waiter) {
107+ wait_complete (make_error_code (boost::asio::error::operation_aborted));
108+ }
75109 }
76110
77111 // complete the given child coroutine
78- virtual void on_complete (child& c, std::exception_ptr eptr)
112+ void on_complete (child& c, std::exception_ptr eptr)
79113 {
80114 --count;
81115
@@ -98,9 +132,21 @@ class spawn_throttle_impl :
98132 }
99133 cancel_outstanding_from (cancel_from);
100134 }
135+
136+ if (waiter && count <= wait_for_count) {
137+ wait_complete ({});
138+ }
139+ }
140+
141+
142+ void service_shutdown ()
143+ {
144+ waiter.reset ();
101145 }
102146
103- protected:
147+ private:
148+ service<spawn_throttle_impl>& svc;
149+ boost::asio::yield_context yield;
104150 const size_t limit;
105151 const cancel_on_error on_error;
106152 size_t count = 0 ;
@@ -112,7 +158,6 @@ class spawn_throttle_impl :
112158 }
113159 }
114160
115- private:
116161 std::exception_ptr unreported_exception;
117162 std::unique_ptr<child[]> children;
118163
@@ -130,6 +175,42 @@ class spawn_throttle_impl :
130175 c.signal ->emit (boost::asio::cancellation_type::terminal);
131176 }
132177 }
178+
179+ using WaitSignature = void (boost::system::error_code);
180+ struct wait_state {
181+ using Work = boost::asio::executor_work_guard<
182+ boost::asio::any_io_executor>;
183+ using Handler = typename boost::asio::async_result<
184+ boost::asio::yield_context, WaitSignature>::handler_type;
185+
186+ Work work;
187+ Handler handler;
188+
189+ explicit wait_state (Handler&& h)
190+ : work(make_work_guard(h)),
191+ handler(std::move(h))
192+ {}
193+ };
194+ std::optional<wait_state> waiter;
195+ size_t wait_for_count = 0 ;
196+
197+ struct op_cancellation {
198+ spawn_throttle_impl* self;
199+ explicit op_cancellation (spawn_throttle_impl* self) noexcept
200+ : self(self) {}
201+ void operator ()(boost::asio::cancellation_type type) {
202+ if (type != boost::asio::cancellation_type::none) {
203+ self->cancel ();
204+ }
205+ }
206+ };
207+
208+ void wait_complete (boost::system::error_code ec)
209+ {
210+ auto w = std::move (*waiter);
211+ waiter.reset ();
212+ boost::asio::dispatch (boost::asio::append (std::move (w.handler ), ec));
213+ }
133214};
134215
135216// A cancellable spawn() completion handler that notifies the spawn_throttle
@@ -184,114 +265,4 @@ inline spawn_throttle_handler spawn_throttle_impl::get()
184265 return {this , c};
185266}
186267
187- class async_spawn_throttle_impl final :
188- public spawn_throttle_impl,
189- public service_list_base_hook
190- {
191- public:
192- async_spawn_throttle_impl (boost::asio::yield_context yield,
193- size_t limit, cancel_on_error on_error)
194- : spawn_throttle_impl(limit, on_error),
195- svc (boost::asio::use_service<service<async_spawn_throttle_impl>>(
196- boost::asio::query (yield.get_executor(),
197- boost::asio::execution::context))),
198- yield(yield)
199- {
200- // register for service_shutdown() notifications
201- svc.add (*this );
202- }
203-
204- ~async_spawn_throttle_impl ()
205- {
206- svc.remove (*this );
207- }
208-
209- executor_type get_executor () override
210- {
211- return yield.get_executor ();
212- }
213-
214- void service_shutdown ()
215- {
216- waiter.reset ();
217- }
218-
219- private:
220- service<async_spawn_throttle_impl>& svc;
221- boost::asio::yield_context yield;
222-
223- using WaitSignature = void (boost::system::error_code);
224- struct wait_state {
225- using Work = boost::asio::executor_work_guard<
226- boost::asio::any_io_executor>;
227- using Handler = typename boost::asio::async_result<
228- boost::asio::yield_context, WaitSignature>::handler_type;
229-
230- Work work;
231- Handler handler;
232-
233- explicit wait_state (Handler&& h)
234- : work(make_work_guard(h)),
235- handler(std::move(h))
236- {}
237- };
238- std::optional<wait_state> waiter;
239- size_t wait_for_count = 0 ;
240-
241- struct op_cancellation {
242- async_spawn_throttle_impl* self;
243- explicit op_cancellation (async_spawn_throttle_impl* self) noexcept
244- : self(self) {}
245- void operator ()(boost::asio::cancellation_type type) {
246- if (type != boost::asio::cancellation_type::none) {
247- self->cancel (false );
248- }
249- }
250- };
251-
252- void wait_for (size_t target_count) override
253- {
254- if (count > target_count) {
255- wait_for_count = target_count;
256-
257- boost::asio::async_initiate<boost::asio::yield_context, WaitSignature>(
258- [this ] (auto handler) {
259- auto slot = get_associated_cancellation_slot (handler);
260- if (slot.is_connected ()) {
261- slot.template emplace <op_cancellation>(this );
262- }
263- waiter.emplace (std::move (handler));
264- }, yield);
265- // this is a coroutine, so the wait has completed by this point
266- }
267-
268- report_exception (); // throw unreported exception
269- }
270-
271- void wait_complete (boost::system::error_code ec)
272- {
273- auto w = std::move (*waiter);
274- waiter.reset ();
275- boost::asio::dispatch (boost::asio::append (std::move (w.handler ), ec));
276- }
277-
278- void on_complete (child& c, std::exception_ptr eptr) override
279- {
280- spawn_throttle_impl::on_complete (c, eptr);
281-
282- if (waiter && count <= wait_for_count) {
283- wait_complete ({});
284- }
285- }
286-
287- void cancel (bool shutdown) override
288- {
289- spawn_throttle_impl::cancel (shutdown);
290-
291- if (waiter) {
292- wait_complete (make_error_code (boost::asio::error::operation_aborted));
293- }
294- }
295- };
296-
297268} // namespace ceph::async::detail
0 commit comments