Skip to content

Commit 5650240

Browse files
committed
NRG: Disjoint majorities during membership changes and network partitions
This commit fixes the following bugs: - Inconsistent Cluster Size: When a leader was partitioned from the cluster, immediately after proposing a EntryAddPeer. The remaining nodes could end up with different view of the cluster size and quorum. So followers could have cluster size and would not match the number of peers in the peer set. A subsequent leader election, electing one of the followers, could break the quorum system. - Incorrect Leader Election: It was possible for a new leader to be elected without a proper quorum. This could happen if a partition occurred after a new peer was proposed but before that change was committed. A follower could add the uncommitted peer to its peer set but would not update its cluster size and quorum, leading to an invalid election. Both issues are solved by making sure that when a peer is added or removed from the membership, the cluster size and quorum are adjusted accordingly, at the same time. Followers would first add peers when receiving the EntryAddPeer, and then adjusting the cluster size only after commit. This patch changes this behavior such that the cluster size and quorum are recomputed upon receiving the EntryAddPeer / EntryRemovePeer proposals. This is inline with the membership protocol proposed in Ongaro's dissertation, section 4.1. This patch also removes the concept of a "known" peer from the Raft layer. A node would add a peer to its peer set when first receiving the corresponding appendEntry, and on commit it would be marked as "known". This distinction no longer applies. Signed-off-by: Daniele Sciascia <daniele@nats.io>
1 parent 9c3ed25 commit 5650240

File tree

4 files changed

+214
-54
lines changed

4 files changed

+214
-54
lines changed

