Skip to content

Commit f03d0ce

Browse files
committed
common/async: spawn_throttle wraps call to asio::spawn()
cancellation of the parent must immediately cancel its children, which only works if the children are on the same executor as the parent prohibit child coroutines from being spawned on a different executor by wrapping the call to asio::spawn() in a new spawn_throttle::spawn() interface expose an overload for asio::spawn()'s optional StackAllocator argument Signed-off-by: Casey Bodley <[email protected]>
1 parent 86385b1 commit f03d0ce

File tree

4 files changed

+79
-101
lines changed

4 files changed

+79
-101
lines changed

src/common/async/max_concurrent_for_each.h

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,9 @@ void max_concurrent_for_each(Iterator begin,
6060
}
6161
auto throttle = spawn_throttle{y, max_concurrent, on_error};
6262
for (Iterator i = begin; i != end; ++i) {
63-
boost::asio::spawn(throttle.get_executor(),
64-
[&func, &val = *i] (boost::asio::yield_context yield) {
65-
func(val, yield);
66-
}, throttle);
63+
throttle.spawn([&func, &val = *i] (boost::asio::yield_context yield) {
64+
func(val, yield);
65+
});
6766
}
6867
throttle.wait();
6968
}

src/common/async/spawn_throttle.h

Lines changed: 21 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,10 @@ namespace ceph::async {
2828
/// The parent may either be a synchronous function or a stackful coroutine,
2929
/// depending on the optional_yield constructor argument.
3030
///
31-
/// Child coroutines are spawned by calling boost::asio::spawn() and using the
32-
/// spawn_throttle object as the CompletionToken argument. Exceptions thrown
33-
/// by children are reported to the caller on its next call to get() or wait().
34-
/// The cancel_on_error option controls whether these exceptions trigger the
35-
/// cancellation of other children.
31+
/// Child coroutines take boost::asio::yield_context as the only argument.
32+
/// Exceptions thrown by children are reported to the caller on its next call
33+
/// to spawn() or wait(). The cancel_on_error option controls whether these
34+
/// exceptions trigger the cancellation of other children.
3635
///
3736
/// All child coroutines are canceled by cancel() or spawn_throttle destruction.
3837
/// This allows a parent function to share memory with its child coroutines
@@ -51,7 +50,7 @@ namespace ceph::async {
5150
/// auto throttle = ceph::async::spawn_throttle{y, 10};
5251
///
5352
/// for (size_t i = 0; i < count; i++) {
54-
/// boost::asio::spawn(throttle.get_executor(), child, throttle);
53+
/// throttle.spawn(child);
5554
/// }
5655
/// throttle.wait();
5756
/// }
@@ -86,19 +85,24 @@ class spawn_throttle {
8685
return impl->get_executor();
8786
}
8887

89-
/// Return a cancellable spawn() completion handler with signature
90-
/// void(std::exception_ptr).
88+
/// Spawn a cancellable coroutine to call the given function, passing its
89+
/// boost::asio::yield_context as the only argument.
9190
///
92-
/// This function may block until a throttle unit becomes available. If one or
93-
/// more previously-spawned coroutines exit with an exception, the first such
94-
/// exception is rethrown here.
95-
///
96-
/// As a convenience, you can avoid calling this function by using the
97-
/// spawn_throttle itself as a CompletionToken for spawn().
98-
auto get()
99-
-> detail::spawn_throttle_handler
91+
/// Before spawning, this function may block until a throttle unit becomes
92+
/// available. If one or more previously-spawned coroutines exit with an
93+
/// exception, the first such exception is rethrown here.
94+
template <typename F>
95+
void spawn(F&& f)
96+
{
97+
boost::asio::spawn(get_executor(), std::forward<F>(f), impl->get());
98+
}
99+
100+
/// /overload
101+
template <typename StackAllocator, typename F>
102+
void spawn(std::allocator_arg_t arg, StackAllocator&& alloc, F&& f)
100103
{
101-
return impl->get();
104+
boost::asio::spawn(get_executor(), arg, std::forward<StackAllocator>(alloc),
105+
std::forward<F>(f), impl->get());
102106
}
103107

104108
/// Wait for all outstanding completions before returning. If any
@@ -120,27 +124,3 @@ class spawn_throttle {
120124
};
121125

122126
} // namespace ceph::async
123-
124-
namespace boost::asio {
125-
126-
// Allow spawn_throttle to be used as a CompletionToken.
127-
template <typename Signature>
128-
struct async_result<ceph::async::spawn_throttle, Signature>
129-
{
130-
using completion_handler_type =
131-
ceph::async::detail::spawn_throttle_handler;
132-
async_result(completion_handler_type&) {}
133-
134-
using return_type = void;
135-
return_type get() {}
136-
137-
template <typename Initiation, typename... Args>
138-
static return_type initiate(Initiation&& init,
139-
ceph::async::spawn_throttle& throttle,
140-
Args&& ...args)
141-
{
142-
return std::move(init)(throttle.get(), std::forward<Args>(args)...);
143-
}
144-
};
145-
146-
} // namespace boost::asio

src/rgw/rgw_op.cc

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7156,10 +7156,9 @@ void RGWDeleteMultiObj::execute(optional_yield y)
71567156
auto group = ceph::async::spawn_throttle{y, max_aio};
71577157

71587158
for (const auto& key : multi_delete->objects) {
7159-
boost::asio::spawn(group.get_executor(),
7160-
[this, &key] (boost::asio::yield_context yield) {
7161-
handle_individual_object(key, yield);
7162-
}, group);
7159+
group.spawn([this, &key] (boost::asio::yield_context yield) {
7160+
handle_individual_object(key, yield);
7161+
});
71637162

71647163
rgw_flush_formatter(s, s->formatter);
71657164
}

src/test/common/test_async_spawn_throttle.cc

Lines changed: 52 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ TEST(YieldGroupSync, spawn_wait)
8181
};
8282

