Skip to content

Commit ec6b9d1

Browse files
authored
Merge pull request ceph#57978 from athanatos/sjust/wip-63647-snaptrim-pipeline
crimson: simplify snaptrim operation pipline usage Reviewed-by: Matan Breizman <[email protected]>
2 parents 48f9fc6 + f0446b2 commit ec6b9d1

File tree

5 files changed

+97
-166
lines changed

5 files changed

+97
-166
lines changed

src/crimson/osd/osd_operations/client_request.h

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,6 @@ class ClientRequest final : public PhasedOperationT<ClientRequest>,
5353
struct AwaitMap : OrderedExclusivePhaseT<AwaitMap> {
5454
static constexpr auto type_name = "ClientRequest::PGPipeline::await_map";
5555
} await_map;
56-
struct WaitRepop : OrderedConcurrentPhaseT<WaitRepop> {
57-
static constexpr auto type_name = "ClientRequest::PGPipeline::wait_repop";
58-
} wait_repop;
5956
struct SendReply : OrderedExclusivePhaseT<SendReply> {
6057
static constexpr auto type_name = "ClientRequest::PGPipeline::send_reply";
6158
} send_reply;

src/crimson/osd/osd_operations/common/pg_pipeline.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@ class CommonPGPipeline {
2929
struct Process : OrderedExclusivePhaseT<Process> {
3030
static constexpr auto type_name = "CommonPGPipeline::process";
3131
} process;
32+
struct WaitRepop : OrderedConcurrentPhaseT<WaitRepop> {
33+
static constexpr auto type_name = "ClientRequest::PGPipeline::wait_repop";
34+
} wait_repop;
3235
};
3336

3437
} // namespace crimson::osd

src/crimson/osd/osd_operations/snaptrim_event.cc

Lines changed: 90 additions & 133 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
22
// vim: ts=8 sw=2 smarttab
33

4+
#include "crimson/common/coroutine.h"
45
#include "crimson/osd/osd_operations/snaptrim_event.h"
56
#include "crimson/osd/ops_executer.h"
67
#include "crimson/osd/pg.h"
78
#include <seastar/core/sleep.hh>
9+
#include <seastar/util/defer.hh>
810

