Skip to content

Commit 52478b3

Browse files
authored
Merge branch 'main' into codex/fix-issue-with-elastickv-functionality
2 parents 3cc2843 + 0dc7046 commit 52478b3

File tree

8 files changed

+254
-67
lines changed

8 files changed

+254
-67
lines changed

.github/workflows/go-mod-fix.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ on:
66

77
jobs:
88
go-mod-fix:
9+
permissions:
10+
contents: write
911
runs-on: ubuntu-latest
1012
steps:
1113
- name: checkout

.github/workflows/go-test.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ jobs:
1616
runs-on: ${{ matrix.os }}
1717
steps:
1818
- uses: actions/checkout@v5
19-
- uses: actions/setup-go@v5
19+
- uses: actions/setup-go@v6
2020
with:
2121
go-version-file: 'go.mod'
2222
- run: go test -race ./...

.github/workflows/golangci-lint.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@ concurrency:
1111
jobs:
1212
golangci-lint:
1313
name: golangci-lint
14+
permissions:
15+
contents: read
16+
pull-requests: write
1417
runs-on: ubuntu-latest
1518
steps:
1619
- name: Check out code into the Go module directory

.github/workflows/jepsen-test.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ concurrency:
44
group: ${{ github.workflow }}-${{ github.ref }}-jepsen-test
55

66
name: Jepsen Test
7+
permissions:
8+
contents: read
79
jobs:
810
test:
911
runs-on: ubuntu-latest

distribution/engine.go

Lines changed: 113 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,28 @@ type Route struct {
1818
End []byte
1919
// GroupID identifies the raft group for the range starting at Start.
2020
GroupID uint64
21+
// Load tracks the number of accesses served by this range.
22+
Load uint64
2123
}
2224

2325
// Engine holds in-memory metadata of routes and provides timestamp generation.
2426
type Engine struct {
25-
mu sync.RWMutex
26-
routes []Route
27-
ts uint64
27+
mu sync.RWMutex
28+
routes []Route
29+
ts uint64
30+
hotspotThreshold uint64
2831
}
2932

30-
// NewEngine creates an Engine.
33+
// NewEngine creates an Engine with no hotspot splitting.
3134
func NewEngine() *Engine {
32-
return &Engine{routes: make([]Route, 0)}
35+
return NewEngineWithThreshold(0)
36+
}
37+
38+
// NewEngineWithThreshold creates an Engine and sets a threshold for hotspot
39+
// detection. A non-zero threshold enables automatic range splitting when the
40+
// number of accesses to a range exceeds the threshold.
41+
func NewEngineWithThreshold(threshold uint64) *Engine {
42+
return &Engine{routes: make([]Route, 0), hotspotThreshold: threshold}
3343
}
3444

