Skip to content

Commit 0f3c257

Browse files
committed
Improve range load tracking concurrency
1 parent 723b1ba commit 0f3c257

File tree

2 files changed

+153
-15
lines changed

2 files changed

+153
-15
lines changed

distribution/engine.go

Lines changed: 107 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,28 @@ type Route struct {
1818
End []byte
1919
// GroupID identifies the raft group for the range starting at Start.
2020
GroupID uint64
21+
// Load tracks the number of accesses served by this range.
22+
Load uint64
2123
}
2224

2325
// Engine holds in-memory metadata of routes and provides timestamp generation.
2426
type Engine struct {
25-
mu sync.RWMutex
26-
routes []Route
27-
ts uint64
27+
mu sync.RWMutex
28+
routes []Route
29+
ts uint64
30+
hotspotThreshold uint64
2831
}
2932

30-
// NewEngine creates an Engine.
33+
// NewEngine creates an Engine with no hotspot splitting.
3134
func NewEngine() *Engine {
32-
return &Engine{routes: make([]Route, 0)}
35+
return NewEngineWithThreshold(0)
36+
}
37+
38+
// NewEngineWithThreshold creates an Engine and sets a threshold for hotspot
39+
// detection. A non-zero threshold enables automatic range splitting when the
40+
// number of accesses to a range exceeds the threshold.
41+
func NewEngineWithThreshold(threshold uint64) *Engine {
42+
return &Engine{routes: make([]Route, 0), hotspotThreshold: threshold}
3343
}
3444

