-
Notifications
You must be signed in to change notification settings - Fork 2
Add range metrics and splitting engine with admin gRPC endpoints #210
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,121 @@ | ||
| package distribution | ||
|
|
||
| import ( | ||
| "sync" | ||
| ) | ||
|
|
||
| // Range represents a key range and its statistics. | ||
| type Range struct { | ||
| Start string | ||
| End string | ||
| RequestCount int | ||
| } | ||
|
|
||
| // RangeStat mirrors Range for external consumption. | ||
| type RangeStat struct { | ||
| Start string | ||
| End string | ||
| Count int | ||
| } | ||
|
|
||
| // Engine tracks ranges and their statistics. | ||
| type Engine struct { | ||
| mu sync.Mutex | ||
| ranges []Range | ||
| splitThreshold int | ||
| notify func(left, right Range) | ||
| } | ||
|
|
||
| // NewEngine creates a new Engine with a split threshold. The notify | ||
| // function is invoked whenever a range split occurs. | ||
| func NewEngine(threshold int, notify func(left, right Range)) *Engine { | ||
| return &Engine{ | ||
| ranges: []Range{{Start: "", End: string(rune(0xffff))}}, | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🚫 [golangci] reported by reviewdog 🐶 |
||
| splitThreshold: threshold, | ||
| notify: notify, | ||
| } | ||
| } | ||
|
|
||
| // RecordRequest registers a request for the given key and triggers a split | ||
| // when the request count for the containing range exceeds the threshold. | ||
| func (e *Engine) RecordRequest(key string) { | ||
| e.mu.Lock() | ||
| defer e.mu.Unlock() | ||
|
|
||
| idx := e.findRange(key) | ||
| e.ranges[idx].RequestCount++ | ||
| if e.ranges[idx].RequestCount >= e.splitThreshold { | ||
| e.splitRange(idx) | ||
| } | ||
| } | ||
|
|
||
| // SplitRange splits the range containing the supplied key. | ||
| func (e *Engine) SplitRange(key string) { | ||
| e.mu.Lock() | ||
| defer e.mu.Unlock() | ||
| idx := e.findRange(key) | ||
| e.splitRange(idx) | ||
| } | ||
|
|
||
| // GetStats returns the statistics for all ranges. | ||
| func (e *Engine) GetStats() []RangeStat { | ||
| e.mu.Lock() | ||
| defer e.mu.Unlock() | ||
| out := make([]RangeStat, len(e.ranges)) | ||
| for i, r := range e.ranges { | ||
| out[i] = RangeStat{Start: r.Start, End: r.End, Count: r.RequestCount} | ||
| } | ||
| return out | ||
| } | ||
|
|
||
| // Ranges returns a copy of the current range metadata. Primarily used for tests. | ||
| func (e *Engine) Ranges() []Range { | ||
| e.mu.Lock() | ||
| defer e.mu.Unlock() | ||
| out := make([]Range, len(e.ranges)) | ||
| copy(out, e.ranges) | ||
| return out | ||
| } | ||
|
|
||
| func (e *Engine) findRange(key string) int { | ||
| for i, r := range e.ranges { | ||
| if (r.Start == "" || key >= r.Start) && (r.End == "" || key < r.End) { | ||
| return i | ||
| } | ||
| } | ||
| // default to last range | ||
| return len(e.ranges) - 1 | ||
| } | ||
|
|
||
| func (e *Engine) splitRange(idx int) { | ||
| r := e.ranges[idx] | ||
| mid := midpoint(r.Start, r.End) | ||
| left := Range{Start: r.Start, End: mid} | ||
| right := Range{Start: mid, End: r.End} | ||
|
|
||
| // replace range with two new ranges | ||
| newRanges := make([]Range, 0, len(e.ranges)+1) | ||
| newRanges = append(newRanges, e.ranges[:idx]...) | ||
| newRanges = append(newRanges, left, right) | ||
| newRanges = append(newRanges, e.ranges[idx+1:]...) | ||
| e.ranges = newRanges | ||
|
|
||
| if e.notify != nil { | ||
| e.notify(left, right) | ||
| } | ||
| } | ||
|
|
||
| func midpoint(start, end string) string { | ||
| // simplistic midpoint calculation based on the first byte of the bounds. | ||
| s := byte('a') | ||
| if len(start) > 0 { | ||
| s = start[0] | ||
| } | ||
| e := byte('z') | ||
| if len(end) > 0 && end != string(rune(0xffff)) { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🚫 [golangci] reported by reviewdog 🐶 |
||
| e = end[0] | ||
| } | ||
| m := s + (e-s)/2 | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🚫 [golangci] reported by reviewdog 🐶 |
||
| return string([]byte{m}) | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,55 @@ | ||
| package distribution | ||
|
|
||
| import "testing" | ||
|
|
||
| // Test range statistics collection and split triggering. | ||
| func TestRangeStatsAndSplit(t *testing.T) { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🚫 [golangci] reported by reviewdog 🐶 |
||
| var notified bool | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🚫 [golangci] reported by reviewdog 🐶 |
||
| eng := NewEngine(3, func(left, right Range) { | ||
| notified = true | ||
| }) | ||
|
|
||
| eng.RecordRequest("a") | ||
| eng.RecordRequest("a") | ||
|
|
||
| stats := eng.GetStats() | ||
| if len(stats) != 1 || stats[0].Count != 2 { | ||
| t.Fatalf("expected single range count 2, got %+v", stats) | ||
| } | ||
|
|
||
| // Third request should trigger a split | ||
| eng.RecordRequest("a") | ||
| if !notified { | ||
| t.Fatalf("expected split notification") | ||
| } | ||
| if len(eng.Ranges()) != 2 { | ||
| t.Fatalf("expected 2 ranges after split, got %d", len(eng.Ranges())) | ||
| } | ||
|
|
||
| // Stats should now have two ranges with reset counts | ||
| stats = eng.GetStats() | ||
| if len(stats) != 2 { | ||
| t.Fatalf("expected stats for 2 ranges, got %d", len(stats)) | ||
| } | ||
|
|
||
| // find range containing "a" | ||
| var count int | ||
| for _, s := range stats { | ||
| if (s.Start == "" || "a" >= s.Start) && (s.End == "" || "a" < s.End) { | ||
| count = s.Count | ||
| } | ||
| } | ||
| if count != 0 { | ||
| t.Fatalf("expected count reset after split, got %d", count) | ||
| } | ||
| } | ||
|
|
||
| // Ensure metadata updates after manual SplitRange call. | ||
| func TestManualSplitRange(t *testing.T) { | ||
| eng := NewEngine(100, nil) | ||
| eng.SplitRange("a") | ||
| if len(eng.Ranges()) != 2 { | ||
| t.Fatalf("expected 2 ranges after manual split, got %d", len(eng.Ranges())) | ||
| } | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🚫 [golangci] reported by reviewdog 🐶
File is not properly formatted (gci)