Skip to content

Commit 859dc27

Browse files
authored
Merge pull request ceph#59998 from cbodley/wip-63935
rgw/rados: don't rely on IoCtx::get_last_version() for async ops Reviewed-by: Adam Emerson <[email protected]>
2 parents 748842a + 7769bdb commit 859dc27

File tree

8 files changed

+154
-110
lines changed

8 files changed

+154
-110
lines changed

src/librados/librados_asio.h

Lines changed: 24 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
#include "include/rados/librados.hpp"
1818
#include "common/async/completion.h"
19+
#include "librados/AioCompletionImpl.h"
1920

2021
/// Defines asynchronous librados operations that satisfy all of the
2122
/// "Requirements on asynchronous operations" imposed by the C++ Networking TS
@@ -53,20 +54,20 @@ using unique_aio_completion_ptr =
5354
/// argument to the handler.
5455
template <typename Result>
5556
struct Invoker {
56-
using Signature = void(boost::system::error_code, Result);
57+
using Signature = void(boost::system::error_code, version_t, Result);
5758
Result result;
5859
template <typename Completion>
59-
void dispatch(Completion&& completion, boost::system::error_code ec) {
60-
ceph::async::dispatch(std::move(completion), ec, std::move(result));
60+
void dispatch(Completion&& completion, boost::system::error_code ec, version_t ver) {
61+
ceph::async::dispatch(std::move(completion), ec, ver, std::move(result));
6162
}
6263
};
6364
// specialization for Result=void
6465
template <>
6566
struct Invoker<void> {
66-
using Signature = void(boost::system::error_code);
67+
using Signature = void(boost::system::error_code, version_t);
6768
template <typename Completion>
68-
void dispatch(Completion&& completion, boost::system::error_code ec) {
69-
ceph::async::dispatch(std::move(completion), ec);
69+
void dispatch(Completion&& completion, boost::system::error_code ec, version_t ver) {
70+
ceph::async::dispatch(std::move(completion), ec, ver);
7071
}
7172
};
7273

@@ -82,12 +83,15 @@ struct AsyncOp : Invoker<Result> {
8283
auto p = std::unique_ptr<Completion>{static_cast<Completion*>(arg)};
8384
// move result out of Completion memory being freed
8485
auto op = std::move(p->user_data);
85-
const int ret = op.aio_completion->get_return_value();
86+
// access AioCompletionImpl directly to avoid locking
87+
const librados::AioCompletionImpl* pc = op.aio_completion->pc;
88+
const int ret = pc->rval;
89+
const version_t ver = pc->objver;
8690
boost::system::error_code ec;
8791
if (ret < 0) {
8892
ec.assign(-ret, librados::detail::err_category());
8993
}
90-
op.dispatch(std::move(p), ec);
94+
op.dispatch(std::move(p), ec, ver);
9195
}
9296

9397
template <typename Executor1, typename CompletionHandler>
@@ -103,7 +107,7 @@ struct AsyncOp : Invoker<Result> {
103107

104108

105109
/// Calls IoCtx::aio_read() and arranges for the AioCompletion to call a
106-
/// given handler with signature (boost::system::error_code, bufferlist).
110+
/// given handler with signature (error_code, version_t, bufferlist).
107111
template <typename ExecutionContext, typename CompletionToken>
108112
auto async_read(ExecutionContext& ctx, IoCtx& io, const std::string& oid,
109113
size_t len, uint64_t off, CompletionToken&& token)
@@ -119,40 +123,40 @@ auto async_read(ExecutionContext& ctx, IoCtx& io, const std::string& oid,
119123
int ret = io.aio_read(oid, op.aio_completion.get(), &op.result, len, off);
120124
if (ret < 0) {
121125
auto ec = boost::system::error_code{-ret, librados::detail::err_category()};
122-
ceph::async::post(std::move(p), ec, bufferlist{});
126+
ceph::async::post(std::move(p), ec, 0, bufferlist{});
123127
} else {
124128
p.release(); // release ownership until completion
125129
}
126130
}, token, ctx.get_executor(), io, oid, len, off);
127131
}
128132

129133
/// Calls IoCtx::aio_write() and arranges for the AioCompletion to call a
130-
/// given handler with signature (boost::system::error_code).
134+
/// given handler with signature (error_code, version_t).
131135
template <typename ExecutionContext, typename CompletionToken>
132136
auto async_write(ExecutionContext& ctx, IoCtx& io, const std::string& oid,
133-
bufferlist &bl, size_t len, uint64_t off,
137+
const bufferlist &bl, size_t len, uint64_t off,
134138
CompletionToken&& token)
135139
{
136140
using Op = detail::AsyncOp<void>;
137141
using Signature = typename Op::Signature;
138142
return boost::asio::async_initiate<CompletionToken, Signature>(
139143
[] (auto handler, auto ex, IoCtx& io, const std::string& oid,
140-
bufferlist &bl, size_t len, uint64_t off) {
144+
const bufferlist &bl, size_t len, uint64_t off) {
141145
auto p = Op::create(ex, std::move(handler));
142146
auto& op = p->user_data;
143147

144148
int ret = io.aio_write(oid, op.aio_completion.get(), bl, len, off);
145149
if (ret < 0) {
146150
auto ec = boost::system::error_code{-ret, librados::detail::err_category()};
147-
ceph::async::post(std::move(p), ec);
151+
ceph::async::post(std::move(p), ec, 0);
148152
} else {
149153
p.release(); // release ownership until completion
150154
}
151155
}, token, ctx.get_executor(), io, oid, bl, len, off);
152156
}
153157

154158
/// Calls IoCtx::aio_operate() and arranges for the AioCompletion to call a
155-
/// given handler with signature (boost::system::error_code, bufferlist).
159+
/// given handler with signature (error_code, version_t, bufferlist).
156160
template <typename ExecutionContext, typename CompletionToken>
157161
auto async_operate(ExecutionContext& ctx, IoCtx& io, const std::string& oid,
158162
ObjectReadOperation *read_op, int flags,
@@ -170,15 +174,15 @@ auto async_operate(ExecutionContext& ctx, IoCtx& io, const std::string& oid,
170174
flags, &op.result);
171175
if (ret < 0) {
172176
auto ec = boost::system::error_code{-ret, librados::detail::err_category()};
173-
ceph::async::post(std::move(p), ec, bufferlist{});
177+
ceph::async::post(std::move(p), ec, 0, bufferlist{});
174178
} else {
175179
p.release(); // release ownership until completion
176180
}
177181
}, token, ctx.get_executor(), io, oid, read_op, flags);
178182
}
179183

