Skip to content

Commit 7d29ca7

Browse files
authored
Merge pull request ceph#61086 from athanatos/sjust/wip-rep-pipeline
crimson: allow replica side write commits to pipeline Reviewed-by: Xuehan Xu <[email protected]>
2 parents 3ec1e93 + 68612d1 commit 7d29ca7

File tree

6 files changed

+96
-30
lines changed

6 files changed

+96
-30
lines changed

src/crimson/osd/osd_operation.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,12 @@ struct PGRepopPipeline {
6262
struct Process : OrderedExclusivePhaseT<Process> {
6363
static constexpr auto type_name = "PGRepopPipeline::process";
6464
} process;
65+
struct WaitCommit : OrderedConcurrentPhaseT<WaitCommit> {
66+
static constexpr auto type_name = "PGRepopPipeline::wait_repop";
67+
} wait_commit;
68+
struct SendReply : OrderedExclusivePhaseT<SendReply> {
69+
static constexpr auto type_name = "PGRepopPipeline::send_reply";
70+
} send_reply;
6571
};
6672

6773
struct CommonOBCPipeline {

src/crimson/osd/osd_operation_external_tracking.h

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,10 @@ struct LttngBackend
3838
CommonOBCPipeline::WaitRepop::BlockingEvent::Backend,
3939
CommonOBCPipeline::WaitRepop::BlockingEvent::ExitBarrierEvent::Backend,
4040
CommonOBCPipeline::SendReply::BlockingEvent::Backend,
41-
PGRepopPipeline::Process::BlockingEvent::Backend
41+
PGRepopPipeline::Process::BlockingEvent::Backend,
42+
PGRepopPipeline::WaitCommit::BlockingEvent::Backend,
43+
PGRepopPipeline::WaitCommit::BlockingEvent::ExitBarrierEvent::Backend,
44+
PGRepopPipeline::SendReply::BlockingEvent::Backend
4245
{
4346
void handle(ClientRequest::StartEvent&,
4447
const Operation&) override {}
@@ -126,6 +129,20 @@ struct LttngBackend
126129
const PGRepopPipeline::Process& blocker) override {
127130
}
128131

132+
void handle(PGRepopPipeline::WaitCommit::BlockingEvent& ev,
133+
const Operation& op,
134+
const PGRepopPipeline::WaitCommit& blocker) override {
135+
}
136+
137+
void handle(PGRepopPipeline::WaitCommit::BlockingEvent::ExitBarrierEvent& ev,
138+
const Operation& op) override {
139+
}
140+
141+
void handle(PGRepopPipeline::SendReply::BlockingEvent& ev,
142+
const Operation& op,
143+
const PGRepopPipeline::SendReply& blocker) override {
144+
}
145+
129146
void handle(ClientRequest::CompletionEvent&,
130147
const Operation&) override {}
131148

@@ -150,7 +167,10 @@ struct HistoricBackend
150167
CommonOBCPipeline::WaitRepop::BlockingEvent::Backend,
151168
CommonOBCPipeline::WaitRepop::BlockingEvent::ExitBarrierEvent::Backend,
152169
CommonOBCPipeline::SendReply::BlockingEvent::Backend,
153-
PGRepopPipeline::Process::BlockingEvent::Backend
170+
PGRepopPipeline::Process::BlockingEvent::Backend,
171+
PGRepopPipeline::WaitCommit::BlockingEvent::Backend,
172+
PGRepopPipeline::WaitCommit::BlockingEvent::ExitBarrierEvent::Backend,
173+
PGRepopPipeline::SendReply::BlockingEvent::Backend
154174
{
155175
void handle(ClientRequest::StartEvent&,
156176
const Operation&) override {}
@@ -246,6 +266,21 @@ struct HistoricBackend
246266
const PGRepopPipeline::Process& blocker) override {
247267
}
248268

269+
void handle(PGRepopPipeline::WaitCommit::BlockingEvent& ev,
270+
const Operation& op,
271+
const PGRepopPipeline::WaitCommit& blocker) override {
272+
}
273+
274+
void handle(PGRepopPipeline::WaitCommit::BlockingEvent::ExitBarrierEvent& ev,
275+
const Operation& op) override {
276+
}
277+
278+
void handle(PGRepopPipeline::SendReply::BlockingEvent& ev,
279+
const Operation& op,
280+
const PGRepopPipeline::SendReply& blocker) override {
281+
}
282+
283+
249284
void handle(ClientRequest::CompletionEvent&, const Operation& op) override {
250285
if (crimson::common::local_conf()->osd_op_history_size) {
251286
to_client_request(op).put_historic();

src/crimson/osd/osd_operations/replicated_request.cc

Lines changed: 37 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
#include "common/Formatter.h"
77

8+
#include "crimson/common/coroutine.h"
89
#include "crimson/osd/osd.h"
910
#include "crimson/osd/osd_connection_priv.h"
1011
#include "crimson/osd/osd_operation_external_tracking.h"
@@ -63,34 +64,52 @@ PGRepopPipeline &RepRequest::repop_pipeline(PG &pg)
6364
return pg.repop_pipeline;
6465
}
6566

67+
RepRequest::interruptible_future<> RepRequest::with_pg_interruptible(
68+
Ref<PG> pg)
69+
{
70+
LOG_PREFIX(RepRequest::with_pg_interruptible);
71+
DEBUGI("{}", *this);
72+
co_await this->template enter_stage<interruptor>(repop_pipeline(*pg).process);
73+
co_await interruptor::make_interruptible(this->template with_blocking_event<
74+
PG_OSDMapGate::OSDMapBlocker::BlockingEvent
75+
>([this, pg](auto &&trigger) {
76+
return pg->osdmap_gate.wait_for_map(
77+
std::move(trigger), req->min_epoch);
78+
}));
79+
80+
if (pg->can_discard_replica_op(*req)) {
81+
co_return;
82+
}
83+
84+
auto [commit_fut, reply] = co_await pg->handle_rep_op(req);
85+
86+
// Transitions from OrderedExclusive->OrderedConcurrent cannot block
87+
this->template enter_stage_sync(repop_pipeline(*pg).wait_commit);
88+
89+
co_await std::move(commit_fut);
90+
91+
co_await this->template enter_stage<interruptor>(
92+
repop_pipeline(*pg).send_reply);
93+
94+
co_await interruptor::make_interruptible(
95+
pg->shard_services.send_to_osd(
96+
req->from.osd, std::move(reply), pg->get_osdmap_epoch())
97+
);
98+
}
99+
66100
seastar::future<> RepRequest::with_pg(
67101
ShardServices &shard_services, Ref<PG> pg)
68102
{
69103
LOG_PREFIX(RepRequest::with_pg);
70-
DEBUGI("{}: RepRequest::with_pg", *this);
104+
DEBUGI("{}", *this);
71105
IRef ref = this;
72106
return interruptor::with_interruption([this, pg] {
73-
LOG_PREFIX(RepRequest::with_pg);
74-
DEBUGI("{}: pg present", *this);
75-
return this->template enter_stage<interruptor>(repop_pipeline(*pg).process
76-
).then_interruptible([this, pg] {
77-
return this->template with_blocking_event<
78-
PG_OSDMapGate::OSDMapBlocker::BlockingEvent
79-
>([this, pg](auto &&trigger) {
80-
return pg->osdmap_gate.wait_for_map(
81-
std::move(trigger), req->min_epoch);
82-
});
83-
}).then_interruptible([this, pg] (auto) {
84-
return pg->handle_rep_op(req);
85-
}).then_interruptible([this] {
86-
logger().debug("{}: complete", *this);
87-
return handle.complete();
88-
});
107+
return with_pg_interruptible(pg);
89108
}, [](std::exception_ptr) {
90109
return seastar::now();
91110
}, pg, pg->get_osdmap_epoch()).finally([this, ref=std::move(ref)] {
92111
logger().debug("{}: exit", *this);
93-
handle.exit();
112+
return handle.complete();
94113
});
95114
}
96115

src/crimson/osd/osd_operations/replicated_request.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,9 @@ class RepRequest final : public PhasedOperationT<RepRequest> {
6868
r_conn = make_local_shared_foreign(std::move(conn));
6969
}
7070

71+
interruptible_future<> with_pg_interruptible(
72+
Ref<PG> pg);
73+
7174
seastar::future<> with_pg(
7275
ShardServices &shard_services, Ref<PG> pg);
7376

@@ -78,6 +81,8 @@ class RepRequest final : public PhasedOperationT<RepRequest> {
7881
ConnectionPipeline::GetPGMapping::BlockingEvent,
7982
PerShardPipeline::CreateOrWaitPG::BlockingEvent,
8083
PGRepopPipeline::Process::BlockingEvent,
84+
PGRepopPipeline::WaitCommit::BlockingEvent,
85+
PGRepopPipeline::SendReply::BlockingEvent,
8186
PG_OSDMapGate::OSDMapBlocker::BlockingEvent,
8287
PGMap::PGCreationBlockingEvent,
8388
OSD_OSDMapGate::OSDMapBlocker::BlockingEvent

src/crimson/osd/pg.cc

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1215,13 +1215,10 @@ void PG::update_stats(const pg_stat_t &stat) {
12151215
);
12161216
}
12171217

1218-
PG::interruptible_future<> PG::handle_rep_op(Ref<MOSDRepOp> req)
1218+
PG::handle_rep_op_fut PG::handle_rep_op(Ref<MOSDRepOp> req)
12191219
{
12201220
LOG_PREFIX(PG::handle_rep_op);
12211221
DEBUGDPP("{}", *this, *req);
1222-
if (can_discard_replica_op(*req)) {
1223-
co_return;
1224-
}
12251222

12261223
ceph::os::Transaction txn;
12271224
auto encoded_txn = req->get_data().cbegin();
@@ -1243,7 +1240,8 @@ PG::interruptible_future<> PG::handle_rep_op(Ref<MOSDRepOp> req)
12431240
txn,
12441241
false);
12451242
DEBUGDPP("{} do_transaction", *this, *req);
1246-
co_await interruptor::make_interruptible(
1243+
1244+
auto commit_fut = interruptor::make_interruptible(
12471245
shard_services.get_store().do_transaction(coll_ref, std::move(txn))
12481246
);
12491247

@@ -1254,10 +1252,7 @@ PG::interruptible_future<> PG::handle_rep_op(Ref<MOSDRepOp> req)
12541252
req.get(), pg_whoami, 0,
12551253
map_epoch, req->get_min_epoch(), CEPH_OSD_FLAG_ONDISK);
12561254
reply->set_last_complete_ondisk(lcod);
1257-
co_await interruptor::make_interruptible(
1258-
shard_services.send_to_osd(req->from.osd, std::move(reply), map_epoch)
1259-
);
1260-
co_return;
1255+
co_return handle_rep_op_ret(std::move(commit_fut), std::move(reply));
12611256
}
12621257

12631258
PG::interruptible_future<> PG::update_snap_map(

src/crimson/osd/pg.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -596,7 +596,13 @@ class PG : public boost::intrusive_ref_counter<
596596
using with_obc_func_t =
597597
std::function<load_obc_iertr::future<> (ObjectContextRef, ObjectContextRef)>;
598598

599-
interruptible_future<> handle_rep_op(Ref<MOSDRepOp> m);
599+
using handle_rep_op_ret = std::tuple<
600+
interruptible_future<>, // resolves upon commit
601+
MURef<MOSDRepOpReply> // reply message
602+
>;
603+
// outer future resolves upon submission
604+
using handle_rep_op_fut = interruptible_future<handle_rep_op_ret>;
605+
handle_rep_op_fut handle_rep_op(Ref<MOSDRepOp> m);
600606
void update_stats(const pg_stat_t &stat);
601607
interruptible_future<> update_snap_map(
602608
const std::vector<pg_log_entry_t> &log_entries,

0 commit comments

Comments
 (0)