Skip to content

Commit 6275935

Browse files
authored
Merge pull request ceph#60576 from cbodley/wip-librados-asio-awaitable
librados/asio: support 'deferred' and 'use_awaitable' completions
2 parents aaee958 + 9331694 commit 6275935

File tree

6 files changed

+646
-97
lines changed

6 files changed

+646
-97
lines changed

src/common/io_exerciser/RadosIo.cc

Lines changed: 25 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -135,26 +135,28 @@ void RadosIo::applyIoOp(IoOp& op) {
135135
std::make_shared<AsyncOpInfo<1>>(std::array<uint64_t, 1>{0},
136136
std::array<uint64_t, 1>{opSize});
137137
op_info->bufferlist[0] = db->generate_data(0, opSize);
138-
op_info->wop.write_full(op_info->bufferlist[0]);
138+
librados::ObjectWriteOperation wop;
139+
wop.write_full(op_info->bufferlist[0]);
139140
auto create_cb = [this](boost::system::error_code ec, version_t ver) {
140141
ceph_assert(ec == boost::system::errc::success);
141142
finish_io();
142143
};
143-
librados::async_operate(asio, io, oid, &op_info->wop, 0, nullptr,
144-
create_cb);
144+
librados::async_operate(asio.get_executor(), io, oid,
145+
std::move(wop), 0, nullptr, create_cb);
145146
break;
146147
}
147148

148149
case OpType::Remove: {
149150
start_io();
150151
auto op_info = std::make_shared<AsyncOpInfo<0>>();
151-
op_info->wop.remove();
152+
librados::ObjectWriteOperation wop;
153+
wop.remove();
152154
auto remove_cb = [this](boost::system::error_code ec, version_t ver) {
153155
ceph_assert(ec == boost::system::errc::success);
154156
finish_io();
155157
};
156-
librados::async_operate(asio, io, oid, &op_info->wop, 0, nullptr,
157-
remove_cb);
158+
librados::async_operate(asio.get_executor(), io, oid,
159+
std::move(wop), 0, nullptr, remove_cb);
158160
break;
159161
}
160162
case OpType::Read:
@@ -197,10 +199,11 @@ void RadosIo::applyReadWriteOp(IoOp& op) {
197199
auto op_info =
198200
std::make_shared<AsyncOpInfo<N>>(readOp.offset, readOp.length);
199201

202+
librados::ObjectReadOperation rop;
200203
for (int i = 0; i < N; i++) {
201-
op_info->rop.read(readOp.offset[i] * block_size,
202-
readOp.length[i] * block_size, &op_info->bufferlist[i],
203-
nullptr);
204+
rop.read(readOp.offset[i] * block_size,
205+
readOp.length[i] * block_size, &op_info->bufferlist[i],
206+
nullptr);
204207
}
205208
auto read_cb = [this, op_info](boost::system::error_code ec, version_t ver,
206209
bufferlist bl) {
@@ -211,44 +214,49 @@ void RadosIo::applyReadWriteOp(IoOp& op) {
211214
}
212215
finish_io();
213216
};
214-
librados::async_operate(asio, io, oid, &op_info->rop, 0, nullptr, read_cb);
217+
librados::async_operate(asio.get_executor(), io, oid,
218+
std::move(rop), 0, nullptr, read_cb);
215219
num_io++;
216220
};
217221

218222
auto applyWriteOp = [this]<OpType opType, int N>(
219223
ReadWriteOp<opType, N> writeOp) {
220224
auto op_info =
221225
std::make_shared<AsyncOpInfo<N>>(writeOp.offset, writeOp.length);
226+
librados::ObjectWriteOperation wop;
222227
for (int i = 0; i < N; i++) {
223228
op_info->bufferlist[i] =
224229
db->generate_data(writeOp.offset[i], writeOp.length[i]);
225-
op_info->wop.write(writeOp.offset[i] * block_size,
226-
op_info->bufferlist[i]);
230+
wop.write(writeOp.offset[i] * block_size,
231+
op_info->bufferlist[i]);
227232
}
228233
auto write_cb = [this](boost::system::error_code ec, version_t ver) {
229234
ceph_assert(ec == boost::system::errc::success);
230235
finish_io();
231236
};
232-
librados::async_operate(asio, io, oid, &op_info->wop, 0, nullptr, write_cb);
237+
librados::async_operate(asio.get_executor(), io, oid,
238+
std::move(wop), 0, nullptr, write_cb);
233239
num_io++;
234240
};
235241