180184
/// Calls IoCtx::aio_operate() and arranges for the AioCompletion to call a
181-
/// given handler with signature (boost::system::error_code).
185+
/// given handler with signature (error_code, version_t).
182186
template <typename ExecutionContext, typename CompletionToken>
183187
auto async_operate(ExecutionContext& ctx, IoCtx& io, const std::string& oid,
184188
ObjectWriteOperation *write_op, int flags,
@@ -196,15 +200,15 @@ auto async_operate(ExecutionContext& ctx, IoCtx& io, const std::string& oid,
196200
int ret = io.aio_operate(oid, op.aio_completion.get(), write_op, flags, trace_ctx);
197201
if (ret < 0) {
198202
auto ec = boost::system::error_code{-ret, librados::detail::err_category()};
199-
ceph::async::post(std::move(p), ec);
203+
ceph::async::post(std::move(p), ec, 0);
200204
} else {
201205
p.release(); // release ownership until completion
202206
}
203207
}, token, ctx.get_executor(), io, oid, write_op, flags, trace_ctx);
204208
}
205209

206210
/// Calls IoCtx::aio_notify() and arranges for the AioCompletion to call a
207-
/// given handler with signature (boost::system::error_code, bufferlist).
211+
/// given handler with signature (error_code, version_t, bufferlist).
208212
template <typename ExecutionContext, typename CompletionToken>
209213
auto async_notify(ExecutionContext& ctx, IoCtx& io, const std::string& oid,
210214
bufferlist& bl, uint64_t timeout_ms, CompletionToken &&token)
@@ -221,7 +225,7 @@ auto async_notify(ExecutionContext& ctx, IoCtx& io, const std::string& oid,
221225
bl, timeout_ms, &op.result);
222226
if (ret < 0) {
223227
auto ec = boost::system::error_code{-ret, librados::detail::err_category()};
224-
ceph::async::post(std::move(p), ec, bufferlist{});
228+
ceph::async::post(std::move(p), ec, 0, bufferlist{});
225229
} else {
226230
p.release(); // release ownership until completion
227231
}

src/rgw/driver/rados/rgw_rados.cc

