diff --git a/distribution/engine.go b/distribution/engine.go index 44fbfb3..cc2fb13 100644 --- a/distribution/engine.go +++ b/distribution/engine.go @@ -18,18 +18,28 @@ type Route struct { End []byte // GroupID identifies the raft group for the range starting at Start. GroupID uint64 + // Load tracks the number of accesses served by this range. + Load uint64 } // Engine holds in-memory metadata of routes and provides timestamp generation. type Engine struct { - mu sync.RWMutex - routes []Route - ts uint64 + mu sync.RWMutex + routes []Route + ts uint64 + hotspotThreshold uint64 } -// NewEngine creates an Engine. +// NewEngine creates an Engine with no hotspot splitting. func NewEngine() *Engine { - return &Engine{routes: make([]Route, 0)} + return NewEngineWithThreshold(0) +} + +// NewEngineWithThreshold creates an Engine and sets a threshold for hotspot +// detection. A non-zero threshold enables automatic range splitting when the +// number of accesses to a range exceeds the threshold. +func NewEngineWithThreshold(threshold uint64) *Engine { + return &Engine{routes: make([]Route, 0), hotspotThreshold: threshold} } // UpdateRoute registers or updates a route for the given key range. @@ -47,25 +57,95 @@ func (e *Engine) UpdateRoute(start, end []byte, group uint64) { func (e *Engine) GetRoute(key []byte) (Route, bool) { e.mu.RLock() defer e.mu.RUnlock() - if len(e.routes) == 0 { + idx := e.routeIndex(key) + if idx < 0 { return Route{}, false } + return e.routes[idx], true +} + +// NextTimestamp returns a monotonic increasing timestamp. +func (e *Engine) NextTimestamp() uint64 { + return atomic.AddUint64(&e.ts, 1) +} + +// RecordAccess increases the access counter for the range containing key and +// splits the range if it turns into a hotspot. +func (e *Engine) RecordAccess(key []byte) { + e.mu.Lock() + defer e.mu.Unlock() + idx := e.routeIndex(key) + if idx < 0 { + return + } + e.routes[idx].Load++ + if e.hotspotThreshold > 0 && e.routes[idx].Load >= e.hotspotThreshold { + e.splitRange(idx) + } +} - // Find the first route with Start > key. +// Stats returns a snapshot of current ranges and their load counters. +func (e *Engine) Stats() []Route { + e.mu.RLock() + defer e.mu.RUnlock() + stats := make([]Route, len(e.routes)) + for i, r := range e.routes { + stats[i] = Route{Start: cloneBytes(r.Start), End: cloneBytes(r.End), GroupID: r.GroupID, Load: r.Load} + } + return stats +} + +func (e *Engine) routeIndex(key []byte) int { + if len(e.routes) == 0 { + return -1 + } i := sort.Search(len(e.routes), func(i int) bool { return bytes.Compare(e.routes[i].Start, key) > 0 }) if i == 0 { - return Route{}, false + return -1 } - r := e.routes[i-1] + i-- + r := e.routes[i] if r.End != nil && bytes.Compare(key, r.End) >= 0 { - return Route{}, false + return -1 } - return r, true + return i } -// NextTimestamp returns a monotonic increasing timestamp. -func (e *Engine) NextTimestamp() uint64 { - return atomic.AddUint64(&e.ts, 1) +func (e *Engine) splitRange(idx int) { + r := e.routes[idx] + if r.End == nil { + // cannot split unbounded range + return + } + mid := midpoint(r.Start, r.End) + if mid == nil { + return + } +left := Route{Start: r.Start, End: mid, GroupID: r.GroupID, Load: r.Load / 2} +right := Route{Start: mid, End: r.End, GroupID: r.GroupID, Load: r.Load - (r.Load / 2)} +e.routes = append(e.routes, Route{}) // Make room for one more element. +copy(e.routes[idx+2:], e.routes[idx+1:]) // Shift elements to the right. +e.routes[idx] = left +e.routes[idx+1] = right +} + +func cloneBytes(b []byte) []byte { + if b == nil { + return nil + } + out := make([]byte, len(b)) + copy(out, b) + return out +} + +// midpoint returns a key that is lexicographically between a and b. It returns +// nil if such a key cannot be determined (e.g. a and b are too close). +func midpoint(a, b []byte) []byte { + m := append(cloneBytes(a), 0) + if bytes.Compare(m, b) >= 0 { + return nil + } + return m } diff --git a/distribution/engine_test.go b/distribution/engine_test.go index 3d58200..8e11663 100644 --- a/distribution/engine_test.go +++ b/distribution/engine_test.go @@ -1,6 +1,9 @@ package distribution -import "testing" +import ( + "bytes" + "testing" +) func TestEngineRouteLookup(t *testing.T) { e := NewEngine() @@ -50,3 +53,36 @@ func TestEngineTimestampMonotonic(t *testing.T) { last = ts } } + +func TestEngineRecordAccessAndStats(t *testing.T) { + e := NewEngineWithThreshold(0) + e.UpdateRoute([]byte("a"), []byte("m"), 1) + e.RecordAccess([]byte("b")) + e.RecordAccess([]byte("b")) + stats := e.Stats() + if len(stats) != 1 { + t.Fatalf("expected 1 route, got %d", len(stats)) + } + if stats[0].Load != 2 { + t.Fatalf("expected load 2, got %d", stats[0].Load) + } +} + +func TestEngineSplitOnHotspot(t *testing.T) { + e := NewEngineWithThreshold(2) + e.UpdateRoute([]byte("a"), []byte("c"), 1) + e.RecordAccess([]byte("b")) + e.RecordAccess([]byte("b")) + stats := e.Stats() + if len(stats) != 2 { + t.Fatalf("expected 2 routes after split, got %d", len(stats)) + } + // ensure the key b can still be resolved after split + r, ok := e.GetRoute([]byte("b")) + if !ok { + t.Fatalf("expected route for key b after split") + } + if r.End != nil && bytes.Compare([]byte("b"), r.End) >= 0 { + t.Fatalf("route does not contain key b") + } +}