Skip to content

Commit 84ce13f

Browse files
committed
test: add shard split and merge coverage
1 parent 6cfe5b0 commit 84ce13f

File tree

1 file changed

+71
-0
lines changed

1 file changed

+71
-0
lines changed

kv/sharded_transaction_test.go

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,3 +82,74 @@ func TestShardedTransactionManagerCommit(t *testing.T) {
8282
t.Fatalf("group2 value: %v %v", v, err)
8383
}
8484
}
85+
86+
func TestShardedTransactionManagerSplitAndMerge(t *testing.T) {
87+
ctx := context.Background()
88+
89+
e := distribution.NewEngine()
90+
// start with single shard handled by group 1
91+
e.UpdateRoute([]byte("a"), nil, 1)
92+
93+
stm := NewShardedTransactionManager(e)
94+
95+
// group 1
96+
s1 := store.NewRbMemoryStore()
97+
l1 := store.NewRbMemoryStoreWithExpire(time.Minute)
98+
r1 := newTestRaft(t, "1", NewKvFSM(s1, l1))
99+
defer r1.Shutdown()
100+
stm.Register(1, NewTransaction(r1))
101+
102+
// group 2 (will be used after split)
103+
s2 := store.NewRbMemoryStore()
104+
l2 := store.NewRbMemoryStoreWithExpire(time.Minute)
105+
r2 := newTestRaft(t, "2", NewKvFSM(s2, l2))
106+
defer r2.Shutdown()
107+
stm.Register(2, NewTransaction(r2))
108+
109+
// initial write routed to group 1
110+
req := []*pb.Request{
111+
{IsTxn: false, Phase: pb.Phase_NONE, Mutations: []*pb.Mutation{{Op: pb.Op_PUT, Key: []byte("b"), Value: []byte("v1")}}},
112+
}
113+
if _, err := stm.Commit(req); err != nil {
114+
t.Fatalf("commit group1: %v", err)
115+
}
116+
v, err := s1.Get(ctx, []byte("b"))
117+
if err != nil || string(v) != "v1" {
118+
t.Fatalf("group1 value before split: %v %v", v, err)
119+
}
120+
121+
// split shard: group1 handles [a,m), group2 handles [m,∞)
122+
e2 := distribution.NewEngine()
123+
e2.UpdateRoute([]byte("a"), []byte("m"), 1)
124+
e2.UpdateRoute([]byte("m"), nil, 2)
125+
stm.engine = e2
126+
127+
// write routed to group 2 after split
128+
req = []*pb.Request{
129+
{IsTxn: false, Phase: pb.Phase_NONE, Mutations: []*pb.Mutation{{Op: pb.Op_PUT, Key: []byte("x"), Value: []byte("v2")}}},
130+
}
131+
if _, err := stm.Commit(req); err != nil {
132+
t.Fatalf("commit group2: %v", err)
133+
}
134+
v, err = s2.Get(ctx, []byte("x"))
135+
if err != nil || string(v) != "v2" {
136+
t.Fatalf("group2 value after split: %v %v", v, err)
137+
}
138+
139+
// merge shards back: all keys handled by group1
140+
e3 := distribution.NewEngine()
141+
e3.UpdateRoute([]byte("a"), nil, 1)
142+
stm.engine = e3
143+
144+
// write routed to group1 after merge
145+
req = []*pb.Request{
146+
{IsTxn: false, Phase: pb.Phase_NONE, Mutations: []*pb.Mutation{{Op: pb.Op_PUT, Key: []byte("z"), Value: []byte("v3")}}},
147+
}
148+
if _, err := stm.Commit(req); err != nil {
149+
t.Fatalf("commit after merge: %v", err)
150+
}
151+
v, err = s1.Get(ctx, []byte("z"))
152+
if err != nil || string(v) != "v3" {
153+
t.Fatalf("group1 value after merge: %v %v", v, err)
154+
}
155+
}

0 commit comments

Comments
 (0)