3545
// UpdateRoute registers or updates a route for the given key range.
@@ -47,25 +57,113 @@ func (e *Engine) UpdateRoute(start, end []byte, group uint64) {
4757
func (e *Engine) GetRoute(key []byte) (Route, bool) {
4858
e.mu.RLock()
4959
defer e.mu.RUnlock()
50-
if len(e.routes) == 0 {
60+
idx := e.routeIndex(key)
61+
if idx < 0 {
5162
return Route{}, false
5263
}
64+
return e.routes[idx], true
65+
}
66+
67+
// NextTimestamp returns a monotonic increasing timestamp.
68+
func (e *Engine) NextTimestamp() uint64 {
69+
return atomic.AddUint64(&e.ts, 1)
70+
}
71+
72+
// RecordAccess increases the access counter for the range containing key and
73+
// splits the range if it turns into a hotspot. The load counter is updated
74+
// atomically under a read lock to allow concurrent access recording. If the
75+
// hotspot threshold is exceeded, RecordAccess acquires a full write lock and
76+
// re-checks the condition before splitting to avoid races with concurrent
77+
// splits.
78+
func (e *Engine) RecordAccess(key []byte) {
79+
e.mu.RLock()
80+
idx := e.routeIndex(key)
81+
if idx < 0 {
82+
e.mu.RUnlock()
83+
return
84+
}
85+
load := atomic.AddUint64(&e.routes[idx].Load, 1)
86+
threshold := e.hotspotThreshold
87+
e.mu.RUnlock()
88+
if threshold == 0 || load < threshold {
89+
return
90+
}
91+
92+
e.mu.Lock()
93+
defer e.mu.Unlock()
94+
idx = e.routeIndex(key)
95+
if idx < 0 {
96+
return
97+
}
98+
if e.routes[idx].Load >= threshold {
99+
e.splitRange(idx)
100+
}
101+
}
53102

54-
// Find the first route with Start > key.
103+
// Stats returns a snapshot of current ranges and their load counters.
104+
func (e *Engine) Stats() []Route {
105+
e.mu.RLock()
106+
defer e.mu.RUnlock()
107+
stats := make([]Route, len(e.routes))
108+
for i, r := range e.routes {
109+
stats[i] = Route{Start: cloneBytes(r.Start), End: cloneBytes(r.End), GroupID: r.GroupID, Load: atomic.LoadUint64(&e.routes[i].Load)}
110+
}
111+
return stats
112+
}
113+
114+
func (e *Engine) routeIndex(key []byte) int {
115+
if len(e.routes) == 0 {
116+
return -1
117+
}
55118
i := sort.Search(len(e.routes), func(i int) bool {
56119
return bytes.Compare(e.routes[i].Start, key) > 0
57120
})
58121
if i == 0 {
59-
return Route{}, false
122+
return -1
60123
}
61-
r := e.routes[i-1]
62-
if r.End != nil && bytes.Compare(key, r.End) >= 0 {
63-
return Route{}, false
124+
i--
125+
if end := e.routes[i].End; end != nil && bytes.Compare(key, end) >= 0 {
126+
return -1
64127
}
65-
return r, true
128+
return i
66129
}
67130

68-
// NextTimestamp returns a monotonic increasing timestamp.
69-
func (e *Engine) NextTimestamp() uint64 {
70-
return atomic.AddUint64(&e.ts, 1)
131+
func (e *Engine) splitRange(idx int) {
132+
r := e.routes[idx]
133+
if r.End == nil {
134+
// cannot split unbounded range; reset load to avoid repeated attempts
135+
e.routes[idx].Load = 0
136+
return
137+
}
138+
mid := midpoint(r.Start, r.End)
139+
if mid == nil {
140+
// cannot determine midpoint; reset load to avoid repeated attempts
141+
e.routes[idx].Load = 0
142+
return
143+
}
144+
left := Route{Start: r.Start, End: mid, GroupID: r.GroupID}
145+
right := Route{Start: mid, End: r.End, GroupID: r.GroupID}
146+
// replace the range at idx with left and right in an idiomatic manner
147+
e.routes = append(e.routes[:idx+1], e.routes[idx:]...)
148+
e.routes[idx] = left
149+
e.routes[idx+1] = right
150+
}
151+
152+
func cloneBytes(b []byte) []byte {
153+
if b == nil {
154+
return nil
155+
}
156+
out := make([]byte, len(b))
157+
copy(out, b)
158+
return out
159+
}
160+
161+
// midpoint returns a key that is lexicographically between a and b. It returns
162+
// nil if such a key cannot be determined (e.g. a and b are too close).
163+
func midpoint(a, b []byte) []byte {
164+
m := append(cloneBytes(a), 0)
165+
if bytes.Compare(m, b) >= 0 {
166+
return nil
167+
}
168+
return m
71169
}

distribution/engine_test.go

Lines changed: 83 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
package distribution
22

3-
import "testing"
3+
import (
4+
"bytes"
5+
"sync"
6+
"testing"
7+
)
48

59
func TestEngineRouteLookup(t *testing.T) {
610
e := NewEngine()
@@ -50,3 +54,81 @@ func TestEngineTimestampMonotonic(t *testing.T) {
5054
last = ts
5155
}
5256
}
57+
58+
func TestEngineRecordAccessAndStats(t *testing.T) {
59+
e := NewEngineWithThreshold(0)
60+
e.UpdateRoute([]byte("a"), []byte("m"), 1)
61+
e.RecordAccess([]byte("b"))
62+
e.RecordAccess([]byte("b"))
63+
stats := e.Stats()
64+
if len(stats) != 1 {
65+
t.Fatalf("expected 1 route, got %d", len(stats))
66+
}
67+
if stats[0].Load != 2 {
68+
t.Fatalf("expected load 2, got %d", stats[0].Load)
69+
}
70+
}
71+
72+
func TestEngineSplitOnHotspot(t *testing.T) {
73+
e := NewEngineWithThreshold(2)
74+
e.UpdateRoute([]byte("a"), []byte("c"), 1)
75+
e.RecordAccess([]byte("b"))
76+
e.RecordAccess([]byte("b"))
77+
stats := e.Stats()
78+
if len(stats) != 2 {
79+
t.Fatalf("expected 2 routes after split, got %d", len(stats))
80+
}
81+
midKey := []byte("a\x00")
82+
assertRange(t, stats[0], []byte("a"), midKey)
83+
assertRange(t, stats[1], midKey, []byte("c"))
84+
if stats[0].Load != 0 || stats[1].Load != 0 {
85+
t.Errorf("expected loads to be reset to 0, got %d, %d", stats[0].Load, stats[1].Load)
86+
}
87+
r, ok := e.GetRoute([]byte("b"))
88+
if !ok || (r.End != nil && bytes.Compare([]byte("b"), r.End) >= 0) {
89+
t.Fatalf("route does not contain key b")
90+
}
91+
}
92+
93+
func TestEngineHotspotUnboundedResetsLoad(t *testing.T) {
94+
e := NewEngineWithThreshold(2)
95+
e.UpdateRoute([]byte("a"), nil, 1)
96+
e.RecordAccess([]byte("b"))
97+
e.RecordAccess([]byte("b"))
98+
stats := e.Stats()
99+
if len(stats) != 1 {
100+
t.Fatalf("expected 1 route, got %d", len(stats))
101+
}
102+
if stats[0].Load != 0 {
103+
t.Fatalf("expected load reset to 0, got %d", stats[0].Load)
104+
}
105+
}
106+
107+
func TestEngineRecordAccessConcurrent(t *testing.T) {
108+
t.Parallel()
109+
e := NewEngineWithThreshold(6)
110+
e.UpdateRoute([]byte("a"), []byte("c"), 1)
111+
112+
var wg sync.WaitGroup
113+
const workers = 10
114+
for i := 0; i < workers; i++ {
115+
wg.Add(1)
116+
go func() {
117+
defer wg.Done()
118+
e.RecordAccess([]byte("b"))
119+
}()
120+
}
121+
wg.Wait()
122+
123+
stats := e.Stats()
124+
if len(stats) != 2 {
125+
t.Fatalf("expected 2 routes after concurrent split, got %d", len(stats))
126+
}
127+
}
128+
129+
func assertRange(t *testing.T, r Route, start, end []byte) {
130+
t.Helper()
131+
if !bytes.Equal(r.Start, start) || !bytes.Equal(r.End, end) {
132+
t.Errorf("expected range [%q, %q), got [%q, %q]", start, end, r.Start, r.End)
133+
}
134+
}

go.mod

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,46 +1,46 @@
11
module github.com/bootjp/elastickv
22

3-
go 1.23.0
3+
go 1.24.0
44

5-
toolchain go1.25.0
5+
toolchain go1.25.1
66

77
require (
88
github.com/Jille/grpc-multi-resolver v1.3.0
99
github.com/Jille/raft-grpc-leader-rpc v1.1.0
1010
github.com/Jille/raft-grpc-transport v1.6.1
1111
github.com/Jille/raftadmin v1.2.1
12-
github.com/aws/aws-sdk-go-v2 v1.38.3
13-
github.com/aws/aws-sdk-go-v2/config v1.31.6
14-
github.com/aws/aws-sdk-go-v2/credentials v1.18.10
15-
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.50.1
12+
github.com/aws/aws-sdk-go-v2 v1.39.0
13+
github.com/aws/aws-sdk-go-v2/config v1.31.8
14+
github.com/aws/aws-sdk-go-v2/credentials v1.18.12
15+
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.50.3
1616
github.com/cockroachdb/errors v1.12.0
1717
github.com/emirpasic/gods v1.18.1
1818
github.com/hashicorp/go-hclog v1.6.3
1919
github.com/hashicorp/raft v1.7.3
2020
github.com/hashicorp/raft-boltdb/v2 v2.3.1
2121
github.com/pkg/errors v0.9.1
22-
github.com/redis/go-redis/v9 v9.12.1
22+
github.com/redis/go-redis/v9 v9.14.0
2323
github.com/spaolacci/murmur3 v1.1.0
2424
github.com/stretchr/testify v1.11.1
2525
github.com/tidwall/redcon v1.6.2
2626
go.etcd.io/bbolt v1.4.3
27-
golang.org/x/sync v0.16.0
28-
google.golang.org/grpc v1.75.0
29-
google.golang.org/protobuf v1.36.8
27+
golang.org/x/sync v0.17.0
28+
google.golang.org/grpc v1.75.1
29+
google.golang.org/protobuf v1.36.9
3030
)
3131

3232
require (
3333
github.com/armon/go-metrics v0.4.1 // indirect
34-
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.6 // indirect
35-
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.6 // indirect
36-
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.6 // indirect
34+
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.7 // indirect
35+
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.7 // indirect
36+
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.7 // indirect
3737
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3 // indirect
3838
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.1 // indirect
39-
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.11.6 // indirect
40-
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.6 // indirect
41-
github.com/aws/aws-sdk-go-v2/service/sso v1.29.1 // indirect
42-
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.34.2 // indirect
43-
github.com/aws/aws-sdk-go-v2/service/sts v1.38.2 // indirect
39+
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.11.7 // indirect
40+
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.7 // indirect
41+
github.com/aws/aws-sdk-go-v2/service/sso v1.29.3 // indirect
42+
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.34.4 // indirect
43+
github.com/aws/aws-sdk-go-v2/service/sts v1.38.4 // indirect
4444
github.com/aws/smithy-go v1.23.0 // indirect
4545
github.com/boltdb/bolt v1.3.1 // indirect
4646
github.com/cespare/xxhash/v2 v2.3.0 // indirect

0 commit comments

Comments
 (0)