Skip to content

Commit 38578de

Browse files
authored
Merge pull request ceph#63873 from aainscow/ec_fixpack_pr
osd: Multiple fixes to optimized EC and peering
2 parents 0d76061 + 71c9154 commit 38578de

31 files changed

+1216
-551
lines changed

src/crimson/osd/pg.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -478,9 +478,13 @@ class PG : public boost::intrusive_ref_counter<
478478
void trim(const pg_log_entry_t &entry) override {
479479
// TODO
480480
}
481-
void partial_write(pg_info_t *info, const pg_log_entry_t &entry) override {
481+
void partial_write(pg_info_t *info,
482+
eversion_t previous_version,
483+
const pg_log_entry_t &entry
484+
) override {
482485
// TODO
483-
ceph_assert(entry.written_shards.empty() && info->partial_writes_last_complete.empty());
486+
ceph_assert(entry.written_shards.empty() &&
487+
info->partial_writes_last_complete.empty());
484488
}
485489
};
486490
PGLog::LogEntryHandlerRef get_log_handler(

src/mon/OSDMonitor.cc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1524,9 +1524,11 @@ void OSDMonitor::prime_pg_temp(
15241524
{
15251525
std::lock_guard l(prime_pg_temp_lock);
15261526
// do not touch a mapping if a change is pending
1527+
std::vector<int> pg_temp = pool ? next.pgtemp_primaryfirst(*pool, acting) :
1528+
acting;
15271529
pending_inc.new_pg_temp.emplace(
15281530
pgid,
1529-
mempool::osdmap::vector<int>(acting.begin(), acting.end()));
1531+
mempool::osdmap::vector<int>(pg_temp.begin(), pg_temp.end()));
15301532
}
15311533
}
15321534

src/osd/ECBackend.cc

