Skip to content

Commit 2bb6ae3

Browse files
authored
Merge pull request ceph#61536 from Matan-B/wip-matanb-backfill-scan
crimson/osd/recovery_backend: scan_for_backfill to use obc_manager Reviewed-by: Xuehan Xu <[email protected]>
2 parents bc6948d + 6dfc166 commit 2bb6ae3

File tree

4 files changed

+93
-91
lines changed

4 files changed

+93
-91
lines changed

src/crimson/osd/recovery_backend.cc

Lines changed: 33 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -222,61 +222,45 @@ RecoveryBackend::handle_backfill_remove(
222222

223223
RecoveryBackend::interruptible_future<BackfillInterval>
224224
RecoveryBackend::scan_for_backfill(
225-
const hobject_t& start,
225+
const hobject_t start,
226226
[[maybe_unused]] const std::int64_t min,
227227
const std::int64_t max)
228228
{
229229
LOG_PREFIX(RecoveryBackend::scan_for_backfill);
230230
DEBUGDPP("starting from {}", pg, start);
231231
auto version_map = seastar::make_lw_shared<std::map<hobject_t, eversion_t>>();
232-
return backend->list_objects(start, max).then_interruptible(
233-
[FNAME, this, start, version_map] (auto&& ret) {
234-
auto&& [objects, next] = std::move(ret);
235-
return seastar::do_with(
236-
std::move(objects),
237-
[FNAME, this, version_map](auto &objects) {
238-
return interruptor::parallel_for_each(objects,
239-
[FNAME, this, version_map] (const hobject_t& object)
240-
-> interruptible_future<> {
241-
crimson::osd::ObjectContextRef obc;
242-
if (pg.is_primary()) {
243-
obc = pg.obc_registry.maybe_get_cached_obc(object);
244-
}
245-
if (obc) {
246-
if (obc->obs.exists) {
247-
DEBUGDPP("found (primary): {} {}",
248-
pg, object, obc->obs.oi.version);
249-
version_map->emplace(object, obc->obs.oi.version);
250-
} else {
251-
// if the object does not exist here, it must have been removed
252-
// between the collection_list_partial and here. This can happen
253-
// for the first item in the range, which is usually last_backfill.
254-
}
255-
return seastar::now();
256-
} else {
257-
return backend->load_metadata(object).safe_then_interruptible(
258-
[FNAME, this, version_map, object] (auto md) {
259-
if (md->os.exists) {
260-
DEBUGDPP("found: {} {}", pg,
261-
object, md->os.oi.version);
262-
version_map->emplace(object, md->os.oi.version);
263-
}
264-
return seastar::now();
265-
}, PGBackend::load_metadata_ertr::assert_all{});
266-
}
267-
});
268-
}).then_interruptible([FNAME, version_map, start=std::move(start),
269-
next=std::move(next), this] {
270-
BackfillInterval bi;
271-
bi.begin = std::move(start);
272-
bi.end = std::move(next);
273-
bi.objects = std::move(*version_map);
274-
DEBUGDPP("{} BackfillInterval filled, leaving, {}",
275-
"scan_for_backfill",
276-
pg, bi);
277-
return seastar::make_ready_future<BackfillInterval>(std::move(bi));
278-
});
279-
});
232+
auto&& [objects, next] = co_await backend->list_objects(start, max);
233+
co_await interruptor::parallel_for_each(objects, seastar::coroutine::lambda([FNAME, this, version_map]
234+
(const hobject_t& object) -> interruptible_future<> {
235+
DEBUGDPP("querying obj:{}", pg, object);
236+
auto obc_manager = pg.obc_loader.get_obc_manager(object);
237+
co_await pg.obc_loader.load_and_lock(
238+
obc_manager, RWState::RWREAD
239+
).handle_error_interruptible(
240+
crimson::ct_error::assert_all("unexpected error")
241+
);
242+
243+
if (obc_manager.get_obc()->obs.exists) {
244+
auto version = obc_manager.get_obc()->obs.oi.version;
245+
version_map->emplace(object, version);
246+
DEBUGDPP("found: {} {}", pg,
247+
object, version);
248+
co_return;
249+
} else {
250+
// if the object does not exist here, it must have been removed
251+
// between the collection_list_partial and here. This can happen
252+
// for the first item in the range, which is usually last_backfill.
253+
co_return;
254+
}
255+
}));
256+
BackfillInterval bi;
257+
bi.begin = std::move(start);
258+
bi.end = std::move(next);
259+
bi.objects = std::move(*version_map);
260+
DEBUGDPP("{} BackfillInterval filled, leaving, {}",
261+
"scan_for_backfill",
262+
pg, bi);
263+
co_return std::move(bi);
280264
}
281265

282266
RecoveryBackend::interruptible_future<>

src/crimson/osd/recovery_backend.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ class RecoveryBackend {
9090
eversion_t need) = 0;
9191

9292
interruptible_future<BackfillInterval> scan_for_backfill(
93-
const hobject_t& from,
93+
const hobject_t from,
9494
std::int64_t min,
9595
std::int64_t max);
9696

src/crimson/osd/replicated_recovery_backend.cc

Lines changed: 36 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -773,10 +773,6 @@ ReplicatedRecoveryBackend::handle_pull(Ref<MOSDPGPull> m)
773773
{
774774
LOG_PREFIX(ReplicatedRecoveryBackend::handle_pull);
775775
DEBUGDPP("{}", pg, *m);
776-
if (pg.can_discard_replica_op(*m)) {
777-
DEBUGDPP("discarding {}", pg, *m);
778-
return seastar::now();
779-
}
780776
return seastar::do_with(m->take_pulls(), [FNAME, this, from=m->from](auto& pulls) {
781777
return interruptor::parallel_for_each(
782778
pulls,
@@ -841,39 +837,42 @@ ReplicatedRecoveryBackend::_handle_pull_response(
841837
pull_info.recovery_info.version = push_op.version;
842838

843839
if (pull_info.recovery_progress.first) {
844-
auto fut = pg.obc_loader.with_obc<RWState::RWNONE>(
845-
pull_info.recovery_info.soid,
846-
[FNAME, this, &pull_info, &recovery_waiter, &push_op](auto, auto obc) {
847-
pull_info.obc = obc;
848-
recovery_waiter.obc = obc;
849-
obc->obs.oi.decode_no_oid(push_op.attrset.at(OI_ATTR),
850-
push_op.soid);
851-
auto ss_attr_iter = push_op.attrset.find(SS_ATTR);
852-
if (ss_attr_iter != push_op.attrset.end()) {
853-
if (!obc->ssc) {
854-
obc->ssc = new crimson::osd::SnapSetContext(
855-
push_op.soid.get_snapdir());
856-
}
857-
try {
858-
obc->ssc->snapset = SnapSet(ss_attr_iter->second);
859-
obc->ssc->exists = true;
860-
} catch (const buffer::error&) {
861-
WARNDPP("unable to decode SnapSet", pg);
862-
throw crimson::osd::invalid_argument();
863-
}
864-
assert(!pull_info.obc->ssc->exists ||
865-
obc->ssc->snapset.seq == pull_info.obc->ssc->snapset.seq);
866-
}
867-
pull_info.recovery_info.oi = obc->obs.oi;
868-
if (pull_info.recovery_info.soid.snap &&
869-
pull_info.recovery_info.soid.snap < CEPH_NOSNAP) {
870-
recalc_subsets(pull_info.recovery_info,
871-
pull_info.obc->ssc);
872-
}
873-
return crimson::osd::PG::load_obc_ertr::now();
874-
}, false).handle_error_interruptible(crimson::ct_error::assert_all{});
875-
co_await std::move(fut);
876-
};
840+
auto obc_manager = pg.obc_loader.get_obc_manager(pull_info.recovery_info.soid);
841+
co_await pg.obc_loader.load_and_lock(
842+
obc_manager, RWState::RWNONE
843+
).handle_error_interruptible(
844+
crimson::ct_error::assert_all("unexpected error")
845+
);
846+
847+
auto obc = obc_manager.get_obc();
848+
pull_info.obc = obc;
849+
recovery_waiter.obc = obc;
850+
// TODO: move to ObjectContextLoader once constructing obc from attrset is supported
851+
obc->obs.oi.decode_no_oid(push_op.attrset.at(OI_ATTR),
852+
push_op.soid);
853+
auto ss_attr_iter = push_op.attrset.find(SS_ATTR);
854+
if (ss_attr_iter != push_op.attrset.end()) {
855+
if (!obc->ssc) {
856+
obc->ssc = new crimson::osd::SnapSetContext(
857+
push_op.soid.get_snapdir());
858+
}
859+
try {
860+
obc->ssc->snapset = SnapSet(ss_attr_iter->second);
861+
obc->ssc->exists = true;
862+
} catch (const buffer::error&) {
863+
WARNDPP("unable to decode SnapSet", pg);
864+
throw crimson::osd::invalid_argument();
865+
}
866+
assert(!pull_info.obc->ssc->exists ||
867+
obc->ssc->snapset.seq == pull_info.obc->ssc->snapset.seq);
868+
}
869+
pull_info.recovery_info.oi = obc->obs.oi;
870+
if (pull_info.recovery_info.soid.snap &&
871+
pull_info.recovery_info.soid.snap < CEPH_NOSNAP) {
872+
recalc_subsets(pull_info.recovery_info,
873+
pull_info.obc->ssc);
874+
}
875+
}
877876

878877
const bool first = pull_info.recovery_progress.first;
879878
pull_info.recovery_progress = push_op.after_progress;
@@ -943,10 +942,6 @@ ReplicatedRecoveryBackend::handle_pull_response(
943942
Ref<MOSDPGPush> m)
944943
{
945944
LOG_PREFIX(ReplicatedRecoveryBackend::handle_pull_response);
946-
if (pg.can_discard_replica_op(*m)) {
947-
DEBUGDPP("discarding {}", pg, *m);
948-
co_return;
949-
}
950945
PushOp& push_op = m->pushes[0]; //TODO: only one push per message for now.
951946
if (push_op.version == eversion_t()) {
952947
// replica doesn't have it!

src/test/crimson/test_crimson_coroutine.cc

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include <numeric>
66

77
#include "seastar/core/sleep.hh"
8+
#include "seastar/core/loop.hh"
89

910
#include "crimson/common/coroutine.h"
1011
#include "crimson/common/errorator.h"
@@ -103,6 +104,28 @@ TEST_F(coroutine_test_t, test_coroutine)
103104
});
104105
}
105106

107+
TEST_F(coroutine_test_t, test_coroutine_loops)
108+
{
109+
run_scl([]() -> seastar::future<> {
110+
int CHECK = 0;
111+
std::vector<int> v = {1,2,3};
112+
co_await seastar::parallel_for_each(v,
113+
[&CHECK] (auto i) -> seastar::future<> {
114+
CHECK++;
115+
co_return;
116+
});
117+
EXPECT_EQ(CHECK, v.size());
118+
119+
co_await seastar::do_until(
120+
[&CHECK] { return CHECK == 0; },
121+
[&CHECK] () -> seastar::future<> {
122+
CHECK--;
123+
co_return;
124+
});
125+
EXPECT_EQ(CHECK, 0);
126+
});
127+
}
128+
106129
TEST_F(coroutine_test_t, test_ertr_coroutine_basic)
107130
{
108131
run_ertr_scl([]() -> ertr::future<> {

0 commit comments

Comments
 (0)