Skip to content

Commit 249a57d

Browse files
authored
Merge pull request ceph#45794 from Matan-B/wip-matanb-c-submit_txn
crimson/osd: submit_transaction() refactoring Reviewed-by: Samuel Just <[email protected]>
2 parents 514eb89 + 48bf0be commit 249a57d

File tree

4 files changed

+56
-53
lines changed

4 files changed

+56
-53
lines changed

src/crimson/osd/ops_executer.cc

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -679,6 +679,39 @@ OpsExecuter::do_execute_op(OSDOp& osd_op)
679679
}
680680
}
681681

682+
void OpsExecuter::fill_op_params_bump_pg_version()
683+
{
684+
osd_op_params->req_id = msg->get_reqid();
685+
osd_op_params->mtime = msg->get_mtime();
686+
osd_op_params->at_version = pg->next_version();
687+
osd_op_params->pg_trim_to = pg->get_pg_trim_to();
688+
osd_op_params->min_last_complete_ondisk = pg->get_min_last_complete_ondisk();
689+
osd_op_params->last_complete = pg->get_info().last_complete;
690+
if (user_modify) {
691+
osd_op_params->user_at_version = osd_op_params->at_version.version;
692+
}
693+
}
694+
695+
std::vector<pg_log_entry_t> OpsExecuter::prepare_transaction(
696+
const std::vector<OSDOp>& ops)
697+
{
698+
std::vector<pg_log_entry_t> log_entries;
699+
log_entries.emplace_back(obc->obs.exists ?
700+
pg_log_entry_t::MODIFY : pg_log_entry_t::DELETE,
701+
obc->obs.oi.soid, osd_op_params->at_version, obc->obs.oi.version,
702+
osd_op_params->user_modify ? osd_op_params->at_version.version : 0,
703+
osd_op_params->req_id, osd_op_params->mtime,
704+
op_info.allows_returnvec() && !ops.empty() ? ops.back().rval.code : 0);
705+
if (op_info.allows_returnvec()) {
706+
// also the per-op values are recorded in the pg log
707+
log_entries.back().set_op_returns(ops);
708+
logger().debug("{} op_returns: {}",
709+
__func__, log_entries.back().op_returns);
710+
}
711+
log_entries.back().clean_regions = std::move(osd_op_params->clean_regions);
712+
return log_entries;
713+
}
714+
682715
// Defined here because there is a circular dependency between OpsExecuter and PG
683716
uint32_t OpsExecuter::get_pool_stripe_width() const {
684717
return pg->get_pool().info.get_stripe_width();

src/crimson/osd/ops_executer.h

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,11 @@ class OpsExecuter : public seastar::enable_lw_shared_from_this<OpsExecuter> {
260260
using rep_op_fut_t =
261261
interruptible_future<rep_op_fut_tuple>;
262262
template <typename MutFunc>
263-
rep_op_fut_t flush_changes_n_do_ops_effects(MutFunc&& mut_func) &&;
263+
rep_op_fut_t flush_changes_n_do_ops_effects(const std::vector<OSDOp>& ops,
264+
MutFunc&& mut_func) &&;
265+
std::vector<pg_log_entry_t> prepare_transaction(
266+
const std::vector<OSDOp>& ops);
267+
void fill_op_params_bump_pg_version();
264268

265269
const hobject_t &get_target() const {
266270
return obc->obs.oi.soid;
@@ -326,7 +330,9 @@ auto OpsExecuter::with_effect_on_obc(
326330

327331
template <typename MutFunc>
328332
OpsExecuter::rep_op_fut_t
329-
OpsExecuter::flush_changes_n_do_ops_effects(MutFunc&& mut_func) &&
333+
OpsExecuter::flush_changes_n_do_ops_effects(
334+
const std::vector<OSDOp>& ops,
335+
MutFunc&& mut_func) &&
330336
{
331337
const bool want_mutate = !txn.empty();
332338
// osd_op_params are instantiated by every wr-like operation.
@@ -337,12 +343,12 @@ OpsExecuter::flush_changes_n_do_ops_effects(MutFunc&& mut_func) &&
337343
seastar::now(),
338344
interruptor::make_interruptible(osd_op_errorator::now()));
339345
if (want_mutate) {
340-
osd_op_params->req_id = msg->get_reqid();
341-
osd_op_params->mtime = msg->get_mtime();
346+
fill_op_params_bump_pg_version();
347+
auto log_entries = prepare_transaction(ops);
342348
auto [submitted, all_completed] = std::forward<MutFunc>(mut_func)(std::move(txn),
343349
std::move(obc),
344350
std::move(*osd_op_params),
345-
user_modify);
351+
std::move(log_entries));
346352
maybe_mutated = interruptor::make_ready_future<rep_op_fut_tuple>(
347353
std::move(submitted),
348354
osd_op_ierrorator::future<>(std::move(all_completed)));

src/crimson/osd/pg.cc

Lines changed: 10 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -571,11 +571,10 @@ seastar::future<> PG::WaitForActiveBlocker::stop()
571571
std::tuple<PG::interruptible_future<>,
572572
PG::interruptible_future<>>
573573
PG::submit_transaction(
574-
const OpInfo& op_info,
575-
const std::vector<OSDOp>& ops,
576574
ObjectContextRef&& obc,
577575
ceph::os::Transaction&& txn,
578-
osd_op_params_t&& osd_op_p)
576+
osd_op_params_t&& osd_op_p,
577+
std::vector<pg_log_entry_t>&& log_entries)
579578
{
580579
if (__builtin_expect(stopping, false)) {
581580
return {seastar::make_exception_future<>(
@@ -589,21 +588,6 @@ PG::submit_transaction(
589588
throw crimson::common::actingset_changed(is_primary());
590589
}
591590

592-
std::vector<pg_log_entry_t> log_entries;
593-
log_entries.emplace_back(obc->obs.exists ?
594-
pg_log_entry_t::MODIFY : pg_log_entry_t::DELETE,
595-
obc->obs.oi.soid, osd_op_p.at_version, obc->obs.oi.version,
596-
osd_op_p.user_modify ? osd_op_p.at_version.version : 0,
597-
osd_op_p.req_id, osd_op_p.mtime,
598-
op_info.allows_returnvec() && !ops.empty() ? ops.back().rval.code : 0);
599-
// TODO: refactor the submit_transaction
600-
if (op_info.allows_returnvec()) {
601-
// also the per-op values are recorded in the pg log
602-
log_entries.back().set_op_returns(ops);
603-
logger().debug("{} op_returns: {}",
604-
__func__, log_entries.back().op_returns);
605-
}
606-
log_entries.back().clean_regions = std::move(osd_op_p.clean_regions);
607591
peering_state.pre_submit_op(obc->obs.oi.soid, log_entries, osd_op_p.at_version);
608592
peering_state.append_log_with_trim_to_updated(std::move(log_entries), osd_op_p.at_version,
609593
txn, true, false);
@@ -628,19 +612,6 @@ PG::submit_transaction(
628612
}));
629613
}
630614

631-
void PG::fill_op_params_bump_pg_version(
632-
osd_op_params_t& osd_op_p,
633-
const bool user_modify)
634-
{
635-
osd_op_p.at_version = next_version();
636-
osd_op_p.pg_trim_to = get_pg_trim_to();
637-
osd_op_p.min_last_complete_ondisk = get_min_last_complete_ondisk();
638-
osd_op_p.last_complete = get_info().last_complete;
639-
if (user_modify) {
640-
osd_op_p.user_at_version = osd_op_p.at_version.version;
641-
}
642-
}
643-
644615
PG::interruptible_future<> PG::repair_object(
645616
const hobject_t& oid,
646617
eversion_t& v)
@@ -661,7 +632,6 @@ PG::do_osd_ops_iertr::future<PG::pg_rep_op_fut_t<Ret>>
661632
PG::do_osd_ops_execute(
662633
seastar::lw_shared_ptr<OpsExecuter> ox,
663634
std::vector<OSDOp>& ops,
664-
const OpInfo &op_info,
665635
SuccessFunc&& success_func,
666636
FailureFunc&& failure_func)
667637
{
@@ -677,26 +647,24 @@ PG::do_osd_ops_execute(
677647
ox->get_target(),
678648
ceph_osd_op_name(osd_op.op.op));
679649
return ox->execute_op(osd_op);
680-
}).safe_then_interruptible([this, ox, &op_info, &ops] {
650+
}).safe_then_interruptible([this, ox, &ops] {
681651
logger().debug(
682652
"do_osd_ops_execute: object {} all operations successful",
683653
ox->get_target());
684654
peering_state.apply_op_stats(ox->get_target(), ox->get_stats());
685-
return std::move(*ox).flush_changes_n_do_ops_effects(
686-
[this, &op_info, &ops] (auto&& txn,
687-
auto&& obc,
688-
auto&& osd_op_p,
689-
bool user_modify) {
655+
return std::move(*ox).flush_changes_n_do_ops_effects(ops,
656+
[this] (auto&& txn,
657+
auto&& obc,
658+
auto&& osd_op_p,
659+
auto&& log_entries) {
690660
logger().debug(
691661
"do_osd_ops_execute: object {} submitting txn",
692662
obc->get_oid());
693-
fill_op_params_bump_pg_version(osd_op_p, user_modify);
694663
return submit_transaction(
695-
op_info,
696-
ops,
697664
std::move(obc),
698665
std::move(txn),
699-
std::move(osd_op_p));
666+
std::move(osd_op_p),
667+
std::move(log_entries));
700668
});
701669
}).safe_then_unpack_interruptible(
702670
[success_func=std::move(success_func), rollbacker, this, failure_func_ptr]
@@ -752,7 +720,6 @@ PG::do_osd_ops(
752720
seastar::make_lw_shared<OpsExecuter>(
753721
Ref<PG>{this}, obc, op_info, *m),
754722
m->ops,
755-
op_info,
756723
[this, m, obc, may_write = op_info.may_write(),
757724
may_read = op_info.may_read(), rvec = op_info.allows_returnvec()] {
758725
// TODO: should stop at the first op which returns a negative retval,
@@ -820,7 +787,6 @@ PG::do_osd_ops(
820787
seastar::make_lw_shared<OpsExecuter>(
821788
Ref<PG>{this}, std::move(obc), op_info, msg_params),
822789
ops,
823-
std::as_const(op_info),
824790
std::move(success_func),
825791
std::move(failure_func));
826792
});

src/crimson/osd/pg.h

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -599,17 +599,15 @@ class PG : public boost::intrusive_ref_counter<
599599
do_osd_ops_iertr::future<pg_rep_op_fut_t<Ret>> do_osd_ops_execute(
600600
seastar::lw_shared_ptr<OpsExecuter> ox,
601601
std::vector<OSDOp>& ops,
602-
const OpInfo &op_info,
603602
SuccessFunc&& success_func,
604603
FailureFunc&& failure_func);
605604
interruptible_future<MURef<MOSDOpReply>> do_pg_ops(Ref<MOSDOp> m);
606605
std::tuple<interruptible_future<>, interruptible_future<>>
607606
submit_transaction(
608-
const OpInfo& op_info,
609-
const std::vector<OSDOp>& ops,
610607
ObjectContextRef&& obc,
611608
ceph::os::Transaction&& txn,
612-
osd_op_params_t&& oop);
609+
osd_op_params_t&& oop,
610+
std::vector<pg_log_entry_t>&& log_entries);
613611
interruptible_future<> repair_object(
614612
const hobject_t& oid,
615613
eversion_t& v);

0 commit comments

Comments
 (0)