Skip to content

Commit a379eb8

Browse files
committed
Track range load and split hotspots
1 parent 723b1ba commit a379eb8

File tree

2 files changed

+128
-15
lines changed

2 files changed

+128
-15
lines changed

distribution/engine.go

Lines changed: 91 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,28 @@ type Route struct {
1818
End []byte
1919
// GroupID identifies the raft group for the range starting at Start.
2020
GroupID uint64
21+
// Load tracks the number of accesses served by this range.
22+
Load uint64
2123
}
2224

2325
// Engine holds in-memory metadata of routes and provides timestamp generation.
2426
type Engine struct {
25-
mu sync.RWMutex
26-
routes []Route
27-
ts uint64
27+
mu sync.RWMutex
28+
routes []Route
29+
ts uint64
30+
hotspotThreshold uint64
2831
}
2932

30-
// NewEngine creates an Engine.
33+
// NewEngine creates an Engine with no hotspot splitting.
3134
func NewEngine() *Engine {
32-
return &Engine{routes: make([]Route, 0)}
35+
return NewEngineWithThreshold(0)
36+
}
37+
38+
// NewEngineWithThreshold creates an Engine and sets a threshold for hotspot
39+
// detection. A non-zero threshold enables automatic range splitting when the
40+
// number of accesses to a range exceeds the threshold.
41+
func NewEngineWithThreshold(threshold uint64) *Engine {
42+
return &Engine{routes: make([]Route, 0), hotspotThreshold: threshold}
3343
}
3444

3545
// UpdateRoute registers or updates a route for the given key range.
@@ -47,25 +57,92 @@ func (e *Engine) UpdateRoute(start, end []byte, group uint64) {
4757
func (e *Engine) GetRoute(key []byte) (Route, bool) {
4858
e.mu.RLock()
4959
defer e.mu.RUnlock()
50-
if len(e.routes) == 0 {
60+
idx := e.routeIndex(key)
61+
if idx < 0 {
5162
return Route{}, false
5263
}
64+
return e.routes[idx], true
65+
}
66+
67+
// NextTimestamp returns a monotonic increasing timestamp.
68+
func (e *Engine) NextTimestamp() uint64 {
69+
return atomic.AddUint64(&e.ts, 1)
70+
}
71+
72+
// RecordAccess increases the access counter for the range containing key and
73+
// splits the range if it turns into a hotspot.
74+
func (e *Engine) RecordAccess(key []byte) {
75+
e.mu.Lock()
76+
defer e.mu.Unlock()
77+
idx := e.routeIndex(key)
78+
if idx < 0 {
79+
return
80+
}
81+
e.routes[idx].Load++
82+
if e.hotspotThreshold > 0 && e.routes[idx].Load >= e.hotspotThreshold {
83+
e.splitRange(idx)
84+
}
85+
}
5386

54-
// Find the first route with Start > key.
87+
// Stats returns a snapshot of current ranges and their load counters.
88+
func (e *Engine) Stats() []Route {
89+
e.mu.RLock()
90+
defer e.mu.RUnlock()
91+
stats := make([]Route, len(e.routes))
92+
for i, r := range e.routes {
93+
stats[i] = Route{Start: cloneBytes(r.Start), End: cloneBytes(r.End), GroupID: r.GroupID, Load: r.Load}
94+
}
95+
return stats
96+
}
97+
98+
func (e *Engine) routeIndex(key []byte) int {
99+
if len(e.routes) == 0 {
100+
return -1
101+
}
55102
i := sort.Search(len(e.routes), func(i int) bool {
56103
return bytes.Compare(e.routes[i].Start, key) > 0
57104
})
58105
if i == 0 {
59-
return Route{}, false
106+
return -1
60107
}
61-
r := e.routes[i-1]
108+
i--
109+
r := e.routes[i]
62110
if r.End != nil && bytes.Compare(key, r.End) >= 0 {
63-
return Route{}, false
111+
return -1
64112
}
65-
return r, true
113+
return i
66114
}
67115

68-
// NextTimestamp returns a monotonic increasing timestamp.
69-
func (e *Engine) NextTimestamp() uint64 {
70-
return atomic.AddUint64(&e.ts, 1)
116+
func (e *Engine) splitRange(idx int) {
117+
r := e.routes[idx]
118+
if r.End == nil {
119+
// cannot split unbounded range
120+
return
121+
}
122+
mid := midpoint(r.Start, r.End)
123+
if mid == nil {
124+
return
125+
}
126+
left := Route{Start: r.Start, End: mid, GroupID: r.GroupID}
127+
right := Route{Start: mid, End: r.End, GroupID: r.GroupID}
128+
e.routes = append(append(e.routes[:idx], left, right), e.routes[idx+1:]...)
129+
}
130+
131+
func cloneBytes(b []byte) []byte {
132+
if b == nil {
133+
return nil
134+
}
135+
out := make([]byte, len(b))
136+
copy(out, b)
137+
return out
138+
}
139+
140+
// midpoint returns a key that is lexicographically between a and b. It returns
141+
// nil if such a key cannot be determined (e.g. a and b are too close).
142+
func midpoint(a, b []byte) []byte {
143+
m := append(cloneBytes(a), 0)
144+
if bytes.Compare(m, b) >= 0 {
145+
return nil
146+
}
147+
return m
71148
}

distribution/engine_test.go

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package distribution
22

3-
import "testing"
3+
import (
4+
"bytes"
5+
"testing"
6+
)
47

58
func TestEngineRouteLookup(t *testing.T) {
69
e := NewEngine()
@@ -50,3 +53,36 @@ func TestEngineTimestampMonotonic(t *testing.T) {
5053
last = ts
5154
}
5255
}
56+
57+
func TestEngineRecordAccessAndStats(t *testing.T) {
58+
e := NewEngineWithThreshold(0)
59+
e.UpdateRoute([]byte("a"), []byte("m"), 1)
60+
e.RecordAccess([]byte("b"))
61+
e.RecordAccess([]byte("b"))
62+
stats := e.Stats()
63+
if len(stats) != 1 {
64+
t.Fatalf("expected 1 route, got %d", len(stats))
65+
}
66+
if stats[0].Load != 2 {
67+
t.Fatalf("expected load 2, got %d", stats[0].Load)
68+
}
69+
}
70+
71+
func TestEngineSplitOnHotspot(t *testing.T) {
72+
e := NewEngineWithThreshold(2)
73+
e.UpdateRoute([]byte("a"), []byte("c"), 1)
74+
e.RecordAccess([]byte("b"))
75+
e.RecordAccess([]byte("b"))
76+
stats := e.Stats()
77+
if len(stats) != 2 {
78+
t.Fatalf("expected 2 routes after split, got %d", len(stats))
79+
}
80+
// ensure the key b can still be resolved after split
81+
r, ok := e.GetRoute([]byte("b"))
82+
if !ok {
83+
t.Fatalf("expected route for key b after split")
84+
}
85+
if r.End != nil && bytes.Compare([]byte("b"), r.End) >= 0 {
86+
t.Fatalf("route does not contain key b")
87+
}
88+
}

0 commit comments

Comments
 (0)