Lines changed: 101 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ void ECBackend::RecoveryBackend::handle_recovery_push(
200200
bool is_repair) {
201201
if (get_parent()->check_failsafe_full()) {
202202
dout(10) << __func__ << " Out of space (failsafe) processing push request."
203-
<< dendl;
203+
<< dendl;
204204
ceph_abort();
205205
}
206206

@@ -242,7 +242,7 @@ void ECBackend::RecoveryBackend::handle_recovery_push(
242242
}
243243

244244
if (op.before_progress.first) {
245-
ceph_assert(op.attrset.count(string("_")));
245+
ceph_assert(op.attrset.contains(OI_ATTR));
246246
m->t.setattrs(
247247
coll,
248248
tobj,
@@ -290,23 +290,23 @@ void ECBackend::RecoveryBackend::handle_recovery_push(
290290
}
291291

292292
void ECBackend::RecoveryBackend::handle_recovery_push_reply(
293-
const PushReplyOp &op,
294-
pg_shard_t from,
295-
RecoveryMessages *m) {
293+
const PushReplyOp &op,
294+
pg_shard_t from,
295+
RecoveryMessages *m) {
296296
if (!recovery_ops.count(op.soid))
297297
return;
298298
RecoveryOp &rop = recovery_ops[op.soid];
299-
ceph_assert(rop.waiting_on_pushes.count(from));
299+
ceph_assert(rop.waiting_on_pushes.contains(from));
300300
rop.waiting_on_pushes.erase(from);
301301
continue_recovery_op(rop, m);
302302
}
303303

304304
void ECBackend::RecoveryBackend::handle_recovery_read_complete(
305-
const hobject_t &hoid,
306-
ECUtil::shard_extent_map_t &&buffers_read,
307-
std::optional<map<string, bufferlist, less<>>> attrs,
308-
const ECUtil::shard_extent_set_t &want_to_read,
309-
RecoveryMessages *m) {
305+
const hobject_t &hoid,
306+
ECUtil::shard_extent_map_t &&buffers_read,
307+
std::optional<map<string, bufferlist, less<>>> attrs,
308+
const ECUtil::shard_extent_set_t &want_to_read,
309+
RecoveryMessages *m) {
310310
dout(10) << __func__ << ": returned " << hoid << " " << buffers_read << dendl;
311311
ceph_assert(recovery_ops.contains(hoid));
312312
RecoveryBackend::RecoveryOp &op = recovery_ops[hoid];
@@ -373,14 +373,10 @@ void ECBackend::RecoveryBackend::handle_recovery_read_complete(
373373
}
374374
}
375375

376-
uint64_t aligned_size = ECUtil::align_page_next(op.obc->obs.oi.size);
376+
uint64_t aligned_size = ECUtil::align_next(op.obc->obs.oi.size);
377377

378378
int r = op.returned_data->decode(ec_impl, shard_want_to_read, aligned_size);
379379
ceph_assert(r == 0);
380-
// We are never appending here, so we never need hinfo.
381-
op.returned_data->insert_parity_buffers();
382-
r = op.returned_data->encode(ec_impl, NULL, 0);
383-
ceph_assert(r==0);
384380

385381
// Finally, we don't want to write any padding, so truncate the buffer
386382
// to remove it.
@@ -393,8 +389,10 @@ void ECBackend::RecoveryBackend::handle_recovery_read_complete(
393389
}
394390
}
395391

396-
dout(20) << __func__ << ": oid=" << op.hoid << " "
397-
<< op.returned_data->debug_string(2048, 8) << dendl;
392+
dout(20) << __func__ << ": oid=" << op.hoid << dendl;
393+
dout(30) << __func__ << "EC_DEBUG_BUFFERS: "
394+
<< op.returned_data->debug_string(2048, 8)
395+
<< dendl;
398396

399397
continue_recovery_op(op, m);
400398
}
@@ -536,12 +534,30 @@ void ECBackend::RecoveryBackend::continue_recovery_op(
536534

537535
op.state = RecoveryOp::READING;
538536

539-
// We always read the recovery chunk size (default 8MiB + parity). If that
540-
// amount of data is not available, then the backend will truncate the
541-
// response.
537+
/* When beginning recovery, the OI may not be known. As such the object
538+
* size is not known. For the first read, attempt to read the default
539+
* size. If this is larger than the object sizes, then the OSD will
540+
* return truncated reads. If the object size is known, then attempt
541+
* correctly sized reads.
542+
*/
543+
uint64_t read_size = get_recovery_chunk_size();
544+
if (op.obc) {
545+
uint64_t read_to_end = ECUtil::align_next(op.obc->obs.oi.size) -
546+
op.recovery_progress.data_recovered_to;
547+
548+
if (read_to_end < read_size) {
549+
read_size = read_to_end;
550+
}
551+
}
542552
sinfo.ro_range_to_shard_extent_set_with_parity(
543-
op.recovery_progress.data_recovered_to,
544-
get_recovery_chunk_size(), want);
553+
op.recovery_progress.data_recovered_to, read_size, want);
554+
555+
op.recovery_progress.data_recovered_to += read_size;
556+
557+
// We only need to recover shards that are missing.
558+
for (auto shard : shard_id_set::difference(sinfo.get_all_shards(), op.missing_on_shards)) {
559+
want.erase(shard);
560+
}
545561

546562
if (op.recovery_progress.first && op.obc) {
547563
op.xattrs = op.obc->attr_cache;
@@ -591,9 +607,15 @@ void ECBackend::RecoveryBackend::continue_recovery_op(
591607
}
592608
if (read_request.shard_reads.empty()) {
593609
ceph_assert(op.obc);
594-
ceph_assert(0 == op.obc->obs.oi.size);
595-
dout(10) << __func__ << "Zero size object recovery, skipping reads."
596-
<< op << dendl;
610+
/* This can happen for several reasons
611+
* - A zero-sized object.
612+
* - The missing shards have no data.
613+
* - The previous recovery did not need the last data shard. In this
614+
* case, data_recovered_to may indicate that the last shard still
615+
* needs recovery, when it does not.
616+
* We can just skip the read and fall through below.
617+
*/
618+
dout(10) << __func__ << " No reads required " << op << dendl;
597619
// Create an empty read result and fall through.
598620
op.returned_data.emplace(&sinfo);
599621
} else {
@@ -612,7 +634,6 @@ void ECBackend::RecoveryBackend::continue_recovery_op(
612634
dout(20) << __func__ << ": returned_data=" << op.returned_data << dendl;
613635
op.state = RecoveryOp::WRITING;
614636
ObjectRecoveryProgress after_progress = op.recovery_progress;
615-
after_progress.data_recovered_to = op.returned_data->get_ro_end();
616637
after_progress.first = false;
617638
if (after_progress.data_recovered_to >= op.obc->obs.oi.size) {
618639
after_progress.data_complete = true;
@@ -621,7 +642,7 @@ void ECBackend::RecoveryBackend::continue_recovery_op(
621642
m->pushes[pg_shard].push_back(PushOp());
622643
PushOp &pop = m->pushes[pg_shard].back();
623644
pop.soid = op.hoid;
624-
pop.version = op.v;
645+
pop.version = op.recovery_info.oi.get_version_for_shard(pg_shard.shard);
625646
op.returned_data->get_shard_first_buffer(pg_shard.shard, pop.data);
626647
dout(10) << __func__ << ": pop shard=" << pg_shard
627648
<< ", oid=" << pop.soid
@@ -634,7 +655,26 @@ void ECBackend::RecoveryBackend::continue_recovery_op(
634655
op.returned_data->get_shard_first_offset(pg_shard.shard),
635656
pop.data.length());
636657
if (op.recovery_progress.first) {
637-
pop.attrset = op.xattrs;
658+
if (sinfo.is_nonprimary_shard(pg_shard.shard)) {
659+
if (pop.version == op.recovery_info.oi.version) {
660+
dout(10) << __func__ << ": copy OI attr only" << dendl;
661+
pop.attrset[OI_ATTR] = op.xattrs[OI_ATTR];
662+
} else {
663+
// We are recovering a partial write - make sure we push the correct
664+
// version in the OI or a scrub error will occur.
665+
object_info_t oi(op.recovery_info.oi);
666+
oi.shard_versions.clear();
667+
oi.version = pop.version;
668+
dout(10) << __func__ << ": partial write OI attr: oi=" << oi << dendl;
669+
bufferlist bl;
670+
oi.encode(bl, get_osdmap()->get_features(
671+
CEPH_ENTITY_TYPE_OSD, nullptr));
672+
pop.attrset[OI_ATTR] = bl;
673+
}
674+
} else {
675+
dout(10) << __func__ << ": push all attrs (not nonprimary)" << dendl;
676+
pop.attrset = op.xattrs;
677+
}
638678
}
639679
pop.recovery_info = op.recovery_info;
640680
pop.before_progress = op.recovery_progress;
@@ -1050,8 +1090,7 @@ void ECBackend::handle_sub_read(
10501090
<< dendl;
10511091
} else {
10521092
get_parent()->clog_error() << "Error " << r
1053-
<< " reading object "
1054-
<< hoid;
1093+
<< " reading object " << hoid;
10551094
dout(5) << __func__ << ": Error " << r
10561095
<< " reading " << hoid << dendl;
10571096
}
@@ -1085,8 +1124,7 @@ void ECBackend::handle_sub_read(
10851124
if (!hinfo) {
10861125
r = -EIO;
10871126
get_parent()->clog_error() << "Corruption detected: object "
1088-
<< hoid
1089-
<< " is missing hash_info";
1127+
<< hoid << " is missing hash_info";
10901128
dout(5) << __func__ << ": No hinfo for " << hoid << dendl;
10911129
goto error;
10921130
}
@@ -1102,8 +1140,8 @@ void ECBackend::handle_sub_read(
11021140
<< hex << h.digest() << " expected 0x" << hinfo->
11031141
get_chunk_hash(shard) << dec;
11041142
dout(5) << __func__ << ": Bad hash for " << hoid << " digest 0x"
1105-
<< hex << h.digest() << " expected 0x" << hinfo->
1106-
get_chunk_hash(shard) << dec << dendl;
1143+
<< hex << h.digest() << " expected 0x"
1144+
<< hinfo->get_chunk_hash(shard) << dec << dendl;
11071145
r = -EIO;
11081146
goto error;
11091147
}
@@ -1172,9 +1210,9 @@ void ECBackend::handle_sub_write_reply(
11721210
}
11731211

11741212
void ECBackend::handle_sub_read_reply(
1175-
pg_shard_t from,
1176-
ECSubReadReply &op,
1177-
const ZTracer::Trace &trace) {
1213+
pg_shard_t from,
1214+
ECSubReadReply &op,
1215+
const ZTracer::Trace &trace) {
11781216
trace.event("ec sub read reply");
11791217
dout(10) << __func__ << ": reply " << op << dendl;
11801218
map<ceph_tid_t, ReadOp>::iterator iter = read_pipeline.tid_to_read_map.
@@ -1227,26 +1265,19 @@ void ECBackend::handle_sub_read_reply(
12271265
rop.complete.emplace(hoid, &sinfo);
12281266
}
12291267
auto &complete = rop.complete.at(hoid);
1230-
for (auto &&[shard, read]: std::as_const(req.shard_reads)) {
1231-
if (complete.errors.contains(read.pg_shard)) continue;
1232-
1233-
complete.processed_read_requests[shard].union_of(read.extents);
1234-
1235-
if (!rop.complete.contains(hoid) ||
1236-
!complete.buffers_read.contains(shard)) {
1237-
if (!read.extents.empty()) continue; // Complete the actual read first.
1238-
1239-
// If we are first here, populate the completion.
1240-
if (!rop.complete.contains(hoid)) {
1241-
rop.complete.emplace(hoid, read_result_t(&sinfo));
1242-
}
1243-
}
1268+
if (!req.shard_reads.contains(from.shard)) {
1269+
continue;
1270+
}
1271+
const shard_read_t &read = req.shard_reads.at(from.shard);
1272+
if (!complete.errors.contains(from)) {
1273+
dout(20) << __func__ <<" read:" << read << dendl;
1274+
complete.processed_read_requests[from.shard].union_of(read.extents);
12441275
}
12451276
}
12461277
for (auto &&[hoid, attr]: op.attrs_read) {
12471278
ceph_assert(!op.errors.count(hoid));
12481279
// if read error better not have sent an attribute
1249-
if (!rop.to_read.count(hoid)) {
1280+
if (!rop.to_read.contains(hoid)) {
12501281
// We canceled this read! @see filter_read_op
12511282
dout(20) << __func__ << " to_read skipping" << dendl;
12521283
continue;
@@ -1290,6 +1321,8 @@ void ECBackend::handle_sub_read_reply(
12901321
rop.to_read.at(oid).shard_want_to_read.
12911322
populate_shard_id_set(want_to_read);
12921323

1324+
dout(20) << __func__ << " read_result: " << read_result << dendl;
1325+
12931326
int err = ec_impl->minimum_to_decode(want_to_read, have, dummy_minimum,
12941327
nullptr);
12951328
if (err) {
@@ -1305,7 +1338,7 @@ void ECBackend::handle_sub_read_reply(
13051338
// We found that new reads are required to do a decode.
13061339
need_resend = true;
13071340
continue;
1308-
} else if (r > 0) {
1341+
} else if (r > 0) {
13091342
// No new reads were requested. This means that some parity
13101343
// shards can be assumed to be zeros.
13111344
err = 0;
@@ -1340,7 +1373,8 @@ void ECBackend::handle_sub_read_reply(
13401373
rop.complete.at(oid).errors.clear();
13411374
}
13421375
}
1343-
// avoid re-read for completed object as we may send remaining reads for uncopmpleted objects
1376+
// avoid re-read for completed object as we may send remaining reads for
1377+
// uncompleted objects
13441378
rop.to_read.at(oid).shard_reads.clear();
13451379
rop.to_read.at(oid).want_attrs = false;
13461380
++is_complete;
@@ -1599,28 +1633,28 @@ void ECBackend::submit_transaction(
15991633
}
16001634

16011635
int ECBackend::objects_read_sync(
1602-
const hobject_t &hoid,
1603-
uint64_t off,
1604-
uint64_t len,
1605-
uint32_t op_flags,
1606-
bufferlist *bl) {
1636+
const hobject_t &hoid,
1637+
uint64_t off,
1638+
uint64_t len,
1639+
uint32_t op_flags,
1640+
bufferlist *bl) {
16071641
return -EOPNOTSUPP;
16081642
}
16091643

16101644
void ECBackend::objects_read_async(
1611-
const hobject_t &hoid,
1612-
uint64_t object_size,
1613-
const list<pair<ec_align_t,
1614-
pair<bufferlist*, Context*>>> &to_read,
1615-
Context *on_complete,
1616-
bool fast_read) {
1645+
const hobject_t &hoid,
1646+
uint64_t object_size,
1647+
const list<pair<ec_align_t,
1648+
pair<bufferlist*, Context*>>> &to_read,
1649+
Context *on_complete,
1650+
bool fast_read) {
16171651
map<hobject_t, std::list<ec_align_t>> reads;
16181652

16191653
uint32_t flags = 0;
16201654
extent_set es;
16211655
for (const auto &[read, ctx]: to_read) {
16221656
pair<uint64_t, uint64_t> tmp;
1623-
if (!cct->_conf->osd_ec_partial_reads || fast_read) {
1657+
if (!cct->_conf->osd_ec_partial_reads) {
16241658
tmp = sinfo.ro_offset_len_to_stripe_ro_offset_len(read.offset, read.size);
16251659
} else {
16261660
tmp.first = read.offset;

src/osd/ECBackend.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,7 @@ class ECBackend : public ECCommon {
252252
hobject_t hoid;
253253
eversion_t v;
254254
std::set<pg_shard_t> missing_on;
255-
std::set<shard_id_t> missing_on_shards;
255+
shard_id_set missing_on_shards;
256256

257257
ObjectRecoveryInfo recovery_info;
258258
ObjectRecoveryProgress recovery_progress;

0 commit comments

Comments
 (0)