server/monitor.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4190,8 +4190,10 @@ func (s *Server) Raftz(opts *RaftzOptions) *RaftzStatus {
41904190
}
41914191
peer := RaftzGroupPeer{
41924192
Name: s.serverNameForNode(id),
4193-
Known: p.kp,
41944193
LastReplicatedIndex: p.li,
4194+
// The Raft layer no longer distinguishes between
4195+
// 'known' and 'unknown' peers.
4196+
Known: true,
41954197
}
41964198
if !p.ts.IsZero() {
41974199
peer.LastSeen = time.Since(p.ts).String()

server/raft.go

Lines changed: 26 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,6 @@ type catchupState struct {
251251
type lps struct {
252252
ts time.Time // Last timestamp
253253
li uint64 // Last index replicated
254-
kp bool // Known peer
255254
}
256255

257256
const (
@@ -541,13 +540,13 @@ func (s *Server) initRaftNode(accName string, cfg *RaftConfig, labels pprofLabel
541540
}
542541

543542
// Make sure to track ourselves.
544-
n.peers[n.id] = &lps{time.Now(), 0, true}
543+
n.peers[n.id] = &lps{time.Now(), 0}
545544

546545
// Track known peers
547546
for _, peer := range ps.knownPeers {
548547
if peer != n.id {
549548
// Set these to 0 to start but mark as known peer.
550-
n.peers[peer] = &lps{time.Time{}, 0, true}
549+
n.peers[peer] = &lps{time.Time{}, 0}
551550
}
552551
}
553552

@@ -2620,13 +2619,10 @@ func (n *raft) addPeer(peer string) {
26202619
delete(n.removed, peer)
26212620
}
26222621

2623-
if lp, ok := n.peers[peer]; !ok {
2622+
if _, ok := n.peers[peer]; !ok {
26242623
// We are not tracking this one automatically so we need
26252624
// to bump cluster size.
2626-
n.peers[peer] = &lps{time.Time{}, 0, true}
2627-
} else {
2628-
// Mark as added.
2629-
lp.kp = true
2625+
n.peers[peer] = &lps{time.Time{}, 0}
26302626
}
26312627

26322628
// Adjust cluster size and quorum if needed.
@@ -3191,29 +3187,15 @@ func (n *raft) applyCommit(index uint64) error {
31913187
}
31923188
}
31933189
case EntryAddPeer:
3194-
newPeer := string(e.Data)
3195-
n.debug("Added peer %q", newPeer)
3196-
3197-
// Store our peer in our global peer map for all peers.
3198-
peers.LoadOrStore(newPeer, newPeer)
3199-
3200-
n.addPeer(newPeer)
3201-
32023190
// We pass these up as well.
32033191
committed = append(committed, e)
32043192

32053193
// We are done with this membership change
32063194
n.membChanging = false
3207-
32083195
case EntryRemovePeer:
32093196
peer := string(e.Data)
32103197
n.debug("Removing peer %q", peer)
32113198

3212-
n.removePeer(peer)
3213-
3214-
// Remove from string intern map.
3215-
peers.Delete(peer)
3216-
32173199
// We pass these up as well.
32183200
committed = append(committed, e)
32193201

@@ -3301,25 +3283,20 @@ func (n *raft) trackResponse(ar *appendEntryResponse) bool {
33013283
// Used to adjust cluster size and peer count based on added official peers.
33023284
// lock should be held.
33033285
func (n *raft) adjustClusterSizeAndQuorum() {
3304-
pcsz, ncsz := n.csz, 0
3305-
for _, peer := range n.peers {
3306-
if peer.kp {
3307-
ncsz++
3308-
}
3309-
}
3310-
n.csz = ncsz
3286+
pcsz := n.csz
3287+
n.csz = len(n.peers)
33113288
n.qn = n.csz/2 + 1
33123289

3313-
if ncsz > pcsz {
3314-
n.debug("Expanding our clustersize: %d -> %d", pcsz, ncsz)
3290+
if n.csz > pcsz {
3291+
n.debug("Expanding our clustersize: %d -> %d", pcsz, n.csz)
33153292
n.lsut = time.Now()
3316-
} else if ncsz < pcsz {
3317-
n.debug("Decreasing our clustersize: %d -> %d", pcsz, ncsz)
3293+
} else if n.csz < pcsz {
3294+
n.debug("Decreasing our clustersize: %d -> %d", pcsz, n.csz)
33183295
if n.State() == Leader {
33193296
go n.sendHeartbeat()
33203297
}
33213298
}
3322-
if ncsz != pcsz {
3299+
if n.csz != pcsz {
33233300
n.recreateInternalSubsLocked()
33243301
}
33253302
}
@@ -3337,7 +3314,7 @@ func (n *raft) trackPeer(peer string) error {
33373314
}
33383315
}
33393316
if n.State() == Leader {
3340-
if lp, ok := n.peers[peer]; !ok || !lp.kp {
3317+
if _, ok := n.peers[peer]; !ok {
33413318
// Check if this peer had been removed previously.
33423319
needPeerAdd = !isRemoved
33433320
}
@@ -3937,14 +3914,17 @@ CONTINUE:
39373914
}
39383915
}
39393916
case EntryAddPeer:
3940-
if newPeer := string(e.Data); len(newPeer) == idLen {
3941-
// Track directly, but wait for commit to be official
3942-
if _, ok := n.peers[newPeer]; !ok {
3943-
n.peers[newPeer] = &lps{time.Time{}, 0, false}
3944-
}
3945-
// Store our peer in our global peer map for all peers.
3946-
peers.LoadOrStore(newPeer, newPeer)
3947-
}
3917+
peer := string(e.Data)
3918+
// Store our peer in our global peer map for all peers.
3919+
peers.LoadOrStore(peer, peer)
3920+
n.addPeer(peer)
3921+
n.debug("Added peer %q", peer)
3922+
case EntryRemovePeer:
3923+
peer := string(e.Data)
3924+
// Remove from string intern map.
3925+
peers.Delete(peer)
3926+
n.removePeer(peer)
3927+
n.debug("Removed peer %q", peer)
39483928
}
39493929
}
39503930

@@ -4006,10 +3986,9 @@ func (n *raft) processPeerState(ps *peerState) {
40063986
n.peers = make(map[string]*lps)
40073987
for _, peer := range ps.knownPeers {
40083988
if lp := old[peer]; lp != nil {
4009-
lp.kp = true
40103989
n.peers[peer] = lp
40113990
} else {
4012-
n.peers[peer] = &lps{time.Time{}, 0, true}
3991+
n.peers[peer] = &lps{time.Time{}, 0}
40133992
}
40143993
}
40153994
n.debug("Update peers from leader to %+v", n.peers)
@@ -4251,10 +4230,8 @@ func decodePeerState(buf []byte) (*peerState, error) {
42514230
// Lock should be held.
42524231
func (n *raft) peerNames() []string {
42534232
var peers []string
4254-
for name, peer := range n.peers {
4255-
if peer.kp {
4256-
peers = append(peers, name)
4257-
}
4233+
for name := range n.peers {
4234+
peers = append(peers, name)
42584235
}
42594236
return peers
42604237
}

server/raft_helpers_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ func (sg smGroup) leader() stateMachine {
5757
return nil
5858
}
5959

60-
func (sg smGroup) followers() []stateMachine {
60+
func (sg smGroup) followers() smGroup {
6161
var f []stateMachine
6262
for _, sm := range sg {
6363
if sm.node().Leader() {

server/raft_test.go

Lines changed: 184 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3870,9 +3870,6 @@ func TestNRGQuorumAfterLeaderStepdown(t *testing.T) {
38703870
require_NoError(t, n.trackPeer(nats1))
38713871
require_True(t, n.Quorum())
38723872
require_Len(t, len(n.peers), 3)
3873-
for _, ps := range n.peers {
3874-
ps.kp = true
3875-
}
38763873

38773874
// If we hand off leadership to another server, we should
38783875
// still be reporting we have quorum.
@@ -4680,3 +4677,187 @@ func TestNRGPartitionedPeerRemove(t *testing.T) {
46804677
return nil
46814678
})
46824679
}
4680+
4681+
func TestNRGPeerAddAndPartitionLeader(t *testing.T) {
4682+
c := createJetStreamClusterExplicit(t, "R3S", 3)
4683+
defer c.shutdown()
4684+
4685+
hub, rtf, rg := c.createMockMemRaftGroup("MOCK", 3, newStateAdder)
4686+
4687+
leader := rg.waitOnLeader()
4688+
followers := rg.followers()
4689+
4690+
// When the leader sends a EntryAddPeer, isolate it from
4691+
// the rest of the cluster.
4692+
hub.setAfterMsgHook(func(subject, reply string, msg []byte) {
4693+
if subject != "$NRG.AE.MOCK" {
4694+
return
4695+
}
4696+
ae, _ := decodeAppendEntry(msg, nil, reply)
4697+
if ae == nil || len(ae.entries) != 1 {
4698+
return
4699+
}
4700+
if ae.leader != leader.node().ID() {
4701+
return
4702+
}
4703+
if ae.entries[0].Type == EntryAddPeer {
4704+
hub.partition(leader.node().ID(), 1)
4705+
}
4706+
})
4707+
4708+
// Add a new node and expect a new leader to be elected
4709+
newNode := c.addMockMemRaftNode("MOCK", rtf, newStateAdder)
4710+
newGroup := append(followers, newNode)
4711+
newLeader := newGroup.waitOnLeader()
4712+
require_True(t, newLeader != nil)
4713+
4714+
// If bug is present: The new leader has not yet committed the
4715+
// appendEntry containing EntryAddPeer. The new leader (and
4716+
// followers) would not update their cluster size until the
4717+
// EntryAddPeer is committed. So that we have the following
4718+
// sequence of events:
4719+
// 1) the new leader has initially cluster size of 3
4720+
// 2) it sends a peerState message with cluster size 3
4721+
// 3) the leader commits the EntryAddPeer from the previous
4722+
// leader applies it. Now cluster size is 4.
4723+
// 4) the followers commit the EntryAddPeer, incrementing
4724+
// their cluster size to 4. Next, the EntryPeerState is
4725+
// committed and the cluster size goes back to 3.
4726+
// 5) At this point cluster size from the leader has diverged
4727+
// from the cluster size of its followers.
4728+
4729+
// Expect all nodes to report cluster size of 4
4730+
for _, n := range newGroup {
4731+
checkFor(t, 1*time.Second, 10*time.Millisecond, func() error {
4732+
if n.node().ClusterSize() != 4 {
4733+
return errors.New("node addition still in progress")
4734+
}
4735+
return nil
4736+
})
4737+
}
4738+
4739+
// Finally bring back the old leader
4740+
hub.healPartitions()
4741+
checkFor(t, 1*time.Second, 10*time.Millisecond, func() error {
4742+
if leader.node().MembershipChangeInProgress() {
4743+
return errors.New("membership still in progress")
4744+
}
4745+
if leader.node().ClusterSize() != 4 {
4746+
return errors.New("node addition still in progress")
4747+
}
4748+
return nil
4749+
})
4750+
}
4751+
4752+
func TestNRGPeerRemoveAndPartitionLeader(t *testing.T) {
4753+
c := createJetStreamClusterExplicit(t, "R5S", 5)
4754+
defer c.shutdown()
4755+
4756+
hub, _, rg := c.createMockMemRaftGroup("MOCK", 5, newStateAdder)
4757+
4758+
leader := rg.waitOnLeader()
4759+
followers := rg.followers()
4760+
4761+
// When the leader sends a EntryRemovePeer, isolate it from
4762+
// the rest of the cluster.
4763+
hub.setAfterMsgHook(func(subject, reply string, msg []byte) {
4764+
if subject != "$NRG.AE.MOCK" {
4765+
return
4766+
}
4767+
ae, _ := decodeAppendEntry(msg, nil, reply)
4768+
if ae == nil || len(ae.entries) != 1 {
4769+
return
4770+
}
4771+
if ae.leader != leader.node().ID() {
4772+
return
4773+
}
4774+
if ae.entries[0].Type == EntryRemovePeer {
4775+
hub.partition(leader.node().ID(), 1)
4776+
}
4777+
})
4778+
4779+
leader.node().ProposeRemovePeer(leader.node().ID())
4780+
4781+
// Expect followers to elect a new leader
4782+
newLeader := followers.waitOnLeader()
4783+
require_True(t, newLeader != nil)
4784+
4785+
// Expect all nodes to report cluster size 4
4786+
for _, n := range followers {
4787+
checkFor(t, 1*time.Second, 10*time.Millisecond, func() error {
4788+
if n.node().ClusterSize() != 4 {
4789+
return errors.New("node addition still in progress")
4790+
}
4791+
return nil
4792+
})
4793+
}
4794+
}
4795+
4796+
func TestNRGLeaderWithoutQuorumAfterPeerAdd(t *testing.T) {
4797+
c := createJetStreamClusterExplicit(t, "R3S", 3)
4798+
defer c.shutdown()
4799+
4800+
hub, rtf, rg := c.createMockMemRaftGroup("MOCK", 3, newStateAdder)
4801+
defer hub.healPartitions()
4802+
4803+
leader := rg.waitOnLeader()
4804+
followers := rg.followers()
4805+
4806+
// Setup a after message hook to create a partition as soon as
4807+
// the leader publishes a EntryAddPeer. The partition will
4808+
// prevent committing the entry.
4809+
hub.setAfterMsgHook(func(subject, reply string, msg []byte) {
4810+
if subject != "$NRG.AE.MOCK" {
4811+
return
4812+
}
4813+
ae, _ := decodeAppendEntry(msg, nil, reply)
4814+
if ae == nil || len(ae.entries) != 1 {
4815+
return
4816+
}
4817+
if ae.leader != leader.node().ID() {
4818+
return
4819+
}
4820+
4821+
// After EntryAddPeer is published, partition the
4822+
// leader and one of the followers. This partition
4823+
// can't commit the entry.
4824+
if ae.entries[0].Type == EntryAddPeer {
4825+
hub.partition(leader.node().ID(), 1)
4826+
hub.partition(followers[0].node().ID(), 1)
4827+
}
4828+
})
4829+
4830+
newNode := c.addMockMemRaftNode("MOCK", rtf, newStateAdder)
4831+
4832+
// At some point here the cluster gets partitioned in two
4833+
// parts: {leader, followers[0]} and {newNode, followers[1]}.
4834+
// Neither side should be able to make progress.
4835+
newGroup := smGroup{newNode, followers[1]}
4836+
newLeader := newGroup.waitOnLeader()
4837+
4838+
// If the bug is present: we managed to elect a new leader,
4839+
// in a 4 node cluster, with only two nodes in the partition!
4840+
// This is because of the following sequence of events:
4841+
// 1) the follower has received the EntryPeerAdd
4842+
// 2) the leader and the other follower have partitioned away
4843+
// 3) the entry is uncommitted, however the follower has added
4844+
/// the new peer to its peer set, but won't adjust cluster
4845+
// size and quorum until after the entry is committed.
4846+
// 4) follower becomes a canditate and will become leader with
4847+
// with a single vote from the new node
4848+
require_Equal(t, newLeader, nil)
4849+
4850+
// Check that node addtition completes after healing the partition
4851+
hub.healPartitions()
4852+
rg = append(rg, newNode)
4853+
newLeader = rg.waitOnLeader()
4854+
require_True(t, newLeader != nil)
4855+
for _, n := range rg {
4856+
checkFor(t, 1*time.Second, 10*time.Millisecond, func() error {
4857+
if n.node().ClusterSize() != 4 {
4858+
return errors.New("node addition still in progress")
4859+
}
4860+
return nil
4861+
})
4862+
}
4863+
}

0 commit comments

Comments
 (0)