Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions adapter/distribution_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package adapter

import (
"context"

"github.com/bootjp/elastickv/distribution"
pb "github.com/bootjp/elastickv/proto"
)

// DistributionServer serves distribution related gRPC APIs.
type DistributionServer struct {
engine *distribution.Engine
pb.UnimplementedDistributionServer
}

// NewDistributionServer creates a new server.
func NewDistributionServer(e *distribution.Engine) *DistributionServer {
return &DistributionServer{engine: e}
}

// UpdateRoute allows updating route information.
func (s *DistributionServer) UpdateRoute(start, end []byte, group uint64) {
s.engine.UpdateRoute(start, end, group)
}

// GetRoute returns route for a key.
func (s *DistributionServer) GetRoute(ctx context.Context, req *pb.GetRouteRequest) (*pb.GetRouteResponse, error) {
r, ok := s.engine.GetRoute(req.Key)
if !ok {
return &pb.GetRouteResponse{}, nil
}
return &pb.GetRouteResponse{
Start: r.Start,
End: r.End,
RaftGroupId: r.GroupID,
}, nil
}

// GetTimestamp returns monotonically increasing timestamp.
func (s *DistributionServer) GetTimestamp(ctx context.Context, req *pb.GetTimestampRequest) (*pb.GetTimestampResponse, error) {
ts := s.engine.NextTimestamp()
return &pb.GetTimestampResponse{Timestamp: ts}, nil
}
71 changes: 71 additions & 0 deletions distribution/engine.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package distribution

import (
"bytes"
"sort"
"sync"
"sync/atomic"
)

// Route represents a mapping from a key range to a raft group.
// Ranges are right half-open intervals: [Start, End). Start is inclusive and
// End is exclusive. A nil End denotes an unbounded interval extending to
// positive infinity.
type Route struct {
// Start marks the inclusive beginning of the range.
Start []byte
// End marks the exclusive end of the range. nil means unbounded.
End []byte
// GroupID identifies the raft group for the range starting at Start.
GroupID uint64
}

// Engine holds in-memory metadata of routes and provides timestamp generation.
type Engine struct {
mu sync.RWMutex
routes []Route
ts uint64
}

// NewEngine creates an Engine.
func NewEngine() *Engine {
return &Engine{routes: make([]Route, 0)}
}

// UpdateRoute registers or updates a route for the given key range.
// Routes are stored sorted by Start.
func (e *Engine) UpdateRoute(start, end []byte, group uint64) {
e.mu.Lock()
defer e.mu.Unlock()
e.routes = append(e.routes, Route{Start: start, End: end, GroupID: group})
sort.Slice(e.routes, func(i, j int) bool {
return bytes.Compare(e.routes[i].Start, e.routes[j].Start) < 0
})
}

// GetRoute finds a route for the given key using right half-open intervals.
func (e *Engine) GetRoute(key []byte) (Route, bool) {
e.mu.RLock()
defer e.mu.RUnlock()
if len(e.routes) == 0 {
return Route{}, false
}

// Find the first route with Start > key.
i := sort.Search(len(e.routes), func(i int) bool {
return bytes.Compare(e.routes[i].Start, key) > 0
})
if i == 0 {
return Route{}, false
}
r := e.routes[i-1]
if r.End != nil && bytes.Compare(key, r.End) >= 0 {
return Route{}, false
}
return r, true
}

// NextTimestamp returns a monotonic increasing timestamp.
func (e *Engine) NextTimestamp() uint64 {
return atomic.AddUint64(&e.ts, 1)
}
52 changes: 52 additions & 0 deletions distribution/engine_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package distribution

import "testing"

func TestEngineRouteLookup(t *testing.T) {
e := NewEngine()
e.UpdateRoute([]byte("a"), []byte("m"), 1)
e.UpdateRoute([]byte("m"), nil, 2)

cases := []struct {
key []byte
group uint64
expect bool
}{
{[]byte("0"), 0, false}, // before first route
{[]byte("a"), 1, true}, // start is inclusive
{[]byte("b"), 1, true},
{[]byte("m"), 2, true}, // end is exclusive for first route
{[]byte("x"), 2, true},
{[]byte("za"), 2, true}, // last route is unbounded
}

for _, c := range cases {
r, ok := e.GetRoute(c.key)
if ok != c.expect {
t.Fatalf("key %q expected ok=%v, got %v", c.key, c.expect, ok)
}
if ok && r.GroupID != c.group {
t.Fatalf("key %q expected group %d, got %d", c.key, c.group, r.GroupID)
}
}
}

func TestEngineRouteUnmatchedAfterEnd(t *testing.T) {
e := NewEngine()
e.UpdateRoute([]byte("a"), []byte("m"), 1)
if _, ok := e.GetRoute([]byte("x")); ok {
t.Fatalf("expected no route for key beyond end")
}
}

func TestEngineTimestampMonotonic(t *testing.T) {
e := NewEngine()
last := e.NextTimestamp()
for i := 0; i < 100; i++ {
ts := e.NextTimestamp()
if ts <= last {
t.Fatalf("timestamp not monotonic: %d <= %d", ts, last)
}
last = ts
}
}
16 changes: 9 additions & 7 deletions proto/Makefile
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
all: gen

gen:
protoc --go_out=. --go_opt=paths=source_relative \
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
service.proto
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
service.proto
protoc --go_out=. --go_opt=paths=source_relative \
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
internal.proto



--go-grpc_out=. --go-grpc_opt=paths=source_relative \
internal.proto
protoc --go_out=. --go_opt=paths=source_relative \
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
distribution.proto
Loading
Loading