Skip to content

Commit 4956c63

Browse files
committed
common/async: remove null_yield support for spawn_throttle
null_yield support was provided by sync_spawn_throttle_impl, which itself stores the boost::asio::io_context that it runs on. however, valgrind reports issues with object lifetimes each spawn_throttle_handler holds a reference count on the spawn_throttle_impl to prevent it from going away while coroutines are in flight. on completion, this spawn_throttle_handler gets invoked on its associated executor. this invocation may drop the last reference to spawn_throttle_impl while that associated executor is still trying to use the io_context that was destroyed with sync_spawn_throttle_impl finding no good way for a custom executor to preserve the lifetime of the spawn_throttle_impl, we instead remove support for this flawed design. the null_yield callers must first spawn a parent coroutine before using spawn_throttle Fixes: https://tracker.ceph.com/issues/70965 Signed-off-by: Casey Bodley <[email protected]>
1 parent 1cefcae commit 4956c63

File tree

3 files changed

+5
-233
lines changed

3 files changed

+5
-233
lines changed

src/common/async/detail/spawn_throttle_impl.h

Lines changed: 0 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
#include <boost/asio/associated_cancellation_slot.hpp>
2424
#include <boost/asio/async_result.hpp>
2525
#include <boost/asio/execution/context.hpp>
26-
#include <boost/asio/io_context.hpp>
2726
#include <boost/asio/query.hpp>
2827
#include <boost/asio/spawn.hpp>
2928
#include <boost/intrusive_ptr.hpp>
@@ -53,10 +52,6 @@ class spawn_throttle_impl :
5352
}
5453
virtual ~spawn_throttle_impl() {}
5554

56-
// factory function
57-
static auto create(optional_yield y, size_t limit, cancel_on_error on_error)
58-
-> boost::intrusive_ptr<spawn_throttle_impl>;
59-
6055
// return the completion handler for a new child. may block due to throttling
6156
// or rethrow an exception from a previously-spawned child
6257
spawn_throttle_handler get();
@@ -189,52 +184,6 @@ inline spawn_throttle_handler spawn_throttle_impl::get()
189184
return {this, c};
190185
}
191186

192-
193-
// Spawn throttle implementation for use in synchronous contexts where wait()
194-
// blocks the calling thread until completion.
195-
class sync_spawn_throttle_impl final : public spawn_throttle_impl {
196-
static constexpr int concurrency = 1; // only run from a single thread
197-
public:
198-
sync_spawn_throttle_impl(size_t limit, cancel_on_error on_error)
199-
: spawn_throttle_impl(limit, on_error),
200-
ctx(std::in_place, concurrency)
201-
{}
202-
203-
executor_type get_executor() override
204-
{
205-
return ctx->get_executor();
206-
}
207-
208-
void wait_for(size_t target_count) override
209-
{
210-
while (count > target_count) {
211-
if (ctx->stopped()) {
212-
ctx->restart();
213-
}
214-
ctx->run_one();
215-
}
216-
217-
report_exception(); // throw unreported exception
218-
}
219-
220-
void cancel(bool shutdown) override
221-
{
222-
spawn_throttle_impl::cancel(shutdown);
223-
224-
if (shutdown) {
225-
// destroy the io_context to trigger two-phase shutdown which
226-
// destroys any completion handlers with a reference to 'this'
227-
ctx.reset();
228-
count = 0;
229-
}
230-
}
231-
232-
private:
233-
std::optional<boost::asio::io_context> ctx;
234-
};
235-
236-
// Spawn throttle implementation for use in asynchronous contexts where wait()
237-
// suspends the calling stackful coroutine.
238187
class async_spawn_throttle_impl final :
239188
public spawn_throttle_impl,
240189
public service_list_base_hook
@@ -345,16 +294,4 @@ class async_spawn_throttle_impl final :
345294
}
346295
};
347296

