Skip to content

Commit 5787514

Browse files
authored
Merge pull request #224 from bootjp/codex/investigate-issue-#78-in-elastickv
Add sharded transaction manager for multi-Raft support
2 parents f9affe0 + b6ff190 commit 5787514

File tree

2 files changed

+373
-0
lines changed

2 files changed

+373
-0
lines changed

kv/shard_router.go

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
package kv
2+
3+
import (
4+
"context"
5+
"sync"
6+
7+
"github.com/bootjp/elastickv/distribution"
8+
pb "github.com/bootjp/elastickv/proto"
9+
"github.com/bootjp/elastickv/store"
10+
"github.com/cockroachdb/errors"
11+
)
12+
13+
// ShardRouter routes requests to multiple raft groups based on key ranges.
14+
// It does not provide transactional guarantees across shards; commits are executed
15+
// per shard and failures may leave partial results.
16+
type ShardRouter struct {
17+
engine *distribution.Engine
18+
mu sync.RWMutex
19+
groups map[uint64]*routerGroup
20+
}
21+
22+
type routerGroup struct {
23+
tm Transactional
24+
store store.Store
25+
}
26+
27+
// NewShardRouter creates a new router.
28+
func NewShardRouter(e *distribution.Engine) *ShardRouter {
29+
return &ShardRouter{
30+
engine: e,
31+
groups: make(map[uint64]*routerGroup),
32+
}
33+
}
34+
35+
// Register associates a raft group ID with its transactional manager and store.
36+
func (s *ShardRouter) Register(group uint64, tm Transactional, st store.Store) {
37+
s.mu.Lock()
38+
defer s.mu.Unlock()
39+
s.groups[group] = &routerGroup{tm: tm, store: st}
40+
}
41+
42+
func (s *ShardRouter) Commit(reqs []*pb.Request) (*TransactionResponse, error) {
43+
return s.process(reqs, func(g *routerGroup, rs []*pb.Request) (*TransactionResponse, error) {
44+
return g.tm.Commit(rs)
45+
})
46+
}
47+
48+
// Abort dispatches aborts to the correct raft group.
49+
func (s *ShardRouter) Abort(reqs []*pb.Request) (*TransactionResponse, error) {
50+
return s.process(reqs, func(g *routerGroup, rs []*pb.Request) (*TransactionResponse, error) {
51+
return g.tm.Abort(rs)
52+
})
53+
}
54+
55+
func (s *ShardRouter) process(reqs []*pb.Request, fn func(*routerGroup, []*pb.Request) (*TransactionResponse, error)) (*TransactionResponse, error) {
56+
grouped, err := s.groupRequests(reqs)
57+
if err != nil {
58+
return nil, errors.WithStack(err)
59+
}
60+
61+
var max uint64
62+
for gid, rs := range grouped {
63+
g, ok := s.getGroup(gid)
64+
if !ok {
65+
return nil, errors.Wrapf(ErrInvalidRequest, "unknown group %d", gid)
66+
}
67+
r, err := fn(g, rs)
68+
if err != nil {
69+
return nil, errors.WithStack(err)
70+
}
71+
if r.CommitIndex > max {
72+
max = r.CommitIndex
73+
}
74+
}
75+
return &TransactionResponse{CommitIndex: max}, nil
76+
}
77+
78+
func (s *ShardRouter) getGroup(id uint64) (*routerGroup, bool) {
79+
s.mu.RLock()
80+
defer s.mu.RUnlock()
81+
g, ok := s.groups[id]
82+
return g, ok
83+
}
84+
85+
func (s *ShardRouter) groupRequests(reqs []*pb.Request) (map[uint64][]*pb.Request, error) {
86+
batches := make(map[uint64][]*pb.Request)
87+
for _, r := range reqs {
88+
if len(r.Mutations) == 0 {
89+
return nil, ErrInvalidRequest
90+
}
91+
key := r.Mutations[0].Key
92+
route, ok := s.engine.GetRoute(key)
93+
if !ok {
94+
return nil, errors.Wrapf(ErrInvalidRequest, "no route for key %q", key)
95+
}
96+
batches[route.GroupID] = append(batches[route.GroupID], r)
97+
}
98+
return batches, nil
99+
}
100+
101+
// Get retrieves a key routed to the correct shard.
102+
func (s *ShardRouter) Get(ctx context.Context, key []byte) ([]byte, error) {
103+
route, ok := s.engine.GetRoute(key)
104+
if !ok {
105+
return nil, errors.Wrapf(ErrInvalidRequest, "no route for key %q", key)
106+
}
107+
g, ok := s.getGroup(route.GroupID)
108+
if !ok {
109+
return nil, errors.Wrapf(ErrInvalidRequest, "unknown group %d", route.GroupID)
110+
}
111+
v, err := g.store.Get(ctx, key)
112+
if err != nil {
113+
return nil, errors.WithStack(err)
114+
}
115+
return v, nil
116+
}
117+
118+
var _ Transactional = (*ShardRouter)(nil)

