Skip to content

Commit a51d87b

Browse files
authored
Merge pull request #295 from bootjp/codex/implement-changes-from-issue-77
distribution: own default group route
2 parents 1487ca5 + ec264ee commit a51d87b

File tree

3 files changed

+44
-2
lines changed

3 files changed

+44
-2
lines changed

cmd/server/demo.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/Jille/raftadmin"
1717
raftadminpb "github.com/Jille/raftadmin/proto"
1818
"github.com/bootjp/elastickv/adapter"
19+
"github.com/bootjp/elastickv/distribution"
1920
"github.com/bootjp/elastickv/kv"
2021
pb "github.com/bootjp/elastickv/proto"
2122
"github.com/bootjp/elastickv/store"
@@ -224,14 +225,15 @@ func setupStorage(dir string) (raft.LogStore, raft.StableStore, raft.SnapshotSto
224225
return ldb, sdb, fss, nil
225226
}
226227

227-
func setupGRPC(r *raft.Raft, st store.MVCCStore, tm *transport.Manager, coordinator *kv.Coordinate) *grpc.Server {
228+
func setupGRPC(r *raft.Raft, st store.MVCCStore, tm *transport.Manager, coordinator *kv.Coordinate, distServer *adapter.DistributionServer) *grpc.Server {
228229
s := grpc.NewServer()
229230
trx := kv.NewTransaction(r)
230231
gs := adapter.NewGRPCServer(st, coordinator)
231232
tm.Register(s)
232233
pb.RegisterRawKVServer(s, gs)
233234
pb.RegisterTransactionalKVServer(s, gs)
234235
pb.RegisterInternalServer(s, adapter.NewInternal(trx, r, coordinator.Clock()))
236+
pb.RegisterDistributionServer(s, distServer)
235237
leaderhealth.Setup(r, s, []string{"RawKV"})
236238
raftadmin.Register(s, r)
237239
return s
@@ -307,8 +309,10 @@ func run(eg *errgroup.Group, cfg config) error {
307309

308310
trx := kv.NewTransaction(r)
309311
coordinator := kv.NewCoordinator(trx, r)
312+
distEngine := distribution.NewEngineWithDefaultRoute()
313+
distServer := adapter.NewDistributionServer(distEngine)
310314

311-
s := setupGRPC(r, st, tm, coordinator)
315+
s := setupGRPC(r, st, tm, coordinator, distServer)
312316

313317
grpcSock, err := lc.Listen(ctx, "tcp", cfg.address)
314318
if err != nil {

distribution/engine.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,21 @@ type Engine struct {
3030
hotspotThreshold uint64
3131
}
3232

33+
const defaultGroupID uint64 = 1
34+
3335
// NewEngine creates an Engine with no hotspot splitting.
3436
func NewEngine() *Engine {
3537
return NewEngineWithThreshold(0)
3638
}
3739

40+
// NewEngineWithDefaultRoute creates an Engine and registers a default route
41+
// covering the full keyspace with a default group ID.
42+
func NewEngineWithDefaultRoute() *Engine {
43+
engine := NewEngine()
44+
engine.UpdateRoute([]byte(""), nil, defaultGroupID)
45+
return engine
46+
}
47+
3848
// NewEngineWithThreshold creates an Engine and sets a threshold for hotspot
3949
// detection. A non-zero threshold enables automatic range splitting when the
4050
// number of accesses to a range exceeds the threshold.

distribution/engine_test.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,34 @@ func TestEngineRecordAccessConcurrent(t *testing.T) {
126126
}
127127
}
128128

129+
func TestNewEngineWithDefaultRoute(t *testing.T) {
130+
e := NewEngineWithDefaultRoute()
131+
stats := e.Stats()
132+
133+
if len(stats) != 1 {
134+
t.Fatalf("expected 1 route, got %d", len(stats))
135+
}
136+
137+
r := stats[0]
138+
if r.GroupID != defaultGroupID {
139+
t.Fatalf("expected group ID %d, got %d", defaultGroupID, r.GroupID)
140+
}
141+
if !bytes.Equal(r.Start, []byte("")) {
142+
t.Fatalf("expected start of keyspace (empty slice), got %q", r.Start)
143+
}
144+
if r.End != nil {
145+
t.Fatalf("expected end of keyspace (nil), got %q", r.End)
146+
}
147+
148+
route, ok := e.GetRoute([]byte("any-key"))
149+
if !ok {
150+
t.Fatal("GetRoute should find the default route")
151+
}
152+
if route.GroupID != defaultGroupID {
153+
t.Fatalf("GetRoute: expected group ID %d, got %d", defaultGroupID, route.GroupID)
154+
}
155+
}
156+
129157
func assertRange(t *testing.T, r Route, start, end []byte) {
130158
t.Helper()
131159
if !bytes.Equal(r.Start, start) || !bytes.Equal(r.End, end) {

0 commit comments

Comments
 (0)