348-
inline auto spawn_throttle_impl::create(optional_yield y, size_t limit,
349-
cancel_on_error on_error)
350-
-> boost::intrusive_ptr<spawn_throttle_impl>
351-
{
352-
if (y) {
353-
auto yield = y.get_yield_context();
354-
return new async_spawn_throttle_impl(yield, limit, on_error);
355-
} else {
356-
return new sync_spawn_throttle_impl(limit, on_error);
357-
}
358-
}
359-
360297
} // namespace ceph::async::detail

src/common/async/spawn_throttle.h

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,8 @@
2323

2424
namespace ceph::async {
2525

26-
/// A coroutine throttle that allows a thread of execution to spawn and manage
26+
/// A coroutine throttle that allows a parent coroutine to spawn and manage
2727
/// multiple child coroutines, while enforcing an upper bound on concurrency.
28-
/// The parent may either be a synchronous function or a stackful coroutine,
29-
/// depending on the optional_yield constructor argument.
3028
///
3129
/// Child coroutines take boost::asio::yield_context as the only argument.
3230
/// Exceptions thrown by children are reported to the caller on its next call
@@ -44,10 +42,10 @@ namespace ceph::async {
4442
/// @code
4543
/// void child(boost::asio::yield_context yield);
4644
///
47-
/// void parent(size_t count, optional_yield y)
45+
/// void parent(size_t count, boost::asio::yield_context yield)
4846
/// {
4947
/// // spawn all children, up to 10 at a time
50-
/// auto throttle = ceph::async::spawn_throttle{y, 10};
48+
/// auto throttle = ceph::async::spawn_throttle{yield, 10};
5149
///
5250
/// for (size_t i = 0; i < count; i++) {
5351
/// throttle.spawn(child);
@@ -60,9 +58,9 @@ class spawn_throttle {
6058
boost::intrusive_ptr<impl_type> impl;
6159

6260
public:
63-
spawn_throttle(optional_yield y, size_t limit,
61+
spawn_throttle(boost::asio::yield_context yield, size_t limit,
6462
cancel_on_error on_error = cancel_on_error::none)
65-
: impl(detail::spawn_throttle_impl::create(y, limit, on_error))
63+
: impl(new detail::async_spawn_throttle_impl(yield, limit, on_error))
6664
{}
6765

6866
spawn_throttle(spawn_throttle&&) = default;

src/test/common/test_async_spawn_throttle.cc

Lines changed: 0 additions & 163 deletions
Original file line numberDiff line numberDiff line change
@@ -66,169 +66,6 @@ auto wait_on(yield_waiter<void>& handler)
6666
}
6767

6868

69-
TEST(YieldGroupSync, wait_empty)
70-
{
71-
auto throttle = spawn_throttle{null_yield, 2};
72-
throttle.wait();
73-
}
74-
75-
TEST(YieldGroupSync, spawn_wait)
76-
{
77-
int completed = 0;
78-
auto cr = [&] (asio::yield_context yield) {
79-
wait_for(1ms, yield);
80-
++completed;
81-
};
82-
83-
auto throttle = spawn_throttle{null_yield, 2};
84-
throttle.spawn(cr);
85-
throttle.wait();
86-
87-
EXPECT_EQ(1, completed);
88-
}
89-
90-
TEST(YieldGroupSync, spawn_shutdown)
91-
{
92-
auto throttle = spawn_throttle{null_yield, 2};
93-
throttle.spawn(wait_for(1s));
94-
}
95-
96-
TEST(YieldGroupSync, spawn_cancel_wait)
97-
{
98-
int completed = 0;
99-
100-
auto cr = [&] (asio::yield_context yield) {
101-
wait_for(1s, yield);
102-
++completed;
103-
};
104-
105-
auto throttle = spawn_throttle{null_yield, 2};
106-
throttle.spawn(cr);
107-
throttle.cancel();
108-
EXPECT_THROW(throttle.wait(), boost::system::system_error);
109-
110-
EXPECT_EQ(0, completed);
111-
}
112-
113-
TEST(YieldGroupSync, spawn_cancel_wait_spawn_wait)
114-
{
115-
int completed = 0;
116-
117-
auto cr = [&] (asio::yield_context yield) {
118-
wait_for(1ms, yield);
119-
++completed;
120-
};
121-
122-
auto throttle = spawn_throttle{null_yield, 2};
123-
throttle.spawn(cr);
124-
throttle.cancel();
125-
EXPECT_THROW(throttle.wait(), boost::system::system_error);
126-
throttle.spawn(cr);
127-
throttle.wait();
128-
129-
EXPECT_EQ(1, completed);
130-
}
131-
132-
TEST(YieldGroupSync, spawn_over_limit)
133-
{
134-
int concurrent = 0;
135-
int max_concurrent = 0;
136-
int completed = 0;
137-
138-
auto cr = [&] (asio::yield_context yield) {
139-
++concurrent;
140-
if (max_concurrent < concurrent) {
141-
max_concurrent = concurrent;
142-
}
143-
144-
wait_for(1ms, yield);
145-
146-
--concurrent;
147-
++completed;
148-
};
149-
150-
auto throttle = spawn_throttle{null_yield, 2};
151-
throttle.spawn(cr);
152-
throttle.spawn(cr);
153-
throttle.spawn(cr); // blocks
154-
throttle.spawn(cr); // blocks
155-
throttle.wait(); // blocks
156-
157-
EXPECT_EQ(0, concurrent);
158-
EXPECT_EQ(2, max_concurrent);
159-
EXPECT_EQ(4, completed);
160-
}
161-
162-
TEST(YieldGroupSync, spawn_cancel_on_error_none)
163-
{
164-
int completed = 0;
165-
166-
auto cr = [&] (asio::yield_context yield) {
167-
wait_for(10ms, yield);
168-
++completed;
169-
};
170-
auto err = [] (asio::yield_context yield) {
171-
wait_for(0ms, yield);
172-
throw std::logic_error{"err"};
173-
};
174-
175-
auto throttle = spawn_throttle{null_yield, 4, cancel_on_error::none};
176-
throttle.spawn(cr);
177-
throttle.spawn(cr);
178-
throttle.spawn(err);
179-
throttle.spawn(cr);
180-
EXPECT_THROW(throttle.wait(), std::logic_error);
181-
182-
EXPECT_EQ(3, completed);
183-
}
184-
185-
TEST(YieldGroupSync, spawn_cancel_on_error_after)
186-
{
187-
int completed = 0;
188-
189-
auto cr = [&] (asio::yield_context yield) {
190-
wait_for(10ms, yield);
191-
++completed;
192-
};
193-
auto err = [] (asio::yield_context yield) {
194-
wait_for(0ms, yield);
195-
throw std::logic_error{"err"};
196-
};
197-
198-
auto throttle = spawn_throttle{null_yield, 4, cancel_on_error::after};
199-
throttle.spawn(cr);
200-
throttle.spawn(cr);
201-
throttle.spawn(err);
202-
throttle.spawn(cr);
203-
EXPECT_THROW(throttle.wait(), std::logic_error);
204-
205-
EXPECT_EQ(2, completed);
206-
}
207-
208-
TEST(YieldGroupSync, spawn_cancel_on_error_all)
209-
{
210-
int completed = 0;
211-
212-
auto cr = [&] (asio::yield_context yield) {
213-
wait_for(1s, yield);
214-
++completed;
215-
};
216-
auto err = [] (asio::yield_context yield) {
217-
wait_for(0ms, yield);
218-
throw std::logic_error{"err"};
219-
};
220-
221-
auto throttle = spawn_throttle{null_yield, 4, cancel_on_error::all};
222-
throttle.spawn(cr);
223-
throttle.spawn(cr);
224-
throttle.spawn(err);
225-
throttle.spawn(cr);
226-
EXPECT_THROW(throttle.wait(), std::logic_error);
227-
228-
EXPECT_EQ(0, completed);
229-
}
230-
231-
23269
TEST(YieldGroupAsync, wait_empty)
23370
{
23471
asio::io_context ctx;

0 commit comments

Comments
 (0)