Skip to content

Commit dd7ba83

Browse files
authored
Merge pull request #211 from bootjp/codex/implement-distribution-engine-with-grpc-service-jkek27
Add distribution engine with gRPC routing and timestamp service
2 parents ae05d76 + 84a458c commit dd7ba83

File tree

7 files changed

+698
-7
lines changed

7 files changed

+698
-7
lines changed

adapter/distribution_server.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package adapter
2+
3+
import (
4+
"context"
5+
6+
"github.com/bootjp/elastickv/distribution"
7+
pb "github.com/bootjp/elastickv/proto"
8+
)
9+
10+
// DistributionServer serves distribution related gRPC APIs.
11+
type DistributionServer struct {
12+
engine *distribution.Engine
13+
pb.UnimplementedDistributionServer
14+
}
15+
16+
// NewDistributionServer creates a new server.
17+
func NewDistributionServer(e *distribution.Engine) *DistributionServer {
18+
return &DistributionServer{engine: e}
19+
}
20+
21+
// UpdateRoute allows updating route information.
22+
func (s *DistributionServer) UpdateRoute(start, end []byte, group uint64) {
23+
s.engine.UpdateRoute(start, end, group)
24+
}
25+
26+
// GetRoute returns route for a key.
27+
func (s *DistributionServer) GetRoute(ctx context.Context, req *pb.GetRouteRequest) (*pb.GetRouteResponse, error) {
28+
r, ok := s.engine.GetRoute(req.Key)
29+
if !ok {
30+
return &pb.GetRouteResponse{}, nil
31+
}
32+
return &pb.GetRouteResponse{
33+
Start: r.Start,
34+
End: r.End,
35+
RaftGroupId: r.GroupID,
36+
}, nil
37+
}
38+
39+
// GetTimestamp returns monotonically increasing timestamp.
40+
func (s *DistributionServer) GetTimestamp(ctx context.Context, req *pb.GetTimestampRequest) (*pb.GetTimestampResponse, error) {
41+
ts := s.engine.NextTimestamp()
42+
return &pb.GetTimestampResponse{Timestamp: ts}, nil
43+
}

distribution/engine.go

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package distribution
2+
3+
import (
4+
"bytes"
5+
"sort"
6+
"sync"
7+
"sync/atomic"
8+
)
9+
10+
// Route represents a mapping from a key range to a raft group.
11+
// Ranges are right half-open intervals: [Start, End). Start is inclusive and
12+
// End is exclusive. A nil End denotes an unbounded interval extending to
13+
// positive infinity.
14+
type Route struct {
15+
// Start marks the inclusive beginning of the range.
16+
Start []byte
17+
// End marks the exclusive end of the range. nil means unbounded.
18+
End []byte
19+
// GroupID identifies the raft group for the range starting at Start.
20+
GroupID uint64
21+
}
22+
23+
// Engine holds in-memory metadata of routes and provides timestamp generation.
24+
type Engine struct {
25+
mu sync.RWMutex
26+
routes []Route
27+
ts uint64
28+
}
29+
30+
// NewEngine creates an Engine.
31+
func NewEngine() *Engine {
32+
return &Engine{routes: make([]Route, 0)}
33+
}
34+
35+
// UpdateRoute registers or updates a route for the given key range.
36+
// Routes are stored sorted by Start.
37+
func (e *Engine) UpdateRoute(start, end []byte, group uint64) {
38+
e.mu.Lock()
39+
defer e.mu.Unlock()
40+
e.routes = append(e.routes, Route{Start: start, End: end, GroupID: group})
41+
sort.Slice(e.routes, func(i, j int) bool {
42+
return bytes.Compare(e.routes[i].Start, e.routes[j].Start) < 0
43+
})
44+
}
45+
46+
// GetRoute finds a route for the given key using right half-open intervals.
47+
func (e *Engine) GetRoute(key []byte) (Route, bool) {
48+
e.mu.RLock()
49+
defer e.mu.RUnlock()
50+
if len(e.routes) == 0 {
51+
return Route{}, false
52+
}
53+
54+
// Find the first route with Start > key.
55+
i := sort.Search(len(e.routes), func(i int) bool {
56+
return bytes.Compare(e.routes[i].Start, key) > 0
57+
})
58+
if i == 0 {
59+
return Route{}, false
60+
}
61+
r := e.routes[i-1]
62+
if r.End != nil && bytes.Compare(key, r.End) >= 0 {
63+
return Route{}, false
64+
}
65+
return r, true
66+
}
67+
68+
// NextTimestamp returns a monotonic increasing timestamp.
69+
func (e *Engine) NextTimestamp() uint64 {
70+
return atomic.AddUint64(&e.ts, 1)
71+
}

distribution/engine_test.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package distribution
2+
3+
import "testing"
4+
5+
func TestEngineRouteLookup(t *testing.T) {
6+
e := NewEngine()
7+
e.UpdateRoute([]byte("a"), []byte("m"), 1)
8+
e.UpdateRoute([]byte("m"), nil, 2)
9+
10+
cases := []struct {
11+
key []byte
12+
group uint64
13+
expect bool
14+
}{
15+
{[]byte("0"), 0, false}, // before first route
16+
{[]byte("a"), 1, true}, // start is inclusive
17+
{[]byte("b"), 1, true},
18+
{[]byte("m"), 2, true}, // end is exclusive for first route
19+
{[]byte("x"), 2, true},
20+
{[]byte("za"), 2, true}, // last route is unbounded
21+
}
22+
23+
for _, c := range cases {
24+
r, ok := e.GetRoute(c.key)
25+
if ok != c.expect {
26+
t.Fatalf("key %q expected ok=%v, got %v", c.key, c.expect, ok)
27+
}
28+
if ok && r.GroupID != c.group {
29+
t.Fatalf("key %q expected group %d, got %d", c.key, c.group, r.GroupID)
30+
}
31+
}
32+
}
33+
34+
func TestEngineRouteUnmatchedAfterEnd(t *testing.T) {
35+
e := NewEngine()
36+
e.UpdateRoute([]byte("a"), []byte("m"), 1)
37+
if _, ok := e.GetRoute([]byte("x")); ok {
38+
t.Fatalf("expected no route for key beyond end")
39+
}
40+
}
41+
42+
func TestEngineTimestampMonotonic(t *testing.T) {
43+
e := NewEngine()
44+
last := e.NextTimestamp()
45+
for i := 0; i < 100; i++ {
46+
ts := e.NextTimestamp()
47+
if ts <= last {
48+
t.Fatalf("timestamp not monotonic: %d <= %d", ts, last)
49+
}
50+
last = ts
51+
}
52+
}

proto/Makefile

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
1+
all: gen
2+
13
gen:
24
protoc --go_out=. --go_opt=paths=source_relative \
3-
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
4-
service.proto
5+
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
6+
service.proto
57
protoc --go_out=. --go_opt=paths=source_relative \
6-
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
7-
internal.proto
8-
9-
10-
8+
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
9+
internal.proto
10+
protoc --go_out=. --go_opt=paths=source_relative \
11+
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
12+
distribution.proto

0 commit comments

Comments
 (0)