Skip to content

Commit 5816f0d

Browse files
authored
Merge pull request ceph#63061 from Matan-B/wip-71137-tentacle
tentacle: crimson: osd_operation cleanups and fix for MOSDRepOpReply ordering Reviewed-by: Aishwarya Mathuria <[email protected]>
2 parents 48861ec + d4ce540 commit 5816f0d

28 files changed

+335
-400
lines changed

src/crimson/osd/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@ add_executable(crimson-osd
1919
ops_executer.cc
2020
osd_operation.cc
2121
osd_operations/client_request.cc
22-
osd_operations/client_request_common.cc
2322
osd_operations/internal_client_request.cc
2423
osd_operations/peering_event.cc
2524
osd_operations/pg_advance_map.cc
2625
osd_operations/replicated_request.cc
26+
osd_operations/replicated_request_reply.cc
2727
osd_operations/logmissing_request.cc
2828
osd_operations/logmissing_request_reply.cc
2929
osd_operations/background_recovery.cc

src/crimson/osd/osd.cc

Lines changed: 16 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
#include "crimson/osd/osd_operations/pg_advance_map.h"
5959
#include "crimson/osd/osd_operations/recovery_subrequest.h"
6060
#include "crimson/osd/osd_operations/replicated_request.h"
61+
#include "crimson/osd/osd_operations/replicated_request_reply.h"
6162
#include "crimson/osd/osd_operations/scrub_events.h"
6263
#include "crimson/osd/osd_operation_external_tracking.h"
6364
#include "crimson/crush/CrushLocation.h"
@@ -1401,56 +1402,45 @@ seastar::future<> OSD::handle_update_log_missing(
14011402
crimson::net::ConnectionRef conn,
14021403
Ref<MOSDPGUpdateLogMissing> m)
14031404
{
1404-
return pg_shard_manager.start_pg_operation<LogMissingRequest>(
1405+
return pg_shard_manager.start_pg_operation_active<LogMissingRequest>(
14051406
std::move(conn),
1406-
std::move(m)).second;
1407+
std::move(m));
14071408
}
14081409

14091410
seastar::future<> OSD::handle_update_log_missing_reply(
14101411
crimson::net::ConnectionRef conn,
14111412
Ref<MOSDPGUpdateLogMissingReply> m)
14121413
{
1413-
return pg_shard_manager.start_pg_operation<LogMissingRequestReply>(
1414+
return pg_shard_manager.start_pg_operation_active<LogMissingRequestReply>(
14141415
std::move(conn),
1415-
std::move(m)).second;
1416+
std::move(m));
14161417
}
14171418

14181419
seastar::future<> OSD::handle_pg_pct(
14191420
crimson::net::ConnectionRef conn,
14201421
Ref<MOSDPGPCT> m)
14211422
{
1422-
return pg_shard_manager.start_pg_operation<PGPCTRequest>(
1423+
return pg_shard_manager.start_pg_operation_active<PGPCTRequest>(
14231424
std::move(conn),
1424-
std::move(m)).second;
1425+
std::move(m));
14251426
}
14261427

14271428
seastar::future<> OSD::handle_rep_op(
14281429
crimson::net::ConnectionRef conn,
14291430
Ref<MOSDRepOp> m)
14301431
{
1431-
m->finish_decode();
1432-
return pg_shard_manager.start_pg_operation<RepRequest>(
1432+
return pg_shard_manager.start_pg_operation_active<RepRequest>(
14331433
std::move(conn),
1434-
std::move(m)).second;
1434+
std::move(m));
14351435
}
14361436

14371437
seastar::future<> OSD::handle_rep_op_reply(
14381438
crimson::net::ConnectionRef conn,
14391439
Ref<MOSDRepOpReply> m)
14401440
{
1441-
LOG_PREFIX(OSD::handle_rep_op_reply);
1442-
spg_t pgid = m->get_spg();
1443-
return pg_shard_manager.with_pg(
1444-
pgid,
1445-
[FNAME, m=std::move(m)](auto &&pg) {
1446-
if (pg) {
1447-
m->finish_decode();
1448-
pg->handle_rep_op_reply(*m);
1449-
} else {
1450-
WARN("stale reply: {}", *m);
1451-
}
1452-
return seastar::now();
1453-
});
1441+
return pg_shard_manager.start_pg_operation_active<ReplicatedRequestReply>(
1442+
std::move(conn),
1443+
std::move(m));
14541444
}
14551445

14561446
seastar::future<> OSD::handle_scrub_command(
@@ -1474,9 +1464,9 @@ seastar::future<> OSD::handle_scrub_message(
14741464
crimson::net::ConnectionRef conn,
14751465
Ref<MOSDFastDispatchOp> m)
14761466
{
1477-
return pg_shard_manager.start_pg_operation<
1467+
return pg_shard_manager.start_pg_operation_active<
14781468
crimson::osd::ScrubMessage
1479-
>(m, conn, m->get_min_epoch(), m->get_spg()).second;
1469+
>(m, conn, m->get_min_epoch(), m->get_spg());
14801470
}
14811471

14821472
seastar::future<> OSD::handle_mark_me_down(
@@ -1494,8 +1484,8 @@ seastar::future<> OSD::handle_recovery_subreq(
14941484
crimson::net::ConnectionRef conn,
14951485
Ref<MOSDFastDispatchOp> m)
14961486
{
1497-
return pg_shard_manager.start_pg_operation<RecoverySubRequest>(
1498-
conn, std::move(m)).second;
1487+
return pg_shard_manager.start_pg_operation_active<RecoverySubRequest>(
1488+
conn, std::move(m));
14991489
}
15001490

15011491
vector<DaemonHealthMetric> OSD::get_health_metrics()

src/crimson/osd/osd_operation.h

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#pragma once
55

66
#include "crimson/common/operation.h"
7+
#include "crimson/net/Connection.h"
78
#include "crimson/osd/pg_interval_interrupt_condition.h"
89
#include "crimson/osd/scheduler/scheduler.h"
910
#include "osd/osd_types.h"
@@ -89,6 +90,7 @@ enum class OperationTypeCode {
8990
pg_advance_map,
9091
pg_creation,
9192
replicated_request,
93+
replicated_request_reply,
9294
background_recovery,
9395
background_recovery_sub,
9496
internal_client_request,
@@ -113,6 +115,7 @@ static constexpr const char* const OP_NAMES[] = {
113115
"pg_advance_map",
114116
"pg_creation",
115117
"replicated_request",
118+
"replicated_request_reply",
116119
"background_recovery",
117120
"background_recovery_sub",
118121
"internal_client_request",
@@ -165,6 +168,61 @@ struct OperationT : InterruptibleOperation {
165168
virtual void dump_detail(ceph::Formatter *f) const = 0;
166169
};
167170

171+
class RemoteOperation {
172+
crimson::net::ConnectionRef l_conn;
173+
crimson::net::ConnectionXcoreRef r_conn;
174+
175+
public:
176+
RemoteOperation(crimson::net::ConnectionRef &&conn)
177+
: l_conn(std::move(conn)) {}
178+
179+
crimson::net::Connection &get_local_connection() {
180+
assert(l_conn);
181+
assert(!r_conn);
182+
return *l_conn;
183+
};
184+
185+
crimson::net::Connection &get_foreign_connection() {
186+
assert(r_conn);
187+
assert(!l_conn);
188+
return *r_conn;
189+
};
190+
191+
crimson::net::ConnectionFFRef prepare_remote_submission() {
192+
assert(l_conn);
193+
assert(!r_conn);
194+
auto ret = seastar::make_foreign(std::move(l_conn));
195+
l_conn.reset();
196+
return ret;
197+
}
198+
199+
void finish_remote_submission(crimson::net::ConnectionFFRef conn) {
200+
assert(conn);
201+
assert(!l_conn);
202+
assert(!r_conn);
203+
r_conn = make_local_shared_foreign(std::move(conn));
204+
}
205+
206+
crimson::net::Connection &get_connection() const {
207+
if (l_conn) {
208+
return *l_conn;
209+
} else {
210+
assert(r_conn);
211+
return *r_conn;
212+
}
213+
}
214+
215+
/**
216+
* get_remote_connection
217+
*
218+
* Return a reference to the remote connection to allow caller to
219+
* perform a copy only as needed.
220+
*/
221+
crimson::net::ConnectionXcoreRef &get_remote_connection() {
222+
return r_conn;
223+
}
224+
};
225+
168226
template <class T>
169227
class TrackableOperationT : public OperationT<T> {
170228
T* that() {

src/crimson/osd/osd_operations/client_request.cc

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,8 @@ void ClientRequest::complete_request(PG &pg)
6060
ClientRequest::ClientRequest(
6161
ShardServices &_shard_services, crimson::net::ConnectionRef conn,
6262
Ref<MOSDOp> &&m)
63-
: shard_services(&_shard_services),
64-
l_conn(std::move(conn)),
63+
: RemoteOperation(std::move(conn)),
64+
shard_services(&_shard_services),
6565
m(std::move(m)),
6666
begin_time(std::chrono::steady_clock::now()),
6767
instance_handle(new instance_handle_t)
@@ -321,7 +321,7 @@ ClientRequest::recover_missing_snaps(
321321
co_await std::move(resolve_oids);
322322

323323
for (auto &oid : ret) {
324-
auto unfound = co_await do_recover_missing(pg, oid, m->get_reqid());
324+
auto unfound = co_await pg->do_recover_missing(oid, m->get_reqid());
325325
if (unfound) {
326326
DEBUGDPP("{} unfound, hang it for now", *pg, oid);
327327
co_await interruptor::make_interruptible(
@@ -347,8 +347,8 @@ ClientRequest::process_op(
347347
"Skipping recover_missings on non primary pg for soid {}",
348348
*pg, m->get_hobj());
349349
} else {
350-
auto unfound = co_await do_recover_missing(
351-
pg, m->get_hobj().get_head(), m->get_reqid());
350+
auto unfound = co_await pg->do_recover_missing(
351+
m->get_hobj().get_head(), m->get_reqid());
352352
if (unfound) {
353353
DEBUGDPP("{} unfound, hang it for now", *pg, m->get_hobj().get_head());
354354
co_await interruptor::make_interruptible(
@@ -486,7 +486,7 @@ ClientRequest::do_process(
486486
co_return;
487487
}
488488

489-
OpsExecuter ox(pg, obc, op_info, *m, r_conn, snapc);
489+
OpsExecuter ox(pg, obc, op_info, *m, get_remote_connection(), snapc);
490490
auto ret = co_await pg->run_executer(
491491
ox, obc, op_info, m->ops
492492
).si_then([]() -> std::optional<std::error_code> {

src/crimson/osd/osd_operations/client_request.h

Lines changed: 4 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
#include "crimson/osd/object_context_loader.h"
1616
#include "crimson/osd/osdmap_gate.h"
1717
#include "crimson/osd/osd_operation.h"
18-
#include "crimson/osd/osd_operations/client_request_common.h"
1918
#include "crimson/osd/pg_activation_blocker.h"
2019
#include "crimson/osd/pg_map.h"
2120
#include "crimson/osd/scrub/pg_scrubber.h"
@@ -28,14 +27,13 @@ class PG;
2827
class OSD;
2928
class ShardServices;
3029

31-
class ClientRequest final : public PhasedOperationT<ClientRequest>,
32-
private CommonClientRequest {
30+
class ClientRequest final
31+
: public PhasedOperationT<ClientRequest>,
32+
public RemoteOperation
33+
{
3334
// Initially set to primary core, updated to pg core after with_pg()
3435
ShardServices *shard_services = nullptr;
3536

36-
crimson::net::ConnectionRef l_conn;
37-
crimson::net::ConnectionXcoreRef r_conn;
38-
3937
// must be after conn due to ConnectionPipeline's life-time
4038
Ref<MOSDOp> m;
4139
OpInfo op_info;
@@ -56,9 +54,6 @@ class ClientRequest final : public PhasedOperationT<ClientRequest>,
5654
static_assert(std::is_same_v<T, MOSDOp>);
5755
return m.get();
5856
}
59-
const crimson::net::ConnectionRef &get_connection() const {
60-
return l_conn;
61-
}
6257

6358
/**
6459
* instance_handle_t
@@ -233,33 +228,6 @@ class ClientRequest final : public PhasedOperationT<ClientRequest>,
233228

234229
PerShardPipeline &get_pershard_pipeline(ShardServices &);
235230

236-
crimson::net::Connection &get_local_connection() {
237-
assert(l_conn);
238-
assert(!r_conn);
239-
return *l_conn;
240-
};
241-
242-
crimson::net::Connection &get_foreign_connection() {
243-
assert(r_conn);
244-
assert(!l_conn);
245-
return *r_conn;
246-
};
247-
248-
crimson::net::ConnectionFFRef prepare_remote_submission() {
249-
assert(l_conn);
250-
assert(!r_conn);
251-
auto ret = seastar::make_foreign(std::move(l_conn));
252-
l_conn.reset();
253-
return ret;
254-
}
255-
256-
void finish_remote_submission(crimson::net::ConnectionFFRef conn) {
257-
assert(conn);
258-
assert(!l_conn);
259-
assert(!r_conn);
260-
r_conn = make_local_shared_foreign(std::move(conn));
261-
}
262-
263231
interruptible_future<> with_pg_process_interruptible(
264232
Ref<PG> pgref, const unsigned instance_id, instance_handle_t &ihref);
265233

src/crimson/osd/osd_operations/client_request_common.cc

Lines changed: 0 additions & 74 deletions
This file was deleted.

0 commit comments

Comments
 (0)