diff --git a/cmd/server/demo.go b/cmd/server/demo.go index 7b828c2..5c59c90 100644 --- a/cmd/server/demo.go +++ b/cmd/server/demo.go @@ -16,6 +16,7 @@ import ( "github.com/Jille/raftadmin" raftadminpb "github.com/Jille/raftadmin/proto" "github.com/bootjp/elastickv/adapter" + "github.com/bootjp/elastickv/distribution" "github.com/bootjp/elastickv/kv" pb "github.com/bootjp/elastickv/proto" "github.com/bootjp/elastickv/store" @@ -224,7 +225,7 @@ func setupStorage(dir string) (raft.LogStore, raft.StableStore, raft.SnapshotSto return ldb, sdb, fss, nil } -func setupGRPC(r *raft.Raft, st store.MVCCStore, tm *transport.Manager, coordinator *kv.Coordinate) *grpc.Server { +func setupGRPC(r *raft.Raft, st store.MVCCStore, tm *transport.Manager, coordinator *kv.Coordinate, distServer *adapter.DistributionServer) *grpc.Server { s := grpc.NewServer() trx := kv.NewTransaction(r) gs := adapter.NewGRPCServer(st, coordinator) @@ -232,6 +233,7 @@ func setupGRPC(r *raft.Raft, st store.MVCCStore, tm *transport.Manager, coordina pb.RegisterRawKVServer(s, gs) pb.RegisterTransactionalKVServer(s, gs) pb.RegisterInternalServer(s, adapter.NewInternal(trx, r, coordinator.Clock())) + pb.RegisterDistributionServer(s, distServer) leaderhealth.Setup(r, s, []string{"RawKV"}) raftadmin.Register(s, r) return s @@ -307,8 +309,10 @@ func run(eg *errgroup.Group, cfg config) error { trx := kv.NewTransaction(r) coordinator := kv.NewCoordinator(trx, r) + distEngine := distribution.NewEngineWithDefaultRoute() + distServer := adapter.NewDistributionServer(distEngine) - s := setupGRPC(r, st, tm, coordinator) + s := setupGRPC(r, st, tm, coordinator, distServer) grpcSock, err := lc.Listen(ctx, "tcp", cfg.address) if err != nil { diff --git a/distribution/engine.go b/distribution/engine.go index 1b35712..bff6abe 100644 --- a/distribution/engine.go +++ b/distribution/engine.go @@ -30,11 +30,21 @@ type Engine struct { hotspotThreshold uint64 } +const defaultGroupID uint64 = 1 + // NewEngine creates an Engine with no hotspot splitting. func NewEngine() *Engine { return NewEngineWithThreshold(0) } +// NewEngineWithDefaultRoute creates an Engine and registers a default route +// covering the full keyspace with a default group ID. +func NewEngineWithDefaultRoute() *Engine { + engine := NewEngine() + engine.UpdateRoute([]byte(""), nil, defaultGroupID) + return engine +} + // NewEngineWithThreshold creates an Engine and sets a threshold for hotspot // detection. A non-zero threshold enables automatic range splitting when the // number of accesses to a range exceeds the threshold. diff --git a/distribution/engine_test.go b/distribution/engine_test.go index dfad358..69ee037 100644 --- a/distribution/engine_test.go +++ b/distribution/engine_test.go @@ -126,6 +126,34 @@ func TestEngineRecordAccessConcurrent(t *testing.T) { } } +func TestNewEngineWithDefaultRoute(t *testing.T) { + e := NewEngineWithDefaultRoute() + stats := e.Stats() + + if len(stats) != 1 { + t.Fatalf("expected 1 route, got %d", len(stats)) + } + + r := stats[0] + if r.GroupID != defaultGroupID { + t.Fatalf("expected group ID %d, got %d", defaultGroupID, r.GroupID) + } + if !bytes.Equal(r.Start, []byte("")) { + t.Fatalf("expected start of keyspace (empty slice), got %q", r.Start) + } + if r.End != nil { + t.Fatalf("expected end of keyspace (nil), got %q", r.End) + } + + route, ok := e.GetRoute([]byte("any-key")) + if !ok { + t.Fatal("GetRoute should find the default route") + } + if route.GroupID != defaultGroupID { + t.Fatalf("GetRoute: expected group ID %d, got %d", defaultGroupID, route.GroupID) + } +} + func assertRange(t *testing.T, r Route, start, end []byte) { t.Helper() if !bytes.Equal(r.Start, start) || !bytes.Equal(r.End, end) {