Skip to content

Commit 37ab502

Browse files
committed
mmaprototype: extract and unit test maxFractionPendingIncDec
Touches cockroachdb#157757. Epic: CRDB-55052
1 parent 60165a6 commit 37ab502

File tree

3 files changed

+157
-24
lines changed

3 files changed

+157
-24
lines changed

pkg/kv/kvserver/allocator/mmaprototype/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ go_test(
3939
name = "mmaprototype_test",
4040
srcs = [
4141
"allocator_state_test.go",
42+
"cluster_state_rebalance_stores_test.go",
4243
"cluster_state_test.go",
4344
"constraint_matcher_test.go",
4445
"constraint_test.go",

pkg/kv/kvserver/allocator/mmaprototype/cluster_state.go

Lines changed: 41 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -937,32 +937,49 @@ func (ss *storeState) computePendingChangesReflectedInLatestLoad(
937937
}
938938

939939
func (ss *storeState) computeMaxFractionPending() {
940-
fracIncrease := 0.0
941-
fracDecrease := 0.0
942-
for i := range ss.reportedLoad {
943-
if ss.reportedLoad[i] == ss.adjusted.load[i] && ss.reportedLoad[i] == 0 {
944-
// Avoid setting ss.maxFractionPendingIncrease and
945-
// ss.maxFractionPendingDecrease to 1000 when the reported load and
946-
// adjusted load are both 0 since some dimension is expected to have zero
947-
// (e.g. write bandwidth during read-only workloads).
948-
continue
949-
}
950-
if ss.reportedLoad[i] == 0 {
951-
fracIncrease = 1000
952-
fracDecrease = 1000
953-
break
954-
}
955-
f := math.Abs(float64(ss.adjusted.load[i]-ss.reportedLoad[i])) / float64(ss.reportedLoad[i])
956-
if ss.adjusted.load[i] > ss.reportedLoad[i] {
957-
if f > fracIncrease {
958-
fracIncrease = f
940+
ss.maxFractionPendingIncrease, ss.maxFractionPendingDecrease = computeMaxFractionPendingIncDec(ss.reportedLoad, ss.adjusted.load)
941+
}
942+
943+
func computeMaxFractionPendingIncDec(rep, adj LoadVector) (maxFracInc, maxFracDec float64) {
944+
for i := range rep {
945+
inc, dec := func(rep, adj LoadValue) (inc, dec float64) {
946+
// The fraction pending expresses the absolute difference of the adjusted
947+
// and reported load as a multiple of the reported load. Note that this
948+
// is the case even if the adjusted load is negative: if, say, the adjusted
949+
// load is -50 and the reported load is 100, it is still correct to say that
950+
// a "magnitude 1.5x" change is pending (from 100 to -50).
951+
diff := adj - rep
952+
953+
switch {
954+
case diff == 0:
955+
// Reported and adjusted are equal, so nothing is pending.
956+
// This also handles the case in which both are zero.
957+
// We don't need to update maxFracInc or maxFracDec because
958+
// they started at zero and only go up from there.
959+
return 0, 0
960+
case rep == 0:
961+
// The adjusted load is nonzero, but the reported one is zero. We can't
962+
// express the load change as a multiple of zero. Arbitrarily assign large
963+
// value to both increase and decrease, indicating that no more changes
964+
// should be made until either the pending change clears (and we get a
965+
// zero diff above) or we register positive reported load.
966+
return 1000, 1000
967+
case diff > 0:
968+
// Vanilla case of adjusted > reported, i.e. we have load incoming.
969+
// We don't need to update maxFracDec.
970+
return math.Abs(float64(diff) / float64(rep)), 0
971+
case diff < 0:
972+
// Vanilla case of adjusted < reported, i.e. we have load incoming.
973+
// We don't need to update maxFracInc.
974+
return 0, math.Abs(float64(diff) / float64(rep))
975+
default:
976+
panic("impossible")
959977
}
960-
} else if f > fracDecrease {
961-
fracDecrease = f
962-
}
978+
}(rep[i], adj[i])
979+
maxFracInc = max(maxFracInc, inc)
980+
maxFracDec = max(maxFracDec, dec)
963981
}
964-
ss.maxFractionPendingIncrease = fracIncrease
965-
ss.maxFractionPendingDecrease = fracDecrease
982+
return maxFracInc, maxFracDec
966983
}
967984

968985
func newStoreState() *storeState {
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package mmaprototype
7+
8+
import (
9+
"testing"
10+
11+
"github.com/stretchr/testify/require"
12+
)
13+
14+
func TestComputeMaxFractionPendingIncDec(t *testing.T) {
15+
tests := []struct {
16+
name string
17+
reported LoadVector
18+
adjusted LoadVector
19+
expectedMaxInc float64
20+
expectedMaxDec float64
21+
}{
22+
{
23+
name: "both_zero",
24+
reported: LoadVector{},
25+
adjusted: LoadVector{},
26+
expectedMaxInc: 0,
27+
expectedMaxDec: 0,
28+
},
29+
{
30+
name: "reported_zero_adjusted_nonzero",
31+
reported: LoadVector{},
32+
adjusted: LoadVector{CPURate: 100, WriteBandwidth: 200, ByteSize: 300},
33+
expectedMaxInc: 1000,
34+
expectedMaxDec: 1000,
35+
},
36+
{
37+
name: "reported_nonzero_adjusted_zero",
38+
reported: LoadVector{CPURate: 100, WriteBandwidth: 200, ByteSize: 300},
39+
adjusted: LoadVector{CPURate: 0, WriteBandwidth: 0, ByteSize: 0},
40+
expectedMaxInc: 0,
41+
expectedMaxDec: 1.0, // abs((0-100)/100) = 1.0, same for all dimensions
42+
},
43+
{
44+
name: "simple_increase",
45+
reported: LoadVector{CPURate: 100},
46+
adjusted: LoadVector{CPURate: 150},
47+
expectedMaxInc: 0.5,
48+
expectedMaxDec: 0,
49+
},
50+
{
51+
name: "simple_decrease",
52+
reported: LoadVector{CPURate: 100},
53+
adjusted: LoadVector{CPURate: 50},
54+
expectedMaxInc: 0,
55+
expectedMaxDec: 0.5,
56+
},
57+
{
58+
name: "multiple_dimensions_increase",
59+
reported: LoadVector{CPURate: 100, WriteBandwidth: 200, ByteSize: 300},
60+
adjusted: LoadVector{CPURate: 150, WriteBandwidth: 250, ByteSize: 400},
61+
expectedMaxInc: 0.5, // max(0.5, 0.25, 0.333...)
62+
expectedMaxDec: 0,
63+
},
64+
{
65+
name: "multiple_dimensions_decrease",
66+
reported: LoadVector{CPURate: 100, WriteBandwidth: 200, ByteSize: 300},
67+
adjusted: LoadVector{CPURate: 50, WriteBandwidth: 100, ByteSize: 150},
68+
expectedMaxInc: 0,
69+
expectedMaxDec: 0.5, // max(0.5, 0.5, 0.5)
70+
},
71+
{
72+
name: "mixed_increase_and_decrease",
73+
reported: LoadVector{CPURate: 100, WriteBandwidth: 200, ByteSize: 300},
74+
adjusted: LoadVector{CPURate: 150, WriteBandwidth: 100, ByteSize: 300},
75+
expectedMaxInc: 0.5, // from CPURate
76+
expectedMaxDec: 0.5, // from WriteBandwidth
77+
},
78+
{
79+
name: "negative_adjusted_load",
80+
reported: LoadVector{CPURate: 100},
81+
adjusted: LoadVector{CPURate: -50},
82+
expectedMaxInc: 0,
83+
expectedMaxDec: 1.5, // abs(-50 - 100) / 100 = 1.5
84+
},
85+
{
86+
name: "fractional_change",
87+
reported: LoadVector{CPURate: 1000},
88+
adjusted: LoadVector{CPURate: 1100},
89+
expectedMaxInc: 0.1,
90+
expectedMaxDec: 0,
91+
},
92+
{
93+
name: "max_across_dimensions",
94+
reported: LoadVector{CPURate: 100, WriteBandwidth: 200, ByteSize: 300},
95+
adjusted: LoadVector{CPURate: 120, WriteBandwidth: 300, ByteSize: 450},
96+
expectedMaxInc: 0.5, // max(0.2, 0.5, 0.5)
97+
expectedMaxDec: 0,
98+
},
99+
{
100+
name: "one_dimension_zero_others_change",
101+
reported: LoadVector{CPURate: 0, WriteBandwidth: 100, ByteSize: 200},
102+
adjusted: LoadVector{CPURate: 50, WriteBandwidth: 150, ByteSize: 100},
103+
expectedMaxInc: 1000, // from CPURate (reported=0)
104+
expectedMaxDec: 1000, // from CPURate (reported=0)
105+
},
106+
}
107+
108+
for _, tt := range tests {
109+
t.Run(tt.name, func(t *testing.T) {
110+
maxInc, maxDec := computeMaxFractionPendingIncDec(tt.reported, tt.adjusted)
111+
require.InDelta(t, tt.expectedMaxInc, maxInc, 1e-9, "max fraction increase")
112+
require.InDelta(t, tt.expectedMaxDec, maxDec, 1e-9, "max fraction decrease")
113+
})
114+
}
115+
}

0 commit comments

Comments
 (0)