Skip to content

Commit cd82bfc

Browse files
authored
Merge pull request ceph#62619 from athanatos/sjust/wip-replica-read-crimson-mosdpct
crimson: add MOSDPGPCT support Reviewed-by: Matan Breizman <[email protected]>
2 parents 60e7cb1 + 666cafc commit cd82bfc

File tree

12 files changed

+334
-3
lines changed

12 files changed

+334
-3
lines changed

src/crimson/osd/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ add_executable(crimson-osd
2828
osd_operations/logmissing_request_reply.cc
2929
osd_operations/background_recovery.cc
3030
osd_operations/recovery_subrequest.cc
31+
osd_operations/pgpct_request.cc
3132
osd_operations/snaptrim_event.cc
3233
osd_operations/scrub_events.cc
3334
pg_recovery.cc

src/crimson/osd/osd.cc

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
#include "messages/MOSDPGUpdateLogMissingReply.h"
2929
#include "messages/MOSDRepOpReply.h"
3030
#include "messages/MOSDScrub2.h"
31+
#include "messages/MOSDPGPCT.h"
3132
#include "messages/MPGStats.h"
3233

3334
#include "os/Transaction.h"
@@ -53,6 +54,7 @@
5354
#include "crimson/osd/pg_meta.h"
5455
#include "crimson/osd/osd_operations/client_request.h"
5556
#include "crimson/osd/osd_operations/peering_event.h"
57+
#include "crimson/osd/osd_operations/pgpct_request.h"
5658
#include "crimson/osd/osd_operations/pg_advance_map.h"
5759
#include "crimson/osd/osd_operations/recovery_subrequest.h"
5860
#include "crimson/osd/osd_operations/replicated_request.h"
@@ -994,6 +996,9 @@ OSD::do_ms_dispatch(
994996
case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY:
995997
return handle_update_log_missing_reply(conn, boost::static_pointer_cast<
996998
MOSDPGUpdateLogMissingReply>(m));
999+
case MSG_OSD_PG_PCT:
1000+
return handle_pg_pct(conn, boost::static_pointer_cast<
1001+
MOSDPGPCT>(m));
9971002
default:
9981003
return std::nullopt;
9991004
}
@@ -1413,6 +1418,15 @@ seastar::future<> OSD::handle_update_log_missing_reply(
14131418
std::move(m)).second;
14141419
}
14151420

1421+
seastar::future<> OSD::handle_pg_pct(
1422+
crimson::net::ConnectionRef conn,
1423+
Ref<MOSDPGPCT> m)
1424+
{
1425+
return pg_shard_manager.start_pg_operation<PGPCTRequest>(
1426+
std::move(conn),
1427+
std::move(m)).second;
1428+
}
1429+
14161430
seastar::future<> OSD::handle_rep_op(
14171431
crimson::net::ConnectionRef conn,
14181432
Ref<MOSDRepOp> m)

src/crimson/osd/osd.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434

3535
class MCommand;
3636
class MOSDMap;
37+
class MOSDPGPCT;
3738
class MOSDRepOpReply;
3839
class MOSDRepOp;
3940
class MOSDScrub2;
@@ -233,6 +234,9 @@ class OSD final : public crimson::net::Dispatcher,
233234
seastar::future<> handle_update_log_missing_reply(
234235
crimson::net::ConnectionRef conn,
235236
Ref<MOSDPGUpdateLogMissingReply> m);
237+
seastar::future<> handle_pg_pct(
238+
crimson::net::ConnectionRef conn,
239+
Ref<MOSDPGPCT> m);
236240

237241
std::vector<DaemonHealthMetric> get_health_metrics();
238242

src/crimson/osd/osd_operation.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ enum class OperationTypeCode {
103103
scrub_find_range,
104104
scrub_reserve_range,
105105
scrub_scan,
106+
pgpct_request,
106107
last_op
107108
};
108109

@@ -126,6 +127,7 @@ static constexpr const char* const OP_NAMES[] = {
126127
"scrub_find_range",
127128
"scrub_reserve_range",
128129
"scrub_scan",
130+
"pgpct_request",
129131
};
130132

131133
// prevent the addition of OperationTypeCode-s with no matching OP_NAMES entry:

src/crimson/osd/osd_operation_external_tracking.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#include "crimson/osd/osd_operations/background_recovery.h"
99
#include "crimson/osd/osd_operations/client_request.h"
1010
#include "crimson/osd/osd_operations/peering_event.h"
11+
#include "crimson/osd/osd_operations/pgpct_request.h"
1112
#include "crimson/osd/osd_operations/pg_advance_map.h"
1213
#include "crimson/osd/osd_operations/recovery_subrequest.h"
1314
#include "crimson/osd/osd_operations/replicated_request.h"
@@ -335,6 +336,14 @@ struct EventBackendRegistry<osd::LogMissingRequestReply> {
335336
}
336337
};
337338