8383
auto throttle = spawn_throttle{null_yield, 2};
84-
asio::spawn(throttle.get_executor(), cr, throttle);
84+
throttle.spawn(cr);
8585
throttle.wait();
8686

8787
EXPECT_EQ(1, completed);
@@ -90,7 +90,7 @@ TEST(YieldGroupSync, spawn_wait)
9090
TEST(YieldGroupSync, spawn_shutdown)
9191
{
9292
auto throttle = spawn_throttle{null_yield, 2};
93-
asio::spawn(throttle.get_executor(), wait_for(1s), throttle);
93+
throttle.spawn(wait_for(1s));
9494
}
9595

9696
TEST(YieldGroupSync, spawn_cancel_wait)
@@ -103,7 +103,7 @@ TEST(YieldGroupSync, spawn_cancel_wait)
103103
};
104104

105105
auto throttle = spawn_throttle{null_yield, 2};
106-
asio::spawn(throttle.get_executor(), cr, throttle);
106+
throttle.spawn(cr);
107107
throttle.cancel();
108108
EXPECT_THROW(throttle.wait(), boost::system::system_error);
109109

@@ -120,10 +120,10 @@ TEST(YieldGroupSync, spawn_cancel_wait_spawn_wait)
120120
};
121121

122122
auto throttle = spawn_throttle{null_yield, 2};
123-
asio::spawn(throttle.get_executor(), cr, throttle);
123+
throttle.spawn(cr);
124124
throttle.cancel();
125125
EXPECT_THROW(throttle.wait(), boost::system::system_error);
126-
asio::spawn(throttle.get_executor(), cr, throttle);
126+
throttle.spawn(cr);
127127
throttle.wait();
128128

129129
EXPECT_EQ(1, completed);
@@ -148,10 +148,10 @@ TEST(YieldGroupSync, spawn_over_limit)
148148
};
149149

150150
auto throttle = spawn_throttle{null_yield, 2};
151-
asio::spawn(throttle.get_executor(), cr, throttle);
152-
asio::spawn(throttle.get_executor(), cr, throttle);
153-
asio::spawn(throttle.get_executor(), cr, throttle); // blocks
154-
asio::spawn(throttle.get_executor(), cr, throttle); // blocks
151+
throttle.spawn(cr);
152+
throttle.spawn(cr);
153+
throttle.spawn(cr); // blocks
154+
throttle.spawn(cr); // blocks
155155
throttle.wait(); // blocks
156156

