@@ -2,6 +2,7 @@ package kv
22
33import (
44 "context"
5+ "fmt"
56 "testing"
67 "time"
78
@@ -11,35 +12,84 @@ import (
1112 "github.com/hashicorp/raft"
1213)
1314
14- // helper to create single -node raft
15- func newTestRaft (t * testing.T , id string , fsm raft.FSM ) * raft.Raft {
15+ // helper to create a multi -node raft cluster and return the leader
16+ func newTestRaft (t * testing.T , id string , fsm raft.FSM ) ( * raft.Raft , func ()) {
1617 t .Helper ()
17- c := raft .DefaultConfig ()
18- c .LocalID = raft .ServerID (id )
19- ldb := raft .NewInmemStore ()
20- sdb := raft .NewInmemStore ()
21- fss := raft .NewInmemSnapshotStore ()
22- addr , trans := raft .NewInmemTransport (raft .ServerAddress (id ))
23- r , err := raft .NewRaft (c , fsm , ldb , sdb , fss , trans )
24- if err != nil {
25- t .Fatalf ("new raft: %v" , err )
18+
19+ const n = 3
20+ addrs := make ([]raft.ServerAddress , n )
21+ trans := make ([]* raft.InmemTransport , n )
22+ for i := 0 ; i < n ; i ++ {
23+ addr , tr := raft .NewInmemTransport (raft .ServerAddress (fmt .Sprintf ("%s-%d" , id , i )))
24+ addrs [i ] = addr
25+ trans [i ] = tr
26+ }
27+ // fully connect transports
28+ for i := 0 ; i < n ; i ++ {
29+ for j := i + 1 ; j < n ; j ++ {
30+ trans [i ].Connect (addrs [j ], trans [j ])
31+ trans [j ].Connect (addrs [i ], trans [i ])
32+ }
2633 }
27- cfg := raft.Configuration {Servers : []raft.Server {{ID : raft .ServerID (id ), Address : addr }}}
28- if err := r .BootstrapCluster (cfg ).Error (); err != nil {
29- t .Fatalf ("bootstrap: %v" , err )
34+
35+ // cluster configuration
36+ cfg := raft.Configuration {}
37+ for i := 0 ; i < n ; i ++ {
38+ cfg .Servers = append (cfg .Servers , raft.Server {
39+ ID : raft .ServerID (fmt .Sprintf ("%s-%d" , id , i )),
40+ Address : addrs [i ],
41+ })
42+ }
43+
44+ rafts := make ([]* raft.Raft , n )
45+ for i := 0 ; i < n ; i ++ {
46+ c := raft .DefaultConfig ()
47+ c .LocalID = cfg .Servers [i ].ID
48+ if i == 0 {
49+ c .HeartbeatTimeout = 200 * time .Millisecond
50+ c .ElectionTimeout = 400 * time .Millisecond
51+ c .LeaderLeaseTimeout = 100 * time .Millisecond
52+ } else {
53+ c .HeartbeatTimeout = 1 * time .Second
54+ c .ElectionTimeout = 2 * time .Second
55+ c .LeaderLeaseTimeout = 500 * time .Millisecond
56+ }
57+ ldb := raft .NewInmemStore ()
58+ sdb := raft .NewInmemStore ()
59+ fss := raft .NewInmemSnapshotStore ()
60+ var rfsm raft.FSM
61+ if i == 0 {
62+ rfsm = fsm
63+ } else {
64+ rfsm = NewKvFSM (store .NewRbMemoryStore (), store .NewRbMemoryStoreWithExpire (time .Minute ))
65+ }
66+ r , err := raft .NewRaft (c , rfsm , ldb , sdb , fss , trans [i ])
67+ if err != nil {
68+ t .Fatalf ("new raft %d: %v" , i , err )
69+ }
70+ if err := r .BootstrapCluster (cfg ).Error (); err != nil {
71+ t .Fatalf ("bootstrap %d: %v" , i , err )
72+ }
73+ rafts [i ] = r
3074 }
3175
32- // single node should eventually become leader
76+ // node 0 should become leader
3377 for i := 0 ; i < 100 ; i ++ {
34- if r .State () == raft .Leader {
78+ if rafts [ 0 ] .State () == raft .Leader {
3579 break
3680 }
3781 time .Sleep (50 * time .Millisecond )
3882 }
39- if r .State () != raft .Leader {
40- t .Fatalf ("node %s is not leader" , id )
83+ if rafts [0 ].State () != raft .Leader {
84+ t .Fatalf ("node %s-0 is not leader" , id )
85+ }
86+
87+ shutdown := func () {
88+ for _ , r := range rafts {
89+ r .Shutdown ()
90+ }
4191 }
42- return r
92+ return rafts [ 0 ], shutdown
4393}
4494
4595func TestShardedTransactionManagerCommit (t * testing.T ) {
@@ -52,15 +102,15 @@ func TestShardedTransactionManagerCommit(t *testing.T) {
52102 // group 1
53103 s1 := store .NewRbMemoryStore ()
54104 l1 := store .NewRbMemoryStoreWithExpire (time .Minute )
55- r1 := newTestRaft (t , "1" , NewKvFSM (s1 , l1 ))
56- defer r1 . Shutdown ()
105+ r1 , stop1 := newTestRaft (t , "1" , NewKvFSM (s1 , l1 ))
106+ defer stop1 ()
57107 stm .Register (1 , NewTransaction (r1 ))
58108
59109 // group 2
60110 s2 := store .NewRbMemoryStore ()
61111 l2 := store .NewRbMemoryStoreWithExpire (time .Minute )
62- r2 := newTestRaft (t , "2" , NewKvFSM (s2 , l2 ))
63- defer r2 . Shutdown ()
112+ r2 , stop2 := newTestRaft (t , "2" , NewKvFSM (s2 , l2 ))
113+ defer stop2 ()
64114 stm .Register (2 , NewTransaction (r2 ))
65115
66116 reqs := []* pb.Request {
@@ -95,15 +145,15 @@ func TestShardedTransactionManagerSplitAndMerge(t *testing.T) {
95145 // group 1
96146 s1 := store .NewRbMemoryStore ()
97147 l1 := store .NewRbMemoryStoreWithExpire (time .Minute )
98- r1 := newTestRaft (t , "1" , NewKvFSM (s1 , l1 ))
99- defer r1 . Shutdown ()
148+ r1 , stop1 := newTestRaft (t , "1" , NewKvFSM (s1 , l1 ))
149+ defer stop1 ()
100150 stm .Register (1 , NewTransaction (r1 ))
101151
102152 // group 2 (will be used after split)
103153 s2 := store .NewRbMemoryStore ()
104154 l2 := store .NewRbMemoryStoreWithExpire (time .Minute )
105- r2 := newTestRaft (t , "2" , NewKvFSM (s2 , l2 ))
106- defer r2 . Shutdown ()
155+ r2 , stop2 := newTestRaft (t , "2" , NewKvFSM (s2 , l2 ))
156+ defer stop2 ()
107157 stm .Register (2 , NewTransaction (r2 ))
108158
109159 // initial write routed to group 1
0 commit comments