Skip to content

Commit 9c3ed25

Browse files
committed
NRG: Decouple Raft transport layer
The Raft implementation was tightly coupled to the server's internal client and send queue for the RPC communication. This makes it difficult to test scenarios like network partitions in deterministic manner. The primary benefit of this change is improved testability. A new mockTransport is introduced for testing, which allows for simulating network partitions and for injecting behavior after a message is sent. Signed-off-by: Daniele Sciascia <daniele@nats.io>
1 parent a11b5b9 commit 9c3ed25

File tree

7 files changed

+346
-72
lines changed

7 files changed

+346
-72
lines changed

server/jetstream_jwt_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1810,7 +1810,7 @@ func TestJetStreamJWTClusterAccountNRG(t *testing.T) {
18101810
// in-account or not.
18111811
for _, rg := range raftNodes {
18121812
rg.Lock()
1813-
rgAcc := rg.acc
1813+
rgAcc := rg.t.Account()
18141814
rg.Unlock()
18151815
switch state {
18161816
case "system":
@@ -1911,7 +1911,7 @@ func TestJetStreamJWTClusterAccountNRGPersistsAfterRestart(t *testing.T) {
19111911

19121912
for _, rg := range raftNodes {
19131913
rg.Lock()
1914-
rgAcc := rg.acc
1914+
rgAcc := rg.t.Account()
19151915
rg.Unlock()
19161916
require_Equal(t, rgAcc.Name, aExpPub)
19171917
require_Equal(t, rza[rg.group].SystemAcc, false)

server/monitor.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4175,7 +4175,7 @@ func (s *Server) Raftz(opts *RaftzOptions) *RaftzStatus {
41754175
PTerm: n.pterm,
41764176
PIndex: n.pindex,
41774177
SystemAcc: n.IsSystemAccount(),
4178-
TrafficAcc: n.acc.GetName(),
4178+
TrafficAcc: n.t.Account().GetName(),
41794179
IPQPropLen: n.prop.len(),
41804180
IPQEntryLen: n.entry.len(),
41814181
IPQRespLen: n.resp.len(),

server/raft.go

Lines changed: 18 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,6 @@ type raft struct {
139139

140140
created time.Time // Time that the group was created
141141
accName string // Account name of the asset this raft group is for
142-
acc *Account // Account that NRG traffic will be sent/received in
143142
group string // Raft group
144143
sd string // Store directory
145144
id string // Node ID
@@ -184,7 +183,6 @@ type raft struct {
184183
vote string // Our current vote state
185184

186185
s *Server // Reference to top-level server
187-
c *client // Internal client for subscriptions
188186
js *jetStream // JetStream, if running, to see if we are out of resources
189187

190188
hasleader atomic.Bool // Is there a group leader right now?
@@ -203,7 +201,7 @@ type raft struct {
203201
asubj string // Append entries subject
204202
areply string // Append entries responses subject
205203

206-
sq *sendq // Send queue for outbound RPC messages
204+
t raftTransport // Transport that handles Raft messaging
207205
aesub *subscription // Subscription for handleAppendEntry callbacks
208206

209207
wtv []byte // Term and vote to be written
@@ -296,6 +294,8 @@ type RaftConfig struct {
296294
// We need to protect against losing state due to the new peers starting with an empty log.
297295
// Therefore, these empty servers can't try to become leader until they at least have _some_ state.
298296
ScaleUp bool
297+
298+
Transport raftTransportFactory
299299
}
300300

301301
var (
@@ -437,6 +437,12 @@ func (s *Server) initRaftNode(accName string, cfg *RaftConfig, labels pprofLabel
437437
extSt: ps.domainExt,
438438
}
439439

440+
factory := cfg.Transport
441+
if factory == nil {
442+
factory = defaultRaftTransport
443+
}
444+
n.t = factory(s, n)
445+
440446
// Setup our internal subscriptions for proposals, votes and append entries.
441447
// If we fail to do this for some reason then this is fatal — we cannot
442448
// continue setting up or the Raft node may be partially/totally isolated.
@@ -620,7 +626,7 @@ func (n *raft) IsSystemAccount() bool {
620626
func (n *raft) GetTrafficAccountName() string {
621627
n.RLock()
622628
defer n.RUnlock()
623-
return n.acc.GetName()
629+
return n.t.Account().GetName()
624630
}
625631

626632
func (n *raft) RecreateInternalSubs() error {
@@ -670,7 +676,7 @@ func (n *raft) recreateInternalSubsLocked() error {
670676
}
671677
}
672678
}
673-
if n.aesub != nil && n.acc == nrgAcc {
679+
if n.aesub != nil && n.t.Account() == nrgAcc {
674680
// Subscriptions already exist and the account NRG state
675681
// hasn't changed.
676682
return nil
@@ -681,33 +687,11 @@ func (n *raft) recreateInternalSubsLocked() error {
681687
// the next step...
682688
n.cancelCatchup()
683689

684-
// If we have an existing client then tear down any existing
685-
// subscriptions and close the internal client.
686-
if c := n.c; c != nil {
687-
c.mu.Lock()
688-
subs := make([]*subscription, 0, len(c.subs))
689-
for _, sub := range c.subs {
690-
subs = append(subs, sub)
691-
}
692-
c.mu.Unlock()
693-
for _, sub := range subs {
694-
n.unsubscribe(sub)
695-
}
696-
c.closeConnection(InternalClient)
697-
}
698-
699-
if n.acc != nrgAcc {
690+
if n.t.Account() != nrgAcc {
700691
n.debug("Subscribing in '%s'", nrgAcc.GetName())
701692
}
702693

703-
c := n.s.createInternalSystemClient()
704-
c.registerWithAccount(nrgAcc)
705-
if nrgAcc.sq == nil {
706-
nrgAcc.sq = n.s.newSendQ(nrgAcc)
707-
}
708-
n.c = c
709-
n.sq = nrgAcc.sq
710-
n.acc = nrgAcc
694+
n.t.Reset(nrgAcc)
711695

712696
// Recreate any internal subscriptions for voting, append
713697
// entries etc in the new account.
@@ -1968,17 +1952,12 @@ func (n *raft) newInbox() string {
19681952
// Our internal subscribe.
19691953
// Lock should be held.
19701954
func (n *raft) subscribe(subject string, cb msgHandler) (*subscription, error) {
1971-
if n.c == nil {
1972-
return nil, errNoInternalClient
1973-
}
1974-
return n.s.systemSubscribe(subject, _EMPTY_, false, n.c, cb)
1955+
return n.t.Subscribe(subject, cb)
19751956
}
19761957

19771958
// Lock should be held.
19781959
func (n *raft) unsubscribe(sub *subscription) {
1979-
if n.c != nil && sub != nil {
1980-
n.c.processUnsub(sub.sid)
1981-
}
1960+
n.t.Unsubscribe(sub)
19821961
}
19831962

19841963
// Lock should be held.
@@ -2103,19 +2082,7 @@ runner:
21032082
n.Lock()
21042083
defer n.Unlock()
21052084

2106-
if c := n.c; c != nil {
2107-
var subs []*subscription
2108-
c.mu.Lock()
2109-
for _, sub := range c.subs {
2110-
subs = append(subs, sub)
2111-
}
2112-
c.mu.Unlock()
2113-
for _, sub := range subs {
2114-
n.unsubscribe(sub)
2115-
}
2116-
c.closeConnection(InternalClient)
2117-
n.c = nil
2118-
}
2085+
n.t.Close()
21192086

21202087
// Unregistering ipQueues do not prevent them from push/pop
21212088
// just will remove them from the central monitoring map
@@ -4640,15 +4607,11 @@ func (n *raft) requestVote() {
46404607
}
46414608

46424609
func (n *raft) sendRPC(subject, reply string, msg []byte) {
4643-
if n.sq != nil {
4644-
n.sq.send(subject, reply, nil, msg)
4645-
}
4610+
n.t.Publish(subject, reply, msg)
46464611
}
46474612

46484613
func (n *raft) sendReply(subject string, msg []byte) {
4649-
if n.sq != nil {
4650-
n.sq.send(subject, _EMPTY_, nil, msg)
4651-
}
4614+
n.t.Publish(subject, _EMPTY_, msg)
46524615
}
46534616

46544617
func (n *raft) wonElection(votes int) bool {

server/raft_helpers_test.go

Lines changed: 25 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -126,21 +126,26 @@ func (sg smGroup) lockFollowers() []stateMachine {
126126
// Create a raft group and place on numMembers servers at random.
127127
// Filestore based.
128128
func (c *cluster) createRaftGroup(name string, numMembers int, smf smFactory) smGroup {
129-
return c.createRaftGroupEx(name, numMembers, smf, FileStorage)
129+
return c.createRaftGroupEx(name, numMembers, smf, defaultRaftTransport, FileStorage)
130130
}
131131

132132
func (c *cluster) createMemRaftGroup(name string, numMembers int, smf smFactory) smGroup {
133-
return c.createRaftGroupEx(name, numMembers, smf, MemoryStorage)
133+
return c.createRaftGroupEx(name, numMembers, smf, defaultRaftTransport, MemoryStorage)
134134
}
135135

136-
func (c *cluster) createRaftGroupEx(name string, numMembers int, smf smFactory, st StorageType) smGroup {
136+
func (c *cluster) createMockMemRaftGroup(name string, members int, smf smFactory) (*raftTransportHub, raftTransportFactory, smGroup) {
137+
hub, rtf := mockTransportFactory()
138+
return hub, rtf, c.createRaftGroupEx(name, members, smf, rtf, MemoryStorage)
139+
}
140+
141+
func (c *cluster) createRaftGroupEx(name string, numMembers int, smf smFactory, rtf raftTransportFactory, st StorageType) smGroup {
137142
c.t.Helper()
138143
if numMembers > len(c.servers) {
139144
c.t.Fatalf("Members > Peers: %d vs %d", numMembers, len(c.servers))
140145
}
141146
servers := append([]*Server{}, c.servers...)
142147
rand.Shuffle(len(servers), func(i, j int) { servers[i], servers[j] = servers[j], servers[i] })
143-
return c.createRaftGroupWithPeers(name, servers[:numMembers], smf, st)
148+
return c.createRaftGroupWithPeers(name, servers[:numMembers], smf, rtf, st)
144149
}
145150

146151
func (c *cluster) createWAL(name string, st StorageType) WAL {
@@ -189,42 +194,48 @@ func (c *cluster) createStateMachine(s *Server, cfg *RaftConfig, peers []string,
189194
return sm
190195
}
191196

192-
func (c *cluster) createRaftGroupWithPeers(name string, servers []*Server, smf smFactory, st StorageType) smGroup {
197+
func (c *cluster) createRaftGroupWithPeers(name string, servers []*Server, smf smFactory, rtf raftTransportFactory, st StorageType) smGroup {
193198
c.t.Helper()
194199

195200
var sg smGroup
196201
peers := serverPeerNames(servers)
197202

198203
for _, s := range servers {
199204
cfg := &RaftConfig{
200-
Name: name,
201-
Store: c.t.TempDir(),
202-
Log: c.createWAL(name, st)}
205+
Name: name,
206+
Store: c.t.TempDir(),
207+
Log: c.createWAL(name, st),
208+
Transport: rtf}
203209
sg = append(sg, c.createStateMachine(s, cfg, peers, smf))
204210
}
205211
return sg
206212
}
207213

208-
func (c *cluster) addNodeEx(name string, smf smFactory, st StorageType) stateMachine {
214+
func (c *cluster) addNodeEx(name string, smf smFactory, rtf raftTransportFactory, st StorageType) stateMachine {
209215
c.t.Helper()
210216

211217
server := c.addInNewServer()
212218

213219
cfg := &RaftConfig{
214-
Name: name,
215-
Store: c.t.TempDir(),
216-
Log: c.createWAL(name, st)}
220+
Name: name,
221+
Store: c.t.TempDir(),
222+
Log: c.createWAL(name, st),
223+
Transport: rtf}
217224

218225
peers := serverPeerNames(c.servers)
219226
return c.createStateMachine(server, cfg, peers, smf)
220227
}
221228

222229
func (c *cluster) addRaftNode(name string, smf smFactory) stateMachine {
223-
return c.addNodeEx(name, smf, FileStorage)
230+
return c.addNodeEx(name, smf, defaultRaftTransport, FileStorage)
224231
}
225232

226233
func (c *cluster) addMemRaftNode(name string, smf smFactory) stateMachine {
227-
return c.addNodeEx(name, smf, MemoryStorage)
234+
return c.addNodeEx(name, smf, defaultRaftTransport, MemoryStorage)
235+
}
236+
237+
func (c *cluster) addMockMemRaftNode(name string, rtf raftTransportFactory, smf smFactory) stateMachine {
238+
return c.addNodeEx(name, smf, rtf, MemoryStorage)
228239
}
229240

230241
// Driver program for the state machine.

server/raft_test.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4627,3 +4627,56 @@ func TestNRGSingleNodeElection(t *testing.T) {
46274627
require_Equal(t, newLeader.node().ClusterSize(), 3)
46284628
require_False(t, newLeader.node().MembershipChangeInProgress())
46294629
}
4630+
4631+
func TestNRGPartitionedPeerRemove(t *testing.T) {
4632+
c := createJetStreamClusterExplicit(t, "R2S", 2)
4633+
defer c.shutdown()
4634+
4635+
hub, _, rg := c.createMockMemRaftGroup("MOCK", 2, newStateAdder)
4636+
defer hub.healPartitions()
4637+
4638+
leader := rg.waitOnLeader()
4639+
followers := rg.followers()
4640+
require_Equal(t, len(followers), 1)
4641+
require_Equal(t, leader.node().ClusterSize(), 2)
4642+
4643+
// Remove the follower while the leader is partitioned away
4644+
hub.partition(leader.node().ID(), 1)
4645+
leader.node().ProposeRemovePeer(followers[0].node().ID())
4646+
4647+
// Follower should can't get elected, but let's try anyway
4648+
followers[0].node().CampaignImmediately()
4649+
4650+
// Expect progress on the leader side
4651+
checkFor(t, 1*time.Second, 10*time.Millisecond, func() error {
4652+
if leader.node().ClusterSize() != 1 {
4653+
return errors.New("node removal still in progress")
4654+
}
4655+
return nil
4656+
})
4657+
4658+
checkFor(t, 1*time.Second, 10*time.Millisecond, func() error {
4659+
if leader.node().MembershipChangeInProgress() {
4660+
return errors.New("membership still in progress")
4661+
}
4662+
return nil
4663+
})
4664+
4665+
require_Equal(t, leader.node().ClusterSize(), 1)
4666+
require_False(t, leader.node().MembershipChangeInProgress())
4667+
4668+
// Follower has not changed
4669+
require_Equal(t, followers[0].node().State(), Follower)
4670+
require_Equal(t, followers[0].node().ClusterSize(), 2)
4671+
require_False(t, followers[0].node().MembershipChangeInProgress())
4672+
4673+
// Heal the partition, and expect the follower to get the bad news...
4674+
hub.heal(leader.node().ID())
4675+
4676+
checkFor(t, 1*time.Second, 10*time.Millisecond, func() error {
4677+
if followers[0].node().ClusterSize() != 1 {
4678+
return errors.New("node removal still in progress")
4679+
}
4680+
return nil
4681+
})
4682+
}

0 commit comments

Comments
 (0)