Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions adapter/distribution_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package adapter

import (
"context"

"github.com/bootjp/elastickv/distribution"
pb "github.com/bootjp/elastickv/proto"
)

// DistributionServer serves distribution related gRPC APIs.
type DistributionServer struct {
engine *distribution.Engine
pb.UnimplementedDistributionServer
}

// NewDistributionServer creates a new server.
func NewDistributionServer(e *distribution.Engine) *DistributionServer {
return &DistributionServer{engine: e}
}

// UpdateRoute allows updating route information.
func (s *DistributionServer) UpdateRoute(start, end []byte, group uint64) {
s.engine.UpdateRoute(start, end, group)
}

// GetRoute returns route for a key.
func (s *DistributionServer) GetRoute(ctx context.Context, req *pb.GetRouteRequest) (*pb.GetRouteResponse, error) {
r, ok := s.engine.GetRoute(req.Key)
if !ok {
return &pb.GetRouteResponse{}, nil
}
return &pb.GetRouteResponse{
Start: r.Start,
End: r.End,
RaftGroupId: r.GroupID,
}, nil
}

// GetTimestamp returns monotonically increasing timestamp.
func (s *DistributionServer) GetTimestamp(ctx context.Context, req *pb.GetTimestampRequest) (*pb.GetTimestampResponse, error) {
ts := s.engine.NextTimestamp()
return &pb.GetTimestampResponse{Timestamp: ts}, nil
}
6 changes: 6 additions & 0 deletions cmd/server/demo.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
transport "github.com/Jille/raft-grpc-transport"
"github.com/Jille/raftadmin"
"github.com/bootjp/elastickv/adapter"
"github.com/bootjp/elastickv/distribution"
"github.com/bootjp/elastickv/kv"
pb "github.com/bootjp/elastickv/proto"
"github.com/bootjp/elastickv/store"
Expand Down Expand Up @@ -89,10 +90,15 @@ func run(eg *errgroup.Group) error {
trx := kv.NewTransaction(r)
coordinator := kv.NewCoordinator(trx, r)
gs := adapter.NewGRPCServer(st, coordinator)
distEngine := distribution.NewEngine()
distSrv := adapter.NewDistributionServer(distEngine)
// example route for demo purposes
distSrv.UpdateRoute([]byte("a"), []byte("z"), uint64(i))
tm.Register(s)
pb.RegisterRawKVServer(s, gs)
pb.RegisterTransactionalKVServer(s, gs)
pb.RegisterInternalServer(s, adapter.NewInternal(trx, r))
pb.RegisterDistributionServer(s, distSrv)
leaderhealth.Setup(r, s, []string{"RawKV"})
raftadmin.Register(s, r)

Expand Down
50 changes: 50 additions & 0 deletions distribution/engine.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package distribution

import (
"bytes"
"sync"
"sync/atomic"
)

// Route represents mapping from key range to raft group.
type Route struct {
Start []byte
End []byte
GroupID uint64
}

// Engine holds in-memory metadata of routes and provides timestamp generation.
type Engine struct {
mu sync.RWMutex
routes []Route
ts uint64
}

// NewEngine creates an Engine.
func NewEngine() *Engine {
return &Engine{routes: make([]Route, 0)}
}

// UpdateRoute registers or updates a route.
func (e *Engine) UpdateRoute(start, end []byte, group uint64) {
e.mu.Lock()
defer e.mu.Unlock()
e.routes = append(e.routes, Route{Start: start, End: end, GroupID: group})
}

// GetRoute finds a route for the given key.
func (e *Engine) GetRoute(key []byte) (Route, bool) {
e.mu.RLock()
defer e.mu.RUnlock()
for _, r := range e.routes {
if bytes.Compare(key, r.Start) >= 0 && bytes.Compare(key, r.End) < 0 {
return r, true
}
}
return Route{}, false
}

// NextTimestamp returns a monotonic increasing timestamp.
func (e *Engine) NextTimestamp() uint64 {
return atomic.AddUint64(&e.ts, 1)
}
31 changes: 31 additions & 0 deletions distribution/engine_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package distribution

import "testing"

func TestEngineRouteLookup(t *testing.T) {
e := NewEngine()
e.UpdateRoute([]byte("a"), []byte("m"), 1)
e.UpdateRoute([]byte("m"), []byte("z"), 2)

r, ok := e.GetRoute([]byte("b"))
if !ok || r.GroupID != 1 {
t.Fatalf("expected group 1, got %v", r.GroupID)
}

r, ok = e.GetRoute([]byte("x"))
if !ok || r.GroupID != 2 {
t.Fatalf("expected group 2, got %v", r.GroupID)
}
}

func TestEngineTimestampMonotonic(t *testing.T) {
e := NewEngine()
last := e.NextTimestamp()
for i := 0; i < 100; i++ {
ts := e.NextTimestamp()
if ts <= last {
t.Fatalf("timestamp not monotonic: %d <= %d", ts, last)
}
last = ts
}
}
16 changes: 9 additions & 7 deletions proto/Makefile
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
all: gen

gen:
protoc --go_out=. --go_opt=paths=source_relative \
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
service.proto
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
service.proto
protoc --go_out=. --go_opt=paths=source_relative \
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
internal.proto



--go-grpc_out=. --go-grpc_opt=paths=source_relative \
internal.proto
protoc --go_out=. --go_opt=paths=source_relative \
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
distribution.proto
Loading
Loading