236242
auto applyFailedWriteOp = [this]<OpType opType, int N>(
237243
ReadWriteOp<opType, N> writeOp) {
238244
auto op_info =
239245
std::make_shared<AsyncOpInfo<N>>(writeOp.offset, writeOp.length);
246+
librados::ObjectWriteOperation wop;
240247
for (int i = 0; i < N; i++) {
241248
op_info->bufferlist[i] =
242249
db->generate_data(writeOp.offset[i], writeOp.length[i]);
243-
op_info->wop.write(writeOp.offset[i] * block_size,
244-
op_info->bufferlist[i]);
250+
wop.write(writeOp.offset[i] * block_size,
251+
op_info->bufferlist[i]);
245252
}
246253
auto write_cb = [this, writeOp](boost::system::error_code ec,
247254
version_t ver) {
248255
ceph_assert(ec != boost::system::errc::success);
249256
finish_io();
250257
};
251-
librados::async_operate(asio, io, oid, &op_info->wop, 0, nullptr, write_cb);
258+
librados::async_operate(asio.get_executor(), io, oid,
259+
std::move(wop), 0, nullptr, write_cb);
252260
num_io++;
253261
};
254262

@@ -450,4 +458,4 @@ void RadosIo::applyInjectOp(IoOp& op) {
450458
fmt::format("Unsupported inject operation ({})", op.getOpType()));
451459
break;
452460
}
453-
}
461+
}

src/common/io_exerciser/RadosIo.h

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,6 @@ class RadosIo : public Model {
5151
template <int N>
5252
class AsyncOpInfo {
5353
public:
54-
librados::ObjectReadOperation rop;
55-
librados::ObjectWriteOperation wop;
5654
std::array<ceph::bufferlist, N> bufferlist;
5755
std::array<uint64_t, N> offset;
5856
std::array<uint64_t, N> length;
@@ -71,4 +69,4 @@ class RadosIo : public Model {
7169
void applyInjectOp(IoOp& op);
7270
};
7371
} // namespace io_exerciser
74-
} // namespace ceph
72+
} // namespace ceph

src/librados/librados_asio.h

Lines changed: 35 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
#include <boost/asio/associated_cancellation_slot.hpp>
1818
#include <boost/asio/cancellation_type.hpp>
19+
#include <boost/asio/execution/executor.hpp>
1920