911
namespace {
1012
seastar::logger& logger() {
@@ -63,30 +65,18 @@ CommonPGPipeline& SnapTrimEvent::client_pp()
6365
SnapTrimEvent::snap_trim_event_ret_t
6466
SnapTrimEvent::start()
6567
{
68+
ceph_assert(pg->is_active_clean());
69+
70+
/* TODO: add a way to expose progress via the optracker without misusing
71+
* pipeline stages. https://tracker.ceph.com/issues/66473 */
6672
ShardServices &shard_services = pg->get_shard_services();
67-
return enter_stage<interruptor>(
68-
client_pp().wait_for_active
69-
).then_interruptible([this] {
70-
return with_blocking_event<PGActivationBlocker::BlockingEvent,
71-
interruptor>([this] (auto&& trigger) {
72-
return pg->wait_for_active_blocker.wait(std::move(trigger));
73+
{
74+
co_await pg->background_process_lock.lock_with_op(*this);
75+
auto unlocker = seastar::defer([this] {
76+
pg->background_process_lock.unlock();
7377
});
74-
}).then_interruptible([this] {
75-
return enter_stage<interruptor>(
76-
client_pp().recover_missing);
77-
}).then_interruptible([] {
78-
//return do_recover_missing(pg, get_target_oid());
79-
return seastar::now();
80-
}).then_interruptible([this] {
81-
return enter_stage<interruptor>(
82-
client_pp().get_obc);
83-
}).then_interruptible([this] {
84-
return pg->background_process_lock.lock_with_op(*this);
85-
}).then_interruptible([this] {
86-
return enter_stage<interruptor>(
87-
client_pp().process);
88-
}).then_interruptible([&shard_services, this] {
89-
return interruptor::async([this] {
78+
79+
auto to_trim_fut = interruptor::async([this] {
9080
using crimson::common::local_conf;
9181
const auto max =
9282
local_conf().get_val<uint64_t>("osd_pg_max_concurrent_snap_trims");
@@ -100,65 +90,42 @@ SnapTrimEvent::start()
10090
}
10191
logger().debug("{}: async almost done line {}", *this, __LINE__);
10292
return std::move(*to_trim);
103-
}).then_interruptible([&shard_services, this] (const auto& to_trim) {
104-
if (to_trim.empty()) {
105-
// the legit ENOENT -> done
106-
logger().debug("{}: to_trim is empty! Stopping iteration", *this);
107-
pg->background_process_lock.unlock();
108-
return snap_trim_iertr::make_ready_future<seastar::stop_iteration>(
109-
seastar::stop_iteration::yes);
110-
}
111-
return [&shard_services, this](const auto &to_trim) {
112-
for (const auto& object : to_trim) {
113-
logger().debug("{}: trimming {}", *this, object);
114-
subop_blocker.emplace_back(
115-
shard_services.start_operation_may_interrupt<
116-
interruptor, SnapTrimObjSubEvent>(
117-
pg,
118-
object,
119-
snapid));
120-
}
121-
return interruptor::now();
122-
}(to_trim).then_interruptible([this] {
123-
return enter_stage<interruptor>(wait_subop);
124-
}).then_interruptible([this] {
125-
logger().debug("{}: awaiting completion", *this);
126-
return subop_blocker.interruptible_wait_completion();
127-
}).finally([this] {
128-
pg->background_process_lock.unlock();
129-
}).si_then([this] {
130-
if (!needs_pause) {
131-
return interruptor::now();
132-
}
133-
// let's know operators we're waiting
134-
return enter_stage<interruptor>(
135-
wait_trim_timer
136-
).then_interruptible([this] {
137-
using crimson::common::local_conf;
138-
const auto time_to_sleep =
139-
local_conf().template get_val<double>("osd_snap_trim_sleep");
140-
logger().debug("{}: time_to_sleep {}", *this, time_to_sleep);
141-
// TODO: this logic should be more sophisticated and distinguish
142-
// between SSDs, HDDs and the hybrid case
143-
return seastar::sleep(
144-
std::chrono::milliseconds(std::lround(time_to_sleep * 1000)));
145-
});
146-
}).si_then([this] {
147-
logger().debug("{}: all completed", *this);
148-
return snap_trim_iertr::make_ready_future<seastar::stop_iteration>(
149-
seastar::stop_iteration::no);
150-
});
151-
}).si_then([this](auto stop) {
152-
return handle.complete().then([stop] {
153-
return snap_trim_iertr::make_ready_future<seastar::stop_iteration>(stop);
154-
});
15593
});
156-
}).finally([this] {
157-
// This SnapTrimEvent op lifetime is maintained within
158-
// PerShardState::start_operation() implementation.
159-
logger().debug("{}: exit", *this);
160-
handle.exit();
161-
});
94+
auto to_trim = co_await std::move(to_trim_fut);
95+
96+
if (to_trim.empty()) {
97+
// the legit ENOENT -> done
98+
logger().debug("{}: to_trim is empty! Stopping iteration", *this);
99+
co_return seastar::stop_iteration::yes;
100+
}
101+
for (const auto& object : to_trim) {
102+
logger().debug("{}: trimming {}", *this, object);
103+
subop_blocker.emplace_back(
104+
shard_services.start_operation_may_interrupt<
105+
interruptor, SnapTrimObjSubEvent>(
106+
pg,
107+
object,
108+
snapid));
109+
}
110+
111+
logger().debug("{}: awaiting completion", *this);
112+
co_await subop_blocker.interruptible_wait_completion();
113+
}
114+
115+
if (needs_pause) {
116+
using crimson::common::local_conf;
117+
const auto time_to_sleep =
118+
local_conf().template get_val<double>("osd_snap_trim_sleep");
119+
logger().debug("{}: time_to_sleep {}", *this, time_to_sleep);
120+
// TODO: this logic should be more sophisticated and distinguish
121+
// between SSDs, HDDs and the hybrid case
122+
co_await interruptor::make_interruptible(
123+
seastar::sleep(
124+
std::chrono::milliseconds(std::lround(time_to_sleep * 1000))));
125+
}
126+
127+
logger().debug("{}: all completed", *this);
128+
co_return seastar::stop_iteration::no;
162129
}
163130

164131

@@ -418,65 +385,55 @@ SnapTrimObjSubEvent::remove_or_update(
418385
SnapTrimObjSubEvent::snap_trim_obj_subevent_ret_t
419386
SnapTrimObjSubEvent::start()
420387
{
421-
return enter_stage<interruptor>(
422-
client_pp().wait_for_active
423-
).then_interruptible([this] {
424-
return with_blocking_event<PGActivationBlocker::BlockingEvent,
425-
interruptor>([this] (auto&& trigger) {
426-
return pg->wait_for_active_blocker.wait(std::move(trigger));
427-
});
428-
}).then_interruptible([this] {
429-
return enter_stage<interruptor>(
430-
client_pp().recover_missing);
431-
}).then_interruptible([] {
432-
//return do_recover_missing(pg, get_target_oid());
433-
return seastar::now();
434-
}).then_interruptible([this] {
435-
return enter_stage<interruptor>(
436-
client_pp().get_obc);
437-
}).then_interruptible([this] {
438-
logger().debug("{}: getting obc for {}", *this, coid);
439-
// end of commonality
440-
// lock both clone's and head's obcs
441-
return pg->obc_loader.with_obc<RWState::RWWRITE>(
442-
coid,
443-
[this](auto head_obc, auto clone_obc) {
388+
ceph_assert(pg->is_active_clean());
389+
390+
auto exit_handle = seastar::defer([this] {
391+
logger().debug("{}: exit", *this);
392+
handle.exit();
393+
});
394+
395+
co_await enter_stage<interruptor>(
396+
client_pp().get_obc);
397+
398+
logger().debug("{}: getting obc for {}", *this, coid);
399+
// end of commonality
400+
// lock both clone's and head's obcs
401+
co_await pg->obc_loader.with_obc<RWState::RWWRITE>(
402+
coid,
403+
[this](auto head_obc, auto clone_obc) {
444404
logger().debug("{}: got clone_obc={}", *this, clone_obc->get_oid());
445405
return enter_stage<interruptor>(
446406
client_pp().process
447407
).then_interruptible(
448408
[this,clone_obc=std::move(clone_obc), head_obc=std::move(head_obc)]() mutable {
449-
logger().debug("{}: processing clone_obc={}", *this, clone_obc->get_oid());
450-
return remove_or_update(
451-
clone_obc, head_obc
452-
).safe_then_interruptible([clone_obc, this](auto&& txn) mutable {
453-
auto [submitted, all_completed] = pg->submit_transaction(
454-
std::move(clone_obc),
455-
std::move(txn),
456-
std::move(osd_op_p),
457-
std::move(log_entries));
458-
return submitted.then_interruptible(
459-
[all_completed=std::move(all_completed), this] () mutable {
460-
return enter_stage<interruptor>(
461-
wait_repop
462-
).then_interruptible([all_completed=std::move(all_completed)] () mutable {
463-
return std::move(all_completed);
464-
});
465-
});
466-
});
467-
});
409+
logger().debug("{}: processing clone_obc={}", *this, clone_obc->get_oid());
410+
return remove_or_update(
411+
clone_obc, head_obc
412+
).safe_then_interruptible([clone_obc, this](auto&& txn) mutable {
413+
auto [submitted, all_completed] = pg->submit_transaction(
414+
std::move(clone_obc),
415+
std::move(txn),
416+
std::move(osd_op_p),
417+
std::move(log_entries));
418+
return submitted.then_interruptible(
419+
[this, all_completed=std::move(all_completed)]() mutable {
420+
return enter_stage<interruptor>(
421+
client_pp().wait_repop
422+
).then_interruptible([all_completed=std::move(all_completed)]() mutable{
423+
return std::move(all_completed);
424+
});
425+
});
426+
});
427+
});
468428
},
469-
false).si_then([this] {
470-
logger().debug("{}: completed", *this);
471-
return handle.complete();
472-
}).handle_error_interruptible(
473-
remove_or_update_iertr::pass_further{},
474-
crimson::ct_error::assert_all{"unexpected error in SnapTrimObjSubEvent"}
475-
);
476-
}).finally([this] {
477-
logger().debug("{}: exit", *this);
478-
handle.exit();
479-
});
429+
false
430+
).handle_error_interruptible(
431+
remove_or_update_iertr::pass_further{},
432+
crimson::ct_error::assert_all{"unexpected error in SnapTrimObjSubEvent"}
433+
);
434+
435+
logger().debug("{}: completed", *this);
436+
co_await interruptor::make_interruptible(handle.complete());
480437
}
481438

482439
void SnapTrimObjSubEvent::print(std::ostream &lhs) const

src/crimson/osd/osd_operations/snaptrim_event.h

Lines changed: 1 addition & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -61,19 +61,6 @@ class SnapTrimEvent final : public PhasedOperationT<SnapTrimEvent> {
6161

6262
SubOpBlocker<snap_trim_obj_subevent_ret_t> subop_blocker;
6363

64-
// we don't need to synchronize with other instances of SnapTrimEvent;
65-
// it's here for the sake of op tracking.
66-
struct WaitSubop : OrderedConcurrentPhaseT<WaitSubop> {
67-
static constexpr auto type_name = "SnapTrimEvent::wait_subop";
68-
} wait_subop;
69-
70-
// an instantiator can instruct us to go over this stage and then
71-
// wait for the future to implement throttling. It is implemented
72-
// that way to for the sake of tracking ops.
73-
struct WaitTrimTimer : OrderedExclusivePhaseT<WaitTrimTimer> {
74-
static constexpr auto type_name = "SnapTrimEvent::wait_trim_timer";
75-
} wait_trim_timer;
76-
7764
Ref<PG> pg;
7865
PipelineHandle handle;
7966
SnapMapper& snap_mapper;
@@ -85,14 +72,7 @@ class SnapTrimEvent final : public PhasedOperationT<SnapTrimEvent> {
8572

8673
std::tuple<
8774
StartEvent,
88-
CommonPGPipeline::WaitForActive::BlockingEvent,
89-
PGActivationBlocker::BlockingEvent,
90-
CommonPGPipeline::RecoverMissing::BlockingEvent,
91-
CommonPGPipeline::GetOBC::BlockingEvent,
92-
CommonPGPipeline::Process::BlockingEvent,
93-
WaitSubop::BlockingEvent,
9475
PG::BackgroundProcessLock::Wait::BlockingEvent,
95-
WaitTrimTimer::BlockingEvent,
9676
CompletionEvent
9777
> tracking_events;
9878

@@ -154,12 +134,6 @@ class SnapTrimObjSubEvent : public PhasedOperationT<SnapTrimObjSubEvent> {
154134
remove_or_update_iertr::future<ceph::os::Transaction>
155135
remove_or_update(ObjectContextRef obc, ObjectContextRef head_obc);
156136

157-
// we don't need to synchronize with other instances started by
158-
// SnapTrimEvent; it's here for the sake of op tracking.
159-
struct WaitRepop : OrderedConcurrentPhaseT<WaitRepop> {
160-
static constexpr auto type_name = "SnapTrimObjSubEvent::wait_repop";
161-
} wait_repop;
162-
163137
void add_log_entry(
164138
int _op,
165139
const hobject_t& _soid,
@@ -192,12 +166,9 @@ class SnapTrimObjSubEvent : public PhasedOperationT<SnapTrimObjSubEvent> {
192166

193167
std::tuple<
194168
StartEvent,
195-
CommonPGPipeline::WaitForActive::BlockingEvent,
196-
PGActivationBlocker::BlockingEvent,
197-
CommonPGPipeline::RecoverMissing::BlockingEvent,
198169
CommonPGPipeline::GetOBC::BlockingEvent,
199170
CommonPGPipeline::Process::BlockingEvent,
200-
WaitRepop::BlockingEvent,
171+
CommonPGPipeline::WaitRepop::BlockingEvent,
201172
CompletionEvent
202173
> tracking_events;
203174
};

src/crimson/osd/pg.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -439,6 +439,9 @@ class PG : public boost::intrusive_ref_counter<
439439

440440

441441
// Utility
442+
bool is_active_clean() const {
443+
return peering_state.is_active() && peering_state.is_clean();
444+
}
442445
bool is_primary() const final {
443446
return peering_state.is_primary();
444447
}

0 commit comments

Comments
 (0)