Skip to content

Commit d192ca7

Browse files
committed
common/async: implement max_concurrent_for_each() for awaitable
Signed-off-by: Casey Bodley <[email protected]>
1 parent 9217fcc commit d192ca7

File tree

2 files changed

+161
-3
lines changed

2 files changed

+161
-3
lines changed

src/common/async/max_concurrent_for_each.h

Lines changed: 51 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include <utility>
2222
#include <boost/asio/spawn.hpp>
2323
#include "cancel_on_error.h"
24+
#include "co_throttle.h"
2425
#include "yield_context.h"
2526
#include "spawn_throttle.h"
2627

@@ -54,8 +55,7 @@ void max_concurrent_for_each(Iterator begin,
5455
Func&& func,
5556
cancel_on_error on_error = cancel_on_error::none)
5657
{
57-
const size_t count = std::ranges::distance(begin, end);
58-
if (!count) {
58+
if (begin == end) {
5959
return;
6060
}
6161
auto throttle = spawn_throttle{y, max_concurrent, on_error};
@@ -84,6 +84,54 @@ auto max_concurrent_for_each(Range&& range,
8484
on_error);
8585
}
8686

87-
// TODO: overloads for co_spawn()
87+
// \overload
88+
template <typename Iterator, typename Sentinel, typename VoidAwaitableFactory,
89+
typename Value = std::iter_reference_t<Iterator>,
90+
typename VoidAwaitable = std::invoke_result_t<
91+
VoidAwaitableFactory, Value>,
92+
typename AwaitableT = typename VoidAwaitable::value_type,
93+
typename AwaitableExecutor = typename VoidAwaitable::executor_type>
94+
requires (std::input_iterator<Iterator> &&
95+
std::sentinel_for<Sentinel, Iterator> &&
96+
std::same_as<AwaitableT, void> &&
97+
boost::asio::execution::executor<AwaitableExecutor>)
98+
auto max_concurrent_for_each(Iterator begin,
99+
Sentinel end,
100+
size_t max_concurrent,
101+
VoidAwaitableFactory&& factory,
102+
cancel_on_error on_error = cancel_on_error::none)
103+
-> boost::asio::awaitable<void, AwaitableExecutor>
104+
{
105+
if (begin == end) {
106+
co_return;
107+
}
108+
auto ex = co_await boost::asio::this_coro::executor;
109+
auto throttle = co_throttle{ex, max_concurrent, on_error};
110+
for (Iterator i = begin; i != end; ++i) {
111+
co_await throttle.spawn(factory(*i));
112+
}
113+
co_await throttle.wait();
114+
}
115+
116+
/// \overload
117+
template <typename Range, typename VoidAwaitableFactory,
118+
typename Value = std::ranges::range_reference_t<Range>,
119+
typename VoidAwaitable = std::invoke_result_t<
120+
VoidAwaitableFactory, Value>,
121+
typename AwaitableT = typename VoidAwaitable::value_type,
122+
typename AwaitableExecutor = typename VoidAwaitable::executor_type>
123+
requires (std::ranges::range<Range> &&
124+
std::same_as<AwaitableT, void> &&
125+
boost::asio::execution::executor<AwaitableExecutor>)
126+
auto max_concurrent_for_each(Range&& range,
127+
size_t max_concurrent,
128+
VoidAwaitableFactory&& factory,
129+
cancel_on_error on_error = cancel_on_error::none)
130+
-> boost::asio::awaitable<void, AwaitableExecutor>
131+
{
132+
return max_concurrent_for_each(
133+
std::begin(range), std::end(range), max_concurrent,
134+
std::forward<VoidAwaitableFactory>(factory), on_error);
135+
}
88136

89137
} // namespace ceph::async

src/test/common/test_async_max_concurrent_for_each.cc

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,12 @@ void wait_for(std::chrono::milliseconds dur, asio::yield_context yield)
3939
timer.async_wait(yield);
4040
}
4141