Lines changed: 21 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -3248,7 +3248,7 @@ int RGWRados::Object::Write::_do_write_meta(uint64_t size, uint64_t accounted_si
32483248
auto& ioctx = ref.ioctx;
32493249

32503250
tracepoint(rgw_rados, operate_enter, req_id.c_str());
3251-
r = rgw_rados_operate(rctx.dpp, ref.ioctx, ref.obj.oid, &op, rctx.y, 0, &trace);
3251+
r = rgw_rados_operate(rctx.dpp, ref.ioctx, ref.obj.oid, &op, rctx.y, 0, &trace, &epoch);
32523252
tracepoint(rgw_rados, operate_exit, req_id.c_str());
32533253
if (r < 0) { /* we can expect to get -ECANCELED if object was replaced under,
32543254
or -ENOENT if was removed, or -EEXIST if it did not exist
@@ -3260,7 +3260,6 @@ int RGWRados::Object::Write::_do_write_meta(uint64_t size, uint64_t accounted_si
32603260
goto done_cancel;
32613261
}
32623262

3263-
epoch = ioctx.get_last_version();
32643263
poolid = ioctx.get_id();
32653264

32663265
r = target->complete_atomic_modification(rctx.dpp, rctx.y);
@@ -5876,7 +5875,8 @@ int RGWRados::Object::Delete::delete_obj(optional_yield y, const DoutPrefixProvi
58765875
}
58775876

58785877
auto& ioctx = ref.ioctx;
5879-
r = rgw_rados_operate(dpp, ioctx, ref.obj.oid, &op, y);
5878+
version_t epoch = 0;
5879+
r = rgw_rados_operate(dpp, ioctx, ref.obj.oid, &op, y, 0, nullptr, &epoch);
58805880

58815881
/* raced with another operation, object state is indeterminate */
58825882
const bool need_invalidate = (r == -ECANCELED);
@@ -5888,7 +5888,7 @@ int RGWRados::Object::Delete::delete_obj(optional_yield y, const DoutPrefixProvi
58885888
tombstone_entry entry{*state};
58895889
obj_tombstone_cache->add(obj, entry);
58905890
}
5891-
r = index_op.complete_del(dpp, poolid, ioctx.get_last_version(), state->mtime, params.remove_objs, y, log_op);
5891+
r = index_op.complete_del(dpp, poolid, epoch, state->mtime, params.remove_objs, y, log_op);
58925892

58935893
int ret = target->complete_atomic_modification(dpp, y);
58945894
if (ret < 0) {
@@ -6609,7 +6609,8 @@ int RGWRados::set_attrs(const DoutPrefixProvider *dpp, RGWObjectCtx* octx, RGWBu
66096609
struct timespec mtime_ts = real_clock::to_timespec(mtime);
66106610
op.mtime2(&mtime_ts);
66116611
auto& ioctx = ref.ioctx;
6612-
r = rgw_rados_operate(dpp, ioctx, ref.obj.oid, &op, y);
6612+
version_t epoch = 0;
6613+
r = rgw_rados_operate(dpp, ioctx, ref.obj.oid, &op, y, 0, nullptr, &epoch);
66136614
if (state) {
66146615
if (r >= 0) {
66156616
ACLOwner owner;
@@ -6640,7 +6641,6 @@ int RGWRados::set_attrs(const DoutPrefixProvider *dpp, RGWObjectCtx* octx, RGWBu
66406641
iter != state->attrset.end()) {
66416642
storage_class = rgw_bl_str(iter->second);
66426643
}
6643-
uint64_t epoch = ioctx.get_last_version();
66446644
int64_t poolid = ioctx.get_id();
66456645
r = index_op.complete(dpp, poolid, epoch, state->size, state->accounted_size,
66466646
mtime, etag, content_type, storage_class, owner,
@@ -8805,12 +8805,7 @@ int RGWRados::raw_obj_stat(const DoutPrefixProvider *dpp,
88058805
}
88068806

88078807
bufferlist outbl;
8808-
r = rgw_rados_operate(dpp, ref.ioctx, ref.obj.oid, &op, &outbl, y);
8809-
8810-
if (epoch) {
8811-
*epoch = ref.ioctx.get_last_version();
8812-
}
8813-
8808+
r = rgw_rados_operate(dpp, ref.ioctx, ref.obj.oid, &op, &outbl, y, 0, nullptr, epoch);
88148809
if (r < 0)
88158810
return r;
88168811

@@ -9662,6 +9657,12 @@ int RGWRados::cls_bucket_list_ordered(const DoutPrefixProvider *dpp,
96629657
num_entries << " total entries" << dendl;
96639658

96649659
auto& ioctx = index_pool;
9660+
9661+
// XXX: check_disk_state() relies on ioctx.get_last_version() but that
9662+
// returns 0 because CLSRGWIssueBucketList doesn't make any synchonous calls
9663+
rgw_bucket_entry_ver index_ver;
9664+
index_ver.pool = ioctx.get_id();
9665+
96659666
std::map<int, rgw_cls_list_ret> shard_list_results;
96669667
cls_rgw_obj_key start_after_key(start_after.name, start_after.instance);
96679668
maybe_warn_about_blocking(dpp); // TODO: use AioTrottle
@@ -9786,12 +9787,10 @@ int RGWRados::cls_bucket_list_ordered(const DoutPrefixProvider *dpp,
97869787
/* there are uncommitted ops. We need to check the current
97879788
* state, and if the tags are old we need to do clean-up as
97889789
* well. */
9789-
librados::IoCtx sub_ctx;
9790-
sub_ctx.dup(ioctx);
97919790
ldout_bitx(bitx, dpp, 20) << "INFO: " << __func__ <<
97929791
" calling check_disk_state bucket=" << bucket_info.bucket <<
97939792
" entry=" << dirent.key << dendl_bitx;
9794-
r = check_disk_state(dpp, sub_ctx, bucket_info, dirent, dirent,
9793+
r = check_disk_state(dpp, bucket_info, index_ver, dirent, dirent,
97959794
updates[tracker.oid_name], y);
97969795
if (r < 0 && r != -ENOENT) {
97979796
ldpp_dout(dpp, 0) << __func__ <<
@@ -10013,6 +10012,9 @@ int RGWRados::cls_bucket_list_unordered(const DoutPrefixProvider *dpp,
1001310012
}
1001410013
}
1001510014

10015+
rgw_bucket_entry_ver index_ver;
10016+
index_ver.pool = ioctx.get_id();
10017+
1001610018
uint32_t count = 0u;
1001710019
std::map<std::string, bufferlist> updates;
1001810020
rgw_obj_index_key last_added_entry;
@@ -10027,7 +10029,7 @@ int RGWRados::cls_bucket_list_unordered(const DoutPrefixProvider *dpp,
1002710029
cls_rgw_bucket_list_op(op, marker, prefix, empty_delimiter,
1002810030
num_entries,
1002910031
list_versions, &result);
10030-
r = rgw_rados_operate(dpp, ioctx, oid, &op, nullptr, y);
10032+
r = rgw_rados_operate(dpp, ioctx, oid, &op, nullptr, y, 0, nullptr, &index_ver.epoch);
1003110033
if (r < 0) {
1003210034
ldpp_dout(dpp, 0) << "ERROR: " << __func__ <<
1003310035
": error in rgw_rados_operate (bucket list op), r=" << r << dendl;
@@ -10044,12 +10046,10 @@ int RGWRados::cls_bucket_list_unordered(const DoutPrefixProvider *dpp,
1004410046
force_check) {
1004510047
/* there are uncommitted ops. We need to check the current state,
1004610048
* and if the tags are old we need to do cleanup as well. */
10047-
librados::IoCtx sub_ctx;
10048-
sub_ctx.dup(ioctx);
1004910049
ldout_bitx(bitx, dpp, 20) << "INFO: " << __func__ <<
1005010050
": calling check_disk_state bucket=" << bucket_info.bucket <<
1005110051
" entry=" << dirent.key << dendl_bitx;
10052-
r = check_disk_state(dpp, sub_ctx, bucket_info, dirent, dirent, updates[oid], y);
10052+
r = check_disk_state(dpp, bucket_info, index_ver, dirent, dirent, updates[oid], y);
1005310053
if (r < 0 && r != -ENOENT) {
1005410054
ldpp_dout(dpp, 0) << "ERROR: " << __func__ <<
1005510055
": error in check_disk_state, r=" << r << dendl;
@@ -10281,8 +10281,8 @@ int RGWRados::remove_objs_from_index(const DoutPrefixProvider *dpp,
1028110281
}
1028210282

1028310283
int RGWRados::check_disk_state(const DoutPrefixProvider *dpp,
10284-
librados::IoCtx io_ctx,
1028510284
RGWBucketInfo& bucket_info,
10285+
const rgw_bucket_entry_ver& index_ver,
1028610286
rgw_bucket_dir_entry& list_state,
1028710287
rgw_bucket_dir_entry& object,
1028810288
bufferlist& suggested_updates,
@@ -10310,8 +10310,6 @@ int RGWRados::check_disk_state(const DoutPrefixProvider *dpp,
1031010310
ldpp_dout(dpp, 0) << "WARNING: generated locator (" << loc << ") is different from listed locator (" << list_state.locator << ")" << dendl;
1031110311
}
1031210312

10313-
io_ctx.locator_set_key(list_state.locator);
10314-
1031510313
RGWObjState *astate = NULL;
1031610314
RGWObjManifest *manifest = nullptr;
1031710315
RGWObjectCtx octx(this->driver);
@@ -10332,8 +10330,7 @@ int RGWRados::check_disk_state(const DoutPrefixProvider *dpp,
1033210330
}
1033310331

1033410332
// encode a suggested removal of that key
10335-
list_state.ver.epoch = io_ctx.get_last_version();
10336-
list_state.ver.pool = io_ctx.get_id();
10333+
list_state.ver = index_ver;
1033710334
ldout_bitx(bitx, dpp, 10) << "INFO: " << __func__ << ": encoding remove of " << list_state.key << " on suggested_updates" << dendl_bitx;
1033810335
cls_rgw_encode_suggestion(CEPH_RGW_REMOVE | suggest_flag, list_state, suggested_updates);
1033910336
return -ENOENT;

src/rgw/driver/rados/rgw_rados.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1642,8 +1642,8 @@ class RGWRados
16421642
* will encode that info as a suggested update.)
16431643
*/
16441644
int check_disk_state(const DoutPrefixProvider *dpp,
1645-
librados::IoCtx io_ctx,
16461645
RGWBucketInfo& bucket_info,
1646+
const rgw_bucket_entry_ver& index_ver,
16471647
rgw_bucket_dir_entry& list_state,
16481648
rgw_bucket_dir_entry& object,
16491649
bufferlist& suggested_updates,

src/rgw/driver/rados/rgw_tools.cc

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -198,36 +198,52 @@ int rgw_delete_system_obj(const DoutPrefixProvider *dpp,
198198

199199
int rgw_rados_operate(const DoutPrefixProvider *dpp, librados::IoCtx& ioctx, const std::string& oid,
200200
librados::ObjectReadOperation *op, bufferlist* pbl,
201-
optional_yield y, int flags, const jspan_context* trace_info)
201+
optional_yield y, int flags, const jspan_context* trace_info,
202+
version_t* pver)
202203
{
203204
// given a yield_context, call async_operate() to yield the coroutine instead
204205
// of blocking
205206
if (y) {
206207
auto& yield = y.get_yield_context();
207208
boost::system::error_code ec;
208-
auto bl = librados::async_operate(
209+
auto [ver, bl] = librados::async_operate(
209210
yield, ioctx, oid, op, flags, trace_info, yield[ec]);
210211
if (pbl) {
211212
*pbl = std::move(bl);
212213
}
214+
if (pver) {
215+
*pver = ver;
216+
}
213217
return -ec.value();
214218
}
215219
maybe_warn_about_blocking(dpp);
216-
return ioctx.operate(oid, op, nullptr, flags);
220+
int r = ioctx.operate(oid, op, nullptr, flags);
221+
if (pver) {
222+
*pver = ioctx.get_last_version();
223+
}
224+
return r;
217225
}
218226

219227
int rgw_rados_operate(const DoutPrefixProvider *dpp, librados::IoCtx& ioctx, const std::string& oid,
220228
librados::ObjectWriteOperation *op, optional_yield y,
221-
int flags, const jspan_context* trace_info)
229+
int flags, const jspan_context* trace_info, version_t* pver)
222230
{
223231
if (y) {
224232
auto& yield = y.get_yield_context();
225233
boost::system::error_code ec;
226-
librados::async_operate(yield, ioctx, oid, op, flags, trace_info, yield[ec]);
234+
version_t ver = librados::async_operate(yield, ioctx, oid, op, flags,
235+
trace_info, yield[ec]);
236+
if (pver) {
237+
*pver = ver;
238+
}
227239
return -ec.value();
228240
}
229241
maybe_warn_about_blocking(dpp);
230-
return ioctx.operate(oid, op, flags, trace_info);
242+
int r = ioctx.operate(oid, op, flags, trace_info);
243+
if (pver) {
244+
*pver = ioctx.get_last_version();
245+
}
246+
return r;
231247
}
232248

233249
int rgw_rados_notify(const DoutPrefixProvider *dpp, librados::IoCtx& ioctx, const std::string& oid,
@@ -237,8 +253,8 @@ int rgw_rados_notify(const DoutPrefixProvider *dpp, librados::IoCtx& ioctx, cons
237253
if (y) {
238254
auto& yield = y.get_yield_context();
239255
boost::system::error_code ec;
240-
auto reply = librados::async_notify(yield, ioctx, oid,
241-
bl, timeout_ms, yield[ec]);
256+
auto [ver, reply] = librados::async_notify(yield, ioctx, oid,
257+
bl, timeout_ms, yield[ec]);
242258
if (pbl) {
243259
*pbl = std::move(reply);
244260
}

0 commit comments

Comments
 (0)