Skip to content

Commit 60c4b64

Browse files
authored
Merge pull request ceph#54525 from myoungwon/wip-delta-overwrite-inplace-replay
crimson/os/seastore: introduce inplace rewrite for RBM Reviewed-by: Yingxin Cheng <[email protected]>
2 parents 930ed08 + 72c9d6d commit 60c4b64

20 files changed

+314
-55
lines changed

src/crimson/os/seastore/async_cleaner.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -388,7 +388,7 @@ JournalTrimmerImpl::config_t::get_test(
388388
max_journal_bytes = 4 * roll_size;
389389
} else {
390390
assert(type == journal_type_t::RANDOM_BLOCK);
391-
target_dirty_bytes = roll_size / 4;
391+
target_dirty_bytes = roll_size / 36;
392392
target_alloc_bytes = roll_size / 4;
393393
max_journal_bytes = roll_size / 2;
394394
}

src/crimson/os/seastore/cache.cc

Lines changed: 63 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1096,6 +1096,20 @@ record_t Cache::prepare_record(
10961096
if (!i->is_exist_mutation_pending()) {
10971097
DEBUGT("commit replace extent ... -- {}, prior={}",
10981098
t, *i, *i->prior_instance);
1099+
// If inplace rewrite occurs during mutation, prev->version will
1100+
// be zero. Although this results in the version mismatch here, we can
1101+
// correct this by changing version to 1. This is because the inplace rewrite
1102+
// does not introduce any actual modification that could negatively
1103+
// impact system reliability
1104+
if (i->prior_instance->version == 0 && i->version > 1) {
1105+
assert(can_inplace_rewrite(i->get_type()));
1106+
assert(can_inplace_rewrite(i->prior_instance->get_type()));
1107+
assert(i->prior_instance->dirty_from_or_retired_at == JOURNAL_SEQ_MIN);
1108+
assert(i->prior_instance->state == CachedExtent::extent_state_t::CLEAN);
1109+
assert(i->prior_instance->get_paddr().get_addr_type() ==
1110+
paddr_types_t::RANDOM_BLOCK);
1111+
i->version = 1;
1112+
}
10991113
// extent with EXIST_MUTATION_PENDING doesn't have
11001114
// prior_instance field so skip these extents.
11011115
// the existing extents should be added into Cache
@@ -1261,6 +1275,24 @@ record_t Cache::prepare_record(
12611275
}
12621276
}
12631277

1278+
for (auto &i: t.written_inplace_ool_block_list) {
1279+
if (!i->is_valid()) {
1280+
continue;
1281+
}
1282+
assert(i->state == CachedExtent::extent_state_t::DIRTY);
1283+
assert(i->version > 0);
1284+
remove_from_dirty(i);
1285+
// set the version to zero because the extent state is now clean
1286+
// in order to handle this transparently
1287+
i->version = 0;
1288+
i->dirty_from_or_retired_at = JOURNAL_SEQ_MIN;
1289+
i->state = CachedExtent::extent_state_t::CLEAN;
1290+
assert(i->is_logical());
1291+
i->clear_modified_region();
1292+
touch_extent(*i);
1293+
DEBUGT("inplace rewrite ool block is commmitted -- {}", t, *i);
1294+
}
1295+
12641296
for (auto &i: t.existing_block_list) {
12651297
if (i->is_valid()) {
12661298
alloc_delta.alloc_blk_ranges.emplace_back(
@@ -1330,7 +1362,8 @@ record_t Cache::prepare_record(
13301362
t.num_allocated_invalid_extents);
13311363

13321364
auto& ool_stats = t.get_ool_write_stats();
1333-
ceph_assert(ool_stats.extents.num == t.written_ool_block_list.size());
1365+
ceph_assert(ool_stats.extents.num == t.written_ool_block_list.size() +
1366+
t.written_inplace_ool_block_list.size());
13341367

13351368
if (record.is_empty()) {
13361369
SUBINFOT(seastore_t,
@@ -1699,22 +1732,25 @@ Cache::replay_delta(
16991732
segment_seq_printer_t{delta_paddr_segment_seq},
17001733
delta_paddr_segment_type,
17011734
delta);
1702-
return replay_delta_ertr::make_ready_future<bool>(false);
1735+
return replay_delta_ertr::make_ready_future<std::pair<bool, CachedExtentRef>>(
1736+
std::make_pair(false, nullptr));
17031737
}
17041738
}
17051739
}
17061740

17071741
if (delta.type == extent_types_t::JOURNAL_TAIL) {
17081742
// this delta should have been dealt with during segment cleaner mounting
1709-
return replay_delta_ertr::make_ready_future<bool>(false);
1743+
return replay_delta_ertr::make_ready_future<std::pair<bool, CachedExtentRef>>(
1744+
std::make_pair(false, nullptr));
17101745
}
17111746

17121747
// replay alloc
17131748
if (delta.type == extent_types_t::ALLOC_INFO) {
17141749
if (journal_seq < alloc_tail) {
17151750
DEBUG("journal_seq {} < alloc_tail {}, don't replay {}",
17161751
journal_seq, alloc_tail, delta);
1717-
return replay_delta_ertr::make_ready_future<bool>(false);
1752+
return replay_delta_ertr::make_ready_future<std::pair<bool, CachedExtentRef>>(
1753+
std::make_pair(false, nullptr));
17181754
}
17191755

17201756
alloc_delta_t alloc_delta;
@@ -1738,14 +1774,16 @@ Cache::replay_delta(
17381774
if (!backref_list.empty()) {
17391775
backref_batch_update(std::move(backref_list), journal_seq);
17401776
}
1741-
return replay_delta_ertr::make_ready_future<bool>(true);
1777+
return replay_delta_ertr::make_ready_future<std::pair<bool, CachedExtentRef>>(
1778+
std::make_pair(true, nullptr));
17421779
}
17431780

17441781
// replay dirty
17451782
if (journal_seq < dirty_tail) {
17461783
DEBUG("journal_seq {} < dirty_tail {}, don't replay {}",
17471784
journal_seq, dirty_tail, delta);
1748-
return replay_delta_ertr::make_ready_future<bool>(false);
1785+
return replay_delta_ertr::make_ready_future<std::pair<bool, CachedExtentRef>>(
1786+
std::make_pair(false, nullptr));
17491787
}
17501788

17511789
if (delta.type == extent_types_t::ROOT) {
@@ -1759,7 +1797,8 @@ Cache::replay_delta(
17591797
journal_seq, record_base, delta, *root);
17601798
root->set_modify_time(modify_time);
17611799
add_extent(root);
1762-
return replay_delta_ertr::make_ready_future<bool>(true);
1800+
return replay_delta_ertr::make_ready_future<std::pair<bool, CachedExtentRef>>(
1801+
std::make_pair(true, root));
17631802
} else {
17641803
auto _get_extent_if_cached = [this](paddr_t addr)
17651804
-> get_extent_ertr::future<CachedExtentRef> {
@@ -1799,17 +1838,26 @@ Cache::replay_delta(
17991838
DEBUG("replay extent is not present, so delta is obsolete at {} {} -- {}",
18001839
journal_seq, record_base, delta);
18011840
assert(delta.pversion > 0);
1802-
return replay_delta_ertr::make_ready_future<bool>(true);
1841+
return replay_delta_ertr::make_ready_future<std::pair<bool, CachedExtentRef>>(
1842+
std::make_pair(false, nullptr));
18031843
}
18041844

18051845
DEBUG("replay extent delta at {} {} ... -- {}, prv_extent={}",
18061846
journal_seq, record_base, delta, *extent);
18071847

1808-
assert(extent->last_committed_crc == delta.prev_crc);
1809-
assert(extent->version == delta.pversion);
1810-
extent->apply_delta_and_adjust_crc(record_base, delta.bl);
1811-
extent->set_modify_time(modify_time);
1812-
assert(extent->last_committed_crc == delta.final_crc);
1848+
if (delta.paddr.get_addr_type() == paddr_types_t::SEGMENT ||
1849+
!can_inplace_rewrite(delta.type)) {
1850+
ceph_assert_always(extent->last_committed_crc == delta.prev_crc);
1851+
assert(extent->version == delta.pversion);
1852+
extent->apply_delta_and_adjust_crc(record_base, delta.bl);
1853+
extent->set_modify_time(modify_time);
1854+
ceph_assert_always(extent->last_committed_crc == delta.final_crc);
1855+
} else {
1856+
assert(delta.paddr.get_addr_type() == paddr_types_t::RANDOM_BLOCK);
1857+
extent->apply_delta_and_adjust_crc(record_base, delta.bl);
1858+
extent->set_modify_time(modify_time);
1859+
// crc will be checked after journal replay is done
1860+
}
18131861

18141862
extent->version++;
18151863
if (extent->version == 1) {
@@ -1821,7 +1869,8 @@ Cache::replay_delta(
18211869
journal_seq, record_base, delta, *extent);
18221870
}
18231871
mark_dirty(extent);
1824-
return replay_delta_ertr::make_ready_future<bool>(true);
1872+
return replay_delta_ertr::make_ready_future<std::pair<bool, CachedExtentRef>>(
1873+
std::make_pair(true, extent));
18251874
});
18261875
}
18271876
}

src/crimson/os/seastore/cache.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1039,7 +1039,8 @@ class Cache {
10391039
*/
10401040
using replay_delta_ertr = crimson::errorator<
10411041
crimson::ct_error::input_output_error>;
1042-
using replay_delta_ret = replay_delta_ertr::future<bool>;
1042+
using replay_delta_ret = replay_delta_ertr::future<
1043+
std::pair<bool, CachedExtentRef>>;
10431044
replay_delta_ret replay_delta(
10441045
journal_seq_t seq,
10451046
paddr_t record_block_base,

src/crimson/os/seastore/cached_extent.h

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -587,6 +587,11 @@ class CachedExtent
587587
rewrite_generation = gen;
588588
}
589589

590+
void set_inplace_rewrite_generation() {
591+
user_hint = placement_hint_t::REWRITE;
592+
rewrite_generation = OOL_GENERATION;
593+
}
594+
590595
bool is_inline() const {
591596
return poffset.is_relative();
592597
}
@@ -606,6 +611,10 @@ class CachedExtent
606611
return prior_instance;
607612
}
608613

614+
uint32_t get_last_committed_crc() const {
615+
return last_committed_crc;
616+
}
617+
609618
private:
610619
template <typename T>
611620
friend class read_set_item_t;
@@ -1237,6 +1246,16 @@ class LogicalCachedExtent : public ChildableCachedExtent {
12371246

12381247
void on_replace_prior(Transaction &t) final;
12391248

1249+
struct modified_region_t {
1250+
extent_len_t offset;
1251+
extent_len_t len;
1252+
};
1253+
virtual std::optional<modified_region_t> get_modified_region() {
1254+
return std::nullopt;
1255+
}
1256+
1257+
virtual void clear_modified_region() {}
1258+
12401259
virtual ~LogicalCachedExtent();
12411260
protected:
12421261

src/crimson/os/seastore/extent_placement_manager.cc

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -790,16 +790,36 @@ RandomBlockOolWriter::do_write(
790790
stats.num_records += 1;
791791

792792
ex->prepare_write();
793-
return rbm->write(paddr,
794-
ex->get_bptr()
795-
).handle_error(
796-
alloc_write_iertr::pass_further{},
797-
crimson::ct_error::assert_all{
798-
"Invalid error when writing record"}
799-
).safe_then([&t, &ex, paddr, FNAME]() {
793+
extent_len_t offset = 0;
794+
bufferptr bp;
795+
if (can_inplace_rewrite(t, ex)) {
796+
auto r = ex->get_modified_region();
797+
ceph_assert(r.has_value());
798+
offset = p2align(r->offset, rbm->get_block_size());
799+
extent_len_t len =
800+
p2roundup(r->offset + r->len, rbm->get_block_size()) - offset;
801+
bp = ceph::bufferptr(ex->get_bptr(), offset, len);
802+
} else {
803+
bp = ex->get_bptr();
804+
}
805+
return trans_intr::make_interruptible(
806+
rbm->write(paddr + offset,
807+
bp
808+
).handle_error(
809+
alloc_write_iertr::pass_further{},
810+
crimson::ct_error::assert_all{
811+
"Invalid error when writing record"}
812+
)
813+
).si_then([this, &t, &ex, paddr, FNAME] {
800814
TRACET("ool extent written at {} -- {}",
801815
t, paddr, *ex);
802-
t.mark_allocated_extent_ool(ex);
816+
if (ex->is_initial_pending()) {
817+
t.mark_allocated_extent_ool(ex);
818+
} else if (can_inplace_rewrite(t, ex)) {
819+
t.mark_inplace_rewrite_extent_ool(ex);
820+
} else {
821+
ceph_assert("impossible");
822+
}
803823
return alloc_write_iertr::now();
804824
});
805825
});

src/crimson/os/seastore/extent_placement_manager.h

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@ class ExtentOolWriter {
4343

4444
using close_ertr = base_ertr;
4545
virtual close_ertr::future<> close() = 0;
46+
47+
virtual bool can_inplace_rewrite(Transaction& t,
48+
CachedExtentRef extent) = 0;
4649
};
4750
using ExtentOolWriterRef = std::unique_ptr<ExtentOolWriter>;
4851

@@ -79,6 +82,11 @@ class SegmentedOolWriter : public ExtentOolWriter {
7982
return make_delayed_temp_paddr(0);
8083
}
8184

85+
bool can_inplace_rewrite(Transaction& t,
86+
CachedExtentRef extent) final {
87+
return false;
88+
}
89+
8290
private:
8391
alloc_write_iertr::future<> do_write(
8492
Transaction& t,
@@ -122,6 +130,17 @@ class RandomBlockOolWriter : public ExtentOolWriter {
122130
return rb_cleaner->alloc_paddr(length);
123131
}
124132

133+
bool can_inplace_rewrite(Transaction& t,
134+
CachedExtentRef extent) final {
135+
if (!extent->is_dirty()) {
136+
return false;
137+
}
138+
assert(t.get_src() == transaction_type_t::TRIM_DIRTY);
139+
ceph_assert_always(extent->get_type() == extent_types_t::ROOT ||
140+
extent->get_paddr().is_absolute());
141+
return crimson::os::seastore::can_inplace_rewrite(extent->get_type());
142+
}
143+
125144
private:
126145
alloc_write_iertr::future<> do_write(
127146
Transaction& t,
@@ -199,6 +218,14 @@ class ExtentPlacementManager {
199218
background_process.set_extent_callback(cb);
200219
}
201220

221+
bool can_inplace_rewrite(Transaction& t, CachedExtentRef extent) {
222+
auto writer = get_writer(placement_hint_t::REWRITE,
223+
get_extent_category(extent->get_type()),
224+
OOL_GENERATION);
225+
ceph_assert(writer);
226+
return writer->can_inplace_rewrite(t, extent);
227+
}
228+
202229
journal_type_t get_journal_type() const {
203230
return background_process.get_journal_type();
204231
}

src/crimson/os/seastore/journal.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#include "crimson/os/seastore/ordering_handle.h"
99
#include "crimson/os/seastore/seastore_types.h"
1010
#include "crimson/os/seastore/segment_seq_allocator.h"
11+
#include "crimson/os/seastore/cached_extent.h"
1112

1213
namespace crimson::os::seastore {
1314

@@ -88,7 +89,7 @@ class Journal {
8889
crimson::ct_error::erange>;
8990
using replay_ret = replay_ertr::future<>;
9091
using delta_handler_t = std::function<
91-
replay_ertr::future<bool>(
92+
replay_ertr::future<std::pair<bool, CachedExtentRef>>(
9293
const record_locator_t&,
9394
const delta_info_t&,
9495
const journal_seq_t&, // dirty_tail

src/crimson/os/seastore/journal/circular_bounded_journal.cc

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -316,7 +316,8 @@ Journal::replay_ret CircularBoundedJournal::replay(
316316
return seastar::do_with(
317317
std::move(delta_handler),
318318
std::map<paddr_t, journal_seq_t>(),
319-
[this](auto &d_handler, auto &map) {
319+
std::map<paddr_t, std::pair<CachedExtentRef, uint32_t>>(),
320+
[this](auto &d_handler, auto &map, auto &crc_info) {
320321
auto build_paddr_seq_map = [&map](
321322
const auto &offsets,
322323
const auto &e,
@@ -339,8 +340,8 @@ Journal::replay_ret CircularBoundedJournal::replay(
339340
// The first pass to build the paddr->journal_seq_t map
340341
// from extent allocations
341342
return scan_valid_record_delta(std::move(build_paddr_seq_map), tail
342-
).safe_then([this, &map, &d_handler, tail]() {
343-
auto call_d_handler_if_valid = [this, &map, &d_handler](
343+
).safe_then([this, &map, &d_handler, tail, &crc_info]() {
344+
auto call_d_handler_if_valid = [this, &map, &d_handler, &crc_info](
344345
const auto &offsets,
345346
const auto &e,
346347
sea_time_point modify_time)
@@ -353,12 +354,27 @@ Journal::replay_ret CircularBoundedJournal::replay(
353354
get_dirty_tail(),
354355
get_alloc_tail(),
355356
modify_time
356-
);
357+
).safe_then([&e, &crc_info](auto ret) {
358+
auto [applied, ext] = ret;
359+
if (applied && ext && can_inplace_rewrite(
360+
ext->get_type())) {
361+
crc_info[ext->get_paddr()] =
362+
std::make_pair(ext, e.final_crc);
363+
}
364+
return replay_ertr::make_ready_future<bool>(applied);
365+
});
357366
}
358367
return replay_ertr::make_ready_future<bool>(true);
359368
};
360369
// The second pass to replay deltas
361-
return scan_valid_record_delta(std::move(call_d_handler_if_valid), tail);
370+
return scan_valid_record_delta(std::move(call_d_handler_if_valid), tail
371+
).safe_then([&crc_info]() {
372+
for (auto p : crc_info) {
373+
ceph_assert_always(p.second.first->get_last_committed_crc() == p.second.second);
374+
}
375+
crc_info.clear();
376+
return replay_ertr::now();
377+
});
362378
});
363379
}).safe_then([this]() {
364380
// make sure that committed_to is JOURNAL_SEQ_NULL if jounal is the initial state

src/crimson/os/seastore/journal/segmented_journal.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,8 @@ SegmentedJournal::replay_segment(
291291
trimmer.get_dirty_tail(),
292292
trimmer.get_alloc_tail(),
293293
modify_time
294-
).safe_then([&stats, delta_type=delta.type](bool is_applied) {
294+
).safe_then([&stats, delta_type=delta.type](auto ret) {
295+
auto [is_applied, ext] = ret;
295296
if (is_applied) {
296297
// see Cache::replay_delta()
297298
assert(delta_type != extent_types_t::JOURNAL_TAIL);

0 commit comments

Comments
 (0)