Skip to content

Commit 25017d6

Browse files
committed
common/async: add max_concurrent_for_each() algorithm
inspired by seastar's max_concurrent_for_each(), implemented for optional_yield in terms of yield_throttle can also be overloaded for co_await() and co_throttle (not part of this branch) Signed-off-by: Casey Bodley <[email protected]>
1 parent 0da024f commit 25017d6

File tree

3 files changed

+318
-0
lines changed

3 files changed

+318
-0
lines changed
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2+
// vim: ts=8 sw=2 smarttab ft=cpp
3+
4+
/*
5+
* Ceph - scalable distributed file system
6+
*
7+
* Copyright contributors to the Ceph project
8+
*
9+
* This is free software; you can redistribute it and/or
10+
* modify it under the terms of the GNU Lesser General Public
11+
* License version 2.1, as published by the Free Software
12+
* Foundation. See file COPYING.
13+
*
14+
*/
15+
16+
#pragma once
17+
18+
#include <concepts>
19+
#include <iterator>
20+
#include <ranges>
21+
#include <utility>
22+
#include <boost/asio/spawn.hpp>
23+
#include "cancel_on_error.h"
24+
#include "yield_context.h"
25+
#include "spawn_throttle.h"
26+
27+
namespace ceph::async {
28+
29+
/// Call a coroutine with each element in the given range then wait for all of
30+
/// them to complete. The first exception is rethrown to the caller. The
31+
/// cancel_on_error option controls whether these exceptions trigger the
32+
/// cancellation of other children. The number of outstanding coroutines
33+
/// is limited by the max_concurrent argument.
34+
///
35+
/// Example:
36+
/// \code
37+
/// void child(task& t, boost::asio::yield_context yield);
38+
///
39+
/// void parent(std::span<task> tasks, optional_yield y)
40+
/// {
41+
/// // process all tasks, up to 10 at a time
42+
/// max_concurrent_for_each(tasks, 10, y, child);
43+
/// }
44+
/// \endcode
45+
template <typename Iterator, typename Sentinel, typename Func,
46+
typename Reference = std::iter_reference_t<Iterator>>
47+
requires (std::input_iterator<Iterator> &&
48+
std::sentinel_for<Sentinel, Iterator> &&
49+
std::invocable<Func, Reference, boost::asio::yield_context>)
50+
void max_concurrent_for_each(Iterator begin,
51+
Sentinel end,
52+
size_t max_concurrent,
53+
optional_yield y,
54+
Func&& func,
55+
cancel_on_error on_error = cancel_on_error::none)
56+
{
57+
const size_t count = std::ranges::distance(begin, end);
58+
if (!count) {
59+
return;
60+
}
61+
auto throttle = spawn_throttle{y, max_concurrent, on_error};
62+
for (Iterator i = begin; i != end; ++i) {
63+
boost::asio::spawn(throttle.get_executor(),
64+
[&func, &val = *i] (boost::asio::yield_context yield) {
65+
func(val, yield);
66+
}, throttle);
67+
}
68+
throttle.wait();
69+
}
70+
71+
/// \overload
72+
template <typename Range, typename Func,
73+
typename Reference = std::ranges::range_reference_t<Range>>
74+
requires (std::ranges::range<Range> &&
75+
std::invocable<Func, Reference, boost::asio::yield_context>)
76+
auto max_concurrent_for_each(Range&& range,
77+
size_t max_concurrent,
78+
optional_yield y,
79+
Func&& func,
80+
cancel_on_error on_error = cancel_on_error::none)
81+
{
82+
return max_concurrent_for_each(std::begin(range), std::end(range),
83+
max_concurrent, y, std::forward<Func>(func),
84+
on_error);
85+
}
86+
87+
// TODO: overloads for co_spawn()
88+
89+
} // namespace ceph::async

src/test/common/CMakeLists.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -357,6 +357,10 @@ add_executable(unittest_async_completion test_async_completion.cc)
357357
add_ceph_unittest(unittest_async_completion)
358358
target_link_libraries(unittest_async_completion ceph-common Boost::system)
359359

