Skip to content

Commit b90eb26

Browse files
authored
Merge pull request #227 from bootjp/codex/split-range-on-hotspot-occurrence-6m6rnp
Track range load and split hotspots
2 parents 5787514 + 2112670 commit b90eb26

File tree

2 files changed

+196
-16
lines changed

2 files changed

+196
-16
lines changed

distribution/engine.go

Lines changed: 113 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,28 @@ type Route struct {
1818
End []byte
1919
// GroupID identifies the raft group for the range starting at Start.
2020
GroupID uint64
21+
// Load tracks the number of accesses served by this range.
22+
Load uint64
2123
}
2224

2325
// Engine holds in-memory metadata of routes and provides timestamp generation.
2426
type Engine struct {
25-
mu sync.RWMutex
26-
routes []Route
27-
ts uint64
27+
mu sync.RWMutex
28+
routes []Route
29+
ts uint64
30+
hotspotThreshold uint64
2831
}
2932

30-
// NewEngine creates an Engine.
33+
// NewEngine creates an Engine with no hotspot splitting.
3134
func NewEngine() *Engine {
32-
return &Engine{routes: make([]Route, 0)}
35+
return NewEngineWithThreshold(0)
36+
}
37+
38+
// NewEngineWithThreshold creates an Engine and sets a threshold for hotspot
39+
// detection. A non-zero threshold enables automatic range splitting when the
40+
// number of accesses to a range exceeds the threshold.
41+
func NewEngineWithThreshold(threshold uint64) *Engine {
42+
return &Engine{routes: make([]Route, 0), hotspotThreshold: threshold}
3343
}
3444