157157
EXPECT_EQ(0, concurrent);
@@ -173,10 +173,10 @@ TEST(YieldGroupSync, spawn_cancel_on_error_none)
173173
};
174174

175175
auto throttle = spawn_throttle{null_yield, 4, cancel_on_error::none};
176-
asio::spawn(throttle.get_executor(), cr, throttle);
177-
asio::spawn(throttle.get_executor(), cr, throttle);
178-
asio::spawn(throttle.get_executor(), err, throttle);
179-
asio::spawn(throttle.get_executor(), cr, throttle);
176+
throttle.spawn(cr);
177+
throttle.spawn(cr);
178+
throttle.spawn(err);
179+
throttle.spawn(cr);
180180
EXPECT_THROW(throttle.wait(), std::logic_error);
181181

182182
EXPECT_EQ(3, completed);
@@ -196,10 +196,10 @@ TEST(YieldGroupSync, spawn_cancel_on_error_after)
196196
};
197197

198198
auto throttle = spawn_throttle{null_yield, 4, cancel_on_error::after};
199-
asio::spawn(throttle.get_executor(), cr, throttle);
200-
asio::spawn(throttle.get_executor(), cr, throttle);
201-
asio::spawn(throttle.get_executor(), err, throttle);
202-
asio::spawn(throttle.get_executor(), cr, throttle);
199+
throttle.spawn(cr);
200+
throttle.spawn(cr);
201+
throttle.spawn(err);
202+
throttle.spawn(cr);
203203
EXPECT_THROW(throttle.wait(), std::logic_error);
204204

205205
EXPECT_EQ(2, completed);
@@ -219,10 +219,10 @@ TEST(YieldGroupSync, spawn_cancel_on_error_all)
219219
};
220220

221221
auto throttle = spawn_throttle{null_yield, 4, cancel_on_error::all};
222-
asio::spawn(throttle.get_executor(), cr, throttle);
223-
asio::spawn(throttle.get_executor(), cr, throttle);
224-
asio::spawn(throttle.get_executor(), err, throttle);
225-
asio::spawn(throttle.get_executor(), cr, throttle);
222+
throttle.spawn(cr);
223+
throttle.spawn(cr);
224+
throttle.spawn(err);
225+
throttle.spawn(cr);
226226
EXPECT_THROW(throttle.wait(), std::logic_error);
227227

228228
EXPECT_EQ(0, completed);
@@ -247,7 +247,7 @@ TEST(YieldGroupAsync, spawn_wait)
247247

248248
asio::spawn(ctx, [&] (asio::yield_context yield) {
249249
auto throttle = spawn_throttle{yield, 2};
250-
asio::spawn(yield, wait_on(waiter), throttle);
250+
throttle.spawn(wait_on(waiter));
251251
throttle.wait(); // blocks
252252
}, rethrow);
253253

@@ -273,10 +273,10 @@ TEST(YieldGroupAsync, spawn_over_limit)
273273

274274
asio::spawn(ctx, [&] (asio::yield_context yield) {
275275
auto throttle = spawn_throttle{yield, 2};
276-
asio::spawn(yield, wait_on(waiter1), throttle);
277-
asio::spawn(yield, wait_on(waiter2), throttle);
278-
asio::spawn(yield, wait_on(waiter3), throttle); // blocks
279-
asio::spawn(yield, wait_on(waiter4), throttle); // blocks
276+
throttle.spawn(wait_on(waiter1));
277+
throttle.spawn(wait_on(waiter2));
278+
throttle.spawn(wait_on(waiter3)); // blocks
279+
throttle.spawn(wait_on(waiter4)); // blocks
280280
throttle.wait(); // blocks
281281
}, rethrow);
282282

@@ -320,7 +320,7 @@ TEST(YieldGroupAsync, spawn_shutdown)
320320