360+
add_executable(unittest_async_max_concurrent_for_each test_async_max_concurrent_for_each.cc)
361+
add_ceph_unittest(unittest_async_max_concurrent_for_each)
362+
target_link_libraries(unittest_async_max_concurrent_for_each ceph-common Boost::system Boost::context)
363+
360364
add_executable(unittest_async_shared_mutex test_async_shared_mutex.cc)
361365
add_ceph_unittest(unittest_async_shared_mutex)
362366
target_link_libraries(unittest_async_shared_mutex ceph-common Boost::system)
Lines changed: 225 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,225 @@
1+
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2+
// vim: ts=8 sw=2 smarttab ft=cpp
3+
4+
/*
5+
* Ceph - scalable distributed file system
6+
*
7+
* Copyright contributors to the Ceph project
8+
*
9+
* This is free software; you can redistribute it and/or
10+
* modify it under the terms of the GNU Lesser General Public
11+
* License version 2.1, as published by the Free Software
12+
* Foundation. See file COPYING.
13+
*
14+
*/
15+
16+
#include "common/async/max_concurrent_for_each.h"
17+
18+
#include <chrono>
19+
#include <exception>
20+
#include <optional>
21+
#include <boost/asio/spawn.hpp>
22+
#include <boost/asio/steady_timer.hpp>
23+
#include <gtest/gtest.h>
24+
25+
namespace ceph::async {
26+
27+
namespace asio = boost::asio;
28+
29+
void rethrow(std::exception_ptr eptr)
30+
{
31+
if (eptr) std::rethrow_exception(eptr);
32+
}
33+
34+
using namespace std::chrono_literals;
35+
36+
void wait_for(std::chrono::milliseconds dur, asio::yield_context yield)
37+
{
38+
auto timer = asio::steady_timer{yield.get_executor(), dur};
39+
timer.async_wait(yield);
40+
}
41+
42+
struct null_sentinel {};
43+
bool operator==(const char* c, null_sentinel) { return !*c; }
44+
static_assert(std::sentinel_for<null_sentinel, const char*>);
45+
46+
TEST(iterator_null_yield, empty)
47+
{
48+
int* end = nullptr;
49+
auto cr = [] (int, asio::yield_context) {};
50+
max_concurrent_for_each(end, end, 10, null_yield, cr);
51+
}
52+
53+
TEST(iterator_null_yield, over_limit)
54+
{
55+
int concurrent = 0;
56+
int max_concurrent = 0;
57+
int completed = 0;
58+
59+
auto cr = [&] (int, asio::yield_context yield) {
60+
++concurrent;
61+
if (max_concurrent < concurrent) {
62+
max_concurrent = concurrent;
63+
}
64+
65+
wait_for(1ms, yield);
66+
67+
--concurrent;
68+
++completed;
69+
};
70+
71+
constexpr auto arr = std::array{1,2,3,4,5,6,7,8,9,10};
72+
max_concurrent_for_each(begin(arr), end(arr), 2, null_yield, cr);
73+
74+
EXPECT_EQ(0, concurrent);
75+
EXPECT_EQ(2, max_concurrent);
76+
EXPECT_EQ(10, completed);
77+
}
78+
79+
TEST(iterator_null_yield, sentinel)
80+
{
81+
const char* begin = "hello";
82+
null_sentinel end;
83+
84+
size_t completed = 0;
85+
auto cr = [&completed] (char c, asio::yield_context) { ++completed; };
86+
max_concurrent_for_each(begin, end, 10, null_yield, cr);
87+
EXPECT_EQ(completed, 5);
88+
}
89+
90+
TEST(range_null_yield, empty)
91+
{
92+
constexpr std::array<int, 0> arr{};
93+
auto cr = [] (int, asio::yield_context) {};
94+
max_concurrent_for_each(arr, 10, null_yield, cr);
95+
}
96+
97+
TEST(range_null_yield, over_limit)
98+
{
99+
int concurrent = 0;
100+
int max_concurrent = 0;
101+
int completed = 0;
102+
103+
auto cr = [&] (int, asio::yield_context yield) {
104+
++concurrent;
105+
if (max_concurrent < concurrent) {
106+
max_concurrent = concurrent;
107+
}
108+
109+
wait_for(1ms, yield);
110+
111+
--concurrent;
112+
++completed;
113+
};
114+
115+
constexpr auto arr = std::array{1,2,3,4,5,6,7,8,9,10};
116+
max_concurrent_for_each(arr, 2, null_yield, cr);
117+
118+
EXPECT_EQ(0, concurrent);
119+
EXPECT_EQ(2, max_concurrent);
120+
EXPECT_EQ(10, completed);
121+
}
122+
123+
124+
TEST(iterator_yield, empty)
125+
{
126+
int* end = nullptr;
127+
auto cr = [] (int, asio::yield_context) {};
128+
129+
asio::io_context ctx;
130+
asio::spawn(ctx, [&] (asio::yield_context yield) {
131+
max_concurrent_for_each(end, end, 10, yield, cr);
132+
}, rethrow);
133+
ctx.run();
134+
}
135+
136+
TEST(iterator_yield, over_limit)
137+
{
138+
int concurrent = 0;
139+
int max_concurrent = 0;
140+
int completed = 0;
141+
142+
auto cr = [&] (int, asio::yield_context yield) {
143+
++concurrent;
144+
if (max_concurrent < concurrent) {
145+
max_concurrent = concurrent;
146+
}
147+
148+
wait_for(1ms, yield);
149+
150+
--concurrent;
151+
++completed;
152+
};
153+
154+
asio::io_context ctx;
155+
asio::spawn(ctx, [&] (asio::yield_context yield) {
156+
constexpr auto arr = std::array{1,2,3,4,5,6,7,8,9,10};
157+
max_concurrent_for_each(begin(arr), end(arr), 2, yield, cr);
158+
}, rethrow);
159+
ctx.run();
160+
161+
EXPECT_EQ(0, concurrent);
162+
EXPECT_EQ(2, max_concurrent);
163+
EXPECT_EQ(10, completed);
164+
}
165+
166+
TEST(iterator_yield, sentinel)
167+
{
168+
const char* begin = "hello";
169+
null_sentinel end;
170+
171+
size_t completed = 0;
172+
auto cr = [&completed] (char c, asio::yield_context) { ++completed; };
173+
174+
asio::io_context ctx;
175+
asio::spawn(ctx, [&] (asio::yield_context yield) {
176+
max_concurrent_for_each(begin, end, 10, yield, cr);
177+
}, rethrow);
178+
ctx.run();
179+
180+
EXPECT_EQ(completed, 5);
181+
}
182+
183+
TEST(range_yield, empty)
184+
{
185+
constexpr std::array<int, 0> arr{};
186+
auto cr = [] (int, asio::yield_context) {};
187+
188+
asio::io_context ctx;
189+
asio::spawn(ctx, [&] (asio::yield_context yield) {
190+
max_concurrent_for_each(arr, 10, yield, cr);
191+
}, rethrow);
192+
ctx.run();
193+
}
194+
195+
TEST(range_yield, over_limit)
196+
{
197+
int concurrent = 0;
198+
int max_concurrent = 0;
199+
int completed = 0;
200+
201+
auto cr = [&] (int, asio::yield_context yield) {
202+
++concurrent;
203+
if (max_concurrent < concurrent) {
204+
max_concurrent = concurrent;
205+
}
206+
207+
wait_for(1ms, yield);
208+
209+
--concurrent;
210+
++completed;
211+
};
212+
213+
asio::io_context ctx;
214+
asio::spawn(ctx, [&] (asio::yield_context yield) {
215+
constexpr auto arr = std::array{1,2,3,4,5,6,7,8,9,10};
216+
max_concurrent_for_each(arr, 2, yield, cr);
217+
}, rethrow);
218+
ctx.run();
219+
220+
EXPECT_EQ(0, concurrent);
221+
EXPECT_EQ(2, max_concurrent);
222+
EXPECT_EQ(10, completed);
223+
}
224+
225+
} // namespace ceph::async

0 commit comments

Comments
 (0)