Skip to content

Commit 8b8b252

Browse files
authored
Merge pull request ceph#62146 from Matan-B/wip-matanb-crimson-wip-69439-v2
crimson: Handle peer replies handling once received Reviewed-by: Samuel Just <[email protected]> Reviewed-by: Xuehan Xu <[email protected]>
2 parents 56c25c2 + dc0c6ab commit 8b8b252

File tree

7 files changed

+42
-25
lines changed

7 files changed

+42
-25
lines changed

src/crimson/osd/ec_backend.cc

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,5 @@ ECBackend::submit_transaction(const std::set<pg_shard_t> &pg_shards,
3333
std::vector<pg_log_entry_t>&& log_entries)
3434
{
3535
// todo
36-
return make_ready_future<rep_op_ret_t>(seastar::now(),
37-
seastar::make_ready_future<crimson::osd::acked_peers_t>());
36+
return make_ready_future<rep_op_ret_t>(seastar::now(), seastar::now());
3837
}

src/crimson/osd/osd_operations/client_request.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,8 +107,7 @@ class ClientRequest final : public PhasedOperationT<ClientRequest>,
107107
CommonOBCPipeline::Process::BlockingEvent,
108108
scrub::PGScrubber::BlockingEvent,
109109
CommonOBCPipeline::WaitRepop::BlockingEvent,
110-
CommonOBCPipeline::SendReply::BlockingEvent,
111-
CompletionEvent
110+
CommonOBCPipeline::SendReply::BlockingEvent
112111
> pg_tracking_events;
113112

114113
template <class BlockingEventT>

src/crimson/osd/pg.cc

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -890,9 +890,7 @@ void PG::enqueue_delete_for_backfill(
890890
backfill_state->enqueue_standalone_delete(obj, v, peers);
891891
}
892892

893-
PG::interruptible_future<
894-
std::tuple<PG::interruptible_future<>,
895-
PG::interruptible_future<>>>
893+
PG::interruptible_future<PG::rep_op_fut_t>
896894
PG::submit_transaction(
897895
ObjectContextRef&& obc,
898896
ObjectContextRef&& new_clone,
@@ -932,16 +930,7 @@ PG::submit_transaction(
932930
std::move(log_entries));
933931
co_return std::make_tuple(
934932
std::move(submitted),
935-
all_completed.then_interruptible(
936-
[this, last_complete=peering_state.get_info().last_complete, at_version]
937-
(auto acked) {
938-
for (const auto& peer : acked) {
939-
peering_state.update_peer_last_complete_ondisk(
940-
peer.shard, peer.last_complete_ondisk);
941-
}
942-
peering_state.complete_write(at_version, last_complete);
943-
return seastar::now();
944-
})
933+
std::move(all_completed)
945934
);
946935
}
947936

src/crimson/osd/pg.h

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -560,6 +560,20 @@ class PG : public boost::intrusive_ref_counter<
560560
return peering_state.get_primary();
561561
}
562562

