Skip to content

Commit dfb2134

Browse files
authored
Merge pull request ceph#62538 from cbodley/wip-rgw-control-watch-par
rgw: send concurrent watch/unwatch operations Reviewed-by: Adam Emerson <[email protected]>
2 parents 9e8cc5f + f863b7d commit dfb2134

File tree

7 files changed

+173
-104
lines changed

7 files changed

+173
-104
lines changed

src/common/async/detail/spawn_throttle_impl.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ struct spawn_throttle_handler {
169169
}
170170
};
171171

172-
spawn_throttle_handler spawn_throttle_impl::get()
172+
inline spawn_throttle_handler spawn_throttle_impl::get()
173173
{
174174
report_exception(); // throw unreported exception
175175

@@ -345,8 +345,8 @@ class async_spawn_throttle_impl final :
345345
}
346346
};
347347

348-
auto spawn_throttle_impl::create(optional_yield y, size_t limit,
349-
cancel_on_error on_error)
348+
inline auto spawn_throttle_impl::create(optional_yield y, size_t limit,
349+
cancel_on_error on_error)
350350
-> boost::intrusive_ptr<spawn_throttle_impl>
351351
{
352352
if (y) {

src/common/options/rgw.yaml.in

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1133,7 +1133,23 @@ options:
11331133
default: 8
11341134
services:
11351135
- rgw
1136+
see_also:
1137+
- rgw_cache_enabled
1138+
- rgw_max_control_aio
11361139
with_legacy: true
1140+
- name: rgw_max_control_aio
1141+
type: int
1142+
level: advanced
1143+
desc: Maximum number of concurrent operations over control objects.
1144+
long_desc: When metadata caching is enabled, a watch operation is sent to each
1145+
control object on startup, with a corresponding unwatch on shutdown. To
1146+
accelerate startup/shutdown, allow several concurrent operations to be sent
1147+
at once.
1148+
default: 8
1149+
services:
1150+
- rgw
1151+
see_also:
1152+
- rgw_num_control_oids
11371153
- name: rgw_verify_ssl
11381154
type: bool
11391155
level: advanced

src/librados/librados_asio.h

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,65 @@ auto async_operate(IoExecutor ex, IoCtx& io, const std::string& oid,
268268
}, token, ex, io, oid, std::move(write_op), flags, trace_ctx);
269269
}
270270

271+
/// Calls IoCtx::aio_watch2() and arranges for the AioCompletion to call a
272+
/// given handler with signature (error_code, version_t).
273+
///
274+
/// The given IoCtx reference is not required to remain valid, but some IoCtx
275+
/// instance must preserve its underlying implementation until completion.
276+
template <boost::asio::execution::executor IoExecutor, typename CompletionToken>
277+
auto async_watch(IoExecutor ex, IoCtx& io, const std::string& oid,
278+
uint64_t* handle, librados::WatchCtx2* ctx,
279+
uint32_t timeout_ms, CompletionToken &&token)
280+
{
281+
using Op = detail::AsyncOp<void>;
282+
using Signature = typename Op::Signature;
283+
return boost::asio::async_initiate<CompletionToken, Signature>(
284+
[] (auto handler, IoExecutor ex, const IoCtx& i, const std::string& oid,
285+
uint64_t* handle, librados::WatchCtx2* ctx, uint32_t timeout_ms) {
286+
constexpr bool is_read = false;
287+
auto p = Op::create(ex, is_read, std::move(handler));
288+
auto& op = p->user_data;
289+
290+
IoCtx& io = const_cast<IoCtx&>(i);
291+
int ret = io.aio_watch2(oid, op.aio_completion.get(),
292+
handle, ctx, timeout_ms);
293+
if (ret < 0) {
294+
auto ec = boost::system::error_code{-ret, librados::detail::err_category()};
295+
ceph::async::post(std::move(p), ec, 0);
296+
} else {
297+
p.release(); // release ownership until completion
298+
}
299+
}, token, ex, io, oid, handle, ctx, timeout_ms);
300+
}
301+
302+
/// Calls IoCtx::aio_unwatch() and arranges for the AioCompletion to call a
303+
/// given handler with signature (error_code, version_t).
304+
///
305+
/// The given IoCtx reference is not required to remain valid, but some IoCtx
306+
/// instance must preserve its underlying implementation until completion.
307+
template <boost::asio::execution::executor IoExecutor, typename CompletionToken>
308+
auto async_unwatch(IoExecutor ex, IoCtx& io, uint64_t handle,
309+
CompletionToken &&token)
310+
{
311+
using Op = detail::AsyncOp<void>;
312+
using Signature = typename Op::Signature;
313+
return boost::asio::async_initiate<CompletionToken, Signature>(
314+
[] (auto handler, IoExecutor ex, const IoCtx& i, uint64_t handle) {
315+
constexpr bool is_read = false;
316+
auto p = Op::create(ex, is_read, std::move(handler));
317+
auto& op = p->user_data;
318+
319+
IoCtx& io = const_cast<IoCtx&>(i);
320+
int ret = io.aio_unwatch(handle, op.aio_completion.get());
321+
if (ret < 0) {
322+
auto ec = boost::system::error_code{-ret, librados::detail::err_category()};
323+
ceph::async::post(std::move(p), ec, 0);
324+
} else {
325+
p.release(); // release ownership until completion
326+
}
327+
}, token, ex, io, handle);
328+
}
329+
271330
/// Calls IoCtx::aio_notify() and arranges for the AioCompletion to call a
272331
/// given handler with signature (error_code, version_t, bufferlist).
273332
///