3545
// UpdateRoute registers or updates a route for the given key range.
@@ -47,25 +57,113 @@ func (e *Engine) UpdateRoute(start, end []byte, group uint64) {
4757
func (e *Engine) GetRoute(key []byte) (Route, bool) {
4858
e.mu.RLock()
4959
defer e.mu.RUnlock()
50-
if len(e.routes) == 0 {
60+
idx := e.routeIndex(key)
61+
if idx < 0 {
5162
return Route{}, false
5263
}
64+
return e.routes[idx], true
65+
}
66+
67+
// NextTimestamp returns a monotonic increasing timestamp.
68+
func (e *Engine) NextTimestamp() uint64 {
69+
return atomic.AddUint64(&e.ts, 1)
70+
}
71+
72+
// RecordAccess increases the access counter for the range containing key and
73+
// splits the range if it turns into a hotspot. The load counter is updated
74+
// atomically under a read lock to allow concurrent access recording. If the
75+
// hotspot threshold is exceeded, RecordAccess acquires a full write lock and
76+
// re-checks the condition before splitting to avoid races with concurrent
77+
// splits.
78+
func (e *Engine) RecordAccess(key []byte) {
79+
e.mu.RLock()
80+
idx := e.routeIndex(key)
81+
if idx < 0 {
82+
e.mu.RUnlock()
83+
return
84+
}
85+
load := atomic.AddUint64(&e.routes[idx].Load, 1)
86+
threshold := e.hotspotThreshold
87+
e.mu.RUnlock()
88+
if threshold == 0 || load < threshold {
89+
return
90+
}
91+
92+
e.mu.Lock()
93+
defer e.mu.Unlock()
94+
idx = e.routeIndex(key)
95+
if idx < 0 {
96+
return
97+
}
98+
if e.routes[idx].Load >= threshold {
99+
e.splitRange(idx)
100+
}
101+
}
53102

54-
// Find the first route with Start > key.
103+
// Stats returns a snapshot of current ranges and their load counters.
104+
func (e *Engine) Stats() []Route {
105+
e.mu.RLock()
106+
defer e.mu.RUnlock()
107+
stats := make([]Route, len(e.routes))
108+
for i, r := range e.routes {
109+
stats[i] = Route{Start: cloneBytes(r.Start), End: cloneBytes(r.End), GroupID: r.GroupID, Load: atomic.LoadUint64(&e.routes[i].Load)}
110+
}
111+
return stats
112+
}
113+
114+
func (e *Engine) routeIndex(key []byte) int {
115+
if len(e.routes) == 0 {
116+
return -1
117+
}
55118
i := sort.Search(len(e.routes), func(i int) bool {
56119
return bytes.Compare(e.routes[i].Start, key) > 0
57120
})
58121
if i == 0 {
59-
return Route{}, false
122+
return -1
60123
}
61-
r := e.routes[i-1]
62-
if r.End != nil && bytes.Compare(key, r.End) >= 0 {
63-
return Route{}, false
124+
i--
125+
if end := e.routes[i].End; end != nil && bytes.Compare(key, end) >= 0 {
126+
return -1
64127
}
65-
return r, true
128+
return i
66129
}
67130

68-
// NextTimestamp returns a monotonic increasing timestamp.
69-
func (e *Engine) NextTimestamp() uint64 {
70-
return atomic.AddUint64(&e.ts, 1)
131+
func (e *Engine) splitRange(idx int) {
132+
r := e.routes[idx]
133+
if r.End == nil {
134+
// cannot split unbounded range; reset load to avoid repeated attempts
135+
e.routes[idx].Load = 0
136+
return
137+
}
138+
mid := midpoint(r.Start, r.End)
139+
if mid == nil {
140+
// cannot determine midpoint; reset load to avoid repeated attempts
141+
e.routes[idx].Load = 0
142+
return
143+
}
144+
left := Route{Start: r.Start, End: mid, GroupID: r.GroupID}
145+
right := Route{Start: mid, End: r.End, GroupID: r.GroupID}
146+
// replace the range at idx with left and right in an idiomatic manner
147+
e.routes = append(e.routes[:idx+1], e.routes[idx:]...)
148+
e.routes[idx] = left
149+
e.routes[idx+1] = right
150+
}
151+
152+
func cloneBytes(b []byte) []byte {
153+
if b == nil {
154+
return nil
155+
}
156+
out := make([]byte, len(b))
157+
copy(out, b)
158+
return out
159+
}
160+
161+
// midpoint returns a key that is lexicographically between a and b. It returns
162+
// nil if such a key cannot be determined (e.g. a and b are too close).
163+
func midpoint(a, b []byte) []byte {
164+
m := append(cloneBytes(a), 0)
165+
if bytes.Compare(m, b) >= 0 {
166+
return nil
167+
}
168+
return m
71169
}

distribution/engine_test.go

Lines changed: 83 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
package distribution
22

3-
import "testing"
3+
import (
4+
"bytes"
5+
"sync"
6+
"testing"
7+
)
48

59
func TestEngineRouteLookup(t *testing.T) {
610
e := NewEngine()
@@ -50,3 +54,81 @@ func TestEngineTimestampMonotonic(t *testing.T) {
5054
last = ts
5155
}
5256
}
57+
58+
func TestEngineRecordAccessAndStats(t *testing.T) {
59+
e := NewEngineWithThreshold(0)
60+
e.UpdateRoute([]byte("a"), []byte("m"), 1)
61+
e.RecordAccess([]byte("b"))
62+
e.RecordAccess([]byte("b"))
63+
stats := e.Stats()
64+
if len(stats) != 1 {
65+
t.Fatalf("expected 1 route, got %d", len(stats))
66+
}
67+
if stats[0].Load != 2 {
68+
t.Fatalf("expected load 2, got %d", stats[0].Load)
69+
}
70+
}
71+
72+
func TestEngineSplitOnHotspot(t *testing.T) {
73+
e := NewEngineWithThreshold(2)
74+
e.UpdateRoute([]byte("a"), []byte("c"), 1)
75+
e.RecordAccess([]byte("b"))
76+
e.RecordAccess([]byte("b"))
77+
stats := e.Stats()
78+
if len(stats) != 2 {
79+
t.Fatalf("expected 2 routes after split, got %d", len(stats))
80+
}
81+
midKey := []byte("a\x00")
82+
assertRange(t, stats[0], []byte("a"), midKey)
83+
assertRange(t, stats[1], midKey, []byte("c"))
84+
if stats[0].Load != 0 || stats[1].Load != 0 {
85+
t.Errorf("expected loads to be reset to 0, got %d, %d", stats[0].Load, stats[1].Load)
86+
}
87+
r, ok := e.GetRoute([]byte("b"))
88+
if !ok || (r.End != nil && bytes.Compare([]byte("b"), r.End) >= 0) {
89+
t.Fatalf("route does not contain key b")
90+
}
91+
}
92+
93+
func TestEngineHotspotUnboundedResetsLoad(t *testing.T) {
94+
e := NewEngineWithThreshold(2)
95+
e.UpdateRoute([]byte("a"), nil, 1)
96+
e.RecordAccess([]byte("b"))
97+
e.RecordAccess([]byte("b"))
98+
stats := e.Stats()
99+
if len(stats) != 1 {
100+
t.Fatalf("expected 1 route, got %d", len(stats))
101+
}
102+
if stats[0].Load != 0 {
103+
t.Fatalf("expected load reset to 0, got %d", stats[0].Load)
104+
}
105+
}
106+
107+
func TestEngineRecordAccessConcurrent(t *testing.T) {
108+
t.Parallel()
109+
e := NewEngineWithThreshold(6)
110+
e.UpdateRoute([]byte("a"), []byte("c"), 1)
111+
112+
var wg sync.WaitGroup
113+
const workers = 10
114+
for i := 0; i < workers; i++ {
115+
wg.Add(1)
116+
go func() {
117+
defer wg.Done()
118+
e.RecordAccess([]byte("b"))
119+
}()
120+
}
121+
wg.Wait()
122+
123+
stats := e.Stats()
124+
if len(stats) != 2 {
125+
t.Fatalf("expected 2 routes after concurrent split, got %d", len(stats))
126+
}
127+
}
128+
129+
func assertRange(t *testing.T, r Route, start, end []byte) {
130+
t.Helper()
131+
if !bytes.Equal(r.Start, start) || !bytes.Equal(r.End, end) {
132+
t.Errorf("expected range [%q, %q), got [%q, %q]", start, end, r.Start, r.End)
133+
}
134+
}

0 commit comments

Comments
 (0)