Skip to content

Commit 93cbc92

Browse files
authored
Merge pull request ceph#60202 from athanatos/sjust/wip-crimson-io
crimson: replace do_osd_ops* with simpler, more general mechanism Reviewed-by: Yingxin Cheng <[email protected]> Reviewed-by: Matan Breizman <[email protected]>
2 parents 2966f22 + 2b562b6 commit 93cbc92

16 files changed

+404
-523
lines changed

src/crimson/common/log.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ static inline seastar::log_level to_log_level(int level) {
9090
#define SUBLOGDPP(subname_, level_, MSG, dpp, ...) \
9191
LOGGER(subname_).log(level_, "{} {}: " MSG, dpp, FNAME , ##__VA_ARGS__)
9292
#define SUBLOGDPPI(subname_, level_, MSG, dpp, ...) \
93-
LOGGER(subname_).log(level_, "{} {}: " MSG, \
93+
LOGGER(subname_).log(level_, "{} {} {}: " MSG, \
9494
interruptor::get_interrupt_cond(), dpp, FNAME , ##__VA_ARGS__)
9595
#define SUBTRACEDPP(subname_, ...) SUBLOGDPP(subname_, seastar::log_level::trace, __VA_ARGS__)
9696
#define SUBTRACEDPPI(subname_, ...) SUBLOGDPPI(subname_, seastar::log_level::trace, __VA_ARGS__)
@@ -106,7 +106,7 @@ static inline seastar::log_level to_log_level(int level) {
106106
#define LOGDPP(level_, MSG, dpp, ...) \
107107
LOCAL_LOGGER.log(level_, "{} {}: " MSG, dpp, FNAME , ##__VA_ARGS__)
108108
#define LOGDPPI(level_, MSG, dpp, ...) \
109-
LOCAL_LOGGER.log(level_, "{} {}: " MSG, \
109+
LOCAL_LOGGER.log(level_, "{} {} {}: " MSG, \
110110
interruptor::get_interrupt_cond(), dpp, FNAME , ##__VA_ARGS__)
111111
#define TRACEDPP(...) LOGDPP(seastar::log_level::trace, __VA_ARGS__)
112112
#define TRACEDPPI(...) LOGDPPI(seastar::log_level::trace, __VA_ARGS__)

src/crimson/osd/ops_executer.cc

Lines changed: 32 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -504,7 +504,7 @@ OpsExecuter::list_snaps_iertr::future<> OpsExecuter::do_list_snaps(
504504
auto p = ss.clone_snaps.find(clone);
505505
if (p == ss.clone_snaps.end()) {
506506
logger().error(
507-
"OpsExecutor::do_list_snaps: {} has inconsistent "
507+
"OpsExecuter::do_list_snaps: {} has inconsistent "
508508
"clone_snaps, missing clone {}",
509509
os.oi.soid,
510510
clone);
@@ -518,7 +518,7 @@ OpsExecuter::list_snaps_iertr::future<> OpsExecuter::do_list_snaps(
518518
auto p = ss.clone_overlap.find(clone);
519519
if (p == ss.clone_overlap.end()) {
520520
logger().error(
521-
"OpsExecutor::do_list_snaps: {} has inconsistent "
521+
"OpsExecuter::do_list_snaps: {} has inconsistent "
522522
"clone_overlap, missing clone {}",
523523
os.oi.soid,
524524
clone);
@@ -532,7 +532,7 @@ OpsExecuter::list_snaps_iertr::future<> OpsExecuter::do_list_snaps(
532532
auto p = ss.clone_size.find(clone);
533533
if (p == ss.clone_size.end()) {
534534
logger().error(
535-
"OpsExecutor::do_list_snaps: {} has inconsistent "
535+
"OpsExecuter::do_list_snaps: {} has inconsistent "
536536
"clone_size, missing clone {}",
537537
os.oi.soid,
538538
clone);
@@ -551,7 +551,7 @@ OpsExecuter::list_snaps_iertr::future<> OpsExecuter::do_list_snaps(
551551
}
552552
resp.seq = ss.seq;
553553
logger().error(
554-
"OpsExecutor::do_list_snaps: {}, resp.clones.size(): {}",
554+
"OpsExecuter::do_list_snaps: {}, resp.clones.size(): {}",
555555
os.oi.soid,
556556
resp.clones.size());
557557
resp.encode(osd_op.outdata);
@@ -678,16 +678,32 @@ OpsExecuter::do_execute_op(OSDOp& osd_op)
678678
whiteout = true;
679679
}
680680
return do_write_op([this, whiteout](auto& backend, auto& os, auto& txn) {
681-
int num_bytes = 0;
682-
// Calculate num_bytes to be removed
683-
if (obc->obs.oi.soid.is_snap()) {
684-
ceph_assert(obc->ssc->snapset.clone_overlap.count(obc->obs.oi.soid.snap));
685-
num_bytes = obc->ssc->snapset.get_clone_bytes(obc->obs.oi.soid.snap);
686-
} else {
687-
num_bytes = obc->obs.oi.size;
688-
}
689-
return backend.remove(os, txn, *osd_op_params,
690-
delta_stats, whiteout, num_bytes);
681+
struct emptyctx_t {};
682+
return with_effect_on_obc(
683+
emptyctx_t{},
684+
[&](auto &ctx) {
685+
int num_bytes = 0;
686+
// Calculate num_bytes to be removed
687+
if (obc->obs.oi.soid.is_snap()) {
688+
ceph_assert(obc->ssc->snapset.clone_overlap.count(
689+
obc->obs.oi.soid.snap));
690+
num_bytes = obc->ssc->snapset.get_clone_bytes(
691+
obc->obs.oi.soid.snap);
692+
} else {
693+
num_bytes = obc->obs.oi.size;
694+
}
695+
return backend.remove(os, txn, *osd_op_params,
696+
delta_stats, whiteout, num_bytes);
697+
},
698+
[](auto &&ctx, ObjectContextRef obc, Ref<PG>) {
699+
return seastar::do_for_each(
700+
obc->watchers,
701+
[](auto &p) { return p.second->remove(); }
702+
).then([obc] {
703+
obc->watchers.clear();
704+
return seastar::now();
705+
});
706+
});
691707
});
692708
}
693709
case CEPH_OSD_OP_CALL:
@@ -957,15 +973,14 @@ void OpsExecuter::CloningContext::apply_to(
957973
processed_obc.ssc->snapset = std::move(new_snapset);
958974
}
959975

960-
OpsExecuter::interruptible_future<std::vector<pg_log_entry_t>>
976+
std::vector<pg_log_entry_t>
961977
OpsExecuter::flush_clone_metadata(
962978
std::vector<pg_log_entry_t>&& log_entries,
963979
SnapMapper& snap_mapper,
964980
OSDriver& osdriver,
965981
ceph::os::Transaction& txn)
966982
{
967983
assert(!txn.empty());
968-
auto maybe_snap_mapped = interruptor::now();
969984
update_clone_overlap();
970985
if (cloning_ctx) {
971986
std::move(*cloning_ctx).apply_to(log_entries, *obc);
@@ -977,12 +992,7 @@ OpsExecuter::flush_clone_metadata(
977992
}
978993
logger().debug("{} done, initial snapset={}, new snapset={}",
979994
__func__, obc->obs.oi.soid, obc->ssc->snapset);
980-
return std::move(
981-
maybe_snap_mapped
982-
).then_interruptible([log_entries=std::move(log_entries)]() mutable {
983-
return interruptor::make_ready_future<std::vector<pg_log_entry_t>>(
984-
std::move(log_entries));
985-
});
995+
return std::move(log_entries);
986996
}
987997

988998
ObjectContextRef OpsExecuter::prepare_clone(

src/crimson/osd/ops_executer.h

Lines changed: 18 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ namespace crimson::osd {
4040
class PG;
4141

4242
// OpsExecuter -- a class for executing ops targeting a certain object.
43-
class OpsExecuter : public seastar::enable_lw_shared_from_this<OpsExecuter> {
43+
class OpsExecuter {
4444
friend class SnapTrimObjSubEvent;
4545

4646
using call_errorator = crimson::errorator<
@@ -170,16 +170,12 @@ class OpsExecuter : public seastar::enable_lw_shared_from_this<OpsExecuter> {
170170

171171
object_stat_sum_t delta_stats;
172172
private:
173-
// an operation can be divided into two stages: main and effect-exposing
174-
// one. The former is performed immediately on call to `do_osd_op()` while
175-
// the later on `submit_changes()` – after successfully processing main
176-
// stages of all involved operations. When any stage fails, none of all
177-
// scheduled effect-exposing stages will be executed.
178-
// when operation requires this division, some variant of `with_effect()`
179-
// should be used.
173+
// with_effect can be used to schedule operations to be performed
174+
// at commit time. effects will be discarded if the operation does
175+
// not commit.
180176
struct effect_t {
181177
// an effect can affect PG, i.e. create a watch timeout
182-
virtual osd_op_errorator::future<> execute(Ref<PG> pg) = 0;
178+
virtual seastar::future<> execute(Ref<PG> pg) = 0;
183179
virtual ~effect_t() = default;
184180
};
185181

@@ -213,10 +209,10 @@ class OpsExecuter : public seastar::enable_lw_shared_from_this<OpsExecuter> {
213209
* execute_clone
214210
*
215211
* If snapc contains a snap which occurred logically after the last write
216-
* seen by this object (see OpsExecutor::should_clone()), we first need
212+
* seen by this object (see OpsExecuter::should_clone()), we first need
217213
* make a clone of the object at its current state. execute_clone primes
218214
* txn with that clone operation and returns an
219-
* OpsExecutor::CloningContext which will allow us to fill in the corresponding
215+
* OpsExecuter::CloningContext which will allow us to fill in the corresponding
220216
* metadata and log_entries once the operations have been processed.
221217
*
222218
* Note that this strategy differs from classic, which instead performs this
@@ -267,7 +263,7 @@ class OpsExecuter : public seastar::enable_lw_shared_from_this<OpsExecuter> {
267263
*/
268264
void update_clone_overlap();
269265

270-
interruptible_future<std::vector<pg_log_entry_t>> flush_clone_metadata(
266+
std::vector<pg_log_entry_t> flush_clone_metadata(
271267
std::vector<pg_log_entry_t>&& log_entries,
272268
SnapMapper& snap_mapper,
273269
OSDriver& osdriver,
@@ -400,7 +396,7 @@ class OpsExecuter : public seastar::enable_lw_shared_from_this<OpsExecuter> {
400396
execute_op(OSDOp& osd_op);
401397

402398
using rep_op_fut_tuple =
403-
std::tuple<interruptible_future<>, osd_op_ierrorator::future<>>;
399+
std::tuple<interruptible_future<>, interruptible_future<>>;
404400
using rep_op_fut_t =
405401
interruptible_future<rep_op_fut_tuple>;
406402
template <typename MutFunc>
@@ -475,7 +471,7 @@ auto OpsExecuter::with_effect_on_obc(
475471
effect_func(std::move(effect_func)),
476472
obc(std::move(obc)) {
477473
}
478-
osd_op_errorator::future<> execute(Ref<PG> pg) final {
474+
seastar::future<> execute(Ref<PG> pg) final {
479475
return std::move(effect_func)(std::move(ctx),
480476
std::move(obc),
481477
std::move(pg));
@@ -502,15 +498,14 @@ OpsExecuter::flush_changes_n_do_ops_effects(
502498
assert(obc);
503499

504500
auto submitted = interruptor::now();
505-
auto all_completed =
506-
interruptor::make_interruptible(osd_op_errorator::now());
501+
auto all_completed = interruptor::now();
507502

508503
if (cloning_ctx) {
509504
ceph_assert(want_mutate);
510505
}
511506

512507
if (want_mutate) {
513-
auto log_entries = co_await flush_clone_metadata(
508+
auto log_entries = flush_clone_metadata(
514509
prepare_transaction(ops),
515510
snap_mapper,
516511
osdriver,
@@ -536,7 +531,7 @@ OpsExecuter::flush_changes_n_do_ops_effects(
536531
// need extra ref pg due to apply_stats() which can be executed after
537532
// informing snap mapper
538533
all_completed =
539-
std::move(all_completed).safe_then_interruptible([this, pg=this->pg] {
534+
std::move(all_completed).then_interruptible([this, pg=this->pg] {
540535
// let's do the cleaning of `op_effects` in destructor
541536
return interruptor::do_for_each(op_effects,
542537
[pg=std::move(pg)](auto& op_effect) {
@@ -552,21 +547,19 @@ OpsExecuter::flush_changes_n_do_ops_effects(
552547

553548
template <class Func>
554549
struct OpsExecuter::RollbackHelper {
555-
void rollback_obc_if_modified(const std::error_code& e);
556-
seastar::lw_shared_ptr<OpsExecuter> ox;
550+
void rollback_obc_if_modified();
551+
OpsExecuter *ox;
557552
Func func;
558553
};
559554

560555
template <class Func>
561556
inline OpsExecuter::RollbackHelper<Func>
562557
OpsExecuter::create_rollbacker(Func&& func) {
563-
return {shared_from_this(), std::forward<Func>(func)};
558+
return {this, std::forward<Func>(func)};
564559
}
565560

566-
567561
template <class Func>
568-
void OpsExecuter::RollbackHelper<Func>::rollback_obc_if_modified(
569-
const std::error_code& e)
562+
void OpsExecuter::RollbackHelper<Func>::rollback_obc_if_modified()
570563
{
571564
// Oops, an operation had failed. do_osd_ops() altogether with
572565
// OpsExecuter already dropped the ObjectStore::Transaction if
@@ -584,10 +577,9 @@ void OpsExecuter::RollbackHelper<Func>::rollback_obc_if_modified(
584577
assert(ox);
585578
const auto need_rollback = ox->has_seen_write();
586579
crimson::get_logger(ceph_subsys_osd).debug(
587-
"{}: object {} got error {}, need_rollback={}",
580+
"{}: object {} got error, need_rollback={}",
588581
__func__,
589582
ox->obc->get_oid(),
590-
e,
591583
need_rollback);
592584
if (need_rollback) {
593585
func(ox->obc);

src/crimson/osd/osd_operation.h

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,37 @@ struct PerShardPipeline {
4040
} create_or_wait_pg;
4141
};
4242

43+
struct PGPeeringPipeline {
44+
struct AwaitMap : OrderedExclusivePhaseT<AwaitMap> {
45+
static constexpr auto type_name = "PeeringEvent::PGPipeline::await_map";
46+
} await_map;
47+
struct Process : OrderedExclusivePhaseT<Process> {
48+
static constexpr auto type_name = "PeeringEvent::PGPipeline::process";
49+
} process;
50+
};
51+
52+
struct CommonPGPipeline {
53+
struct WaitForActive : OrderedExclusivePhaseT<WaitForActive> {
54+
static constexpr auto type_name = "CommonPGPipeline:::wait_for_active";
55+
} wait_for_active;
56+
struct RecoverMissing : OrderedConcurrentPhaseT<RecoverMissing> {
57+
static constexpr auto type_name = "CommonPGPipeline::recover_missing";
58+
} recover_missing;
59+
struct CheckAlreadyCompleteGetObc : OrderedExclusivePhaseT<CheckAlreadyCompleteGetObc> {
60+
static constexpr auto type_name = "CommonPGPipeline::check_already_complete_get_obc";
61+
} check_already_complete_get_obc;
62+
struct LockOBC : OrderedConcurrentPhaseT<LockOBC> {
63+
static constexpr auto type_name = "CommonPGPipeline::lock_obc";
64+
} lock_obc;
65+
struct Process : OrderedExclusivePhaseT<Process> {
66+
static constexpr auto type_name = "CommonPGPipeline::process";
67+
} process;
68+
struct WaitRepop : OrderedConcurrentPhaseT<WaitRepop> {
69+
static constexpr auto type_name = "ClientRequest::PGPipeline::wait_repop";
70+
} wait_repop;
71+
};
72+
73+
4374
enum class OperationTypeCode {
4475
client_request = 0,
4576
peering_event,

src/crimson/osd/osd_operation_external_tracking.h

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ struct LttngBackend
3636
ClientRequest::PGPipeline::RecoverMissing::
3737
BlockingEvent::ExitBarrierEvent::Backend,
3838
ClientRequest::PGPipeline::CheckAlreadyCompleteGetObc::BlockingEvent::Backend,
39-
ClientRequest::PGPipeline::GetOBC::BlockingEvent::Backend,
4039
ClientRequest::PGPipeline::LockOBC::BlockingEvent::Backend,
4140
ClientRequest::PGPipeline::LockOBC::BlockingEvent::ExitBarrierEvent::Backend,
4241
ClientRequest::PGPipeline::Process::BlockingEvent::Backend,
@@ -117,10 +116,6 @@ struct LttngBackend
117116
const ClientRequest::PGPipeline::CheckAlreadyCompleteGetObc& blocker) override {
118117
}
119118

120-
void handle(ClientRequest::PGPipeline::GetOBC::BlockingEvent& ev,
121-
const Operation& op,
122-
const ClientRequest::PGPipeline::GetOBC& blocker) override {
123-
}
124119

125120
void handle(ClientRequest::PGPipeline::LockOBC::BlockingEvent& ev,
126121
const Operation& op,
@@ -171,7 +166,6 @@ struct HistoricBackend
171166
ClientRequest::PGPipeline::RecoverMissing::
172167
BlockingEvent::ExitBarrierEvent::Backend,
173168
ClientRequest::PGPipeline::CheckAlreadyCompleteGetObc::BlockingEvent::Backend,
174-
ClientRequest::PGPipeline::GetOBC::BlockingEvent::Backend,
175169
ClientRequest::PGPipeline::LockOBC::BlockingEvent::Backend,
176170
ClientRequest::PGPipeline::LockOBC::BlockingEvent::ExitBarrierEvent::Backend,
177171
ClientRequest::PGPipeline::Process::BlockingEvent::Backend,
@@ -252,11 +246,6 @@ struct HistoricBackend
252246
const ClientRequest::PGPipeline::CheckAlreadyCompleteGetObc& blocker) override {
253247
}
254248

255-
void handle(ClientRequest::PGPipeline::GetOBC::BlockingEvent& ev,
256-
const Operation& op,
257-
const ClientRequest::PGPipeline::GetOBC& blocker) override {
258-
}
259-
260249
void handle(ClientRequest::PGPipeline::LockOBC::BlockingEvent& ev,
261250
const Operation& op,
262251
const ClientRequest::PGPipeline::LockOBC& blocker) override {

0 commit comments

Comments
 (0)