Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions cmd/server/demo.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/Jille/raftadmin"
raftadminpb "github.com/Jille/raftadmin/proto"
"github.com/bootjp/elastickv/adapter"
"github.com/bootjp/elastickv/distribution"
"github.com/bootjp/elastickv/kv"
pb "github.com/bootjp/elastickv/proto"
"github.com/bootjp/elastickv/store"
Expand Down Expand Up @@ -224,14 +225,15 @@ func setupStorage(dir string) (raft.LogStore, raft.StableStore, raft.SnapshotSto
return ldb, sdb, fss, nil
}

func setupGRPC(r *raft.Raft, st store.MVCCStore, tm *transport.Manager, coordinator *kv.Coordinate) *grpc.Server {
func setupGRPC(r *raft.Raft, st store.MVCCStore, tm *transport.Manager, coordinator *kv.Coordinate, distServer *adapter.DistributionServer) *grpc.Server {
s := grpc.NewServer()
trx := kv.NewTransaction(r)
gs := adapter.NewGRPCServer(st, coordinator)
tm.Register(s)
pb.RegisterRawKVServer(s, gs)
pb.RegisterTransactionalKVServer(s, gs)
pb.RegisterInternalServer(s, adapter.NewInternal(trx, r, coordinator.Clock()))
pb.RegisterDistributionServer(s, distServer)
leaderhealth.Setup(r, s, []string{"RawKV"})
raftadmin.Register(s, r)
return s
Expand Down Expand Up @@ -307,8 +309,10 @@ func run(eg *errgroup.Group, cfg config) error {

trx := kv.NewTransaction(r)
coordinator := kv.NewCoordinator(trx, r)
distEngine := distribution.NewEngineWithDefaultRoute()
distServer := adapter.NewDistributionServer(distEngine)

s := setupGRPC(r, st, tm, coordinator)
s := setupGRPC(r, st, tm, coordinator, distServer)

grpcSock, err := lc.Listen(ctx, "tcp", cfg.address)
if err != nil {
Expand Down
10 changes: 10 additions & 0 deletions distribution/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,21 @@ type Engine struct {
hotspotThreshold uint64
}

const defaultGroupID uint64 = 1

// NewEngine creates an Engine with no hotspot splitting.
func NewEngine() *Engine {
return NewEngineWithThreshold(0)
}

// NewEngineWithDefaultRoute creates an Engine and registers a default route
// covering the full keyspace with a default group ID.
func NewEngineWithDefaultRoute() *Engine {
engine := NewEngine()
engine.UpdateRoute([]byte(""), nil, defaultGroupID)
return engine
}
Comment on lines +42 to +46
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This new constructor is a great addition for simplifying setup. However, new public functions should be accompanied by unit tests to ensure they behave as expected and prevent future regressions. The PR description mentions no tests were run for this change.

Please add a unit test for NewEngineWithDefaultRoute to distribution/engine_test.go. Here is a suggested test case:

func TestNewEngineWithDefaultRoute(t *testing.T) {
	e := NewEngineWithDefaultRoute()
	stats := e.Stats()

	if len(stats) != 1 {
		t.Fatalf("expected 1 route, got %d", len(stats))
	}

	r := stats[0]
	if r.GroupID != defaultGroupID {
		t.Fatalf("expected group ID %d, got %d", defaultGroupID, r.GroupID)
	}
	if !bytes.Equal(r.Start, []byte("")) {
		t.Fatalf("expected start of keyspace (empty slice), got %q", r.Start)
	}
	if r.End != nil {
		t.Fatalf("expected end of keyspace (nil), got %q", r.End)
	}

	// Also test via GetRoute
	route, ok := e.GetRoute([]byte("any-key"))
	if !ok {
		t.Fatal("GetRoute should find the default route")
	}
	if route.GroupID != defaultGroupID {
		t.Fatalf("GetRoute: expected group ID %d, got %d", defaultGroupID, route.GroupID)
	}
}


// NewEngineWithThreshold creates an Engine and sets a threshold for hotspot
// detection. A non-zero threshold enables automatic range splitting when the
// number of accesses to a range exceeds the threshold.
Expand Down
28 changes: 28 additions & 0 deletions distribution/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,34 @@ func TestEngineRecordAccessConcurrent(t *testing.T) {
}
}

func TestNewEngineWithDefaultRoute(t *testing.T) {
e := NewEngineWithDefaultRoute()
stats := e.Stats()

if len(stats) != 1 {
t.Fatalf("expected 1 route, got %d", len(stats))
}

r := stats[0]
if r.GroupID != defaultGroupID {
t.Fatalf("expected group ID %d, got %d", defaultGroupID, r.GroupID)
}
if !bytes.Equal(r.Start, []byte("")) {
t.Fatalf("expected start of keyspace (empty slice), got %q", r.Start)
}
if r.End != nil {
t.Fatalf("expected end of keyspace (nil), got %q", r.End)
}

route, ok := e.GetRoute([]byte("any-key"))
if !ok {
t.Fatal("GetRoute should find the default route")
}
if route.GroupID != defaultGroupID {
t.Fatalf("GetRoute: expected group ID %d, got %d", defaultGroupID, route.GroupID)
}
}

func assertRange(t *testing.T, r Route, start, end []byte) {
t.Helper()
if !bytes.Equal(r.Start, start) || !bytes.Equal(r.End, end) {
Expand Down
Loading