Skip to content

Commit 14ddf3c

Browse files
committed
{neorados,osdc}: Support subsystem cancellation
Tag operations with a subsystem so we can cancel them all in one go. Signed-off-by: Adam C. Emerson <[email protected]>
1 parent 4c0f422 commit 14ddf3c

File tree

5 files changed

+118
-29
lines changed

5 files changed

+118
-29
lines changed

src/include/neorados/RADOS.hpp

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1408,26 +1408,29 @@ class RADOS final
14081408
auto execute(Object o, IOContext ioc, ReadOp op,
14091409
ceph::buffer::list* bl,
14101410
CompletionToken&& token, uint64_t* objver = nullptr,
1411-
const blkin_trace_info* trace_info = nullptr) {
1411+
const blkin_trace_info* trace_info = nullptr,
1412+
std::uint64_t subsystem = 0) {
14121413
auto consigned = consign(std::forward<CompletionToken>(token));
14131414
return boost::asio::async_initiate<decltype(consigned), Op::Signature>(
1414-
[bl, objver, trace_info, this](auto&& handler, Object o, IOContext ioc,
1415-
ReadOp op) {
1415+
[bl, objver, trace_info,
1416+
subsystem, this](auto&& handler, Object o, IOContext ioc,
1417+
ReadOp op) {
14161418
execute_(std::move(o), std::move(ioc), std::move(op), bl,
1417-
std::move(handler), objver, trace_info);
1419+
std::move(handler), objver, trace_info, subsystem);
14181420
}, consigned, std::move(o), std::move(ioc), std::move(op));
14191421
}
14201422

14211423
template<boost::asio::completion_token_for<Op::Signature> CompletionToken>
14221424
auto execute(Object o, IOContext ioc, WriteOp op,
14231425
CompletionToken&& token, uint64_t* objver = nullptr,
1424-
const blkin_trace_info* trace_info = nullptr) {
1426+
const blkin_trace_info* trace_info = nullptr,
1427+
std::uint64_t subsystem = 0) {
14251428
auto consigned = consign(std::forward<CompletionToken>(token));
14261429
return boost::asio::async_initiate<decltype(consigned), Op::Signature>(
1427-
[objver, trace_info, this](auto&& handler, Object o, IOContext ioc,
1428-
WriteOp op) {
1430+
[objver, trace_info,
1431+
subsystem, this](auto&& handler, Object o, IOContext ioc, WriteOp op) {
14291432
execute_(std::move(o), std::move(ioc), std::move(op),
1430-
std::move(handler), objver, trace_info);
1433+
std::move(handler), objver, trace_info, subsystem);
14311434
}, consigned, std::move(o), std::move(ioc), std::move(op));
14321435
}
14331436

@@ -1803,6 +1806,9 @@ class RADOS final
18031806

18041807
uint64_t instance_id() const;
18051808

1809+
uint64_t new_subsystem() const;
1810+
void cancel_subsystem(uint64_t subsystem) const;
1811+
18061812
private:
18071813

18081814
RADOS();
@@ -1816,11 +1822,13 @@ class RADOS final
18161822

18171823
void execute_(Object o, IOContext ioc, ReadOp op,
18181824
ceph::buffer::list* bl, Op::Completion c,
1819-
uint64_t* objver, const blkin_trace_info* trace_info);
1825+
uint64_t* objver, const blkin_trace_info* trace_info,
1826+
uint64_t subsystem);
18201827

18211828
void execute_(Object o, IOContext ioc, WriteOp op,
18221829
Op::Completion c, uint64_t* objver,
1823-
const blkin_trace_info* trace_info);
1830+
const blkin_trace_info* trace_info,
1831+
uint64_t subsystem);
18241832

18251833
void lookup_pool_(std::string name, LookupPoolComp c);
18261834
void list_pools_(LSPoolsComp c);