42+
asio::awaitable<void> wait_for(std::chrono::milliseconds dur)
43+
{
44+
auto timer = asio::steady_timer{co_await asio::this_coro::executor, dur};
45+
co_await timer.async_wait(asio::use_awaitable);
46+
}
47+
4248
struct null_sentinel {};
4349
bool operator==(const char* c, null_sentinel) { return !*c; }
4450
static_assert(std::sentinel_for<null_sentinel, const char*>);
@@ -222,4 +228,108 @@ TEST(range_yield, over_limit)
222228
EXPECT_EQ(10, completed);
223229
}
224230

231+
TEST(iterator_co, empty)
232+
{
233+
int* end = nullptr;
234+
auto cr = [] (int) -> asio::awaitable<void> { co_return; };
235+
236+
asio::io_context ctx;
237+
asio::co_spawn(ctx, [&] () -> asio::awaitable<void> {
238+
co_await max_concurrent_for_each(end, end, 10, cr);
239+
}, rethrow);
240+
ctx.run();
241+
}
242+
243+
TEST(iterator_co, over_limit)
244+
{
245+
int concurrent = 0;
246+
int max_concurrent = 0;
247+
int completed = 0;
248+
249+
auto cr = [&] (int) -> asio::awaitable<void> {
250+
++concurrent;
251+
if (max_concurrent < concurrent) {
252+
max_concurrent = concurrent;
253+
}
254+
255+
co_await wait_for(1ms);
256+
257+
--concurrent;
258+
++completed;
259+
};
260+
261+
asio::io_context ctx;
262+
asio::co_spawn(ctx, [&] () -> asio::awaitable<void> {
263+
constexpr auto arr = std::array{1,2,3,4,5,6,7,8,9,10};
264+
co_await max_concurrent_for_each(begin(arr), end(arr), 2, cr);
265+
}, rethrow);
266+
ctx.run();
267+
268+
EXPECT_EQ(0, concurrent);
269+
EXPECT_EQ(2, max_concurrent);
270+
EXPECT_EQ(10, completed);
271+
}
272+
273+
TEST(iterator_co, sentinel)
274+
{
275+
const char* begin = "hello";
276+
null_sentinel end;
277+
278+
size_t completed = 0;
279+
auto cr = [&completed] (char c) -> asio::awaitable<void> {
280+
++completed;
281+
co_return;
282+
};
283+
284+
asio::io_context ctx;
285+
asio::co_spawn(ctx, [&] () -> asio::awaitable<void> {
286+
co_await max_concurrent_for_each(begin, end, 10, cr);
287+
}, rethrow);
288+
ctx.run();
289+
290+
EXPECT_EQ(completed, 5);
291+
}
292+
293+
TEST(range_co, empty)
294+
{
295+
constexpr std::array<int, 0> arr{};
296+
auto cr = [] (int) -> asio::awaitable<void> { co_return; };
297+
298+
asio::io_context ctx;
299+
asio::co_spawn(ctx, [&] () -> asio::awaitable<void> {
300+
co_await max_concurrent_for_each(arr, 10, cr);
301+
}, rethrow);
302+
ctx.run();
303+
}
304+
305+
TEST(range_co, over_limit)
306+
{
307+
int concurrent = 0;
308+
int max_concurrent = 0;
309+
int completed = 0;
310+
311+
auto cr = [&] (int) -> asio::awaitable<void> {
312+
++concurrent;
313+
if (max_concurrent < concurrent) {
314+
max_concurrent = concurrent;
315+
}
316+
317+
co_await wait_for(1ms);
318+
319+
--concurrent;
320+
++completed;
321+
};
322+
323+
asio::io_context ctx;
324+
asio::co_spawn(ctx, [&] () -> asio::awaitable<void> {
325+
constexpr auto arr = std::array{1,2,3,4,5,6,7,8,9,10};
326+
co_await max_concurrent_for_each(arr, 2, cr);
327+
}, rethrow);
328+
ctx.run();
329+
330+
EXPECT_EQ(0, concurrent);
331+
EXPECT_EQ(2, max_concurrent);
332+
EXPECT_EQ(10, completed);
333+
}
334+
225335
} // namespace ceph::async

0 commit comments

Comments
 (0)