Skip to content

Commit fbb6464

Browse files
committed
Allow unbounded distribution routes
1 parent 427619c commit fbb6464

File tree

8 files changed

+629
-7
lines changed

8 files changed

+629
-7
lines changed

adapter/distribution_server.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package adapter
2+
3+
import (
4+
"context"
5+
6+
"github.com/bootjp/elastickv/distribution"
7+
pb "github.com/bootjp/elastickv/proto"
8+
)
9+
10+
// DistributionServer serves distribution related gRPC APIs.
11+
type DistributionServer struct {
12+
engine *distribution.Engine
13+
pb.UnimplementedDistributionServer
14+
}
15+
16+
// NewDistributionServer creates a new server.
17+
func NewDistributionServer(e *distribution.Engine) *DistributionServer {
18+
return &DistributionServer{engine: e}
19+
}
20+
21+
// UpdateRoute allows updating route information.
22+
func (s *DistributionServer) UpdateRoute(start, end []byte, group uint64) {
23+
s.engine.UpdateRoute(start, end, group)
24+
}
25+
26+
// GetRoute returns route for a key.
27+
func (s *DistributionServer) GetRoute(ctx context.Context, req *pb.GetRouteRequest) (*pb.GetRouteResponse, error) {
28+
r, ok := s.engine.GetRoute(req.Key)
29+
if !ok {
30+
return &pb.GetRouteResponse{}, nil
31+
}
32+
return &pb.GetRouteResponse{
33+
Start: r.Start,
34+
End: r.End,
35+
RaftGroupId: r.GroupID,
36+
}, nil
37+
}
38+
39+
// GetTimestamp returns monotonically increasing timestamp.
40+
func (s *DistributionServer) GetTimestamp(ctx context.Context, req *pb.GetTimestampRequest) (*pb.GetTimestampResponse, error) {
41+
ts := s.engine.NextTimestamp()
42+
return &pb.GetTimestampResponse{Timestamp: ts}, nil
43+
}

cmd/server/demo.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
transport "github.com/Jille/raft-grpc-transport"
1212
"github.com/Jille/raftadmin"
1313
"github.com/bootjp/elastickv/adapter"
14+
"github.com/bootjp/elastickv/distribution"
1415
"github.com/bootjp/elastickv/kv"
1516
pb "github.com/bootjp/elastickv/proto"
1617
"github.com/bootjp/elastickv/store"
@@ -89,10 +90,15 @@ func run(eg *errgroup.Group) error {
8990
trx := kv.NewTransaction(r)
9091
coordinator := kv.NewCoordinator(trx, r)
9192
gs := adapter.NewGRPCServer(st, coordinator)
93+
distEngine := distribution.NewEngine()
94+
distSrv := adapter.NewDistributionServer(distEngine)
95+
// example route for demo purposes
96+
distSrv.UpdateRoute([]byte("a"), []byte("z"), uint64(i))
9297
tm.Register(s)
9398
pb.RegisterRawKVServer(s, gs)
9499
pb.RegisterTransactionalKVServer(s, gs)
95100
pb.RegisterInternalServer(s, adapter.NewInternal(trx, r))
101+
pb.RegisterDistributionServer(s, distSrv)
96102
leaderhealth.Setup(r, s, []string{"RawKV"})
97103
raftadmin.Register(s, r)
98104

distribution/engine.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package distribution
2+
3+
import (
4+
"bytes"
5+
"sync"
6+
"sync/atomic"
7+
)
8+
9+
// Route represents mapping from key range to raft group.
10+
// The key range is half-open: [Start, End)
11+
// meaning Start is inclusive and End is exclusive. If End is nil or empty
12+
// the range is unbounded and covers all keys >= Start.
13+
type Route struct {
14+
// Start marks the inclusive beginning of the range.
15+
Start []byte
16+
// End marks the exclusive end of the range. When empty, the range has
17+
// no upper bound.
18+
End []byte
19+
GroupID uint64
20+
}
21+
22+
// Engine holds in-memory metadata of routes and provides timestamp generation.
23+
type Engine struct {
24+
mu sync.RWMutex
25+
routes []Route
26+
ts uint64
27+
}
28+
29+
// NewEngine creates an Engine.
30+
func NewEngine() *Engine {
31+
return &Engine{routes: make([]Route, 0)}
32+
}
33+
34+
// UpdateRoute registers or updates a route. The range is [start, end). If end
35+
// is nil or empty the range is treated as unbounded above.
36+
func (e *Engine) UpdateRoute(start, end []byte, group uint64) {
37+
e.mu.Lock()
38+
defer e.mu.Unlock()
39+
e.routes = append(e.routes, Route{Start: start, End: end, GroupID: group})
40+
}
41+
42+
// GetRoute finds a route for the given key. The search uses half-open ranges
43+
// [Start, End). If End is empty the range is treated as unbounded above.
44+
func (e *Engine) GetRoute(key []byte) (Route, bool) {
45+
e.mu.RLock()
46+
defer e.mu.RUnlock()
47+
for _, r := range e.routes {
48+
if bytes.Compare(key, r.Start) >= 0 {
49+
if len(r.End) == 0 || bytes.Compare(key, r.End) < 0 {
50+
return r, true
51+
}
52+
}
53+
}
54+
return Route{}, false
55+
}
56+
57+
// NextTimestamp returns a monotonic increasing timestamp.
58+
func (e *Engine) NextTimestamp() uint64 {
59+
return atomic.AddUint64(&e.ts, 1)
60+
}

distribution/engine_test.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package distribution
2+
3+
import "testing"
4+
5+
func TestEngineRouteLookup(t *testing.T) {
6+
e := NewEngine()
7+
e.UpdateRoute([]byte("a"), []byte("m"), 1)
8+
// second range has no upper bound
9+
e.UpdateRoute([]byte("m"), nil, 2)
10+
11+
cases := []struct {
12+
key []byte
13+
group uint64
14+
expect bool
15+
}{
16+
{[]byte("a"), 1, true}, // start is inclusive
17+
{[]byte("b"), 1, true},
18+
{[]byte("m"), 2, true}, // end is exclusive, so m belongs to second range
19+
{[]byte("x"), 2, true},
20+
{[]byte("za"), 2, true}, // unbounded range covers keys beyond z
21+
}
22+
23+
for _, c := range cases {
24+
r, ok := e.GetRoute(c.key)
25+
if ok != c.expect {
26+
t.Fatalf("key %q expected ok=%v, got %v", c.key, c.expect, ok)
27+
}
28+
if ok && r.GroupID != c.group {
29+
t.Fatalf("key %q expected group %d, got %d", c.key, c.group, r.GroupID)
30+
}
31+
}
32+
}
33+
34+
func TestEngineTimestampMonotonic(t *testing.T) {
35+
e := NewEngine()
36+
last := e.NextTimestamp()
37+
for i := 0; i < 100; i++ {
38+
ts := e.NextTimestamp()
39+
if ts <= last {
40+
t.Fatalf("timestamp not monotonic: %d <= %d", ts, last)
41+
}
42+
last = ts
43+
}
44+
}

proto/Makefile

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
1+
all: gen
2+
13
gen:
24
protoc --go_out=. --go_opt=paths=source_relative \
3-
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
4-
service.proto
5+
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
6+
service.proto
57
protoc --go_out=. --go_opt=paths=source_relative \
6-
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
7-
internal.proto
8-
9-
10-
8+
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
9+
internal.proto
10+
protoc --go_out=. --go_opt=paths=source_relative \
11+
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
12+
distribution.proto

0 commit comments

Comments
 (0)