Skip to content

Commit af6c51b

Browse files
committed
feat: add range metrics and splitting
1 parent 427619c commit af6c51b

File tree

5 files changed

+753
-0
lines changed

5 files changed

+753
-0
lines changed

distribution/engine.go

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
package distribution
2+
3+
import (
4+
"sync"
5+
)
6+
7+
// Range represents a key range and its statistics.
8+
type Range struct {
9+
Start string
10+
End string
11+
RequestCount int
12+
}
13+
14+
// RangeStat mirrors Range for external consumption.
15+
type RangeStat struct {
16+
Start string
17+
End string
18+
Count int
19+
}
20+
21+
// Engine tracks ranges and their statistics.
22+
type Engine struct {
23+
mu sync.Mutex
24+
ranges []Range
25+
splitThreshold int
26+
notify func(left, right Range)
27+
}
28+
29+
// NewEngine creates a new Engine with a split threshold. The notify
30+
// function is invoked whenever a range split occurs.
31+
func NewEngine(threshold int, notify func(left, right Range)) *Engine {
32+
return &Engine{
33+
ranges: []Range{{Start: "", End: string(rune(0xffff))}},
34+
splitThreshold: threshold,
35+
notify: notify,
36+
}
37+
}
38+
39+
// RecordRequest registers a request for the given key and triggers a split
40+
// when the request count for the containing range exceeds the threshold.
41+
func (e *Engine) RecordRequest(key string) {
42+
e.mu.Lock()
43+
defer e.mu.Unlock()
44+
45+
idx := e.findRange(key)
46+
e.ranges[idx].RequestCount++
47+
if e.ranges[idx].RequestCount >= e.splitThreshold {
48+
e.splitRange(idx)
49+
}
50+
}
51+
52+
// SplitRange splits the range containing the supplied key.
53+
func (e *Engine) SplitRange(key string) {
54+
e.mu.Lock()
55+
defer e.mu.Unlock()
56+
idx := e.findRange(key)
57+
e.splitRange(idx)
58+
}
59+
60+
// GetStats returns the statistics for all ranges.
61+
func (e *Engine) GetStats() []RangeStat {
62+
e.mu.Lock()
63+
defer e.mu.Unlock()
64+
out := make([]RangeStat, len(e.ranges))
65+
for i, r := range e.ranges {
66+
out[i] = RangeStat{Start: r.Start, End: r.End, Count: r.RequestCount}
67+
}
68+
return out
69+
}
70+
71+
// Ranges returns a copy of the current range metadata. Primarily used for tests.
72+
func (e *Engine) Ranges() []Range {
73+
e.mu.Lock()
74+
defer e.mu.Unlock()
75+
out := make([]Range, len(e.ranges))
76+
copy(out, e.ranges)
77+
return out
78+
}
79+
80+
func (e *Engine) findRange(key string) int {
81+
for i, r := range e.ranges {
82+
if (r.Start == "" || key >= r.Start) && (r.End == "" || key < r.End) {
83+
return i
84+
}
85+
}
86+
// default to last range
87+
return len(e.ranges) - 1
88+
}
89+
90+
func (e *Engine) splitRange(idx int) {
91+
r := e.ranges[idx]
92+
mid := midpoint(r.Start, r.End)
93+
left := Range{Start: r.Start, End: mid}
94+
right := Range{Start: mid, End: r.End}
95+
96+
// replace range with two new ranges
97+
newRanges := make([]Range, 0, len(e.ranges)+1)
98+
newRanges = append(newRanges, e.ranges[:idx]...)
99+
newRanges = append(newRanges, left, right)
100+
newRanges = append(newRanges, e.ranges[idx+1:]...)
101+
e.ranges = newRanges
102+
103+
if e.notify != nil {
104+
e.notify(left, right)
105+
}
106+
}
107+
108+
func midpoint(start, end string) string {
109+
// simplistic midpoint calculation based on the first byte of the bounds.
110+
s := byte('a')
111+
if len(start) > 0 {
112+
s = start[0]
113+
}
114+
e := byte('z')
115+
if len(end) > 0 && end != string(rune(0xffff)) {
116+
e = end[0]
117+
}
118+
m := s + (e-s)/2
119+
return string([]byte{m})
120+
}
121+

distribution/split_test.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package distribution
2+
3+
import "testing"
4+
5+
// Test range statistics collection and split triggering.
6+
func TestRangeStatsAndSplit(t *testing.T) {
7+
var notified bool
8+
eng := NewEngine(3, func(left, right Range) {
9+
notified = true
10+
})
11+
12+
eng.RecordRequest("a")
13+
eng.RecordRequest("a")
14+
15+
stats := eng.GetStats()
16+
if len(stats) != 1 || stats[0].Count != 2 {
17+
t.Fatalf("expected single range count 2, got %+v", stats)
18+
}
19+
20+
// Third request should trigger a split
21+
eng.RecordRequest("a")
22+
if !notified {
23+
t.Fatalf("expected split notification")
24+
}
25+
if len(eng.Ranges()) != 2 {
26+
t.Fatalf("expected 2 ranges after split, got %d", len(eng.Ranges()))
27+
}
28+
29+
// Stats should now have two ranges with reset counts
30+
stats = eng.GetStats()
31+
if len(stats) != 2 {
32+
t.Fatalf("expected stats for 2 ranges, got %d", len(stats))
33+
}
34+
35+
// find range containing "a"
36+
var count int
37+
for _, s := range stats {
38+
if (s.Start == "" || "a" >= s.Start) && (s.End == "" || "a" < s.End) {
39+
count = s.Count
40+
}
41+
}
42+
if count != 0 {
43+
t.Fatalf("expected count reset after split, got %d", count)
44+
}
45+
}
46+
47+
// Ensure metadata updates after manual SplitRange call.
48+
func TestManualSplitRange(t *testing.T) {
49+
eng := NewEngine(100, nil)
50+
eng.SplitRange("a")
51+
if len(eng.Ranges()) != 2 {
52+
t.Fatalf("expected 2 ranges after manual split, got %d", len(eng.Ranges()))
53+
}
54+
}
55+

0 commit comments

Comments
 (0)