kv/shard_router_test.go

Lines changed: 255 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,255 @@
1+
package kv
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"testing"
7+
"time"
8+
9+
"github.com/bootjp/elastickv/distribution"
10+
pb "github.com/bootjp/elastickv/proto"
11+
"github.com/bootjp/elastickv/store"
12+
"github.com/hashicorp/raft"
13+
)
14+
15+
// helper to create a multi-node raft cluster and return the leader
16+
func newTestRaft(t *testing.T, id string, fsm raft.FSM) (*raft.Raft, func()) {
17+
t.Helper()
18+
19+
const n = 3
20+
addrs := make([]raft.ServerAddress, n)
21+
trans := make([]*raft.InmemTransport, n)
22+
for i := 0; i < n; i++ {
23+
addr, tr := raft.NewInmemTransport(raft.ServerAddress(fmt.Sprintf("%s-%d", id, i)))
24+
addrs[i] = addr
25+
trans[i] = tr
26+
}
27+
// fully connect transports
28+
for i := 0; i < n; i++ {
29+
for j := i + 1; j < n; j++ {
30+
trans[i].Connect(addrs[j], trans[j])
31+
trans[j].Connect(addrs[i], trans[i])
32+
}
33+
}
34+
35+
// cluster configuration
36+
cfg := raft.Configuration{}
37+
for i := 0; i < n; i++ {
38+
cfg.Servers = append(cfg.Servers, raft.Server{
39+
ID: raft.ServerID(fmt.Sprintf("%s-%d", id, i)),
40+
Address: addrs[i],
41+
})
42+
}
43+
44+
rafts := make([]*raft.Raft, n)
45+
for i := 0; i < n; i++ {
46+
c := raft.DefaultConfig()
47+
c.LocalID = cfg.Servers[i].ID
48+
if i == 0 {
49+
c.HeartbeatTimeout = 200 * time.Millisecond
50+
c.ElectionTimeout = 400 * time.Millisecond
51+
c.LeaderLeaseTimeout = 100 * time.Millisecond
52+
} else {
53+
c.HeartbeatTimeout = 1 * time.Second
54+
c.ElectionTimeout = 2 * time.Second
55+
c.LeaderLeaseTimeout = 500 * time.Millisecond
56+
}
57+
ldb := raft.NewInmemStore()
58+
sdb := raft.NewInmemStore()
59+
fss := raft.NewInmemSnapshotStore()
60+
var rfsm raft.FSM
61+
if i == 0 {
62+
rfsm = fsm
63+
} else {
64+
rfsm = NewKvFSM(store.NewRbMemoryStore(), store.NewRbMemoryStoreWithExpire(time.Minute))
65+
}
66+
r, err := raft.NewRaft(c, rfsm, ldb, sdb, fss, trans[i])
67+
if err != nil {
68+
t.Fatalf("new raft %d: %v", i, err)
69+
}
70+
if err := r.BootstrapCluster(cfg).Error(); err != nil {
71+
t.Fatalf("bootstrap %d: %v", i, err)
72+
}
73+
rafts[i] = r
74+
}
75+
76+
// node 0 should become leader
77+
for i := 0; i < 100; i++ {
78+
if rafts[0].State() == raft.Leader {
79+
break
80+
}
81+
time.Sleep(50 * time.Millisecond)
82+
}
83+
if rafts[0].State() != raft.Leader {
84+
t.Fatalf("node %s-0 is not leader", id)
85+
}
86+
87+
shutdown := func() {
88+
for _, r := range rafts {
89+
r.Shutdown()
90+
}
91+
}
92+
return rafts[0], shutdown
93+
}
94+
95+
func TestShardRouterCommit(t *testing.T) {
96+
ctx := context.Background()
97+
98+
e := distribution.NewEngine()
99+
e.UpdateRoute([]byte("a"), []byte("m"), 1)
100+
e.UpdateRoute([]byte("m"), nil, 2)
101+
102+
router := NewShardRouter(e)
103+
104+
// group 1
105+
s1 := store.NewRbMemoryStore()
106+
l1 := store.NewRbMemoryStoreWithExpire(time.Minute)
107+
r1, stop1 := newTestRaft(t, "1", NewKvFSM(s1, l1))
108+
defer stop1()
109+
router.Register(1, NewTransaction(r1), s1)
110+
111+
// group 2
112+
s2 := store.NewRbMemoryStore()
113+
l2 := store.NewRbMemoryStoreWithExpire(time.Minute)
114+
r2, stop2 := newTestRaft(t, "2", NewKvFSM(s2, l2))
115+
defer stop2()
116+
router.Register(2, NewTransaction(r2), s2)
117+
118+
reqs := []*pb.Request{
119+
{IsTxn: false, Phase: pb.Phase_NONE, Mutations: []*pb.Mutation{{Op: pb.Op_PUT, Key: []byte("b"), Value: []byte("v1")}}},
120+
{IsTxn: false, Phase: pb.Phase_NONE, Mutations: []*pb.Mutation{{Op: pb.Op_PUT, Key: []byte("x"), Value: []byte("v2")}}},
121+
}
122+
123+
if _, err := router.Commit(reqs); err != nil {
124+
t.Fatalf("commit: %v", err)
125+
}
126+
127+
v, err := router.Get(ctx, []byte("b"))
128+
if err != nil || string(v) != "v1" {
129+
t.Fatalf("group1 get: %v %v", v, err)
130+
}
131+
v, err = router.Get(ctx, []byte("x"))
132+
if err != nil || string(v) != "v2" {
133+
t.Fatalf("group2 get: %v %v", v, err)
134+
}
135+
}
136+
137+
func TestShardRouterSplitAndMerge(t *testing.T) {
138+
ctx := context.Background()
139+
140+
e := distribution.NewEngine()
141+
// start with single shard handled by group 1
142+
e.UpdateRoute([]byte("a"), nil, 1)
143+
144+
router := NewShardRouter(e)
145+
146+
// group 1
147+
s1 := store.NewRbMemoryStore()
148+
l1 := store.NewRbMemoryStoreWithExpire(time.Minute)
149+
r1, stop1 := newTestRaft(t, "1", NewKvFSM(s1, l1))
150+
defer stop1()
151+
router.Register(1, NewTransaction(r1), s1)
152+
153+
// group 2 (will be used after split)
154+
s2 := store.NewRbMemoryStore()
155+
l2 := store.NewRbMemoryStoreWithExpire(time.Minute)
156+
r2, stop2 := newTestRaft(t, "2", NewKvFSM(s2, l2))
157+
defer stop2()
158+
router.Register(2, NewTransaction(r2), s2)
159+
160+
// initial write routed to group 1
161+
req := []*pb.Request{
162+
{IsTxn: false, Phase: pb.Phase_NONE, Mutations: []*pb.Mutation{{Op: pb.Op_PUT, Key: []byte("b"), Value: []byte("v1")}}},
163+
}
164+
if _, err := router.Commit(req); err != nil {
165+
t.Fatalf("commit group1: %v", err)
166+
}
167+
v, err := router.Get(ctx, []byte("b"))
168+
if err != nil || string(v) != "v1" {
169+
t.Fatalf("group1 value before split: %v %v", v, err)
170+
}
171+
172+
// split shard: group1 handles [a,m), group2 handles [m,∞)
173+
e2 := distribution.NewEngine()
174+
e2.UpdateRoute([]byte("a"), []byte("m"), 1)
175+
e2.UpdateRoute([]byte("m"), nil, 2)
176+
router.engine = e2
177+
178+
// write routed to group 2 after split
179+
req = []*pb.Request{
180+
{IsTxn: false, Phase: pb.Phase_NONE, Mutations: []*pb.Mutation{{Op: pb.Op_PUT, Key: []byte("x"), Value: []byte("v2")}}},
181+
}
182+
if _, err := router.Commit(req); err != nil {
183+
t.Fatalf("commit group2: %v", err)
184+
}
185+
v, err = router.Get(ctx, []byte("x"))
186+
if err != nil || string(v) != "v2" {
187+
t.Fatalf("group2 value after split: %v %v", v, err)
188+
}
189+
190+
// merge shards back: all keys handled by group1
191+
e3 := distribution.NewEngine()
192+
e3.UpdateRoute([]byte("a"), nil, 1)
193+
router.engine = e3
194+
195+
// write routed to group1 after merge
196+
req = []*pb.Request{
197+
{IsTxn: false, Phase: pb.Phase_NONE, Mutations: []*pb.Mutation{{Op: pb.Op_PUT, Key: []byte("z"), Value: []byte("v3")}}},
198+
}
199+
if _, err := router.Commit(req); err != nil {
200+
t.Fatalf("commit after merge: %v", err)
201+
}
202+
v, err = router.Get(ctx, []byte("z"))
203+
if err != nil || string(v) != "v3" {
204+
t.Fatalf("group1 value after merge: %v %v", v, err)
205+
}
206+
}
207+
208+
type fakeTM struct {
209+
commitErr bool
210+
commitCalls int
211+
abortCalls int
212+
}
213+
214+
func (f *fakeTM) Commit(reqs []*pb.Request) (*TransactionResponse, error) {
215+
f.commitCalls++
216+
if f.commitErr {
217+
return nil, fmt.Errorf("commit fail")
218+
}
219+
return &TransactionResponse{}, nil
220+
}
221+
222+
func (f *fakeTM) Abort(reqs []*pb.Request) (*TransactionResponse, error) {
223+
f.abortCalls++
224+
return &TransactionResponse{}, nil
225+
}
226+
227+
func TestShardRouterCommitFailure(t *testing.T) {
228+
e := distribution.NewEngine()
229+
e.UpdateRoute([]byte("a"), []byte("m"), 1)
230+
e.UpdateRoute([]byte("m"), nil, 2)
231+
232+
router := NewShardRouter(e)
233+
234+
ok := &fakeTM{}
235+
fail := &fakeTM{commitErr: true}
236+
router.Register(1, ok, nil)
237+
router.Register(2, fail, nil)
238+
239+
reqs := []*pb.Request{
240+
{IsTxn: false, Phase: pb.Phase_NONE, Mutations: []*pb.Mutation{{Op: pb.Op_PUT, Key: []byte("b"), Value: []byte("v1")}}},
241+
{IsTxn: false, Phase: pb.Phase_NONE, Mutations: []*pb.Mutation{{Op: pb.Op_PUT, Key: []byte("x"), Value: []byte("v2")}}},
242+
}
243+
244+
if _, err := router.Commit(reqs); err == nil {
245+
t.Fatalf("expected error")
246+
}
247+
248+
if fail.commitCalls == 0 || ok.commitCalls == 0 {
249+
t.Fatalf("expected commits on both groups")
250+
}
251+
252+
if ok.abortCalls != 0 {
253+
t.Fatalf("unexpected abort on successful group")
254+
}
255+
}

0 commit comments

Comments
 (0)