Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 94 additions & 14 deletions distribution/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,28 @@ type Route struct {
End []byte
// GroupID identifies the raft group for the range starting at Start.
GroupID uint64
// Load tracks the number of accesses served by this range.
Load uint64
}

// Engine holds in-memory metadata of routes and provides timestamp generation.
type Engine struct {
mu sync.RWMutex
routes []Route
ts uint64
mu sync.RWMutex
routes []Route
ts uint64
hotspotThreshold uint64
}

// NewEngine creates an Engine.
// NewEngine creates an Engine with no hotspot splitting.
func NewEngine() *Engine {
return &Engine{routes: make([]Route, 0)}
return NewEngineWithThreshold(0)
}

// NewEngineWithThreshold creates an Engine and sets a threshold for hotspot
// detection. A non-zero threshold enables automatic range splitting when the
// number of accesses to a range exceeds the threshold.
func NewEngineWithThreshold(threshold uint64) *Engine {
return &Engine{routes: make([]Route, 0), hotspotThreshold: threshold}
}

// UpdateRoute registers or updates a route for the given key range.
Expand All @@ -47,25 +57,95 @@ func (e *Engine) UpdateRoute(start, end []byte, group uint64) {
func (e *Engine) GetRoute(key []byte) (Route, bool) {
e.mu.RLock()
defer e.mu.RUnlock()
if len(e.routes) == 0 {
idx := e.routeIndex(key)
if idx < 0 {
return Route{}, false
}
return e.routes[idx], true
}

// NextTimestamp returns a monotonic increasing timestamp.
func (e *Engine) NextTimestamp() uint64 {
return atomic.AddUint64(&e.ts, 1)
}

// RecordAccess increases the access counter for the range containing key and
// splits the range if it turns into a hotspot.
func (e *Engine) RecordAccess(key []byte) {
e.mu.Lock()
defer e.mu.Unlock()
idx := e.routeIndex(key)
if idx < 0 {
return
}
e.routes[idx].Load++
if e.hotspotThreshold > 0 && e.routes[idx].Load >= e.hotspotThreshold {
e.splitRange(idx)
}
}
Comment on lines +74 to +85
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The use of a full e.mu.Lock() in RecordAccess serializes all access recordings, which could become a performance bottleneck under high contention, as this is likely a hot path.

For better concurrency, you could consider a more fine-grained locking approach. For instance, you could use atomic.AddUint64 to increment the Load counter under a read lock (e.mu.RLock). The range splitting, which is a less frequent and more expensive operation, could then acquire a full write lock (e.mu.Lock()) only when the threshold is met.

This change would increase complexity, as you'd need to carefully handle the race condition where a range is split by another goroutine between releasing the read lock and acquiring the write lock (e.g., by re-checking the condition after acquiring the write lock). However, it would significantly improve the scalability of access tracking.


// Find the first route with Start > key.
// Stats returns a snapshot of current ranges and their load counters.
func (e *Engine) Stats() []Route {
e.mu.RLock()
defer e.mu.RUnlock()
stats := make([]Route, len(e.routes))
for i, r := range e.routes {
stats[i] = Route{Start: cloneBytes(r.Start), End: cloneBytes(r.End), GroupID: r.GroupID, Load: r.Load}
}
return stats
}

func (e *Engine) routeIndex(key []byte) int {
if len(e.routes) == 0 {
return -1
}
i := sort.Search(len(e.routes), func(i int) bool {
return bytes.Compare(e.routes[i].Start, key) > 0
})
if i == 0 {
return Route{}, false
return -1
}
r := e.routes[i-1]
i--
r := e.routes[i]
if r.End != nil && bytes.Compare(key, r.End) >= 0 {
return Route{}, false
return -1
}
return r, true
return i
}

// NextTimestamp returns a monotonic increasing timestamp.
func (e *Engine) NextTimestamp() uint64 {
return atomic.AddUint64(&e.ts, 1)
func (e *Engine) splitRange(idx int) {
r := e.routes[idx]
if r.End == nil {
// cannot split unbounded range
return
}
mid := midpoint(r.Start, r.End)
if mid == nil {
return
}
left := Route{Start: r.Start, End: mid, GroupID: r.GroupID, Load: r.Load / 2}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
File is not properly formatted (gci)

right := Route{Start: mid, End: r.End, GroupID: r.GroupID, Load: r.Load - (r.Load / 2)}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
Magic number: 2, in detected (mnd)

e.routes = append(e.routes, Route{}) // Make room for one more element.
copy(e.routes[idx+2:], e.routes[idx+1:]) // Shift elements to the right.
e.routes[idx] = left
e.routes[idx+1] = right
}

func cloneBytes(b []byte) []byte {
if b == nil {
return nil
}
out := make([]byte, len(b))
copy(out, b)
return out
}

// midpoint returns a key that is lexicographically between a and b. It returns
// nil if such a key cannot be determined (e.g. a and b are too close).
func midpoint(a, b []byte) []byte {
m := append(cloneBytes(a), 0)
if bytes.Compare(m, b) >= 0 {
return nil
}
return m
}
38 changes: 37 additions & 1 deletion distribution/engine_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package distribution

import "testing"
import (
"bytes"
"testing"
)

func TestEngineRouteLookup(t *testing.T) {
e := NewEngine()
Expand Down Expand Up @@ -50,3 +53,36 @@ func TestEngineTimestampMonotonic(t *testing.T) {
last = ts
}
}

func TestEngineRecordAccessAndStats(t *testing.T) {
e := NewEngineWithThreshold(0)
e.UpdateRoute([]byte("a"), []byte("m"), 1)
e.RecordAccess([]byte("b"))
e.RecordAccess([]byte("b"))
stats := e.Stats()
if len(stats) != 1 {
t.Fatalf("expected 1 route, got %d", len(stats))
}
if stats[0].Load != 2 {
t.Fatalf("expected load 2, got %d", stats[0].Load)
}
}

func TestEngineSplitOnHotspot(t *testing.T) {
e := NewEngineWithThreshold(2)
e.UpdateRoute([]byte("a"), []byte("c"), 1)
e.RecordAccess([]byte("b"))
e.RecordAccess([]byte("b"))
stats := e.Stats()
if len(stats) != 2 {
t.Fatalf("expected 2 routes after split, got %d", len(stats))
}
Comment on lines +77 to +79
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This test correctly verifies that a split occurs, but it could be more robust. It would be beneficial to add more specific assertions to check the properties of the newly created ranges.

I recommend adding checks for:

  1. The exact Start and End keys of the new ranges to ensure the midpoint logic is working as expected.
  2. The Load of the new ranges. Based on the current implementation, they should be 0. If you adopt my suggestion to distribute load, this test should be updated to check for the distributed values (e.g., 1 and 1 in this case).
if len(stats) != 2 {
	t.Fatalf("expected 2 routes after split, got %d", len(stats))
}

// Check that the new ranges are correct.
midKey := []byte("a\x00")
if !bytes.Equal(stats[0].Start, []byte("a")) || !bytes.Equal(stats[0].End, midKey) {
	t.Errorf("expected first range to be [a, a\\x00), got [%q, %q]", stats[0].Start, stats[0].End)
}
if !bytes.Equal(stats[1].Start, midKey) || !bytes.Equal(stats[1].End, []byte("c")) {
	t.Errorf("expected second range to be [a\\x00, c), got [%q, %q]", stats[1].Start, stats[1].End)
}

// Check that load is reset after split.
if stats[0].Load != 0 || stats[1].Load != 0 {
	t.Errorf("expected loads to be reset to 0, got %d, %d", stats[0].Load, stats[1].Load)
}

// ensure the key b can still be resolved after split
r, ok := e.GetRoute([]byte("b"))
if !ok {
t.Fatalf("expected route for key b after split")
}
if r.End != nil && bytes.Compare([]byte("b"), r.End) >= 0 {
t.Fatalf("route does not contain key b")
}
}
Loading