src/neorados/RADOS.cc

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -938,7 +938,8 @@ asio::io_context& RADOS::get_io_context() {
938938
void RADOS::execute_(Object o, IOContext _ioc, ReadOp _op,
939939
cb::list* bl,
940940
ReadOp::Completion c, version_t* objver,
941-
const blkin_trace_info *trace_info) {
941+
const blkin_trace_info *trace_info,
942+
std::uint64_t subsystem) {
942943
if (_op.size() == 0) {
943944
asio::dispatch(asio::append(std::move(c), bs::error_code{}));
944945
return;
@@ -957,14 +958,16 @@ void RADOS::execute_(Object o, IOContext _ioc, ReadOp _op,
957958
trace.event("init");
958959
impl->objecter->read(
959960
*oid, ioc->oloc, std::move(op->op), ioc->snap_seq, bl, flags,
960-
std::move(c), objver, nullptr /* data_offset */, 0 /* features */, &trace);
961+
std::move(c), objver, nullptr /* data_offset */, 0 /* features */, &trace,
962+
subsystem);
961963

962964
trace.event("submitted");
963965
}
964966

965967
void RADOS::execute_(Object o, IOContext _ioc, WriteOp _op,
966968
WriteOp::Completion c, version_t* objver,
967-
const blkin_trace_info *trace_info) {
969+
const blkin_trace_info *trace_info,
970+
std::uint64_t subsystem) {
968971
if (_op.size() == 0) {
969972
asio::dispatch(asio::append(std::move(c), bs::error_code{}));
970973
return;
@@ -989,7 +992,7 @@ void RADOS::execute_(Object o, IOContext _ioc, WriteOp _op,
989992
impl->objecter->mutate(
990993
*oid, ioc->oloc, std::move(op->op), ioc->snapc,
991994
mtime, flags,
992-
std::move(c), objver, osd_reqid_t{}, &trace);
995+
std::move(c), objver, osd_reqid_t{}, &trace, subsystem);
993996
trace.event("submitted");
994997
}
995998

@@ -1966,6 +1969,17 @@ uint64_t RADOS::instance_id() const {
19661969
return impl->get_instance_id();
19671970
}
19681971

1972+
uint64_t RADOS::new_subsystem() const
1973+
{
1974+
return impl->objecter->unique_subsystem_id();
1975+
}
1976+
1977+
void RADOS::cancel_subsystem(uint64_t subsystem) const
1978+
{
1979+
impl->objecter->subsystem_cancel(subsystem,
1980+
asio::error::operation_aborted);
1981+
}
1982+
19691983
#pragma GCC diagnostic push
19701984
#pragma GCC diagnostic ignored "-Wnon-virtual-dtor"
19711985
#pragma clang diagnostic push

src/osdc/Objecter.cc

Lines changed: 58 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2564,7 +2564,8 @@ void Objecter::_op_submit(Op *op, shunique_lock<ceph::shared_mutex>& sul, ceph_t
25642564
ldout(cct, 5) << num_in_flight << " in flight" << dendl;
25652565
}
25662566

2567-
int Objecter::op_cancel(OSDSession *s, ceph_tid_t tid, int r)
2567+
int Objecter::op_cancel(OSDSession *s, ceph_tid_t tid, int r,
2568+
bs::error_code ec)
25682569
{
25692570
ceph_assert(initialized);
25702571

@@ -2590,7 +2591,7 @@ int Objecter::op_cancel(OSDSession *s, ceph_tid_t tid, int r)
25902591
Op *op = p->second;
25912592
if (op->has_completion()) {
25922593
num_in_flight--;
2593-
op->complete(osdcode(r), r, service.get_executor());
2594+
op->complete(ec, r, service.get_executor());
25942595
}
25952596
_op_cancel_map_check(op);
25962597
_finish_op(op, r);
@@ -2599,6 +2600,58 @@ int Objecter::op_cancel(OSDSession *s, ceph_tid_t tid, int r)
25992600
return 0;
26002601
}
26012602

2603+
void Objecter::subsystem_cancel(uint64_t subsystem, bs::error_code ec)
2604+
{
2605+
unique_lock wl(rwlock);
2606+
2607+
ldout(cct, 5) << __func__ << ": cancelling subsystem " << subsystem
2608+
<< " ec=" << ec.message() << dendl;
2609+
2610+
auto next_subsystem_op = [subsystem](const decltype(OSDSession::ops)& ops) {
2611+
auto i = std::find_if(
2612+
ops.begin(), ops.end(),
2613+
[subsystem](const auto& el) -> std::optional<ceph_tid_t> {
2614+
return el.second->subsystem == subsystem;
2615+
});
2616+
if (i != ops.cend()) {
2617+
return std::make_optional(i->second->tid);
2618+
}
2619+
return std::optional<ceph_tid_t>{std::nullopt};
2620+
};
2621+
2622+
// Since we only do this at shutdown, better to have it be slow than
2623+
// make it fast and introduce another cost per op.
2624+
2625+
start:
2626+
2627+
for (auto siter = osd_sessions.begin();
2628+
siter != osd_sessions.end(); ++siter) {
2629+
auto s = siter->second;
2630+
shared_lock sl(s->lock);
2631+
while (auto tid = next_subsystem_op(s->ops)) {
2632+
sl.unlock();
2633+
auto ret = op_cancel(s, *tid, ceph::from_error_code(ec), ec);
2634+
if (ret == -ENOENT) {
2635+
/* oh no! raced, maybe tid moved to another session, restarting */
2636+
goto start;
2637+
}
2638+
sl.lock();
2639+
}
2640+
}
2641+
2642+
// Handle case where the op is in homeless session
2643+
shared_lock sl(homeless_session->lock);
2644+
while (auto tid = next_subsystem_op(homeless_session->ops)) {
2645+
sl.unlock();
2646+
auto ret = op_cancel(homeless_session, *tid, ceph::from_error_code(ec), ec);
2647+
if (ret == -ENOENT) {
2648+
/* oh no! raced, maybe tid moved to another session, restarting */
2649+
goto start;
2650+
}
2651+
}
2652+
sl.unlock();
2653+
}
2654+
26022655
int Objecter::op_cancel(ceph_tid_t tid, int r)
26032656
{
26042657
int ret = 0;
@@ -2634,7 +2687,7 @@ int Objecter::_op_cancel(ceph_tid_t tid, int r)
26342687
shared_lock sl(s->lock);
26352688
if (s->ops.find(tid) != s->ops.end()) {
26362689
sl.unlock();
2637-
ret = op_cancel(s, tid, r);
2690+
ret = op_cancel(s, tid, r, osdcode(r));
26382691
if (ret == -ENOENT) {
26392692
/* oh no! raced, maybe tid moved to another session, restarting */
26402693
goto start;
@@ -2650,7 +2703,7 @@ int Objecter::_op_cancel(ceph_tid_t tid, int r)
26502703
shared_lock sl(homeless_session->lock);
26512704
if (homeless_session->ops.find(tid) != homeless_session->ops.end()) {
26522705
sl.unlock();
2653-
ret = op_cancel(homeless_session, tid, r);
2706+
ret = op_cancel(homeless_session, tid, r, osdcode(r));
26542707
if (ret == -ENOENT) {
26552708
/* oh no! raced, maybe tid moved to another session, restarting */
26562709
goto start;
@@ -2689,7 +2742,7 @@ epoch_t Objecter::op_cancel_writes(int r, int64_t pool)
26892742
sl.unlock();
26902743

26912744
for (auto titer = to_cancel.begin(); titer != to_cancel.end(); ++titer) {
2692-
int cancel_result = op_cancel(s, *titer, r);
2745+
int cancel_result = op_cancel(s, *titer, r, osdcode(r));
26932746
// We hold rwlock across search and cancellation, so cancels
26942747
// should always succeed
26952748
ceph_assert(cancel_result == 0);

src/osdc/Objecter.h

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1713,6 +1713,7 @@ class Objecter : public md_config_obs_t, public Dispatcher {
17131713

17141714
private:
17151715
std::atomic<uint64_t> last_tid{0};
1716+
std::atomic<uint64_t> last_subsystem{0};
17161717
std::atomic<unsigned> inflight_ops{0};
17171718
std::atomic<int> client_inc{-1};
17181719
uint64_t max_linger_id{0};
@@ -1795,6 +1796,11 @@ class Objecter : public md_config_obs_t, public Dispatcher {
17951796
void maybe_request_map();
17961797

17971798
void enable_blocklist_events();
1799+
1800+
uint64_t unique_subsystem_id() {
1801+
return ++last_subsystem;
1802+
}
1803+
17981804
private:
17991805

18001806
void _maybe_request_map();
@@ -2034,6 +2040,7 @@ class Objecter : public md_config_obs_t, public Dispatcher {
20342040

20352041
osd_reqid_t reqid; // explicitly setting reqid
20362042
ZTracer::Trace trace;
2043+
std::uint64_t subsystem = 0;
20372044
const jspan_context* otel_trace = nullptr;
20382045

20392046
static bool has_completion(decltype(onfinish)& f) {
@@ -2065,7 +2072,7 @@ class Objecter : public md_config_obs_t, public Dispatcher {
20652072

20662073
Op(const object_t& o, const object_locator_t& ol, osdc_opvec&& _ops,
20672074
int f, OpComp fin, version_t *ov, int *offset = nullptr,
2068-
ZTracer::Trace *parent_trace = nullptr) :
2075+
ZTracer::Trace *parent_trace = nullptr, uint64_t subsystem = 0) :
20692076
target(o, ol, f),
20702077
ops(std::move(_ops)),
20712078
out_bl(ops.size(), nullptr),
@@ -2074,7 +2081,7 @@ class Objecter : public md_config_obs_t, public Dispatcher {
20742081
out_ec(ops.size(), nullptr),
20752082
onfinish(std::move(fin)),
20762083
objver(ov),
2077-
data_offset(offset) {
2084+
data_offset(offset), subsystem(subsystem) {
20782085
if (target.base_oloc.key == o)
20792086
target.base_oloc.key.clear();
20802087
if (parent_trace && parent_trace->valid()) {
@@ -2085,7 +2092,8 @@ class Objecter : public md_config_obs_t, public Dispatcher {
20852092

20862093
Op(const object_t& o, const object_locator_t& ol, osdc_opvec&& _ops,
20872094
int f, Context* fin, version_t *ov, int *offset = nullptr,
2088-
ZTracer::Trace *parent_trace = nullptr, const jspan_context *otel_trace = nullptr) :
2095+
ZTracer::Trace *parent_trace = nullptr, const jspan_context *otel_trace = nullptr,
2096+
uint64_t subsystem = 0) :
20892097
target(o, ol, f),
20902098
ops(std::move(_ops)),
20912099
out_bl(ops.size(), nullptr),
@@ -2095,6 +2103,7 @@ class Objecter : public md_config_obs_t, public Dispatcher {
20952103
onfinish(fin),
20962104
objver(ov),
20972105
data_offset(offset),
2106+
subsystem(subsystem),
20982107
otel_trace(otel_trace) {
20992108
if (target.base_oloc.key == o)
21002109
target.base_oloc.key.clear();
@@ -2932,7 +2941,8 @@ class Objecter : public md_config_obs_t, public Dispatcher {
29322941

29332942
/// cancel an in-progress request with the given return code
29342943
private:
2935-
int op_cancel(OSDSession *s, ceph_tid_t tid, int r);
2944+
int op_cancel(OSDSession *s, ceph_tid_t tid, int r,
2945+
boost::system::error_code ec);
29362946
int _op_cancel(ceph_tid_t tid, int r);
29372947

29382948
int get_read_flags(int flags) {
@@ -2949,6 +2959,7 @@ class Objecter : public md_config_obs_t, public Dispatcher {
29492959

29502960
public:
29512961
int op_cancel(ceph_tid_t tid, int r);
2962+
void subsystem_cancel(uint64_t subsystem, boost::system::error_code e);
29522963
int op_cancel(const std::vector<ceph_tid_t>& tidls, int r);
29532964

29542965
/**
@@ -3054,10 +3065,10 @@ class Objecter : public md_config_obs_t, public Dispatcher {
30543065
ceph::real_time mtime, int flags,
30553066
Op::OpComp oncommit,
30563067
version_t *objver = NULL, osd_reqid_t reqid = osd_reqid_t(),
3057-
ZTracer::Trace *parent_trace = nullptr) {
3068+
ZTracer::Trace *parent_trace = nullptr, uint32_t subsystem = 0) {
30583069
Op *o = new Op(oid, oloc, std::move(op.ops), flags | global_op_flags |
30593070
CEPH_OSD_FLAG_WRITE, std::move(oncommit), objver,
3060-
nullptr, parent_trace);
3071+
nullptr, parent_trace, subsystem);
30613072
o->priority = op.priority;
30623073
o->mtime = mtime;
30633074
o->snapc = snapc;
@@ -3125,10 +3136,11 @@ class Objecter : public md_config_obs_t, public Dispatcher {
31253136
ObjectOperation&& op, snapid_t snapid, ceph::buffer::list *pbl,
31263137
int flags, Op::OpComp onack,
31273138
version_t *objver = nullptr, int *data_offset = nullptr,
3128-
uint64_t features = 0, ZTracer::Trace *parent_trace = nullptr) {
3139+
uint64_t features = 0, ZTracer::Trace *parent_trace = nullptr,
3140+
uint64_t subsystem = 0) {
31293141
Op *o = new Op(oid, oloc, std::move(op.ops), get_read_flags(flags),
31303142
std::move(onack), objver,
3131-
data_offset, parent_trace);
3143+
data_offset, parent_trace, subsystem);
31323144
o->priority = op.priority;
31333145
o->snapid = snapid;
31343146
o->outbl = pbl;

src/test/librados_test_stub/NeoradosTestStub.cc

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -584,7 +584,8 @@ boost::asio::io_context::executor_type neorados::RADOS::get_executor() const {
584584

585585
void RADOS::execute_(Object o, IOContext ioc, ReadOp op,
586586
ceph::buffer::list* bl, Op::Completion c,
587-
uint64_t* objver, const blkin_trace_info* trace_info) {
587+
uint64_t* objver, const blkin_trace_info* trace_info,
588+
uint64_t subsystem) {
588589
auto io_ctx = impl->get_io_ctx(ioc);
589590
if (io_ctx == nullptr) {
590591
asio::dispatch(asio::append(std::move(c), osdc_errc::pool_dne));
@@ -603,7 +604,8 @@ void RADOS::execute_(Object o, IOContext ioc, ReadOp op,
603604

604605
void RADOS::execute_(Object o, IOContext ioc, WriteOp op,
605606
Op::Completion c, uint64_t* objver,
606-
const blkin_trace_info* trace_info) {
607+
const blkin_trace_info* trace_info,
608+
uint64_t subsystem) {
607609
auto io_ctx = impl->get_io_ctx(ioc);
608610
if (io_ctx == nullptr) {
609611
asio::dispatch(asio::append(std::move(c), osdc_errc::pool_dne));

0 commit comments

Comments
 (0)