2021
#include "include/rados/librados.hpp"
2122
#include "common/async/completion.h"
@@ -151,136 +152,142 @@ struct AsyncOp : Invoker<Result> {
151152
///
152153
/// The given IoCtx reference is not required to remain valid, but some IoCtx
153154
/// instance must preserve its underlying implementation until completion.
154-
template <typename ExecutionContext, typename CompletionToken>
155-
auto async_read(ExecutionContext& ctx, IoCtx& io, const std::string& oid,
155+
template <boost::asio::execution::executor IoExecutor, typename CompletionToken>
156+
auto async_read(IoExecutor ex, IoCtx& io, const std::string& oid,
156157
size_t len, uint64_t off, CompletionToken&& token)
157158
{
158159
using Op = detail::AsyncOp<bufferlist>;
159160
using Signature = typename Op::Signature;
160161
return boost::asio::async_initiate<CompletionToken, Signature>(
161-
[] (auto handler, auto ex, IoCtx& io, const std::string& oid,
162-
size_t len, uint64_t off) {
162+
[] (auto handler, IoExecutor ex, const IoCtx& i,
163+
const std::string& oid, size_t len, uint64_t off) {
163164
constexpr bool is_read = true;
164165
auto p = Op::create(ex, is_read, std::move(handler));
165166
auto& op = p->user_data;
166167

168+
IoCtx& io = const_cast<IoCtx&>(i);
167169
int ret = io.aio_read(oid, op.aio_completion.get(), &op.result, len, off);
168170
if (ret < 0) {
169171
auto ec = boost::system::error_code{-ret, librados::detail::err_category()};
170172
ceph::async::post(std::move(p), ec, 0, bufferlist{});
171173
} else {
172174
p.release(); // release ownership until completion
173175
}
174-
}, token, ctx.get_executor(), io, oid, len, off);
176+
}, token, ex, io, oid, len, off);
175177
}
176178

177179
/// Calls IoCtx::aio_write() and arranges for the AioCompletion to call a
178180
/// given handler with signature (error_code, version_t).
179181
///
180182
/// The given IoCtx reference is not required to remain valid, but some IoCtx
181183
/// instance must preserve its underlying implementation until completion.
182-
template <typename ExecutionContext, typename CompletionToken>
183-
auto async_write(ExecutionContext& ctx, IoCtx& io, const std::string& oid,
184+
template <boost::asio::execution::executor IoExecutor, typename CompletionToken>
185+
auto async_write(IoExecutor ex, IoCtx& io, const std::string& oid,
184186
const bufferlist &bl, size_t len, uint64_t off,
185187
CompletionToken&& token)
186188
{
187189
using Op = detail::AsyncOp<void>;
188190
using Signature = typename Op::Signature;
189191
return boost::asio::async_initiate<CompletionToken, Signature>(
190-
[] (auto handler, auto ex, IoCtx& io, const std::string& oid,
192+
[] (auto handler, IoExecutor ex, const IoCtx& i, const std::string& oid,
191193
const bufferlist &bl, size_t len, uint64_t off) {
192194
constexpr bool is_read = false;
193195
auto p = Op::create(ex, is_read, std::move(handler));
194196
auto& op = p->user_data;
195197

198+
IoCtx& io = const_cast<IoCtx&>(i);
196199
int ret = io.aio_write(oid, op.aio_completion.get(), bl, len, off);
197200
if (ret < 0) {
198201
auto ec = boost::system::error_code{-ret, librados::detail::err_category()};
199202
ceph::async::post(std::move(p), ec, 0);
200203
} else {
201204
p.release(); // release ownership until completion
202205
}
203-
}, token, ctx.get_executor(), io, oid, bl, len, off);
206+
}, token, ex, io, oid, bl, len, off);
204207
}
205208

206209
/// Calls IoCtx::aio_operate() and arranges for the AioCompletion to call a
207210
/// given handler with signature (error_code, version_t, bufferlist).
208211
///
209212
/// The given IoCtx reference is not required to remain valid, but some IoCtx
210213
/// instance must preserve its underlying implementation until completion.
211-
template <typename ExecutionContext, typename CompletionToken>
212-
auto async_operate(ExecutionContext& ctx, IoCtx& io, const std::string& oid,
213-
ObjectReadOperation *read_op, int flags,
214+
template <boost::asio::execution::executor IoExecutor, typename CompletionToken>
215+
auto async_operate(IoExecutor ex, IoCtx& io, const std::string& oid,
216+
ObjectReadOperation read_op, int flags,
214217
const jspan_context* trace_ctx, CompletionToken&& token)
215218
{
216219
using Op = detail::AsyncOp<bufferlist>;
217220
using Signature = typename Op::Signature;
218221
return boost::asio::async_initiate<CompletionToken, Signature>(
219-
[] (auto handler, auto ex, IoCtx& io, const std::string& oid,
220-
ObjectReadOperation *read_op, int flags) {
222+
[] (auto handler, IoExecutor ex, const IoCtx& i, const std::string& oid,
223+
ObjectReadOperation read_op, int flags) {
221224
constexpr bool is_read = true;
222225
auto p = Op::create(ex, is_read, std::move(handler));
223226
auto& op = p->user_data;
224227

225-
int ret = io.aio_operate(oid, op.aio_completion.get(), read_op,
228+
auto& io = const_cast<IoCtx&>(i);
229+
int ret = io.aio_operate(oid, op.aio_completion.get(), &read_op,
226230
flags, &op.result);
227231
if (ret < 0) {
228232
auto ec = boost::system::error_code{-ret, librados::detail::err_category()};
229233
ceph::async::post(std::move(p), ec, 0, bufferlist{});
230234
} else {
231235
p.release(); // release ownership until completion
232236
}
233-
}, token, ctx.get_executor(), io, oid, read_op, flags);
237+
}, token, ex, io, oid, std::move(read_op), flags);
234238
}
235239

236240
/// Calls IoCtx::aio_operate() and arranges for the AioCompletion to call a
237241
/// given handler with signature (error_code, version_t).
238242
///
239243
/// The given IoCtx reference is not required to remain valid, but some IoCtx
240244
/// instance must preserve its underlying implementation until completion.
241-
template <typename ExecutionContext, typename CompletionToken>
242-
auto async_operate(ExecutionContext& ctx, IoCtx& io, const std::string& oid,
243-
ObjectWriteOperation *write_op, int flags,
245+
template <boost::asio::execution::executor IoExecutor, typename CompletionToken>
246+
auto async_operate(IoExecutor ex, IoCtx& io, const std::string& oid,
247+
ObjectWriteOperation write_op, int flags,
244248
const jspan_context* trace_ctx, CompletionToken &&token)
245249
{
246250
using Op = detail::AsyncOp<void>;
247251
using Signature = typename Op::Signature;
248252
return boost::asio::async_initiate<CompletionToken, Signature>(
249-
[] (auto handler, auto ex, IoCtx& io, const std::string& oid,
250-
ObjectWriteOperation *write_op, int flags,
253+
[] (auto handler, IoExecutor ex, const IoCtx& i, const std::string& oid,
254+
ObjectWriteOperation write_op, int flags,
251255
const jspan_context* trace_ctx) {
252256
constexpr bool is_read = false;
253257
auto p = Op::create(ex, is_read, std::move(handler));
254258
auto& op = p->user_data;
255259

256-
int ret = io.aio_operate(oid, op.aio_completion.get(), write_op, flags, trace_ctx);
260+
auto& io = const_cast<IoCtx&>(i);
261+
int ret = io.aio_operate(oid, op.aio_completion.get(), &write_op, flags, trace_ctx);
257262
if (ret < 0) {
258263
auto ec = boost::system::error_code{-ret, librados::detail::err_category()};
259264
ceph::async::post(std::move(p), ec, 0);
260265
} else {
261266
p.release(); // release ownership until completion
262267
}
263-
}, token, ctx.get_executor(), io, oid, write_op, flags, trace_ctx);
268+
}, token, ex, io, oid, std::move(write_op), flags, trace_ctx);
264269
}
265270

266271
/// Calls IoCtx::aio_notify() and arranges for the AioCompletion to call a
267272
/// given handler with signature (error_code, version_t, bufferlist).
268273
///
269274
/// The given IoCtx reference is not required to remain valid, but some IoCtx
270275
/// instance must preserve its underlying implementation until completion.
271-
template <typename ExecutionContext, typename CompletionToken>
272-
auto async_notify(ExecutionContext& ctx, IoCtx& io, const std::string& oid,
276+
template <boost::asio::execution::executor IoExecutor, typename CompletionToken>
277+
auto async_notify(IoExecutor ex, IoCtx& io, const std::string& oid,
273278
bufferlist& bl, uint64_t timeout_ms, CompletionToken &&token)
274279
{
275280
using Op = detail::AsyncOp<bufferlist>;
276281
using Signature = typename Op::Signature;
277282
return boost::asio::async_initiate<CompletionToken, Signature>(
278-
[] (auto handler, auto ex, IoCtx& io, const std::string& oid,
279-
bufferlist& bl, uint64_t timeout_ms) {
283+
[] (auto handler, IoExecutor ex, const IoCtx& i, const std::string& oid,
284+
const bufferlist& b, uint64_t timeout_ms) {
280285
constexpr bool is_read = false;
281286
auto p = Op::create(ex, is_read, std::move(handler));
282287
auto& op = p->user_data;
283288

289+
IoCtx& io = const_cast<IoCtx&>(i);
290+
bufferlist& bl = const_cast<bufferlist&>(b);
284291
int ret = io.aio_notify(oid, op.aio_completion.get(),
285292
bl, timeout_ms, &op.result);
286293
if (ret < 0) {
@@ -289,7 +296,7 @@ auto async_notify(ExecutionContext& ctx, IoCtx& io, const std::string& oid,
289296
} else {
290297
p.release(); // release ownership until completion
291298
}
292-
}, token, ctx.get_executor(), io, oid, bl, timeout_ms);
299+
}, token, ex, io, oid, bl, timeout_ms);
293300
}
294301

295302
} // namespace librados

src/rgw/driver/rados/rgw_tools.cc

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -206,9 +206,10 @@ int rgw_rados_operate(const DoutPrefixProvider *dpp, librados::IoCtx& ioctx, con
206206
// of blocking
207207
if (y) {
208208
auto& yield = y.get_yield_context();
209+
auto ex = yield.get_executor();
209210
boost::system::error_code ec;
210-
auto [ver, bl] = librados::async_operate(
211-
yield, ioctx, oid, op, flags, trace_info, yield[ec]);
211+
auto [ver, bl] = librados::async_operate(ex, ioctx, oid, std::move(*op),
212+
flags, trace_info, yield[ec]);
212213
if (pbl) {
213214
*pbl = std::move(bl);
214215
}
@@ -231,9 +232,10 @@ int rgw_rados_operate(const DoutPrefixProvider *dpp, librados::IoCtx& ioctx, con
231232
{
232233
if (y) {
233234
auto& yield = y.get_yield_context();
235+
auto ex = yield.get_executor();
234236
boost::system::error_code ec;
235-
version_t ver = librados::async_operate(yield, ioctx, oid, op, flags,
236-
trace_info, yield[ec]);
237+
version_t ver = librados::async_operate(ex, ioctx, oid, std::move(*op),
238+
flags, trace_info, yield[ec]);
237239
if (pver) {
238240
*pver = ver;
239241
}
@@ -254,8 +256,8 @@ int rgw_rados_notify(const DoutPrefixProvider *dpp, librados::IoCtx& ioctx, cons
254256
if (y) {
255257
auto& yield = y.get_yield_context();
256258
boost::system::error_code ec;
257-
auto [ver, reply] = librados::async_notify(yield, ioctx, oid,
258-
bl, timeout_ms, yield[ec]);
259+
auto [ver, reply] = librados::async_notify(
260+
yield.get_executor(), ioctx, oid, bl, timeout_ms, yield[ec]);
259261
if (pbl) {
260262
*pbl = std::move(reply);
261263
}

src/rgw/rgw_aio.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ Aio::OpFunc aio_abstract(librados::IoCtx ctx, Op&& op,
9797
// executor so it can safely call back into Aio without locking
9898
auto ex = yield.get_executor();
9999

100-
librados::async_operate(yield, ctx, r.obj.oid, &op, 0, trace_ctx,
100+
librados::async_operate(ex, ctx, r.obj.oid, std::move(op), 0, trace_ctx,
101101
bind_executor(ex, Handler{aio, ctx, r}));
102102
};
103103
}

0 commit comments

Comments
 (0)