@@ -108,43 +108,105 @@ func newNode(grpcAddress, raftAddress, redisAddress string, r *raft.Raft, tm *tr
108108
109109//nolint:unparam
110110func createNode (t * testing.T , n int ) ([]Node , []string , []string ) {
111- var grpcAdders []string
112- var redisAdders []string
113- var nodes []Node
114-
115111 const (
116112 waitTimeout = 5 * time .Second
117113 waitInterval = 100 * time .Millisecond
118114 )
119115
120- cfg := raft.Configuration {}
121- ports := make ([]portsAdress , n )
116+ t .Helper ()
122117
123118 ctx := context .Background ()
124- var lc net.ListenConfig
125119
126- // port assign
120+ ports := assignPorts (n )
121+ cfg := buildRaftConfig (n , ports )
122+ nodes , grpcAdders , redisAdders := setupNodes (t , ctx , n , ports , cfg )
123+
124+ waitForNodeListeners (t , ctx , nodes , waitTimeout , waitInterval )
125+ waitForRaftReadiness (t , nodes , waitTimeout , waitInterval )
126+
127+ return nodes , grpcAdders , redisAdders
128+ }
129+
130+ func waitForNodeListeners (t * testing.T , ctx context.Context , nodes []Node , waitTimeout , waitInterval time.Duration ) {
131+ t .Helper ()
132+ d := & net.Dialer {Timeout : time .Second }
133+ for _ , n := range nodes {
134+ assert .Eventually (t , func () bool {
135+ conn , err := d .DialContext (ctx , "tcp" , n .grpcAddress )
136+ if err != nil {
137+ return false
138+ }
139+ _ = conn .Close ()
140+ conn , err = d .DialContext (ctx , "tcp" , n .redisAddress )
141+ if err != nil {
142+ return false
143+ }
144+ _ = conn .Close ()
145+ return true
146+ }, waitTimeout , waitInterval )
147+ }
148+ }
149+
150+ func waitForRaftReadiness (t * testing.T , nodes []Node , waitTimeout , waitInterval time.Duration ) {
151+ t .Helper ()
152+ assert .Eventually (t , func () bool {
153+ return nodes [0 ].raft .State () == raft .Leader
154+ }, waitTimeout , waitInterval )
155+
156+ expectedLeader := raft .ServerAddress (nodes [0 ].raftAddress )
157+ assert .Eventually (t , func () bool {
158+ for i , n := range nodes {
159+ state := n .raft .State ()
160+ if i == 0 {
161+ if state != raft .Leader {
162+ return false
163+ }
164+ } else if state != raft .Follower {
165+ return false
166+ }
167+
168+ addr , _ := n .raft .LeaderWithID ()
169+ if addr != expectedLeader {
170+ return false
171+ }
172+ }
173+ return true
174+ }, waitTimeout , waitInterval )
175+ }
176+
177+ func assignPorts (n int ) []portsAdress {
178+ ports := make ([]portsAdress , n )
127179 for i := 0 ; i < n ; i ++ {
128180 ports [i ] = portAssigner ()
129181 }
182+ return ports
183+ }
130184
131- // build raft node config
185+ func buildRaftConfig (n int , ports []portsAdress ) raft.Configuration {
186+ cfg := raft.Configuration {}
132187 for i := 0 ; i < n ; i ++ {
133- var suffrage raft.ServerSuffrage
188+ suffrage := raft .Nonvoter
134189 if i == 0 {
135190 suffrage = raft .Voter
136- } else {
137- suffrage = raft .Nonvoter
138191 }
139192
140- server := raft.Server {
193+ cfg . Servers = append ( cfg . Servers , raft.Server {
141194 Suffrage : suffrage ,
142195 ID : raft .ServerID (strconv .Itoa (i )),
143196 Address : raft .ServerAddress (ports [i ].raftAddress ),
144- }
145- cfg .Servers = append (cfg .Servers , server )
197+ })
146198 }
147199
200+ return cfg
201+ }
202+
203+ func setupNodes (t * testing.T , ctx context.Context , n int , ports []portsAdress , cfg raft.Configuration ) ([]Node , []string , []string ) {
204+ t .Helper ()
205+ var grpcAdders []string
206+ var redisAdders []string
207+ var nodes []Node
208+ var lc net.ListenConfig
209+
148210 for i := 0 ; i < n ; i ++ {
149211 st := store .NewRbMemoryStore ()
150212 trxSt := store .NewMemoryStoreDefaultTTL ()
@@ -172,16 +234,16 @@ func createNode(t *testing.T, n int) ([]Node, []string, []string) {
172234
173235 grpcAdders = append (grpcAdders , port .grpcAddress )
174236 redisAdders = append (redisAdders , port .redisAddress )
175- go func () {
176- assert .NoError (t , s .Serve (grpcSock ))
177- }()
237+ go func (srv * grpc. Server , lis net. Listener ) {
238+ assert .NoError (t , srv .Serve (lis ))
239+ }(s , grpcSock )
178240
179241 l , err := lc .Listen (ctx , "tcp" , port .redisAddress )
180242 assert .NoError (t , err )
181243 rd := NewRedisServer (l , st , coordinator )
182- go func () {
183- assert .NoError (t , rd .Run ())
184- }()
244+ go func (server * RedisServer ) {
245+ assert .NoError (t , server .Run ())
246+ }(rd )
185247
186248 nodes = append (nodes , newNode (
187249 port .grpcAddress ,
@@ -192,30 +254,8 @@ func createNode(t *testing.T, n int) ([]Node, []string, []string) {
192254 s ,
193255 rd ,
194256 ))
195-
196- }
197-
198- d := & net.Dialer {Timeout : time .Second }
199- for _ , n := range nodes {
200- assert .Eventually (t , func () bool {
201- conn , err := d .DialContext (ctx , "tcp" , n .grpcAddress )
202- if err != nil {
203- return false
204- }
205- _ = conn .Close ()
206- conn , err = d .DialContext (ctx , "tcp" , n .redisAddress )
207- if err != nil {
208- return false
209- }
210- _ = conn .Close ()
211- return true
212- }, waitTimeout , waitInterval )
213257 }
214258
215- assert .Eventually (t , func () bool {
216- return nodes [0 ].raft .State () == raft .Leader
217- }, waitTimeout , waitInterval )
218-
219259 return nodes , grpcAdders , redisAdders
220260}
221261
0 commit comments