Skip to content

Commit 88456f7

Browse files
committed
test: ensure unbounded hotspot resets load
1 parent 0f3c257 commit 88456f7

File tree

2 files changed

+45
-2
lines changed

2 files changed

+45
-2
lines changed

distribution/engine.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,16 +132,22 @@ func (e *Engine) routeIndex(key []byte) int {
132132
func (e *Engine) splitRange(idx int) {
133133
r := e.routes[idx]
134134
if r.End == nil {
135-
// cannot split unbounded range
135+
// cannot split unbounded range; reset load to avoid repeated attempts
136+
e.routes[idx].Load = 0
136137
return
137138
}
138139
mid := midpoint(r.Start, r.End)
139140
if mid == nil {
141+
// cannot determine midpoint; reset load to avoid repeated attempts
142+
e.routes[idx].Load = 0
140143
return
141144
}
142145
left := Route{Start: r.Start, End: mid, GroupID: r.GroupID}
143146
right := Route{Start: mid, End: r.End, GroupID: r.GroupID}
144-
e.routes = append(append(e.routes[:idx], left, right), e.routes[idx+1:]...)
147+
// replace the range at idx with left and right in an idiomatic manner
148+
e.routes = append(e.routes[:idx+1], e.routes[idx:]...)
149+
e.routes[idx] = left
150+
e.routes[idx+1] = right
145151
}
146152

147153
func cloneBytes(b []byte) []byte {

distribution/engine_test.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package distribution
22

33
import (
44
"bytes"
5+
"sync"
56
"testing"
67
)
78

@@ -89,6 +90,42 @@ func TestEngineSplitOnHotspot(t *testing.T) {
8990
}
9091
}
9192

93+
func TestEngineHotspotUnboundedResetsLoad(t *testing.T) {
94+
e := NewEngineWithThreshold(2)
95+
e.UpdateRoute([]byte("a"), nil, 1)
96+
e.RecordAccess([]byte("b"))
97+
e.RecordAccess([]byte("b"))
98+
stats := e.Stats()
99+
if len(stats) != 1 {
100+
t.Fatalf("expected 1 route, got %d", len(stats))
101+
}
102+
if stats[0].Load != 0 {
103+
t.Fatalf("expected load reset to 0, got %d", stats[0].Load)
104+
}
105+
}
106+
107+
func TestEngineRecordAccessConcurrent(t *testing.T) {
108+
t.Parallel()
109+
e := NewEngineWithThreshold(6)
110+
e.UpdateRoute([]byte("a"), []byte("c"), 1)
111+
112+
var wg sync.WaitGroup
113+
const workers = 10
114+
for i := 0; i < workers; i++ {
115+
wg.Add(1)
116+
go func() {
117+
defer wg.Done()
118+
e.RecordAccess([]byte("b"))
119+
}()
120+
}
121+
wg.Wait()
122+
123+
stats := e.Stats()
124+
if len(stats) != 2 {
125+
t.Fatalf("expected 2 routes after concurrent split, got %d", len(stats))
126+
}
127+
}
128+
92129
func assertRange(t *testing.T, r Route, start, end []byte) {
93130
t.Helper()
94131
if !bytes.Equal(r.Start, start) || !bytes.Equal(r.End, end) {

0 commit comments

Comments
 (0)