339+
template <>
340+
struct EventBackendRegistry<osd::PGPCTRequest> {
341+
static std::tuple<> get_backends() {
342+
return {/* no extenral backends */};
343+
}
344+
};
345+
346+
338347
template <>
339348
struct EventBackendRegistry<osd::RecoverySubRequest> {
340349
static std::tuple<> get_backends() {
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2+
// vim: ts=8 sw=2 smarttab
3+
4+
#include "logmissing_request.h"
5+
6+
#include "common/Formatter.h"
7+
8+
#include "crimson/osd/osd.h"
9+
#include "crimson/osd/osd_connection_priv.h"
10+
#include "crimson/osd/osd_operation_external_tracking.h"
11+
#include "crimson/osd/pg.h"
12+
#include "crimson/osd/replicated_backend.h"
13+
14+
SET_SUBSYS(osd);
15+
16+
namespace crimson::osd {
17+
18+
PGPCTRequest::PGPCTRequest(crimson::net::ConnectionRef&& conn,
19+
Ref<MOSDPGPCT> &&req)
20+
: l_conn{std::move(conn)},
21+
req{std::move(req)}
22+
{}
23+
24+
void PGPCTRequest::print(std::ostream& os) const
25+
{
26+
os << "PGPCTRequest("
27+
<< " req=" << *req
28+
<< ")";
29+
}
30+
31+
void PGPCTRequest::dump_detail(Formatter *f) const
32+
{
33+
f->open_object_section("PGPCTRequest");
34+
f->dump_stream("pgid") << req->get_spg();
35+
f->dump_unsigned("map_epoch", req->get_map_epoch());
36+
f->dump_unsigned("min_epoch", req->get_min_epoch());
37+
f->dump_stream("pg_committed_to") << req->pg_committed_to;
38+
f->close_section();
39+
}
40+
41+
ConnectionPipeline &PGPCTRequest::get_connection_pipeline()
42+
{
43+
return get_osd_priv(
44+
&get_local_connection()
45+
).client_request_conn_pipeline;
46+
}
47+
48+
PerShardPipeline &PGPCTRequest::get_pershard_pipeline(
49+
ShardServices &shard_services)
50+
{
51+
return shard_services.get_replicated_request_pipeline();
52+
}
53+
54+
PGPCTRequest::interruptible_future<> PGPCTRequest::with_pg_interruptible(
55+
PG &pg)
56+
{
57+
LOG_PREFIX(PGPCTRequest::with_pg_interruptible);
58+
DEBUGDPP("{}", pg, *this);
59+
co_await this->template enter_stage<interruptor>(pg.repop_pipeline.process);
60+
61+
{
62+
auto fut = this->template with_blocking_event<
63+
PG_OSDMapGate::OSDMapBlocker::BlockingEvent
64+
>([this, &pg](auto &&trigger) {
65+
return pg.osdmap_gate.wait_for_map(
66+
std::move(trigger), req->min_epoch);
67+
});
68+
co_await interruptor::make_interruptible(std::move(fut));
69+
}
70+
71+
// This *must* be a replicated backend, ec doesn't have pct messages
72+
static_cast<ReplicatedBackend&>(*(pg.backend)).do_pct(*req);
73+
}
74+
75+
seastar::future<> PGPCTRequest::with_pg(
76+
ShardServices &shard_services, Ref<PG> pgref)
77+
{
78+
LOG_PREFIX(PGPCTRequest::with_pg);
79+
DEBUGDPP("{}", *pgref, *this);
80+
81+
PG &pg = *pgref;
82+
IRef ref = this;
83+
return interruptor::with_interruption([this, &pg] {
84+
return with_pg_interruptible(pg);
85+
}, [](std::exception_ptr) {
86+
return seastar::now();
87+
}, pgref, pgref->get_osdmap_epoch()).finally(
88+
[FNAME, this, ref=std::move(ref), pgref=std::move(pgref)]() mutable {
89+
DEBUGDPP("exit", *pgref, *this);
90+
return handle.complete(
91+
).then([ref=std::move(ref), pgref=std::move(pgref)] {});
92+
});
93+
}
94+
95+
}
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2+
// vim: ts=8 sw=2 smarttab
3+
4+
#pragma once
5+
6+
#include "crimson/net/Connection.h"
7+
#include "crimson/osd/osdmap_gate.h"
8+
#include "crimson/osd/osd_operation.h"
9+
#include "crimson/osd/osd_operations/client_request.h"
10+
#include "crimson/osd/pg_map.h"
11+
#include "crimson/common/type_helpers.h"
12+
#include "messages/MOSDPGPCT.h"
13+
14+
namespace ceph {
15+
class Formatter;
16+
}
17+
18+
namespace crimson::osd {
19+
20+
class ShardServices;
21+
22+
class OSD;
23+
class PG;
24+
25+
class PGPCTRequest final : public PhasedOperationT<PGPCTRequest> {
26+
public:
27+
static constexpr OperationTypeCode type = OperationTypeCode::pgpct_request;
28+
PGPCTRequest(crimson::net::ConnectionRef&&, Ref<MOSDPGPCT>&&);
29+
30+
void print(std::ostream &) const final;
31+
void dump_detail(ceph::Formatter* f) const final;
32+
33+
static constexpr bool can_create() { return false; }
34+
spg_t get_pgid() const {
35+
return req->get_spg();
36+
}
37+
PipelineHandle &get_handle() { return handle; }
38+
epoch_t get_epoch() const { return req->get_min_epoch(); }
39+
epoch_t get_epoch_sent_at() const { return req->get_map_epoch(); }
40+
41+
ConnectionPipeline &get_connection_pipeline();
42+
43+
PerShardPipeline &get_pershard_pipeline(ShardServices &);
44+
45+
crimson::net::Connection &get_local_connection() {
46+
assert(l_conn);
47+
assert(!r_conn);
48+
return *l_conn;
49+
};
50+
51+
crimson::net::Connection &get_foreign_connection() {
52+
assert(r_conn);
53+
assert(!l_conn);
54+
return *r_conn;
55+
};
56+
57+
crimson::net::ConnectionFFRef prepare_remote_submission() {
58+
assert(l_conn);
59+
assert(!r_conn);
60+
auto ret = seastar::make_foreign(std::move(l_conn));
61+
l_conn.reset();
62+
return ret;
63+
}
64+
65+
void finish_remote_submission(crimson::net::ConnectionFFRef conn) {
66+
assert(conn);
67+
assert(!l_conn);
68+
assert(!r_conn);
69+
r_conn = make_local_shared_foreign(std::move(conn));
70+
}
71+
72+
seastar::future<> with_pg(
73+
ShardServices &shard_services, Ref<PG> pg);
74+
75+
interruptible_future<> with_pg_interruptible(
76+
PG& pg);
77+
78+
std::tuple<
79+
StartEvent,
80+
ConnectionPipeline::AwaitActive::BlockingEvent,
81+
ConnectionPipeline::AwaitMap::BlockingEvent,
82+
ConnectionPipeline::GetPGMapping::BlockingEvent,
83+
PerShardPipeline::CreateOrWaitPG::BlockingEvent,
84+
PGRepopPipeline::Process::BlockingEvent,
85+
PG_OSDMapGate::OSDMapBlocker::BlockingEvent,
86+
PGMap::PGCreationBlockingEvent,
87+
OSD_OSDMapGate::OSDMapBlocker::BlockingEvent
88+
> tracking_events;
89+
90+
private:
91+
crimson::net::ConnectionRef l_conn;
92+
crimson::net::ConnectionXcoreRef r_conn;
93+
94+
// must be after `conn` to ensure the ConnectionPipeline is alive
95+
PipelineHandle handle;
96+
Ref<MOSDPGPCT> req;
97+
};
98+
99+
}
100+
101+
#if FMT_VERSION >= 90000
102+
template <> struct fmt::formatter<crimson::osd::PGPCTRequest> : fmt::ostream_formatter {};
103+
#endif

src/crimson/osd/pg.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1460,6 +1460,7 @@ seastar::future<> PG::stop()
14601460
cancel_remote_recovery_reservation();
14611461
check_readable_timer.cancel();
14621462
renew_lease_timer.cancel();
1463+
backend->on_actingset_changed(false);
14631464
return osdmap_gate.stop().then([this] {
14641465
return wait_for_active_blocker.stop();
14651466
}).then([this] {

src/crimson/osd/pg.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
#include "common/ostream_temp.h"
1414
#include "include/interval_set.h"
1515
#include "crimson/net/Fwd.h"
16+
#include "messages/MOSDPGPCT.h"
1617
#include "messages/MOSDRepOpReply.h"
1718
#include "messages/MOSDOpReply.h"
1819
#include "os/Transaction.h"
@@ -747,10 +748,11 @@ class PG : public boost::intrusive_ref_counter<
747748
std::unique_ptr<PGRecovery> recovery_handler;
748749
C_PG_FinishRecovery *recovery_finisher;
749750

750-
PeeringState peering_state;
751751
eversion_t projected_last_update;
752752

753753
public:
754+
PeeringState peering_state;
755+
754756
// scrub state
755757

756758
friend class ScrubScan;
@@ -924,6 +926,7 @@ class PG : public boost::intrusive_ref_counter<
924926
friend class RepRequest;
925927
friend class LogMissingRequest;
926928
friend class LogMissingRequestReply;
929+
friend class PGPCTRequest;
927930
friend struct PGFacade;
928931
friend class InternalClientRequest;
929932
friend class WatchTimeoutRequest;

0 commit comments

Comments
 (0)