3545
// UpdateRoute registers or updates a route for the given key range.
@@ -47,25 +57,108 @@ func (e *Engine) UpdateRoute(start, end []byte, group uint64) {
4757
func (e *Engine) GetRoute(key []byte) (Route, bool) {
4858
e.mu.RLock()
4959
defer e.mu.RUnlock()
50-
if len(e.routes) == 0 {
60+
idx := e.routeIndex(key)
61+
if idx < 0 {
5162
return Route{}, false
5263
}
64+
return e.routes[idx], true
65+
}
66+
67+
// NextTimestamp returns a monotonic increasing timestamp.
68+
func (e *Engine) NextTimestamp() uint64 {
69+
return atomic.AddUint64(&e.ts, 1)
70+
}
71+
72+
// RecordAccess increases the access counter for the range containing key and
73+
// splits the range if it turns into a hotspot. The load counter is updated
74+
// atomically under a read lock to allow concurrent access recording. If the
75+
// hotspot threshold is exceeded, RecordAccess acquires a full write lock and
76+
// re-checks the condition before splitting to avoid races with concurrent
77+
// splits.
78+
func (e *Engine) RecordAccess(key []byte) {
79+
e.mu.RLock()
80+
idx := e.routeIndex(key)
81+
if idx < 0 {
82+
e.mu.RUnlock()
83+
return
84+
}
85+
load := atomic.AddUint64(&e.routes[idx].Load, 1)
86+
threshold := e.hotspotThreshold
87+
e.mu.RUnlock()
88+
if threshold == 0 || load < threshold {
89+
return
90+
}
91+
92+
e.mu.Lock()
93+
defer e.mu.Unlock()
94+
idx = e.routeIndex(key)
95+
if idx < 0 {
96+
return
97+
}
98+
if e.routes[idx].Load >= threshold {
99+
e.splitRange(idx)
100+
}
101+
}
53102

54-
// Find the first route with Start > key.
103+
// Stats returns a snapshot of current ranges and their load counters.
104+
func (e *Engine) Stats() []Route {
105+
e.mu.RLock()
106+
defer e.mu.RUnlock()
107+
stats := make([]Route, len(e.routes))
108+
for i, r := range e.routes {
109+
stats[i] = Route{Start: cloneBytes(r.Start), End: cloneBytes(r.End), GroupID: r.GroupID, Load: atomic.LoadUint64(&e.routes[i].Load)}
110+
}
111+
return stats
112+
}
113+
114+
func (e *Engine) routeIndex(key []byte) int {
115+
if len(e.routes) == 0 {
116+
return -1
117+
}
55118
i := sort.Search(len(e.routes), func(i int) bool {
56119
return bytes.Compare(e.routes[i].Start, key) > 0
57120
})
58121
if i == 0 {
59-
return Route{}, false
122+
return -1
60123
}
61-
r := e.routes[i-1]
124+
i--
125+
r := e.routes[i]
62126
if r.End != nil && bytes.Compare(key, r.End) >= 0 {
63-
return Route{}, false
127+
return -1
64128
}
65-
return r, true
129+
return i
66130
}
67131

68-
// NextTimestamp returns a monotonic increasing timestamp.
69-
func (e *Engine) NextTimestamp() uint64 {
70-
return atomic.AddUint64(&e.ts, 1)
132+
func (e *Engine) splitRange(idx int) {
133+
r := e.routes[idx]
134+
if r.End == nil {
135+
// cannot split unbounded range
136+
return
137+
}
138+
mid := midpoint(r.Start, r.End)
139+
if mid == nil {
140+
return
141+
}
142+
left := Route{Start: r.Start, End: mid, GroupID: r.GroupID}
143+
right := Route{Start: mid, End: r.End, GroupID: r.GroupID}
144+
e.routes = append(append(e.routes[:idx], left, right), e.routes[idx+1:]...)
145+
}
146+
147+
func cloneBytes(b []byte) []byte {
148+
if b == nil {
149+
return nil
150+
}
151+
out := make([]byte, len(b))
152+
copy(out, b)
153+
return out
154+
}
155+
156+
// midpoint returns a key that is lexicographically between a and b. It returns
157+
// nil if such a key cannot be determined (e.g. a and b are too close).
158+
func midpoint(a, b []byte) []byte {
159+
m := append(cloneBytes(a), 0)
160+
if bytes.Compare(m, b) >= 0 {
161+
return nil
162+
}
163+
return m
71164
}

distribution/engine_test.go

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package distribution
22

3-
import "testing"
3+
import (
4+
"bytes"
5+
"testing"
6+
)
47

58
func TestEngineRouteLookup(t *testing.T) {
69
e := NewEngine()
@@ -50,3 +53,45 @@ func TestEngineTimestampMonotonic(t *testing.T) {
5053
last = ts
5154
}
5255
}
56+
57+
func TestEngineRecordAccessAndStats(t *testing.T) {
58+
e := NewEngineWithThreshold(0)
59+
e.UpdateRoute([]byte("a"), []byte("m"), 1)
60+
e.RecordAccess([]byte("b"))
61+
e.RecordAccess([]byte("b"))
62+
stats := e.Stats()
63+
if len(stats) != 1 {
64+
t.Fatalf("expected 1 route, got %d", len(stats))
65+
}
66+
if stats[0].Load != 2 {
67+
t.Fatalf("expected load 2, got %d", stats[0].Load)
68+
}
69+
}
70+
71+
func TestEngineSplitOnHotspot(t *testing.T) {
72+
e := NewEngineWithThreshold(2)
73+
e.UpdateRoute([]byte("a"), []byte("c"), 1)
74+
e.RecordAccess([]byte("b"))
75+
e.RecordAccess([]byte("b"))
76+
stats := e.Stats()
77+
if len(stats) != 2 {
78+
t.Fatalf("expected 2 routes after split, got %d", len(stats))
79+
}
80+
midKey := []byte("a\x00")
81+
assertRange(t, stats[0], []byte("a"), midKey)
82+
assertRange(t, stats[1], midKey, []byte("c"))
83+
if stats[0].Load != 0 || stats[1].Load != 0 {
84+
t.Errorf("expected loads to be reset to 0, got %d, %d", stats[0].Load, stats[1].Load)
85+
}
86+
r, ok := e.GetRoute([]byte("b"))
87+
if !ok || (r.End != nil && bytes.Compare([]byte("b"), r.End) >= 0) {
88+
t.Fatalf("route does not contain key b")
89+
}
90+
}
91+
92+
func assertRange(t *testing.T, r Route, start, end []byte) {
93+
t.Helper()
94+
if !bytes.Equal(r.Start, start) || !bytes.Equal(r.End, end) {
95+
t.Errorf("expected range [%q, %q), got [%q, %q]", start, end, r.Start, r.End)
96+
}
97+
}

0 commit comments

Comments
 (0)