src/rgw/driver/rados/rgw_tools.cc

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,34 @@ int rgw_get_rados_ref(const DoutPrefixProvider* dpp, librados::Rados* rados,
117117
return 0;
118118
}
119119

120+
int rgw_rados_ref::watch(const DoutPrefixProvider* dpp, uint64_t* handle,
121+
librados::WatchCtx2* ctx, optional_yield y)
122+
{
123+
if (y) {
124+
auto& yield = y.get_yield_context();
125+
boost::system::error_code ec;
126+
librados::async_watch(yield.get_executor(), ioctx, obj.oid,
127+
handle, ctx, 0, yield[ec]);
128+
return ceph::from_error_code(ec);
129+
} else {
130+
maybe_warn_about_blocking(dpp);
131+
return ioctx.watch2(obj.oid, handle, ctx);
132+
}
133+
}
134+
135+
int rgw_rados_ref::unwatch(const DoutPrefixProvider* dpp, uint64_t handle,
136+
optional_yield y)
137+
{
138+
if (y) {
139+
auto& yield = y.get_yield_context();
140+
boost::system::error_code ec;
141+
librados::async_unwatch(yield.get_executor(), ioctx, handle, yield[ec]);
142+
return ceph::from_error_code(ec);
143+
} else {
144+
maybe_warn_about_blocking(dpp);
145+
return ioctx.unwatch2(handle);
146+
}
147+
}
120148

121149
map<string, bufferlist>* no_change_attrs() {
122150
static map<string, bufferlist> no_change;

src/rgw/driver/rados/rgw_tools.h

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -145,18 +145,10 @@ struct rgw_rados_ref {
145145
return ioctx.aio_operate(obj.oid, c, op, pbl);
146146
}
147147

148-
int watch(uint64_t* handle, librados::WatchCtx2* ctx) {
149-
return ioctx.watch2(obj.oid, handle, ctx);
150-
}
151-
152-
int aio_watch(librados::AioCompletion* c, uint64_t* handle,
153-
librados::WatchCtx2 *ctx) {
154-
return ioctx.aio_watch(obj.oid, c, handle, ctx);
155-
}
148+
int watch(const DoutPrefixProvider* dpp, uint64_t* handle,
149+
librados::WatchCtx2* ctx, optional_yield y);
156150

157-
int unwatch(uint64_t handle) {
158-
return ioctx.unwatch2(handle);
159-
}
151+
int unwatch(const DoutPrefixProvider* dpp, uint64_t handle, optional_yield y);
160152

161153
int notify(const DoutPrefixProvider* dpp, bufferlist& bl, uint64_t timeout_ms,
162154
bufferlist* pbl, optional_yield y) {

0 commit comments

Comments
 (0)