321321
asio::spawn(ctx, [&] (asio::yield_context yield) {
322322
auto throttle = spawn_throttle{yield, 2};
323-
asio::spawn(yield, wait_on(waiter1), throttle);
323+
throttle.spawn(wait_on(waiter1));
324324
waiter2.async_wait(yield); // blocks
325325
// shut down while there's an outstanding child but throttle is not
326326
// waiting on spawn() or wait()
@@ -340,8 +340,8 @@ TEST(YieldGroupAsync, spawn_throttled_shutdown)
340340

341341
asio::spawn(ctx, [&] (asio::yield_context yield) {
342342
auto throttle = spawn_throttle{yield, 1};
343-
asio::spawn(yield, wait_on(waiter1), throttle);
344-
asio::spawn(yield, wait_on(waiter2), throttle); // blocks
343+
throttle.spawn(wait_on(waiter1));
344+
throttle.spawn(wait_on(waiter2)); // blocks
345345
// shut down while we're throttled on the second spawn
346346
}, rethrow);
347347

@@ -358,7 +358,7 @@ TEST(YieldGroupAsync, spawn_wait_shutdown)
358358

359359
asio::spawn(ctx, [&] (asio::yield_context yield) {
360360
auto throttle = spawn_throttle{yield, 1};
361-
asio::spawn(yield, wait_on(waiter), throttle);
361+
throttle.spawn(wait_on(waiter));
362362
throttle.wait(); // blocks
363363
// shut down while we're wait()ing
364364
}, rethrow);
@@ -378,8 +378,8 @@ TEST(YieldGroupAsync, spawn_throttled_error)
378378

379379
asio::spawn(ctx, [&] (asio::yield_context yield) {
380380
auto throttle = spawn_throttle{yield, 1};
381-
asio::spawn(yield, wait_on(waiter1), throttle);
382-
asio::spawn(yield, wait_on(waiter2), throttle); // blocks
381+
throttle.spawn(wait_on(waiter1));
382+
throttle.spawn(wait_on(waiter2)); // blocks
383383
}, capture(result));
384384

385385
ctx.poll();
@@ -413,8 +413,8 @@ TEST(YieldGroupAsync, spawn_throttled_signal)
413413

414414
asio::spawn(ctx, [&] (asio::yield_context yield) {
415415
auto throttle = spawn_throttle{yield, 1};
416-
asio::spawn(yield, wait_on(waiter1), throttle);
417-
asio::spawn(yield, wait_on(waiter2), throttle); // blocks
416+
throttle.spawn(wait_on(waiter1));
417+
throttle.spawn(wait_on(waiter2)); // blocks
418418
}, capture(signal, result));
419419

420420
ctx.poll();
@@ -446,7 +446,7 @@ TEST(YieldGroupAsync, spawn_wait_error)
446446

447447
asio::spawn(ctx, [&] (asio::yield_context yield) {
448448
auto throttle = spawn_throttle{yield, 1};
449-
asio::spawn(yield, wait_on(waiter), throttle);
449+
throttle.spawn(wait_on(waiter));
450450
throttle.wait(); // blocks
451451
}, capture(result));
452452

@@ -479,7 +479,7 @@ TEST(YieldGroupAsync, spawn_wait_signal)
479479

480480
asio::spawn(ctx, [&] (asio::yield_context yield) {
481481
auto throttle = spawn_throttle{yield, 1};
482-
asio::spawn(yield, wait_on(waiter), throttle);
482+
throttle.spawn(wait_on(waiter));
483483
throttle.wait(); // blocks
484484
}, capture(signal, result));
485485

@@ -511,7 +511,7 @@ TEST(YieldGroupAsync, spawn_cancel_wait)
511511

512512
asio::spawn(ctx, [&] (asio::yield_context yield) {
513513
auto throttle = spawn_throttle{yield, 2};
514-
asio::spawn(yield, wait_on(waiter), throttle);
514+
throttle.spawn(wait_on(waiter));
515515
throttle.cancel();
516516
throttle.wait();
517517
}, capture(result));
@@ -539,9 +539,9 @@ TEST(YieldGroupAsync, spawn_cancel_on_error_none)
539539

540540
asio::spawn(ctx, [&] (asio::yield_context yield) {
541541
auto throttle = spawn_throttle{yield, 4, cancel_on_error::none};
542-
asio::spawn(yield, wait_on(waiter1), throttle);
543-
asio::spawn(yield, wait_on(waiter2), throttle);
544-
asio::spawn(yield, wait_on(waiter3), throttle);
542+
throttle.spawn(wait_on(waiter1));
543+
throttle.spawn(wait_on(waiter2));
544+
throttle.spawn(wait_on(waiter3));
545545
throttle.wait(); // blocks
546546
}, capture(result));
547547

@@ -586,9 +586,9 @@ TEST(YieldGroupAsync, spawn_cancel_on_error_after)
586586

587587
asio::spawn(ctx, [&] (asio::yield_context yield) {
588588
auto throttle = spawn_throttle{yield, 4, cancel_on_error::after};
589-
asio::spawn(yield, wait_on(waiter1), throttle);
590-
asio::spawn(yield, wait_on(waiter2), throttle);
591-
asio::spawn(yield, wait_on(waiter3), throttle);
589+
throttle.spawn(wait_on(waiter1));
590+
throttle.spawn(wait_on(waiter2));
591+
throttle.spawn(wait_on(waiter3));
592592
throttle.wait(); // blocks
593593
}, capture(result));
594594

@@ -629,9 +629,9 @@ TEST(YieldGroupAsync, spawn_cancel_on_error_all)
629629

630630
asio::spawn(ctx, [&] (asio::yield_context yield) {
631631
auto throttle = spawn_throttle{yield, 4, cancel_on_error::all};
632-
asio::spawn(yield, wait_on(waiter1), throttle);
633-
asio::spawn(yield, wait_on(waiter2), throttle);
634-
asio::spawn(yield, wait_on(waiter3), throttle);
632+
throttle.spawn(wait_on(waiter1));
633+
throttle.spawn(wait_on(waiter2));
634+
throttle.spawn(wait_on(waiter3));
635635
throttle.wait(); // blocks
636636
}, capture(result));
637637

@@ -665,9 +665,9 @@ TEST(YieldGroupAsync, spawn_wait_spawn_wait)
665665

666666
asio::spawn(ctx, [&] (asio::yield_context yield) {
667667
auto throttle = spawn_throttle{yield, 1};
668-
asio::spawn(yield, wait_on(waiter1), throttle);
668+
throttle.spawn(wait_on(waiter1));
669669
throttle.wait(); // blocks
670-
asio::spawn(yield, wait_on(waiter2), throttle);
670+
throttle.spawn(wait_on(waiter2));
671671
throttle.wait(); // blocks
672672
}, rethrow);
673673

@@ -698,10 +698,10 @@ TEST(YieldGroupAsync, spawn_cancel_wait_spawn_wait)
698698

699699
asio::spawn(ctx, [&] (asio::yield_context yield) {
700700
auto throttle = spawn_throttle{yield, 1};
701-
asio::spawn(yield, wait_on(waiter1), throttle);
701+
throttle.spawn(wait_on(waiter1));
702702
throttle.cancel();
703703
EXPECT_THROW(throttle.wait(), boost::system::system_error);
704-
asio::spawn(yield, wait_on(waiter2), throttle);
704+
throttle.spawn(wait_on(waiter2));
705705
throttle.wait(); // blocks
706706
}, rethrow);
707707

@@ -723,9 +723,9 @@ TEST(YieldGroupAsync, spawn_error_wait_spawn_wait)
723723

724724
asio::spawn(ctx, [&] (asio::yield_context yield) {
725725
auto throttle = spawn_throttle{yield, 1};
726-
asio::spawn(yield, wait_on(waiter1), throttle);
726+
throttle.spawn(wait_on(waiter1));
727727
EXPECT_THROW(throttle.wait(), boost::system::system_error);
728-
asio::spawn(yield, wait_on(waiter2), throttle);
728+
throttle.spawn(wait_on(waiter2));
729729
throttle.wait(); // blocks
730730
}, rethrow);
731731

0 commit comments

Comments
 (0)