Skip to content

Commit 1841964

Browse files
committed
kvserver: port over relevant kvserver and allocator integration files
This commit brings in the relevant kvserver and allocator integration files from the prototype branch. To avoid merging untested code directly into production packages like kvserver, the files have been placed in a new package under pkg/kv/kvserver/allocator/mmaprototypehelpers/ and renamed to allocator_mma_integration.go and kvserver_mma_integration.go. These will be moved to their respective production packages once fully validated. Epic: none Release note: none
1 parent 45f2866 commit 1841964

File tree

7 files changed

+882
-0
lines changed

7 files changed

+882
-0
lines changed

pkg/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1462,6 +1462,7 @@ GO_TARGETS = [
14621462
"//pkg/kv/kvserver/allocator/load:load_test",
14631463
"//pkg/kv/kvserver/allocator/mmaprototype:mmaprototype",
14641464
"//pkg/kv/kvserver/allocator/mmaprototype:mmaprototype_test",
1465+
"//pkg/kv/kvserver/allocator/mmaprototypehelpers:mmaprototypehelpers",
14651466
"//pkg/kv/kvserver/allocator/plan:plan",
14661467
"//pkg/kv/kvserver/allocator/plan:plan_test",
14671468
"//pkg/kv/kvserver/allocator/storepool:storepool",
@@ -1478,6 +1479,7 @@ GO_TARGETS = [
14781479
"//pkg/kv/kvserver/asim/history:history",
14791480
"//pkg/kv/kvserver/asim/metrics:metrics",
14801481
"//pkg/kv/kvserver/asim/metrics:metrics_test",
1482+
"//pkg/kv/kvserver/asim/mmaintegration:mmaintegration",
14811483
"//pkg/kv/kvserver/asim/op:op",
14821484
"//pkg/kv/kvserver/asim/op:op_test",
14831485
"//pkg/kv/kvserver/asim/queue:queue",
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
load("@io_bazel_rules_go//go:def.bzl", "go_library")
2+
3+
go_library(
4+
name = "mmaprototypehelpers",
5+
srcs = [
6+
"allocator_mma_integration.go",
7+
"kvserver_mma_integration.go",
8+
],
9+
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/mmaprototypehelpers",
10+
visibility = ["//visibility:public"],
11+
deps = [
12+
"//pkg/kv/kvpb",
13+
"//pkg/kv/kvserver/allocator",
14+
"//pkg/kv/kvserver/allocator/mmaprototype",
15+
"//pkg/kv/kvserver/allocator/storepool",
16+
"//pkg/roachpb",
17+
"//pkg/util/log",
18+
"//pkg/util/syncutil",
19+
"//pkg/util/timeutil",
20+
],
21+
)
Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package mmaprototypehelpers
7+
8+
import (
9+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator"
10+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/mmaprototype"
11+
"github.com/cockroachdb/cockroach/pkg/roachpb"
12+
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
13+
)
14+
15+
func MakeStoreLoadMsg(
16+
desc roachpb.StoreDescriptor, origTimestampNanos int64,
17+
) mmaprototype.StoreLoadMsg {
18+
var load, capacity mmaprototype.LoadVector
19+
load[mmaprototype.CPURate] = mmaprototype.LoadValue(desc.Capacity.CPUPerSecond)
20+
if desc.NodeCapacity.NodeCPURateCapacity > 0 {
21+
// CPU is a shared resource across all stores on a node, and furthermore
22+
// there are consumers that we don't track on a per-replica (and thus
23+
// per-store) level (anything in SQL, for example). So we generally
24+
// expect the NodeCPURateCapacity to be higher than the sum of all
25+
// StoresCPURate. We do not currently have a configured per-store
26+
// capacity (there is no configuration setting that says "this store
27+
// gets at most 4 vcpus", so we need to derive a sensible store-level
28+
// capacity from the node-level capacity (roughly, vcpus*seconds).
29+
//
30+
// We have
31+
//
32+
// cpuUtil = NodeCPURateUsage/NodeCPURateCapacity
33+
//
34+
// we want to define StoresCPURateCapacity such that
35+
//
36+
// StoresCPURate/StoresCPURateCapacity = cpuUtil,
37+
//
38+
// i.e. we want to give the (collective) stores a capacity that results
39+
// in the same utilization as reported at the process (node) level, i.e.
40+
//
41+
// StoresCPURateCapacity = StoresCPURate/cpuUtil.
42+
//
43+
// Finally, we split StoresCPURateCapacity evenly across the stores.
44+
//
45+
// This construction ensures that there is overload as measured by node
46+
// CPU usage exactly when there is overload as measured by mean store
47+
// CPU usage:
48+
//
49+
// mean = sum_i StoreCPURate_i / sum_i StoreCPURateCapacity_i
50+
// = 1/StoresCPURateCapacity * sum_i StoreCPURate_i
51+
// = StoresCPURate / StoresCPURateCapacity
52+
// = cpuUtil
53+
//
54+
// and in particular, when the mean indicates overload, at least one
55+
// store will be above the meaning, meaning it is overloaded as well
56+
// and will induce load shedding.
57+
//
58+
// It's worth noting that this construction assumes that all load on the
59+
// node is due to the stores. Take an extreme example in which there is
60+
// a single store using up 1vcpu, but the node is fully utilized (at,
61+
// say, 16 vcpus). In this case, the node will be at 100%, and we will
62+
// assign a capacity of 1 to the store, i.e. the store will also be at
63+
// 100% utilization despite contributing only 1/16th of the node CPU
64+
// utilization. The effect of the construction is that the stores will
65+
// take on responsibility for shedding load to compensate for auxiliary
66+
// consumption of CPU, which is generally sensible.
67+
cpuUtil :=
68+
float64(desc.NodeCapacity.NodeCPURateUsage) / float64(desc.NodeCapacity.NodeCPURateCapacity)
69+
// cpuUtil can be zero or close to zero.
70+
almostZeroUtil := cpuUtil < 0.01
71+
if desc.NodeCapacity.StoresCPURate != 0 && !almostZeroUtil {
72+
// cpuUtil is distributed across the stores, by constructing a
73+
// nodeCapacity, and then splitting nodeCapacity evenly across all the
74+
// stores. If the cpuUtil of a node is higher than the mean across nodes
75+
// of the cluster, then cpu util of at least one store on that node will
76+
// be higher than the mean across all stores in the cluster (since the
77+
// cpu util of a node is simply the mean across all its stores), which
78+
// will result in load shedding. Note that this can cause cpu util of a
79+
// store to be > 100% e.g. if a node is at 80% cpu util and has 10
80+
// stores, and all the cpu usage is due to store s1, then s1 will have
81+
// 800% util.
82+
nodeCapacity := float64(desc.NodeCapacity.StoresCPURate) / cpuUtil
83+
storeCapacity := nodeCapacity / float64(desc.NodeCapacity.NumStores)
84+
capacity[mmaprototype.CPURate] = mmaprototype.LoadValue(storeCapacity)
85+
} else {
86+
// almostZeroUtil or StoresCPURate is zero. We assume that only 50% of
87+
// the usage can be accounted for in StoresCPURate, so we divide 50% of
88+
// the NodeCPURateCapacity among all the stores.
89+
capacity[mmaprototype.CPURate] = mmaprototype.LoadValue(
90+
float64(desc.NodeCapacity.NodeCPURateCapacity/2) / float64(desc.NodeCapacity.NumStores))
91+
}
92+
} else {
93+
// TODO(sumeer): remove this hack of defaulting to 50% utilization, since
94+
// NodeCPURateCapacity should never be 0.
95+
// TODO(tbg): when do we expect to hit this branch? Mixed version cluster?
96+
capacity[mmaprototype.CPURate] = load[mmaprototype.CPURate] * 2
97+
}
98+
load[mmaprototype.WriteBandwidth] = mmaprototype.LoadValue(desc.Capacity.WriteBytesPerSecond)
99+
capacity[mmaprototype.WriteBandwidth] = mmaprototype.UnknownCapacity
100+
// ByteSize is based on LogicalBytes since that is how we measure the size
101+
// of each range
102+
load[mmaprototype.ByteSize] = mmaprototype.LoadValue(desc.Capacity.LogicalBytes)
103+
// Available does not compensate for the ballast, so utilization will look
104+
// higher than actual. This is fine since the ballast is small (default is
105+
// 1% of capacity) and is for use in an emergency.
106+
byteSizeUtil :=
107+
float64(desc.Capacity.Capacity-desc.Capacity.Available) / float64(desc.Capacity.Capacity)
108+
almostZeroUtil := byteSizeUtil < 0.01
109+
if load[mmaprototype.ByteSize] != 0 && !almostZeroUtil {
110+
// Normal case. The store has some ranges, and is not almost empty.
111+
capacity[mmaprototype.ByteSize] = mmaprototype.LoadValue(float64(load[mmaprototype.ByteSize]) / byteSizeUtil)
112+
} else {
113+
// Has no ranges or is almost empty. This is likely a new store. Since
114+
// LogicalBytes are uncompressed, we start with the compressed available,
115+
// which is desirably pessimistic.
116+
capacity[mmaprototype.ByteSize] = mmaprototype.LoadValue(desc.Capacity.Available)
117+
}
118+
var secondaryLoad mmaprototype.SecondaryLoadVector
119+
secondaryLoad[mmaprototype.LeaseCount] = mmaprototype.LoadValue(desc.Capacity.LeaseCount)
120+
secondaryLoad[mmaprototype.ReplicaCount] = mmaprototype.LoadValue(desc.Capacity.RangeCount)
121+
// TODO(tbg): this triggers early in tests, probably we're making load messages
122+
// before having received the first capacity. Still, this is bad, should fix.
123+
// or handle properly by communicating an unknown capacity.
124+
// if capacity[mmaprototype.CPURate] == 0 {
125+
// panic("ouch")
126+
// }
127+
return mmaprototype.StoreLoadMsg{
128+
NodeID: desc.Node.NodeID,
129+
StoreID: desc.StoreID,
130+
Load: load,
131+
Capacity: capacity,
132+
SecondaryLoad: secondaryLoad,
133+
LoadTime: timeutil.FromUnixNanos(origTimestampNanos),
134+
}
135+
}
136+
137+
// UsageInfoToMMALoad converts a RangeUsageInfo to a mmaprototype.RangeLoad.
138+
func UsageInfoToMMALoad(usage allocator.RangeUsageInfo) mmaprototype.RangeLoad {
139+
lv := mmaprototype.LoadVector{}
140+
lv[mmaprototype.CPURate] = mmaprototype.LoadValue(usage.RequestCPUNanosPerSecond) + mmaprototype.LoadValue(usage.RaftCPUNanosPerSecond)
141+
lv[mmaprototype.WriteBandwidth] = mmaprototype.LoadValue(usage.WriteBytesPerSecond)
142+
lv[mmaprototype.ByteSize] = mmaprototype.LoadValue(usage.LogicalBytes)
143+
return mmaprototype.RangeLoad{
144+
Load: lv,
145+
RaftCPU: mmaprototype.LoadValue(usage.RaftCPUNanosPerSecond),
146+
}
147+
}
148+
149+
// ReplicaDescriptorToReplicaIDAndType converts a ReplicaDescriptor to a
150+
// StoreIDAndReplicaState. The leaseholder store is passed in as lh.
151+
func ReplicaDescriptorToReplicaIDAndType(
152+
desc roachpb.ReplicaDescriptor, lh roachpb.StoreID,
153+
) mmaprototype.StoreIDAndReplicaState {
154+
return mmaprototype.StoreIDAndReplicaState{
155+
StoreID: desc.StoreID,
156+
ReplicaState: mmaprototype.ReplicaState{
157+
ReplicaIDAndType: mmaprototype.ReplicaIDAndType{
158+
ReplicaID: desc.ReplicaID,
159+
ReplicaType: mmaprototype.ReplicaType{
160+
ReplicaType: desc.Type,
161+
IsLeaseholder: desc.StoreID == lh,
162+
},
163+
},
164+
},
165+
}
166+
}

0 commit comments

Comments
 (0)