Skip to content

Commit 1756cd5

Browse files
committed
kvadmission,server: use the range's tenant ID for admission control
This behavior applies only to non-admin requests, and if the cluster setting is not disabled. Fixes #135853 Epic: none Release note (ops change): The new cluster setting kvadmission.use_range_tenant_id_for_non_admin.enabled can be used to disable the behavior where admission control uses the range's tenantID for non-admin requests. The default behavior is enabled.
1 parent c013c36 commit 1756cd5

File tree

5 files changed

+246
-50
lines changed

5 files changed

+246
-50
lines changed

pkg/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,7 @@ ALL_TESTS = [
261261
"//pkg/kv/kvserver/gc:gc_test",
262262
"//pkg/kv/kvserver/idalloc:idalloc_test",
263263
"//pkg/kv/kvserver/intentresolver:intentresolver_test",
264+
"//pkg/kv/kvserver/kvadmission:kvadmission_test",
264265
"//pkg/kv/kvserver/kvflowcontrol/node_rac2:node_rac2_test",
265266
"//pkg/kv/kvserver/kvflowcontrol/rac2:rac2_test",
266267
"//pkg/kv/kvserver/kvflowcontrol/replica_rac2:replica_rac2_test",
@@ -1552,6 +1553,7 @@ GO_TARGETS = [
15521553
"//pkg/kv/kvserver/intentresolver:intentresolver",
15531554
"//pkg/kv/kvserver/intentresolver:intentresolver_test",
15541555
"//pkg/kv/kvserver/kvadmission:kvadmission",
1556+
"//pkg/kv/kvserver/kvadmission:kvadmission_test",
15551557
"//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb:kvflowcontrolpb",
15561558
"//pkg/kv/kvserver/kvflowcontrol/kvflowinspectpb:kvflowinspectpb",
15571559
"//pkg/kv/kvserver/kvflowcontrol/node_rac2:node_rac2",

pkg/kv/kvserver/kvadmission/BUILD.bazel

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
load("@io_bazel_rules_go//go:def.bzl", "go_library")
1+
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
22

33
go_library(
44
name = "kvadmission",
@@ -25,3 +25,17 @@ go_library(
2525
"@com_github_cockroachdb_pebble//:pebble",
2626
],
2727
)
28+
29+
go_test(
30+
name = "kvadmission_test",
31+
srcs = ["kvadmission_test.go"],
32+
embed = [":kvadmission"],
33+
deps = [
34+
"//pkg/kv/kvpb",
35+
"//pkg/roachpb",
36+
"//pkg/settings/cluster",
37+
"//pkg/util/admission",
38+
"//pkg/util/admission/admissionpb",
39+
"@com_github_stretchr_testify//require",
40+
],
41+
)

pkg/kv/kvserver/kvadmission/kvadmission.go

Lines changed: 87 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -142,14 +142,32 @@ var ConnectedStoreExpiration = settings.RegisterDurationSetting(
142142
5*time.Minute,
143143
)
144144

145+
// useRangeTenantIDForNonAdminEnabled determines whether the range's tenant ID
146+
// is used by admission control when called by the system tenant. When false,
147+
// the requester's tenant ID (i.e., the system tenant ID) is used.
148+
var useRangeTenantIDForNonAdminEnabled = settings.RegisterBoolSetting(
149+
settings.SystemOnly,
150+
"kvadmission.use_range_tenant_id_for_non_admin.enabled",
151+
"when true, and the caller is the system tenant, the tenantID used by admission control "+
152+
"for non-admin requests is overridden to the range's tenantID",
153+
true,
154+
)
155+
145156
// Controller provides admission control for the KV layer.
146157
type Controller interface {
147158
// AdmitKVWork must be called before performing KV work.
148159
// BatchRequest.AdmissionHeader and BatchRequest.Replica.StoreID must be
149-
// populated for admission to work correctly. If err is non-nil, the
150-
// returned handle can be ignored. If err is nil, AdmittedKVWorkDone must be
151-
// called after the KV work is done executing.
152-
AdmitKVWork(context.Context, roachpb.TenantID, *kvpb.BatchRequest) (Handle, error)
160+
// populated for admission to work correctly. The requestTenantID represents
161+
// the authenticated caller and must be populated. The rangeTenantID
162+
// represents the tenant of the range on which the work is being performed
163+
// -- in rare cases it may be unpopulated.
164+
//
165+
// If err is non-nil, the returned handle can be ignored. If err is nil,
166+
// AdmittedKVWorkDone must be called after the KV work is done executing.
167+
AdmitKVWork(
168+
_ context.Context, requestTenantID roachpb.TenantID, rangeTenantID roachpb.TenantID,
169+
_ *kvpb.BatchRequest,
170+
) (Handle, error)
153171
// AdmittedKVWorkDone is called after the admitted KV work is done
154172
// executing.
155173
AdmittedKVWorkDone(Handle, *StoreWriteBytes)
@@ -269,50 +287,16 @@ func MakeController(
269287
// TODO(irfansharif): There's a fair bit happening here and there's no test
270288
// coverage. Fix that.
271289
func (n *controllerImpl) AdmitKVWork(
272-
ctx context.Context, tenantID roachpb.TenantID, ba *kvpb.BatchRequest,
273-
) (handle Handle, retErr error) {
274-
ah := Handle{tenantID: tenantID}
290+
ctx context.Context,
291+
requestTenantID roachpb.TenantID,
292+
rangeTenantID roachpb.TenantID,
293+
ba *kvpb.BatchRequest,
294+
) (_ Handle, retErr error) {
275295
if n.kvAdmissionQ == nil {
276-
return ah, nil
277-
}
278-
279-
bypassAdmission := ba.IsAdmin()
280-
source := ba.AdmissionHeader.Source
281-
if !roachpb.IsSystemTenantID(tenantID.ToUint64()) {
282-
// Request is from a SQL node.
283-
bypassAdmission = false
284-
source = kvpb.AdmissionHeader_FROM_SQL
285-
}
286-
if source == kvpb.AdmissionHeader_OTHER {
287-
bypassAdmission = true
288-
}
289-
// TODO(abaptist): Revisit and deprecate this setting in v23.1.
290-
if admission.KVBulkOnlyAdmissionControlEnabled.Get(&n.settings.SV) {
291-
if admissionpb.WorkPriority(ba.AdmissionHeader.Priority) >= admissionpb.NormalPri {
292-
bypassAdmission = true
293-
}
294-
}
295-
// LeaseInfo requests are used as makeshift replica health probes by
296-
// DistSender circuit breakers, make sure they bypass AC.
297-
//
298-
// TODO(erikgrinaker): the various bypass conditions here should be moved to
299-
// one or more request flags.
300-
if ba.IsSingleLeaseInfoRequest() {
301-
bypassAdmission = true
302-
}
303-
createTime := ba.AdmissionHeader.CreateTime
304-
if !bypassAdmission && createTime == 0 {
305-
// TODO(sumeer): revisit this for multi-tenant. Specifically, the SQL use
306-
// of zero CreateTime needs to be revisited. It should use high priority.
307-
createTime = timeutil.Now().UnixNano()
308-
}
309-
admissionInfo := admission.WorkInfo{
310-
TenantID: tenantID,
311-
Priority: admissionpb.WorkPriority(ba.AdmissionHeader.Priority),
312-
CreateTime: createTime,
313-
BypassAdmission: bypassAdmission,
296+
return Handle{}, nil
314297
}
315-
298+
admissionInfo := workInfoForBatch(n.settings, requestTenantID, rangeTenantID, ba)
299+
ah := Handle{tenantID: admissionInfo.TenantID}
316300
admissionEnabled := true
317301
// Don't subject HeartbeatTxnRequest to the storeAdmissionQ. Even though
318302
// it would bypass admission, it would consume a slot. When writes are
@@ -323,13 +307,14 @@ func (n *controllerImpl) AdmitKVWork(
323307
if ba.IsWrite() && !ba.IsSingleHeartbeatTxnRequest() {
324308
var admitted bool
325309
attemptFlowControl := kvflowcontrol.Enabled.Get(&n.settings.SV)
326-
if attemptFlowControl && !bypassAdmission {
310+
if attemptFlowControl && !admissionInfo.BypassAdmission {
327311
kvflowHandle, found := n.kvflowHandles.LookupReplicationAdmissionHandle(ba.RangeID)
328312
if !found {
329313
return Handle{}, nil
330314
}
331315
var err error
332-
admitted, err = kvflowHandle.Admit(ctx, admissionInfo.Priority, timeutil.FromUnixNanos(createTime))
316+
admitted, err = kvflowHandle.Admit(
317+
ctx, admissionInfo.Priority, timeutil.FromUnixNanos(admissionInfo.CreateTime))
333318
if err != nil {
334319
return Handle{}, err
335320
} else if admitted {
@@ -654,3 +639,57 @@ func (wb *StoreWriteBytes) Release() {
654639
}
655640
storeWriteBytesPool.Put(wb)
656641
}
642+
643+
func workInfoForBatch(
644+
st *cluster.Settings,
645+
requestTenantID roachpb.TenantID,
646+
rangeTenantID roachpb.TenantID,
647+
ba *kvpb.BatchRequest,
648+
) admission.WorkInfo {
649+
bypassAdmission := ba.IsAdmin()
650+
source := ba.AdmissionHeader.Source
651+
tenantID := requestTenantID
652+
if requestTenantID.IsSystem() {
653+
if useRangeTenantIDForNonAdminEnabled.Get(&st.SV) && !bypassAdmission &&
654+
rangeTenantID.IsSet() {
655+
tenantID = rangeTenantID
656+
}
657+
// Else, either it is an admin request (common), or rangeTenantID is not
658+
// set (rare), or the cluster setting is disabled, so continue using the
659+
// SystemTenantID.
660+
} else {
661+
// Request is from a SQL node.
662+
bypassAdmission = false
663+
source = kvpb.AdmissionHeader_FROM_SQL
664+
}
665+
if source == kvpb.AdmissionHeader_OTHER {
666+
bypassAdmission = true
667+
}
668+
// TODO(abaptist): Revisit and deprecate this setting in v23.1.
669+
if admission.KVBulkOnlyAdmissionControlEnabled.Get(&st.SV) {
670+
if admissionpb.WorkPriority(ba.AdmissionHeader.Priority) >= admissionpb.NormalPri {
671+
bypassAdmission = true
672+
}
673+
}
674+
// LeaseInfo requests are used as makeshift replica health probes by
675+
// DistSender circuit breakers, make sure they bypass AC.
676+
//
677+
// TODO(erikgrinaker): the various bypass conditions here should be moved to
678+
// one or more request flags.
679+
if ba.IsSingleLeaseInfoRequest() {
680+
bypassAdmission = true
681+
}
682+
createTime := ba.AdmissionHeader.CreateTime
683+
if !bypassAdmission && createTime == 0 {
684+
// TODO(sumeer): revisit this for multi-tenant. Specifically, the SQL use
685+
// of zero CreateTime needs to be revisited. It should use high priority.
686+
createTime = timeutil.Now().UnixNano()
687+
}
688+
admissionInfo := admission.WorkInfo{
689+
TenantID: tenantID,
690+
Priority: admissionpb.WorkPriority(ba.AdmissionHeader.Priority),
691+
CreateTime: createTime,
692+
BypassAdmission: bypassAdmission,
693+
}
694+
return admissionInfo
695+
}
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
// Package kvadmission is the integration layer between KV and admission
7+
// control.
8+
9+
package kvadmission
10+
11+
import (
12+
"context"
13+
"testing"
14+
15+
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
16+
"github.com/cockroachdb/cockroach/pkg/roachpb"
17+
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
18+
"github.com/cockroachdb/cockroach/pkg/util/admission"
19+
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
20+
"github.com/stretchr/testify/require"
21+
)
22+
23+
func TestWorkInfoForBatch(t *testing.T) {
24+
testCases := []struct {
25+
name string
26+
isAdmin bool
27+
isLeaseInfoRequest bool
28+
ah kvpb.AdmissionHeader
29+
requestTenant int
30+
rangeTenant int
31+
useRangeTenantIDEnabled bool
32+
bulkOnlyEnabled bool
33+
34+
expected admission.WorkInfo
35+
}{
36+
{
37+
name: "other-from-system-tenant-bypasses",
38+
ah: kvpb.AdmissionHeader{
39+
Priority: int32(admissionpb.BulkNormalPri), CreateTime: 2, Source: kvpb.AdmissionHeader_OTHER},
40+
requestTenant: 1, rangeTenant: 4,
41+
expected: admission.WorkInfo{
42+
TenantID: roachpb.MustMakeTenantID(1),
43+
Priority: admissionpb.BulkNormalPri, CreateTime: 2, BypassAdmission: true,
44+
}},
45+
{
46+
name: "other-from-non-system-tenant-does-not-bypass",
47+
ah: kvpb.AdmissionHeader{
48+
Priority: int32(admissionpb.BulkNormalPri), CreateTime: 2, Source: kvpb.AdmissionHeader_OTHER},
49+
requestTenant: 2, rangeTenant: 4,
50+
expected: admission.WorkInfo{
51+
TenantID: roachpb.MustMakeTenantID(2),
52+
Priority: admissionpb.BulkNormalPri, CreateTime: 2, BypassAdmission: false,
53+
}},
54+
{
55+
name: "is-lease-info-request-bypasses",
56+
isLeaseInfoRequest: true,
57+
ah: kvpb.AdmissionHeader{
58+
Priority: int32(admissionpb.BulkNormalPri), CreateTime: 2, Source: kvpb.AdmissionHeader_FROM_SQL},
59+
requestTenant: 2, rangeTenant: 4,
60+
expected: admission.WorkInfo{
61+
TenantID: roachpb.MustMakeTenantID(2),
62+
Priority: admissionpb.BulkNormalPri, CreateTime: 2, BypassAdmission: true,
63+
}},
64+
{
65+
name: "is-admin-from-system-tenant-bypasses",
66+
isAdmin: true,
67+
ah: kvpb.AdmissionHeader{
68+
Priority: int32(admissionpb.LowPri), CreateTime: 5, Source: kvpb.AdmissionHeader_ROOT_KV},
69+
requestTenant: 1, rangeTenant: 4,
70+
expected: admission.WorkInfo{
71+
TenantID: roachpb.MustMakeTenantID(1),
72+
Priority: admissionpb.LowPri, CreateTime: 5, BypassAdmission: true,
73+
}},
74+
{
75+
name: "is-admin-from-non-system-tenant-does-not-bypass",
76+
isAdmin: true,
77+
ah: kvpb.AdmissionHeader{
78+
Priority: int32(admissionpb.LowPri), CreateTime: 5, Source: kvpb.AdmissionHeader_ROOT_KV},
79+
requestTenant: 2, rangeTenant: 4,
80+
expected: admission.WorkInfo{
81+
TenantID: roachpb.MustMakeTenantID(2),
82+
Priority: admissionpb.LowPri, CreateTime: 5, BypassAdmission: false,
83+
}},
84+
{
85+
name: "bulk-only-enabled-from-non-system-tenant-bypasses",
86+
ah: kvpb.AdmissionHeader{
87+
Priority: int32(admissionpb.NormalPri), CreateTime: 5, Source: kvpb.AdmissionHeader_FROM_SQL},
88+
requestTenant: 2, rangeTenant: 4, bulkOnlyEnabled: true,
89+
expected: admission.WorkInfo{
90+
TenantID: roachpb.MustMakeTenantID(2),
91+
Priority: admissionpb.NormalPri, CreateTime: 5, BypassAdmission: true,
92+
}},
93+
{
94+
name: "system-tenant-uses-range-tenant",
95+
ah: kvpb.AdmissionHeader{
96+
Priority: int32(admissionpb.NormalPri), CreateTime: 5, Source: kvpb.AdmissionHeader_FROM_SQL},
97+
requestTenant: 1, rangeTenant: 4, useRangeTenantIDEnabled: true,
98+
expected: admission.WorkInfo{
99+
TenantID: roachpb.MustMakeTenantID(4),
100+
Priority: admissionpb.NormalPri, CreateTime: 5, BypassAdmission: false,
101+
}},
102+
{
103+
name: "non-system-tenant-does-not-use-range-tenant",
104+
ah: kvpb.AdmissionHeader{
105+
Priority: int32(admissionpb.NormalPri), CreateTime: 5, Source: kvpb.AdmissionHeader_FROM_SQL},
106+
requestTenant: 2, rangeTenant: 4, useRangeTenantIDEnabled: true,
107+
expected: admission.WorkInfo{
108+
TenantID: roachpb.MustMakeTenantID(2),
109+
Priority: admissionpb.NormalPri, CreateTime: 5, BypassAdmission: false,
110+
}},
111+
}
112+
for _, tc := range testCases {
113+
t.Run(tc.name, func(t *testing.T) {
114+
st := cluster.MakeTestingClusterSettings()
115+
ctx := context.Background()
116+
ba := kvpb.BatchRequest{}
117+
ba.AdmissionHeader = tc.ah
118+
if tc.isLeaseInfoRequest {
119+
ba.Add(&kvpb.LeaseInfoRequest{})
120+
} else if tc.isAdmin {
121+
ba.Add(&kvpb.AdminSplitRequest{})
122+
}
123+
useRangeTenantIDForNonAdminEnabled.Override(ctx, &st.SV, tc.useRangeTenantIDEnabled)
124+
admission.KVBulkOnlyAdmissionControlEnabled.Override(ctx, &st.SV, tc.bulkOnlyEnabled)
125+
workInfo := workInfoForBatch(st, roachpb.MustMakeTenantID(uint64(tc.requestTenant)),
126+
roachpb.MustMakeTenantID(uint64(tc.rangeTenant)), &ba)
127+
require.Equal(t, tc.expected, workInfo)
128+
})
129+
}
130+
}

pkg/server/node.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1621,8 +1621,19 @@ func (n *Node) batchInternal(
16211621
defer log.Event(ctx, "node sending response")
16221622
}
16231623

1624+
var rangeTenantIDForAC roachpb.TenantID
1625+
// The computation of rangeTenantIDForAC fails open in case of error.
1626+
if len(args.Requests) > 0 {
1627+
k, err := keys.Addr(args.Requests[0].GetInner().Header().Key)
1628+
if err == nil {
1629+
_, tid, err := keys.DecodeTenantPrefix(roachpb.Key(k))
1630+
if err == nil {
1631+
rangeTenantIDForAC = tid
1632+
}
1633+
}
1634+
}
16241635
tStart := timeutil.Now()
1625-
handle, err := n.storeCfg.KVAdmissionController.AdmitKVWork(ctx, tenID, args)
1636+
handle, err := n.storeCfg.KVAdmissionController.AdmitKVWork(ctx, tenID, rangeTenantIDForAC, args)
16261637
if err != nil {
16271638
return nil, err
16281639
}

0 commit comments

Comments
 (0)