@@ -36,6 +36,42 @@ ReplicatedBackend::_read(const hobject_t& hoid,
3636 return store->read (coll, ghobject_t {hoid}, off, len, flags);
3737}
3838
39+ MURef<MOSDRepOp> ReplicatedBackend::new_repop_msg (
40+ const pg_shard_t &pg_shard,
41+ const hobject_t &hoid,
42+ const bufferlist &encoded_txn,
43+ const osd_op_params_t &osd_op_p,
44+ epoch_t min_epoch,
45+ epoch_t map_epoch,
46+ const std::vector<pg_log_entry_t > &log_entries,
47+ ceph_tid_t tid)
48+ {
49+ ceph_assert (pg_shard != whoami);
50+ auto m = crimson::make_message<MOSDRepOp>(
51+ osd_op_p.req_id ,
52+ whoami,
53+ spg_t {pgid, pg_shard.shard },
54+ hoid,
55+ CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK,
56+ map_epoch,
57+ min_epoch,
58+ tid,
59+ osd_op_p.at_version );
60+ if (pg.should_send_op (pg_shard, hoid)) {
61+ m->set_data (encoded_txn);
62+ } else {
63+ ceph::os::Transaction t;
64+ bufferlist bl;
65+ encode (t, bl);
66+ m->set_data (bl);
67+ }
68+ encode (log_entries, m->logbl );
69+ m->pg_trim_to = osd_op_p.pg_trim_to ;
70+ m->pg_committed_to = osd_op_p.pg_committed_to ;
71+ m->pg_stats = pg.get_info ().stats ;
72+ return m;
73+ }
74+
3975ReplicatedBackend::rep_op_fut_t
4076ReplicatedBackend::submit_transaction (const std::set<pg_shard_t >& pg_shards,
4177 const hobject_t & hoid,
@@ -63,29 +99,10 @@ ReplicatedBackend::submit_transaction(const std::set<pg_shard_t>& pg_shards,
6399 auto sends = std::make_unique<std::vector<seastar::future<>>>();
64100 for (auto pg_shard : pg_shards) {
65101 if (pg_shard != whoami) {
66- auto m = crimson::make_message<MOSDRepOp>(
67- osd_op_p.req_id ,
68- whoami,
69- spg_t {pgid, pg_shard.shard },
70- hoid,
71- CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK,
72- map_epoch,
73- min_epoch,
74- tid,
75- osd_op_p.at_version );
76- if (pg.should_send_op (pg_shard, hoid)) {
77- m->set_data (encoded_txn);
78- } else {
79- ceph::os::Transaction t;
80- bufferlist bl;
81- encode (t, bl);
82- m->set_data (bl);
83- }
102+ auto m = new_repop_msg (
103+ pg_shard, hoid, encoded_txn, osd_op_p,
104+ min_epoch, map_epoch, log_entries, tid);
84105 pending_txn->second .acked_peers .push_back ({pg_shard, eversion_t {}});
85- encode (log_entries, m->logbl );
86- m->pg_trim_to = osd_op_p.pg_trim_to ;
87- m->pg_committed_to = osd_op_p.pg_committed_to ;
88- m->pg_stats = pg.get_info ().stats ;
89106 // TODO: set more stuff. e.g., pg_states
90107 sends->emplace_back (
91108 shard_services.send_to_osd (
0 commit comments