Skip to content

Commit 0ad6038

Browse files
authored
Merge pull request ceph#49286 from rzarzynski/wip-crimson-snaptrimmer
crimson/osd, osd: bring snap trimming to crimson Reviewed-by: Samuel Just <[email protected]>
2 parents cca84e6 + 4dbe8f5 commit 0ad6038

24 files changed

+1371
-139
lines changed

src/common/ceph_mutex.h

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,16 @@
1515
// naming the mutex for the purposes of the lockdep debug variant.
1616

1717
#if defined(WITH_SEASTAR) && !defined(WITH_ALIEN)
18+
#include <seastar/core/condition-variable.hh>
19+
20+
#include "crimson/common/log.h"
21+
#include "include/ceph_assert.h"
22+
23+
#ifndef NDEBUG
24+
#define FUT_DEBUG(FMT_MSG, ...) crimson::get_logger(ceph_subsys_).trace(FMT_MSG, ##__VA_ARGS__)
25+
#else
26+
#define FUT_DEBUG(FMT_MSG, ...)
27+
#endif
1828

1929
namespace ceph {
2030
// an empty class satisfying the mutex concept
@@ -33,11 +43,30 @@ namespace ceph {
3343
void unlock_shared() {}
3444
};
3545

46+
// this implementation assumes running within a seastar::thread
47+
struct green_condition_variable : private seastar::condition_variable {
48+
template <class LockT>
49+
void wait(LockT&&) {
50+
FUT_DEBUG("green_condition_variable::{}: before blocking", __func__);
51+
seastar::condition_variable::wait().get();
52+
FUT_DEBUG("green_condition_variable::{}: after blocking", __func__);
53+
}
54+
55+
void notify_one() noexcept {
56+
FUT_DEBUG("green_condition_variable::{}", __func__);
57+
signal();
58+
}
59+
60+
void notify_all() noexcept {
61+
FUT_DEBUG("green_condition_variable::{}", __func__);
62+
broadcast();
63+
}
64+
};
65+
3666
using mutex = dummy_mutex;
3767
using recursive_mutex = dummy_mutex;
3868
using shared_mutex = dummy_shared_mutex;
39-
// in seastar, we should use a difference interface for enforcing the
40-
// semantics of condition_variable
69+
using condition_variable = green_condition_variable;
4170

4271
template <typename ...Args>
4372
dummy_mutex make_mutex(Args&& ...args) {

src/common/map_cacher.hpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,11 @@ class StoreDriver {
6060
std::pair<K, V> *next ///< [out] first key after key
6161
) = 0; ///< @return 0 on success, -ENOENT if there is no next
6262

63+
virtual int get_next_or_current(
64+
const K &key, ///< [in] key at-which-or-after to get
65+
std::pair<K, V> *next_or_current
66+
) = 0; ///< @return 0 on success, -ENOENT if there is no next
67+
6368
virtual ~StoreDriver() {}
6469
};
6570

src/crimson/common/errorator.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,13 @@ class maybe_handle_error_t {
335335
// to throwing an exception by the handler.
336336
std::invoke(std::forward<ErrorVisitorT>(errfunc),
337337
ErrorT::error_t::from_exception_ptr(std::move(ep)));
338+
} else if constexpr (seastar::Future<decltype(result)>) {
339+
// result is seastar::future but return_t is e.g. int. If so,
340+
// the else clause cannot be used as seastar::future lacks
341+
// errorator_type member.
342+
result = seastar::make_ready_future<return_t>(
343+
std::invoke(std::forward<ErrorVisitorT>(errfunc),
344+
ErrorT::error_t::from_exception_ptr(std::move(ep))));
338345
} else {
339346
result = FuturatorT::type::errorator_type::template make_ready_future<return_t>(
340347
std::invoke(std::forward<ErrorVisitorT>(errfunc),

src/crimson/common/interruptible_future.h

Lines changed: 47 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,11 @@ struct interrupt_cond_t {
9696
uint64_t ref_count = 0;
9797
void set(
9898
InterruptCondRef<InterruptCond>& ic) {
99+
INTR_FUT_DEBUG(
100+
"{}: going to set interrupt_cond: {}, ic: {}",
101+
__func__,
102+
(void*)interrupt_cond.get(),
103+
(void*)ic.get());
99104
if (!interrupt_cond) {
100105
interrupt_cond = ic;
101106
}
@@ -110,13 +115,15 @@ struct interrupt_cond_t {
110115
void reset() {
111116
if (--ref_count == 0) {
112117
INTR_FUT_DEBUG(
113-
"call_with_interruption_impl clearing interrupt_cond: {},{}",
118+
"{}: clearing interrupt_cond: {},{}",
119+
__func__,
114120
(void*)interrupt_cond.get(),
115121
typeid(InterruptCond).name());
116122
interrupt_cond.release();
117123
} else {
118124
INTR_FUT_DEBUG(
119-
"call_with_interruption_impl end without clearing interrupt_cond: {},{}, ref_count: {}",
125+
"{}: end without clearing interrupt_cond: {},{}, ref_count: {}",
126+
__func__,
120127
(void*)interrupt_cond.get(),
121128
typeid(InterruptCond).name(),
122129
ref_count);
@@ -1386,12 +1393,46 @@ struct interruptor
13861393
template <typename Func,
13871394
typename Result = futurize_t<std::invoke_result_t<Func>>>
13881395
static inline Result async(Func&& func) {
1389-
return seastar::async([func=std::forward<Func>(func),
1390-
interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond]
1391-
() mutable {
1396+
auto interruption_condition = interrupt_cond<InterruptCond>.interrupt_cond;
1397+
INTR_FUT_DEBUG(
1398+
"interruptible_future_detail::async() yielding out, "
1399+
"interrupt_cond {},{} cleared",
1400+
(void*)interruption_condition.get(),
1401+
typeid(InterruptCond).name());
1402+
interrupt_cond<InterruptCond>.reset();
1403+
auto ret = seastar::async([func=std::forward<Func>(func),
1404+
interruption_condition] () mutable {
13921405
return non_futurized_call_with_interruption(
1393-
interrupt_condition, std::forward<Func>(func));
1406+
interruption_condition, std::forward<Func>(func));
13941407
});
1408+
interrupt_cond<InterruptCond>.set(interruption_condition);
1409+
INTR_FUT_DEBUG(
1410+
"interruptible_future_detail::async() yield back, interrupt_cond: {},{}",
1411+
(void*)interrupt_cond<InterruptCond>.interrupt_cond.get(),
1412+
typeid(InterruptCond).name());
1413+
return ret;
1414+
}
1415+
1416+
template <class FutureT>
1417+
static decltype(auto) green_get(FutureT&& fut) {
1418+
if (fut.available()) {
1419+
return fut.get();
1420+
} else {
1421+
// destined to wait!
1422+
auto interruption_condition = interrupt_cond<InterruptCond>.interrupt_cond;
1423+
INTR_FUT_DEBUG(
1424+
"green_get() waiting, interrupt_cond: {},{}",
1425+
(void*)interrupt_cond<InterruptCond>.interrupt_cond.get(),
1426+
typeid(InterruptCond).name());
1427+
interrupt_cond<InterruptCond>.reset();
1428+
auto&& value = fut.get();
1429+
interrupt_cond<InterruptCond>.set(interruption_condition);
1430+
INTR_FUT_DEBUG(
1431+
"green_get() got, interrupt_cond: {},{}",
1432+
(void*)interrupt_cond<InterruptCond>.interrupt_cond.get(),
1433+
typeid(InterruptCond).name());
1434+
return std::move(value);
1435+
}
13951436
}
13961437

13971438
static void yield() {

src/crimson/osd/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ add_executable(crimson-osd
2626
osd_operations/logmissing_request_reply.cc
2727
osd_operations/background_recovery.cc
2828
osd_operations/recovery_subrequest.cc
29+
osd_operations/snaptrim_event.cc
2930
pg_recovery.cc
3031
recovery_backend.cc
3132
replicated_recovery_backend.cc
@@ -45,6 +46,7 @@ add_executable(crimson-osd
4546
${PROJECT_SOURCE_DIR}/src/osd/PGStateUtils.cc
4647
${PROJECT_SOURCE_DIR}/src/osd/MissingLoc.cc
4748
${PROJECT_SOURCE_DIR}/src/osd/PGLog.cc
49+
${PROJECT_SOURCE_DIR}/src/osd/SnapMapper.cc
4850
${PROJECT_SOURCE_DIR}/src/osd/recovery_types.cc
4951
${PROJECT_SOURCE_DIR}/src/osd/osd_perf_counters.cc
5052
watch.cc

src/crimson/osd/ops_executer.cc

Lines changed: 80 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include "crimson/osd/pg.h"
2020
#include "crimson/osd/watch.h"
2121
#include "osd/ClassHandler.h"
22+
#include "osd/SnapMapper.h"
2223

2324
namespace {
2425
seastar::logger& logger() {
@@ -806,6 +807,9 @@ void OpsExecuter::fill_op_params_bump_pg_version()
806807
std::vector<pg_log_entry_t> OpsExecuter::prepare_transaction(
807808
const std::vector<OSDOp>& ops)
808809
{
810+
// let's ensure we don't need to inform SnapMapper about this particular
811+
// entry.
812+
assert(obc->obs.oi.soid.snap >= CEPH_MAXSNAP);
809813
std::vector<pg_log_entry_t> log_entries;
810814
log_entries.emplace_back(
811815
obc->obs.exists ?
@@ -827,6 +831,57 @@ std::vector<pg_log_entry_t> OpsExecuter::prepare_transaction(
827831
return log_entries;
828832
}
829833

834+
OpsExecuter::interruptible_future<> OpsExecuter::snap_map_remove(
835+
const hobject_t& soid,
836+
SnapMapper& snap_mapper,
837+
OSDriver& osdriver,
838+
ceph::os::Transaction& txn)
839+
{
840+
logger().debug("{}: soid {}", __func__, soid);
841+
return interruptor::async([soid, &snap_mapper,
842+
_t=osdriver.get_transaction(&txn)]() mutable {
843+
const auto r = snap_mapper.remove_oid(soid, &_t);
844+
if (r) {
845+
logger().error("{}: remove_oid {} failed with {}",
846+
__func__, soid, r);
847+
}
848+
// On removal tolerate missing key corruption
849+
assert(r == 0 || r == -ENOENT);
850+
});
851+
}
852+
853+
OpsExecuter::interruptible_future<> OpsExecuter::snap_map_modify(
854+
const hobject_t& soid,
855+
const std::set<snapid_t>& snaps,
856+
SnapMapper& snap_mapper,
857+
OSDriver& osdriver,
858+
ceph::os::Transaction& txn)
859+
{
860+
logger().debug("{}: soid {}, snaps {}", __func__, soid, snaps);
861+
return interruptor::async([soid, snaps, &snap_mapper,
862+
_t=osdriver.get_transaction(&txn)]() mutable {
863+
assert(std::size(snaps) > 0);
864+
[[maybe_unused]] const auto r = snap_mapper.update_snaps(
865+
soid, snaps, 0, &_t);
866+
assert(r == 0);
867+
});
868+
}
869+
870+
OpsExecuter::interruptible_future<> OpsExecuter::snap_map_clone(
871+
const hobject_t& soid,
872+
const std::set<snapid_t>& snaps,
873+
SnapMapper& snap_mapper,
874+
OSDriver& osdriver,
875+
ceph::os::Transaction& txn)
876+
{
877+
logger().debug("{}: soid {}, snaps {}", __func__, soid, snaps);
878+
return interruptor::async([soid, snaps, &snap_mapper,
879+
_t=osdriver.get_transaction(&txn)]() mutable {
880+
assert(std::size(snaps) > 0);
881+
snap_mapper.add_oid(soid, snaps, &_t);
882+
});
883+
}
884+
830885
// Defined here because there is a circular dependency between OpsExecuter and PG
831886
uint32_t OpsExecuter::get_pool_stripe_width() const {
832887
return pg->get_pgpool().info.get_stripe_width();
@@ -920,15 +975,29 @@ void OpsExecuter::CloningContext::apply_to(
920975
processed_obc.ssc->snapset = std::move(new_snapset);
921976
}
922977

923-
void OpsExecuter::flush_clone_metadata(
924-
std::vector<pg_log_entry_t>& log_entries)
978+
OpsExecuter::interruptible_future<std::vector<pg_log_entry_t>>
979+
OpsExecuter::flush_clone_metadata(
980+
std::vector<pg_log_entry_t>&& log_entries,
981+
SnapMapper& snap_mapper,
982+
OSDriver& osdriver,
983+
ceph::os::Transaction& txn)
925984
{
926985
assert(!txn.empty());
986+
auto maybe_snap_mapped = interruptor::now();
927987
if (cloning_ctx) {
928988
osd_op_params->at_version = pg->next_version();
929-
std::move(*cloning_ctx).apply_to(osd_op_params->at_version,
930-
log_entries,
931-
*obc);
989+
std::move(*cloning_ctx).apply_to(
990+
osd_op_params->at_version,
991+
log_entries,
992+
*obc);
993+
const auto& coid = log_entries.back().soid;
994+
const auto& cloned_snaps = obc->ssc->snapset.clone_snaps[coid.snap];
995+
maybe_snap_mapped = snap_map_clone(
996+
coid,
997+
std::set<snapid_t>{std::begin(cloned_snaps), std::end(cloned_snaps)},
998+
snap_mapper,
999+
osdriver,
1000+
txn);
9321001
}
9331002
if (snapc.seq > obc->ssc->snapset.seq) {
9341003
// update snapset with latest snap context
@@ -937,6 +1006,12 @@ void OpsExecuter::flush_clone_metadata(
9371006
}
9381007
logger().debug("{} done, initial snapset={}, new snapset={}",
9391008
__func__, obc->obs.oi.soid, obc->ssc->snapset);
1009+
return std::move(
1010+
maybe_snap_mapped
1011+
).then_interruptible([log_entries=std::move(log_entries)]() mutable {
1012+
return interruptor::make_ready_future<std::vector<pg_log_entry_t>>(
1013+
std::move(log_entries));
1014+
});
9401015
}
9411016

9421017
// TODO: make this static

0 commit comments

Comments
 (0)