88#include " crimson/common/exception.h"
99#include " crimson/common/log.h"
1010#include " crimson/os/futurized_store.h"
11+ #include " crimson/osd/pg.h"
1112#include " crimson/osd/shard_services.h"
1213#include " osd/PeeringState.h"
1314
1415SET_SUBSYS (osd);
1516
1617ReplicatedBackend::ReplicatedBackend (pg_t pgid,
1718 pg_shard_t whoami,
19+ crimson::osd::PG& pg,
1820 ReplicatedBackend::CollectionRef coll,
1921 crimson::osd::ShardServices& shard_services,
2022 DoutPrefixProvider &dpp)
2123 : PGBackend{whoami.shard , coll, shard_services, dpp},
2224 pgid{pgid},
23- whoami{whoami}
25+ whoami{whoami},
26+ pg (pg)
2427{}
2528
2629ReplicatedBackend::ll_read_ierrorator::future<ceph::bufferlist>
@@ -41,36 +44,14 @@ ReplicatedBackend::_submit_transaction(std::set<pg_shard_t>&& pg_shards,
4144 std::vector<pg_log_entry_t >&& log_entries)
4245{
4346 LOG_PREFIX (ReplicatedBackend::_submit_transaction);
47+ DEBUGDPP (" object {}, {}" , dpp, hoid);
4448
4549 const ceph_tid_t tid = shard_services.get_tid ();
4650 auto pending_txn =
4751 pending_trans.try_emplace (tid, pg_shards.size (), osd_op_p.at_version ).first ;
4852 bufferlist encoded_txn;
4953 encode (txn, encoded_txn);
5054
51- DEBUGDPP (" object {}" , dpp, hoid);
52- auto all_completed = interruptor::make_interruptible (
53- shard_services.get_store ().do_transaction (coll, std::move (txn))
54- ).then_interruptible ([FNAME, this ,
55- peers=pending_txn->second .weak_from_this ()] {
56- if (!peers) {
57- // for now, only actingset_changed can cause peers
58- // to be nullptr
59- ERRORDPP (" peers is null, this should be impossible" , dpp);
60- assert (0 == " impossible" );
61- }
62- if (--peers->pending == 0 ) {
63- peers->all_committed .set_value ();
64- peers->all_committed = {};
65- return seastar::now ();
66- }
67- return peers->all_committed .get_shared_future ();
68- }).then_interruptible ([pending_txn, this ] {
69- auto acked_peers = std::move (pending_txn->second .acked_peers );
70- pending_trans.erase (pending_txn);
71- return seastar::make_ready_future<crimson::osd::acked_peers_t >(std::move (acked_peers));
72- });
73-
7455 auto sends = std::make_unique<std::vector<seastar::future<>>>();
7556 for (auto pg_shard : pg_shards) {
7657 if (pg_shard != whoami) {
@@ -91,9 +72,43 @@ ReplicatedBackend::_submit_transaction(std::set<pg_shard_t>&& pg_shards,
9172 m->min_last_complete_ondisk = osd_op_p.min_last_complete_ondisk ;
9273 m->set_rollback_to (osd_op_p.at_version );
9374 // TODO: set more stuff. e.g., pg_states
94- sends->emplace_back (shard_services.send_to_osd (pg_shard.osd , std::move (m), map_epoch));
75+ sends->emplace_back (
76+ shard_services.send_to_osd (
77+ pg_shard.osd , std::move (m), map_epoch));
9578 }
9679 }
80+
81+ pg.log_operation (
82+ std::move (log_entries),
83+ osd_op_p.pg_trim_to ,
84+ osd_op_p.at_version ,
85+ osd_op_p.min_last_complete_ondisk ,
86+ true ,
87+ txn,
88+ false );
89+
90+ auto all_completed = interruptor::make_interruptible (
91+ shard_services.get_store ().do_transaction (coll, std::move (txn))
92+ ).then_interruptible ([FNAME, this ,
93+ peers=pending_txn->second .weak_from_this ()] {
94+ if (!peers) {
95+ // for now, only actingset_changed can cause peers
96+ // to be nullptr
97+ ERRORDPP (" peers is null, this should be impossible" , dpp);
98+ assert (0 == " impossible" );
99+ }
100+ if (--peers->pending == 0 ) {
101+ peers->all_committed .set_value ();
102+ peers->all_committed = {};
103+ return seastar::now ();
104+ }
105+ return peers->all_committed .get_shared_future ();
106+ }).then_interruptible ([pending_txn, this ] {
107+ auto acked_peers = std::move (pending_txn->second .acked_peers );
108+ pending_trans.erase (pending_txn);
109+ return seastar::make_ready_future<crimson::osd::acked_peers_t >(std::move (acked_peers));
110+ });
111+
97112 auto sends_complete = seastar::when_all_succeed (
98113 sends->begin (), sends->end ()
99114 ).finally ([sends=std::move (sends)] {});
0 commit comments