563+
eversion_t get_last_complete() const {
564+
return peering_state.get_info().last_complete;
565+
}
566+
567+
void complete_write(eversion_t v, eversion_t lc) {
568+
peering_state.complete_write(v, lc);
569+
}
570+
571+
void update_peer_last_complete_ondisk(
572+
pg_shard_t fromosd,
573+
eversion_t lcod) {
574+
peering_state.update_peer_last_complete_ondisk(fromosd, lcod);
575+
}
576+
563577
/// initialize created PG
564578
seastar::future<> init(
565579
int role,
@@ -686,8 +700,9 @@ class PG : public boost::intrusive_ref_counter<
686700
interruptible_future<MURef<MOSDOpReply>> do_pg_ops(Ref<MOSDOp> m);
687701

688702
public:
689-
interruptible_future<
690-
std::tuple<interruptible_future<>, interruptible_future<>>>
703+
using rep_op_fut_t = std::tuple<interruptible_future<>,
704+
interruptible_future<>>;
705+
interruptible_future<rep_op_fut_t>
691706
submit_transaction(
692707
ObjectContextRef&& obc,
693708
ObjectContextRef&& new_clone,

src/crimson/osd/pg_backend.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ class PGBackend
6262
::crimson::osd::IOInterruptCondition, T>;
6363
using rep_op_ret_t =
6464
std::tuple<interruptible_future<>,
65-
interruptible_future<crimson::osd::acked_peers_t>>;
65+
interruptible_future<>>;
6666
using rep_op_fut_t = interruptible_future<rep_op_ret_t>;
6767
PGBackend(shard_id_t shard, CollectionRef coll,
6868
crimson::osd::ShardServices &shard_services,

src/crimson/osd/replicated_backend.cc

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,11 @@ ReplicatedBackend::submit_transaction(
9292

9393
const ceph_tid_t tid = shard_services.get_tid();
9494
auto pending_txn =
95-
pending_trans.try_emplace(tid, pg_shards.size(), osd_op_p.at_version).first;
95+
pending_trans.try_emplace(
96+
tid,
97+
pg_shards.size(),
98+
osd_op_p.at_version,
99+
pg.get_last_complete()).first;
96100
bufferlist encoded_txn;
97101
encode(txn, encoded_txn);
98102

@@ -162,10 +166,13 @@ ReplicatedBackend::submit_transaction(
162166
assert(0 == "impossible");
163167
}
164168
if (--peers->pending == 0) {
169+
// no peers other than me, replication size is 1
170+
pg.complete_write(peers->at_version, peers->last_complete);
165171
peers->all_committed.set_value();
166172
peers->all_committed = {};
167173
return seastar::now();
168174
}
175+
// wait for all peers to ack (ReplicatedBackend::got_rep_op_reply)
169176
return peers->all_committed.get_shared_future();
170177
}).then_interruptible([pending_txn, this, _new_clone, &hoid,
171178
to_push_delete=std::move(to_push_delete),
@@ -181,8 +188,7 @@ ReplicatedBackend::submit_transaction(
181188
if (!to_push_delete.empty()) {
182189
pg.enqueue_delete_for_backfill(hoid, {}, to_push_delete);
183190
}
184-
return seastar::make_ready_future<
185-
crimson::osd::acked_peers_t>(std::move(acked_peers));
191+
return seastar::now();
186192
});
187193

188194
auto sends_complete = seastar::when_all_succeed(
@@ -212,7 +218,10 @@ void ReplicatedBackend::got_rep_op_reply(const MOSDRepOpReply& reply)
212218
for (auto& peer : peers.acked_peers) {
213219
if (peer.shard == reply.from) {
214220
peer.last_complete_ondisk = reply.get_last_complete_ondisk();
221+
pg.update_peer_last_complete_ondisk(
222+
peer.shard, peer.last_complete_ondisk);
215223
if (--peers.pending == 0) {
224+
pg.complete_write(peers.at_version, peers.last_complete);
216225
peers.all_committed.set_value();
217226
peers.all_committed = {};
218227
}

src/crimson/osd/replicated_backend.h

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,13 @@ class ReplicatedBackend : public PGBackend
4444
const pg_shard_t whoami;
4545
class pending_on_t : public seastar::weakly_referencable<pending_on_t> {
4646
public:
47-
pending_on_t(size_t pending, const eversion_t& at_version)
48-
: pending{static_cast<unsigned>(pending)}, at_version(at_version)
47+
pending_on_t(
48+
size_t pending,
49+
const eversion_t& at_version,
50+
const eversion_t& last_complete)
51+
: pending{static_cast<unsigned>(pending)},
52+
at_version(at_version),
53+
last_complete(last_complete)
4954
{}
5055
unsigned pending;
5156
// The order of pending_txns' at_version must be the same as their
@@ -54,6 +59,7 @@ class ReplicatedBackend : public PGBackend
5459
// way, client requests at_version must be updated synchorously/simultaneously
5560
// with ceph_tid_t.
5661
const eversion_t at_version;
62+
const eversion_t last_complete;
5763
crimson::osd::acked_peers_t acked_peers;
5864
seastar::shared_promise<> all_committed;
5965
};

0 commit comments

Comments
 (0)