Skip to content

Commit 205fd33

Browse files
committed
mds/quiesce: declare QuiesceDbPeerListing and QuiesceDbPeerAck
With these dedicated structs we can fully defer to QuiesceDbEncoding when encoding/decoding quiesce db messages Signed-off-by: Leonid Usov <[email protected]>
1 parent 7599257 commit 205fd33

File tree

9 files changed

+110
-56
lines changed

9 files changed

+110
-56
lines changed

src/include/cephfs/types.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,17 @@ struct std::hash<mds_gid_t> {
5656
}
5757
};
5858

59+
inline void encode(const mds_gid_t &v, bufferlist& bl, uint64_t features = 0) {
60+
uint64_t vv = v;
61+
encode_raw(vv, bl);
62+
}
63+
64+
inline void decode(mds_gid_t &v, bufferlist::const_iterator& p) {
65+
uint64_t vv;
66+
decode_raw(vv, p);
67+
v = vv;
68+
}
69+
5970
typedef int32_t fs_cluster_id_t;
6071
constexpr fs_cluster_id_t FS_CLUSTER_ID_NONE = -1;
6172

src/mds/MDSRankQuiesce.cc

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,7 @@ void MDSRank::quiesce_cluster_update() {
260260
membership.send_ack = [=, this](QuiesceMap&& ack) {
261261
if (me == membership.leader) {
262262
// loopback
263-
quiesce_db_manager->submit_ack_from(me, std::move(ack));
263+
quiesce_db_manager->submit_peer_ack({me, std::move(ack)});
264264
return 0;
265265
} else {
266266
std::lock_guard guard(mds_lock);
@@ -273,7 +273,7 @@ void MDSRank::quiesce_cluster_update() {
273273

274274
auto ack_msg = make_message<MMDSQuiesceDbAck>();
275275
dout(10) << "sending ack " << ack << " to the leader " << membership.leader << dendl;
276-
ack_msg->encode_payload_from(me, ack);
276+
ack_msg->encode_payload_from({me, std::move(ack)});
277277
return send_message_mds(ack_msg, addrs);
278278
}
279279
};
@@ -287,7 +287,7 @@ void MDSRank::quiesce_cluster_update() {
287287
auto addrs = mdsmap->get_info_gid(to).addrs;
288288
auto listing_msg = make_message<MMDSQuiesceDbListing>();
289289
dout(10) << "sending listing " << db << " to the peer " << to << dendl;
290-
listing_msg->encode_payload_from(me, db);
290+
listing_msg->encode_payload_from({me, std::move(db)});
291291
return send_message_mds(listing_msg, addrs);
292292
};
293293
}
@@ -363,33 +363,33 @@ bool MDSRank::quiesce_dispatch(const cref_t<Message> &m) {
363363
{
364364
const auto& req = ref_cast<MMDSQuiesceDbListing>(m);
365365
mds_gid_t gid;
366-
QuiesceDbListing db_listing;
367-
req->decode_payload_into(gid, db_listing);
366+
QuiesceDbPeerListing peer_listing;
367+
req->decode_payload_into(peer_listing);
368368
if (quiesce_db_manager) {
369-
dout(10) << "got " << db_listing << " from peer " << gid << dendl;
370-
int result = quiesce_db_manager->submit_listing_from(gid, std::move(db_listing));
369+
dout(10) << "got " << peer_listing << dendl;
370+
int result = quiesce_db_manager->submit_peer_listing(std::move(peer_listing));
371371
if (result != 0) {
372-
dout(3) << "error (" << result << ") submitting " << db_listing << " from peer " << gid << dendl;
372+
dout(3) << "error (" << result << ") submitting " << peer_listing << dendl;
373373
}
374374
} else {
375-
dout(5) << "no db manager to process " << db_listing << dendl;
375+
dout(5) << "no db manager to process " << peer_listing << dendl;
376376
}
377377
return true;
378378
}
379379
case MSG_MDS_QUIESCE_DB_ACK:
380380
{
381381
const auto& req = ref_cast<MMDSQuiesceDbAck>(m);
382382
mds_gid_t gid;
383-
QuiesceMap diff_map;
384-
req->decode_payload_into(gid, diff_map);
383+
QuiesceDbPeerAck peer_ack;
384+
req->decode_payload_into(peer_ack);
385385
if (quiesce_db_manager) {
386-
dout(10) << "got ack " << diff_map << " from peer " << gid << dendl;
387-
int result = quiesce_db_manager->submit_ack_from(gid, std::move(diff_map));
386+
dout(10) << "got " << peer_ack << dendl;
387+
int result = quiesce_db_manager->submit_peer_ack(std::move(peer_ack));
388388
if (result != 0) {
389-
dout(3) << "error (" << result << ") submitting an ack from peer " << gid << dendl;
389+
dout(3) << "error (" << result << ") submitting and ack from " << peer_ack.origin << dendl;
390390
}
391391
} else {
392-
dout(5) << "no db manager to process an ack: " << diff_map << dendl;
392+
dout(5) << "no db manager to process " << peer_ack << dendl;
393393
}
394394
return true;
395395
}

src/mds/QuiesceDb.h

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,10 @@ using QuiesceSetId = std::string;
119119
using QuiesceRoot = std::string;
120120
using QuiesceSetVersion = uint64_t;
121121

122+
namespace QuiesceInterface {
123+
using PeerId = mds_gid_t;
124+
}
125+
122126
struct QuiesceDbVersion {
123127
epoch_t epoch;
124128
QuiesceSetVersion set_version;
@@ -563,7 +567,7 @@ operator<<(std::basic_ostream<CharT, Traits>& os, const QuiesceDbRequest& req)
563567
/// contain all sets that have their version > than the last acked by the peer.
564568
struct QuiesceDbListing {
565569
QuiesceDbVersion db_version = {0, 0};
566-
/// @brief Crusially, the precise `db_age` must be included in every db listing
570+
/// @brief Crucially, the precise `db_age` must be included in every db listing
567571
/// This data is used by all replicas to update their calculated DB TIME ZERO.
568572
/// All events in the database are measured relative to the DB TIME ZERO
569573
QuiesceTimeInterval db_age = QuiesceTimeInterval::zero();
@@ -600,6 +604,18 @@ operator<<(std::basic_ostream<CharT, Traits>& os, const QuiesceDbListing& dbl)
600604
return os << "q-db[v:" << dbl.db_version << " sets:" << active << "/" << inactive << "]";
601605
}
602606

607+
struct QuiesceDbPeerListing {
608+
QuiesceInterface::PeerId origin;
609+
QuiesceDbListing db;
610+
};
611+
612+
template <class CharT, class Traits>
613+
static std::basic_ostream<CharT, Traits>&
614+
operator<<(std::basic_ostream<CharT, Traits>& os, const QuiesceDbPeerListing& dbl)
615+
{
616+
return os << dbl.db << " from " << dbl.origin;
617+
}
618+
603619
/// @brief `QuiesceMap` is a root-centric representation of the quiesce database
604620
/// It lists roots with their effective states as of particular version.
605621
/// Additionally, the same structure is used by the peers when reporting
@@ -662,6 +678,18 @@ operator<<(std::basic_ostream<CharT, Traits>& os, const QuiesceMap& map)
662678
return os << "q-map[v:" << map.db_version << " roots:" << active << "/" << inactive << "]";
663679
}
664680

681+
struct QuiesceDbPeerAck {
682+
QuiesceInterface::PeerId origin;
683+
QuiesceMap diff_map;
684+
};
685+
686+
template <class CharT, class Traits>
687+
static std::basic_ostream<CharT, Traits>&
688+
operator<<(std::basic_ostream<CharT, Traits>& os, const QuiesceDbPeerAck& ack)
689+
{
690+
return os << "ack " << ack.diff_map << " from " << ack.origin;
691+
}
692+
665693
inline QuiesceTimeInterval interval_saturate_add(QuiesceTimeInterval lhs, QuiesceTimeInterval rhs)
666694
{
667695
// assuming an unsigned time interval.
@@ -685,7 +713,6 @@ inline QuiesceTimePoint interval_saturate_add_now(QuiesceTimeInterval interval)
685713
};
686714

687715
namespace QuiesceInterface {
688-
using PeerId = mds_gid_t;
689716
/// @brief A callback from the manager to the agent with an up-to-date root list
690717
/// The map is mutable and will be used as synchronous agent ack if the return value is true
691718
using AgentNotify = std::function<bool(QuiesceMap&)>;

src/mds/QuiesceDbEncoding.h

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,11 @@
1515
#include "include/encoding.h"
1616
#include <stdint.h>
1717

18+
struct QuiesceDbEncoding {
19+
static constexpr int version = 1;
20+
static constexpr int compat = 1;
21+
};
22+
1823
void encode(QuiesceDbVersion const& v, bufferlist& bl, uint64_t features = 0)
1924
{
2025
encode(v.epoch, bl, features);
@@ -131,6 +136,18 @@ void decode(QuiesceDbListing& listing, bufferlist::const_iterator& p)
131136
decode(listing.sets, p);
132137
}
133138

139+
void encode(QuiesceDbPeerListing const& listing, bufferlist& bl, uint64_t features = 0)
140+
{
141+
encode(listing.origin, bl, features);
142+
encode(listing.db, bl, features);
143+
}
144+
145+
void decode(QuiesceDbPeerListing& listing, bufferlist::const_iterator& p)
146+
{
147+
decode(listing.origin, p);
148+
decode(listing.db, p);
149+
}
150+
134151
void encode(QuiesceMap::RootInfo const& root, bufferlist& bl, uint64_t features = 0)
135152
{
136153
encode(root.state, bl, features);
@@ -155,3 +172,14 @@ void decode(QuiesceMap& map, bufferlist::const_iterator& p)
155172
decode(map.roots, p);
156173
}
157174

175+
void encode(QuiesceDbPeerAck const& ack, bufferlist& bl, uint64_t features = 0)
176+
{
177+
encode(ack.origin, bl, features);
178+
encode(ack.diff_map, bl, features);
179+
}
180+
181+
void decode(QuiesceDbPeerAck& ack, bufferlist::const_iterator& p)
182+
{
183+
decode(ack.origin, p);
184+
decode(ack.diff_map, p);
185+
}

src/mds/QuiesceDbManager.cc

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,7 @@ QuiesceTimeInterval QuiesceDbManager::replica_upkeep(decltype(pending_db_updates
256256
{
257257
// as a replica, we only care about the latest update
258258
while (db_updates.size() > 1) {
259-
dout(10) << "skipping an older update from " << db_updates.front().first << " version " << db_updates.front().second.db_version << dendl;
259+
dout(10) << "skipping an older update from " << db_updates.front().origin << " version " << db_updates.front().db.db_version << dendl;
260260
db_updates.pop();
261261
}
262262

@@ -265,7 +265,7 @@ QuiesceTimeInterval QuiesceDbManager::replica_upkeep(decltype(pending_db_updates
265265
return QuiesceTimeInterval::max();
266266
}
267267

268-
QuiesceDbListing &update = db_updates.back().second;
268+
QuiesceDbListing &update = db_updates.back().db;
269269

270270
if (update.db_version.epoch != membership.epoch) {
271271
dout(10) << "ignoring db update from another epoch: " << update.db_version << " != " << db_version() << dendl;
@@ -317,7 +317,8 @@ bool QuiesceDbManager::leader_bootstrap(decltype(pending_db_updates)&& db_update
317317

318318
// only consider db submissions from unknown peers
319319
while (!unknown_peers.empty() && !db_updates.empty()) {
320-
auto &[from, update] = db_updates.front();
320+
auto &from = db_updates.front().origin;
321+
auto &update = db_updates.front().db;
321322
if (update.db_version.epoch == membership.epoch && unknown_peers.erase(from) > 0) {
322323
// see if this peer's version is newer than mine
323324
if (db.set_version < update.db_version.set_version) {

src/mds/QuiesceDbManager.h

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -75,19 +75,20 @@ class QuiesceDbManager {
7575
submit_condition.notify_all();
7676
return 0;
7777
}
78+
7879
// acks the messaging system
79-
int submit_ack_from(QuiesceInterface::PeerId sender, const QuiesceMap& diff_map) {
80+
int submit_peer_ack(QuiesceDbPeerAck&& ack) {
8081
std::lock_guard l(submit_mutex);
8182

8283
if (!cluster_membership || !cluster_membership->is_leader()) {
8384
return -EPERM;
8485
}
8586

86-
if (!cluster_membership->members.contains(sender)) {
87+
if (!cluster_membership->members.contains(ack.origin)) {
8788
return -ESTALE;
8889
}
8990

90-
pending_acks.push({ sender, diff_map });
91+
pending_acks.push(std::move(ack));
9192
submit_condition.notify_all();
9293
return 0;
9394
}
@@ -97,18 +98,18 @@ class QuiesceDbManager {
9798
// -> EPERM if this is the leader
9899

99100
// process an incoming listing from a leader
100-
int submit_listing_from(QuiesceInterface::PeerId sender, QuiesceDbListing&& listing) {
101+
int submit_peer_listing(QuiesceDbPeerListing&& listing) {
101102
std::lock_guard l(submit_mutex);
102103

103104
if (!cluster_membership) {
104105
return -EPERM;
105106
}
106107

107-
if (cluster_membership->epoch != listing.db_version.epoch) {
108+
if (cluster_membership->epoch != listing.db.db_version.epoch) {
108109
return -ESTALE;
109110
}
110111

111-
pending_db_updates.push({sender, std::move(listing)});
112+
pending_db_updates.push(std::move(listing));
112113
submit_condition.notify_all();
113114
return 0;
114115
}
@@ -187,8 +188,8 @@ class QuiesceDbManager {
187188

188189
std::optional<AgentCallback> agent_callback;
189190
std::optional<QuiesceClusterMembership> cluster_membership;
190-
std::queue<std::pair<QuiesceInterface::PeerId, QuiesceDbListing>> pending_db_updates;
191-
std::queue<std::pair<QuiesceInterface::PeerId, QuiesceMap>> pending_acks;
191+
std::queue<QuiesceDbPeerListing> pending_db_updates;
192+
std::queue<QuiesceDbPeerAck> pending_acks;
192193
std::deque<RequestContext*> pending_requests;
193194

194195
class QuiesceDbThread : public Thread {

src/messages/MMDSQuiesceDbAck.h

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -34,29 +34,22 @@ class MMDSQuiesceDbAck final : public MMDSOp {
3434
// noop to prevent unnecessary overheads
3535
}
3636

37-
void encode_payload_from(mds_gid_t const&gid, QuiesceMap const&diff_map)
37+
void encode_payload_from(QuiesceDbPeerAck const& ack)
3838
{
39-
using ceph::encode;
40-
41-
ceph_assert(gid != MDS_GID_NONE);
42-
43-
ENCODE_START(1, 1, payload);
44-
encode(gid, payload);
45-
encode(diff_map, payload);
39+
ENCODE_START(QuiesceDbEncoding::version, QuiesceDbEncoding::compat, payload);
40+
encode(ack, payload);
4641
ENCODE_FINISH(payload);
4742
}
4843

4944
void decode_payload() override {
5045
// noop to prevent unnecessary overheads
5146
}
5247

53-
void decode_payload_into(mds_gid_t &gid, QuiesceMap &diff_map) const
48+
void decode_payload_into(QuiesceDbPeerAck &ack) const
5449
{
55-
using ceph::decode;
5650
auto p = payload.cbegin();
57-
DECODE_START(1, p);
58-
decode(gid, p);
59-
decode(diff_map, p);
51+
DECODE_START(QuiesceDbEncoding::version, p);
52+
decode(ack, p);
6053
DECODE_FINISH(p);
6154
}
6255

src/messages/MMDSQuiesceDbListing.h

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -33,29 +33,22 @@ class MMDSQuiesceDbListing final : public MMDSOp {
3333
// noop to prevent unnecessary overheads
3434
}
3535

36-
void encode_payload_from(mds_gid_t const& gid, QuiesceDbListing const& db_listing)
36+
void encode_payload_from(QuiesceDbPeerListing const& peer_listing)
3737
{
38-
using ceph::encode;
39-
40-
ceph_assert(gid != MDS_GID_NONE);
41-
42-
ENCODE_START(1, 1, payload);
43-
encode(gid, payload);
44-
encode(db_listing, payload);
38+
ENCODE_START(QuiesceDbEncoding::version, QuiesceDbEncoding::compat, payload);
39+
encode(peer_listing, payload);
4540
ENCODE_FINISH(payload);
4641
}
4742

4843
void decode_payload() override {
4944
// noop to prevent unnecessary overheads
5045
}
5146

52-
void decode_payload_into(mds_gid_t &gid, QuiesceDbListing &db_listing) const
47+
void decode_payload_into(QuiesceDbPeerListing &peer_listing) const
5348
{
54-
using ceph::decode;
5549
auto p = payload.cbegin();
56-
DECODE_START(1, p);
57-
decode(gid, p);
58-
decode(db_listing, p);
50+
DECODE_START(QuiesceDbEncoding::version, p);
51+
decode(peer_listing, p);
5952
DECODE_FINISH(p);
6053
}
6154

src/test/mds/TestQuiesceDb.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ class QuiesceDbTest: public testing::Test {
151151
if (epoch == this->epoch) {
152152
if (this->managers.contains(recipient)) {
153153
dout(10) << "listing from " << me << " (leader=" << leader << ") to " << recipient << " for version " << listing.db_version << " with " << listing.sets.size() << " sets" << dendl;
154-
this->managers[recipient]->submit_listing_from(me, std::move(listing));
154+
this->managers[recipient]->submit_peer_listing({me, std::move(listing)});
155155
comms_cond.notify_all();
156156
return 0;
157157
}
@@ -181,7 +181,7 @@ class QuiesceDbTest: public testing::Test {
181181
it++;
182182
}
183183
}
184-
this->managers[leader]->submit_ack_from(me, std::move(diff_map));
184+
this->managers[leader]->submit_peer_ack({me, std::move(diff_map)});
185185
comms_cond.notify_all();
186186
l.unlock();
187187
while(!done_hooks.empty()) {

0 commit comments

Comments
 (0)