Skip to content

Commit eb9e05b

Browse files
authored
Merge pull request ceph#58913 from cyx1231st/wip-seastore-unlock-ool
crimson/os/seastore: move ool writes from collection lock to concurrent DeviceSubmission phase Reviewed-by: Myoungwon Oh <[email protected]> Reviewed-by: Xuehan Xu <[email protected]>
2 parents 44fc9fa + 19dbe68 commit eb9e05b

15 files changed

+275
-187
lines changed

src/crimson/os/seastore/cache.cc

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1322,7 +1322,7 @@ record_t Cache::prepare_record(
13221322
}
13231323
}
13241324

1325-
for (auto &i: t.written_ool_block_list) {
1325+
for (auto &i: t.ool_block_list) {
13261326
TRACET("fresh ool extent -- {}", t, *i);
13271327
ceph_assert(i->is_valid());
13281328
assert(!i->is_inline());
@@ -1340,7 +1340,7 @@ record_t Cache::prepare_record(
13401340
}
13411341
}
13421342

1343-
for (auto &i: t.written_inplace_ool_block_list) {
1343+
for (auto &i: t.inplace_ool_block_list) {
13441344
if (!i->is_valid()) {
13451345
continue;
13461346
}
@@ -1422,13 +1422,13 @@ record_t Cache::prepare_record(
14221422

14231423
ceph_assert(t.get_fresh_block_stats().num ==
14241424
t.inline_block_list.size() +
1425-
t.written_ool_block_list.size() +
1425+
t.ool_block_list.size() +
14261426
t.num_delayed_invalid_extents +
14271427
t.num_allocated_invalid_extents);
14281428

14291429
auto& ool_stats = t.get_ool_write_stats();
1430-
ceph_assert(ool_stats.extents.num == t.written_ool_block_list.size() +
1431-
t.written_inplace_ool_block_list.size());
1430+
ceph_assert(ool_stats.extents.num == t.ool_block_list.size() +
1431+
t.inplace_ool_block_list.size());
14321432

14331433
if (record.is_empty()) {
14341434
SUBINFOT(seastore_t,

src/crimson/os/seastore/extent_placement_manager.cc

Lines changed: 76 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,7 @@ SegmentedOolWriter::SegmentedOolWriter(
2828
{
2929
}
3030

31-
SegmentedOolWriter::alloc_write_ertr::future<>
32-
SegmentedOolWriter::write_record(
31+
void SegmentedOolWriter::write_record(
3332
Transaction& t,
3433
record_t&& record,
3534
std::list<LogicalCachedExtentRef>&& extents,
@@ -47,24 +46,36 @@ SegmentedOolWriter::write_record(
4746
stats.md_bytes += record.size.get_raw_mdlength();
4847
stats.num_records += 1;
4948

50-
return record_submitter.submit(
49+
auto ret = record_submitter.submit(
5150
std::move(record),
52-
with_atomic_roll_segment
53-
).safe_then([this, FNAME, &t, extents=std::move(extents)
54-
](record_locator_t ret) mutable {
55-
DEBUGT("{} finish with {} and {} extents",
51+
with_atomic_roll_segment);
52+
DEBUGT("{} start at {} with {} extents ...",
53+
t, segment_allocator.get_name(),
54+
ret.record_base_regardless_md,
55+
extents.size());
56+
paddr_t extent_addr = ret.record_base_regardless_md.offset;
57+
for (auto& extent : extents) {
58+
TRACET("{} extent will be written at {} -- {}",
5659
t, segment_allocator.get_name(),
57-
ret, extents.size());
58-
paddr_t extent_addr = ret.record_block_base;
59-
for (auto& extent : extents) {
60-
TRACET("{} ool extent written at {} -- {}",
61-
t, segment_allocator.get_name(),
62-
extent_addr, *extent);
63-
t.update_delayed_ool_extent_addr(extent, extent_addr);
64-
extent_addr = extent_addr.as_seg_paddr().add_offset(
65-
extent->get_length());
66-
}
60+
extent_addr, *extent);
61+
t.update_delayed_ool_extent_addr(extent, extent_addr);
62+
extent_addr = extent_addr.as_seg_paddr().add_offset(
63+
extent->get_length());
64+
}
65+
// t might be destructed inside write_future
66+
auto write_future = seastar::with_gate(write_guard,
67+
[this, FNAME, tid=t.get_trans_id(),
68+
record_base=ret.record_base_regardless_md,
69+
submit_fut=std::move(ret.future)]() mutable {
70+
return std::move(submit_fut
71+
).safe_then([this, FNAME, tid, record_base](record_locator_t ret) {
72+
TRACE("trans.{} {} finish {}=={}",
73+
tid, segment_allocator.get_name(), ret, record_base);
74+
// ool won't write metadata, so the paddrs must be equal
75+
assert(ret.record_block_base == record_base.offset);
76+
});
6777
});
78+
t.get_handle().add_write_future(std::move(write_future));
6879
}
6980

7081
SegmentedOolWriter::alloc_write_iertr::future<>
@@ -101,19 +112,15 @@ SegmentedOolWriter::do_write(
101112
DEBUGT("{} extents={} submit {} extents and roll, unavailable ...",
102113
t, segment_allocator.get_name(),
103114
extents.size(), num_extents);
104-
auto fut_write = alloc_write_ertr::now();
105115
if (num_extents > 0) {
106116
assert(record_submitter.check_action(record.size) !=
107117
action_t::ROLL);
108-
fut_write = write_record(
118+
write_record(
109119
t, std::move(record), std::move(pending_extents),
110120
true/* with_atomic_roll_segment */);
111121
}
112122
return trans_intr::make_interruptible(
113-
record_submitter.roll_segment(
114-
).safe_then([fut_write=std::move(fut_write)]() mutable {
115-
return std::move(fut_write);
116-
})
123+
record_submitter.roll_segment()
117124
).si_then([this, &t, &extents] {
118125
return do_write(t, extents);
119126
});
@@ -144,15 +151,12 @@ SegmentedOolWriter::do_write(
144151
DEBUGT("{} extents={} submit {} extents ...",
145152
t, segment_allocator.get_name(),
146153
extents.size(), pending_extents.size());
147-
return trans_intr::make_interruptible(
148-
write_record(t, std::move(record), std::move(pending_extents))
149-
).si_then([this, &t, &extents] {
150-
if (!extents.empty()) {
151-
return do_write(t, extents);
152-
} else {
153-
return alloc_write_iertr::now();
154-
}
155-
});
154+
write_record(t, std::move(record), std::move(pending_extents));
155+
if (!extents.empty()) {
156+
return do_write(t, extents);
157+
} else {
158+
return alloc_write_iertr::now();
159+
}
156160
}
157161
// SUBMIT_NOT_FULL: evaluate the next extent
158162
}
@@ -162,8 +166,8 @@ SegmentedOolWriter::do_write(
162166
t, segment_allocator.get_name(),
163167
num_extents);
164168
assert(num_extents > 0);
165-
return trans_intr::make_interruptible(
166-
write_record(t, std::move(record), std::move(pending_extents)));
169+
write_record(t, std::move(record), std::move(pending_extents));
170+
return alloc_write_iertr::now();
167171
}
168172

169173
SegmentedOolWriter::alloc_write_iertr::future<>
@@ -498,6 +502,7 @@ ExtentPlacementManager::write_delayed_ool_extents(
498502
assert(extent->is_valid());
499503
});
500504
#endif
505+
assert(writer->get_type() == backend_type_t::SEGMENTED);
501506
return writer->alloc_write_ool_extents(t, extents);
502507
});
503508
}
@@ -524,6 +529,7 @@ ExtentPlacementManager::write_preallocated_ool_extents(
524529
return trans_intr::do_for_each(alloc_map, [&t](auto& p) {
525530
auto writer = p.first;
526531
auto& extents = p.second;
532+
assert(writer->get_type() == backend_type_t::RANDOM_BLOCK);
527533
return writer->alloc_write_ool_extents(t, extents);
528534
});
529535
});
@@ -985,27 +991,27 @@ RandomBlockOolWriter::alloc_write_ool_extents(
985991
if (extents.empty()) {
986992
return alloc_write_iertr::now();
987993
}
988-
return seastar::with_gate(write_guard, [this, &t, &extents] {
989-
return do_write(t, extents);
990-
});
994+
do_write(t, extents);
995+
return alloc_write_iertr::now();
991996
}
992997

993-
RandomBlockOolWriter::alloc_write_iertr::future<>
994-
RandomBlockOolWriter::do_write(
998+
void RandomBlockOolWriter::do_write(
995999
Transaction& t,
9961000
std::list<CachedExtentRef>& extents)
9971001
{
9981002
LOG_PREFIX(RandomBlockOolWriter::do_write);
9991003
assert(!extents.empty());
10001004
DEBUGT("start with {} allocated extents",
10011005
t, extents.size());
1002-
return trans_intr::do_for_each(extents,
1003-
[this, &t, FNAME](auto& ex) {
1006+
std::vector<write_info_t> writes;
1007+
writes.reserve(extents.size());
1008+
for (auto& ex : extents) {
10041009
auto paddr = ex->get_paddr();
10051010
assert(paddr.is_absolute());
10061011
RandomBlockManager * rbm = rb_cleaner->get_rbm(paddr);
10071012
assert(rbm);
1008-
TRACE("extent {}, allocated addr {}", fmt::ptr(ex.get()), paddr);
1013+
TRACE("write extent {}, paddr {} ...",
1014+
fmt::ptr(ex.get()), paddr);
10091015
auto& stats = t.get_ool_write_stats();
10101016
stats.extents.num += 1;
10111017
stats.extents.bytes += ex->get_length();
@@ -1029,29 +1035,36 @@ RandomBlockOolWriter::do_write(
10291035
trans_stats.data_bytes += ex->get_length();
10301036
w_stats.data_bytes += ex->get_length();
10311037
}
1032-
return trans_intr::make_interruptible(
1033-
rbm->write(paddr + offset,
1034-
bp
1035-
).handle_error(
1036-
alloc_write_iertr::pass_further{},
1037-
crimson::ct_error::assert_all{
1038-
"Invalid error when writing record"}
1039-
)
1040-
).si_then([this, &t, &ex, paddr, FNAME] {
1041-
TRACET("ool extent written at {} -- {}",
1042-
t, paddr, *ex);
1043-
if (ex->is_initial_pending()) {
1044-
t.mark_allocated_extent_ool(ex);
1045-
} else if (can_inplace_rewrite(t, ex)) {
1046-
assert(ex->is_logical());
1047-
t.mark_inplace_rewrite_extent_ool(
1048-
ex->template cast<LogicalCachedExtent>());
1049-
} else {
1050-
ceph_assert("impossible");
1051-
}
1052-
return alloc_write_iertr::now();
1038+
writes.push_back(write_info_t{paddr + offset, std::move(bp), rbm});
1039+
1040+
if (ex->is_initial_pending()) {
1041+
t.mark_allocated_extent_ool(ex);
1042+
} else if (can_inplace_rewrite(t, ex)) {
1043+
assert(ex->is_logical());
1044+
t.mark_inplace_rewrite_extent_ool(
1045+
ex->template cast<LogicalCachedExtent>());
1046+
} else {
1047+
ceph_assert("impossible");
1048+
}
1049+
}
1050+
1051+
// t might be destructed inside write_future
1052+
auto write_future = seastar::with_gate(write_guard,
1053+
[writes=std::move(writes)]() mutable {
1054+
return seastar::do_with(std::move(writes),
1055+
[](auto& writes) {
1056+
return crimson::do_for_each(writes,
1057+
[](auto& info) {
1058+
return info.rbm->write(info.offset, info.bp
1059+
).handle_error(
1060+
alloc_write_ertr::pass_further{},
1061+
crimson::ct_error::assert_all{
1062+
"Invalid error when writing record"}
1063+
);
1064+
});
10531065
});
10541066
});
1067+
t.get_handle().add_write_future(std::move(write_future));
10551068
}
10561069

10571070
}

src/crimson/os/seastore/extent_placement_manager.h

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ class SegmentedOolWriter : public ExtentOolWriter {
115115
Transaction& t,
116116
std::list<CachedExtentRef> &extent);
117117

118-
alloc_write_ertr::future<> write_record(
118+
void write_record(
119119
Transaction& t,
120120
record_t&& record,
121121
std::list<LogicalCachedExtentRef> &&extents,
@@ -190,7 +190,12 @@ class RandomBlockOolWriter : public ExtentOolWriter {
190190
}
191191
#endif
192192
private:
193-
alloc_write_iertr::future<> do_write(
193+
struct write_info_t {
194+
paddr_t offset;
195+
ceph::bufferptr bp;
196+
RandomBlockManager* rbm;
197+
};
198+
void do_write(
194199
Transaction& t,
195200
std::list<CachedExtentRef> &extent);
196201

src/crimson/os/seastore/journal/circular_bounded_journal.cc

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,9 +94,12 @@ CircularBoundedJournal::do_submit_record(
9494
(void*)&handle,
9595
action == RecordSubmitter::action_t::SUBMIT_FULL ?
9696
"FULL" : "NOT_FULL");
97-
auto submit_fut = record_submitter.submit(std::move(record));
97+
auto submit_ret = record_submitter.submit(std::move(record));
98+
// submit_ret.record_base_regardless_md is wrong for journaling
9899
return handle.enter(write_pipeline->device_submission
99-
).then([submit_fut=std::move(submit_fut)]() mutable {
100+
).then([&handle] {
101+
return handle.take_write_future();
102+
}).safe_then([submit_fut=std::move(submit_ret.future)]() mutable {
100103
return std::move(submit_fut);
101104
}).safe_then([FNAME, this, &handle](record_locator_t result) {
102105
return handle.enter(write_pipeline->finalize

src/crimson/os/seastore/journal/circular_journal_space.cc

Lines changed: 10 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ CircularJournalSpace::roll_ertr::future<> CircularJournalSpace::roll() {
4949
return roll_ertr::now();
5050
}
5151

52-
CircularJournalSpace::write_ret
52+
CircularJournalSpace::write_ertr::future<>
5353
CircularJournalSpace::write(ceph::bufferlist&& to_write) {
5454
LOG_PREFIX(CircularJournalSpace::write);
5555
assert(get_written_to().segment_seq != NULL_SEG_SEQ);
@@ -60,7 +60,6 @@ CircularJournalSpace::write(ceph::bufferlist&& to_write) {
6060
assert(encoded_size + get_rbm_addr(get_written_to())
6161
< get_journal_end());
6262

63-
journal_seq_t j_seq = get_written_to();
6463
auto target = get_rbm_addr(get_written_to());
6564
auto new_written_to = target + encoded_size;
6665
assert(new_written_to < get_journal_end());
@@ -69,22 +68,12 @@ CircularJournalSpace::write(ceph::bufferlist&& to_write) {
6968
get_device_id());
7069
set_written_to(
7170
journal_seq_t{get_written_to().segment_seq, paddr});
72-
DEBUG("{}, target {}", to_write.length(), target);
71+
DEBUG("length {}, commit target {}, used_size {}",
72+
encoded_size, target, get_records_used_size());
7373

74-
auto write_result = write_result_t{
75-
j_seq,
76-
encoded_size
77-
};
7874
return device_write_bl(target, to_write
79-
).safe_then([this, target,
80-
length=encoded_size,
81-
write_result,
82-
FNAME] {
83-
DEBUG("commit target {} used_size {} written length {}",
84-
target, get_records_used_size(), length);
85-
return write_result;
86-
}).handle_error(
87-
base_ertr::pass_further{},
75+
).handle_error(
76+
write_ertr::pass_further{},
8877
crimson::ct_error::assert_all{ "Invalid error" }
8978
);
9079
}
@@ -167,7 +156,8 @@ ceph::bufferlist CircularJournalSpace::encode_header()
167156
return bl;
168157
}
169158

170-
CircularJournalSpace::write_ertr::future<> CircularJournalSpace::device_write_bl(
159+
CircularJournalSpace::submit_ertr::future<>
160+
CircularJournalSpace::device_write_bl(
171161
rbm_abs_addr offset, bufferlist &bl)
172162
{
173163
LOG_PREFIX(CircularJournalSpace::device_write_bl);
@@ -181,7 +171,7 @@ CircularJournalSpace::write_ertr::future<> CircularJournalSpace::device_write_bl
181171
length);
182172
return device->writev(offset, bl
183173
).handle_error(
184-
write_ertr::pass_further{},
174+
submit_ertr::pass_further{},
185175
crimson::ct_error::assert_all{ "Invalid error device->write" }
186176
);
187177
}
@@ -229,7 +219,7 @@ CircularJournalSpace::read_header()
229219
});
230220
}
231221

232-
CircularJournalSpace::write_ertr::future<>
222+
CircularJournalSpace::submit_ertr::future<>
233223
CircularJournalSpace::write_header()
234224
{
235225
LOG_PREFIX(CircularJournalSpace::write_header);
@@ -245,7 +235,7 @@ CircularJournalSpace::write_header()
245235
iter.copy(bl.length(), bp.c_str());
246236
return device->write(device->get_shard_journal_start(), std::move(bp)
247237
).handle_error(
248-
write_ertr::pass_further{},
238+
submit_ertr::pass_further{},
249239
crimson::ct_error::assert_all{ "Invalid error device->write" }
250240
);
251241
}

0 commit comments

Comments
 (0)