@@ -142,14 +142,32 @@ var ConnectedStoreExpiration = settings.RegisterDurationSetting(
142
142
5 * time .Minute ,
143
143
)
144
144
145
+ // useRangeTenantIDForNonAdminEnabled determines whether the range's tenant ID
146
+ // is used by admission control when called by the system tenant. When false,
147
+ // the requester's tenant ID (i.e., the system tenant ID) is used.
148
+ var useRangeTenantIDForNonAdminEnabled = settings .RegisterBoolSetting (
149
+ settings .SystemOnly ,
150
+ "kvadmission.use_range_tenant_id_for_non_admin.enabled" ,
151
+ "when true, and the caller is the system tenant, the tenantID used by admission control " +
152
+ "for non-admin requests is overridden to the range's tenantID" ,
153
+ true ,
154
+ )
155
+
145
156
// Controller provides admission control for the KV layer.
146
157
type Controller interface {
147
158
// AdmitKVWork must be called before performing KV work.
148
159
// BatchRequest.AdmissionHeader and BatchRequest.Replica.StoreID must be
149
- // populated for admission to work correctly. If err is non-nil, the
150
- // returned handle can be ignored. If err is nil, AdmittedKVWorkDone must be
151
- // called after the KV work is done executing.
152
- AdmitKVWork (context.Context , roachpb.TenantID , * kvpb.BatchRequest ) (Handle , error )
160
+ // populated for admission to work correctly. The requestTenantID represents
161
+ // the authenticated caller and must be populated. The rangeTenantID
162
+ // represents the tenant of the range on which the work is being performed
163
+ // -- in rare cases it may be unpopulated.
164
+ //
165
+ // If err is non-nil, the returned handle can be ignored. If err is nil,
166
+ // AdmittedKVWorkDone must be called after the KV work is done executing.
167
+ AdmitKVWork (
168
+ _ context.Context , requestTenantID roachpb.TenantID , rangeTenantID roachpb.TenantID ,
169
+ _ * kvpb.BatchRequest ,
170
+ ) (Handle , error )
153
171
// AdmittedKVWorkDone is called after the admitted KV work is done
154
172
// executing.
155
173
AdmittedKVWorkDone (Handle , * StoreWriteBytes )
@@ -269,50 +287,16 @@ func MakeController(
269
287
// TODO(irfansharif): There's a fair bit happening here and there's no test
270
288
// coverage. Fix that.
271
289
func (n * controllerImpl ) AdmitKVWork (
272
- ctx context.Context , tenantID roachpb.TenantID , ba * kvpb.BatchRequest ,
273
- ) (handle Handle , retErr error ) {
274
- ah := Handle {tenantID : tenantID }
290
+ ctx context.Context ,
291
+ requestTenantID roachpb.TenantID ,
292
+ rangeTenantID roachpb.TenantID ,
293
+ ba * kvpb.BatchRequest ,
294
+ ) (_ Handle , retErr error ) {
275
295
if n .kvAdmissionQ == nil {
276
- return ah , nil
277
- }
278
-
279
- bypassAdmission := ba .IsAdmin ()
280
- source := ba .AdmissionHeader .Source
281
- if ! roachpb .IsSystemTenantID (tenantID .ToUint64 ()) {
282
- // Request is from a SQL node.
283
- bypassAdmission = false
284
- source = kvpb .AdmissionHeader_FROM_SQL
285
- }
286
- if source == kvpb .AdmissionHeader_OTHER {
287
- bypassAdmission = true
288
- }
289
- // TODO(abaptist): Revisit and deprecate this setting in v23.1.
290
- if admission .KVBulkOnlyAdmissionControlEnabled .Get (& n .settings .SV ) {
291
- if admissionpb .WorkPriority (ba .AdmissionHeader .Priority ) >= admissionpb .NormalPri {
292
- bypassAdmission = true
293
- }
294
- }
295
- // LeaseInfo requests are used as makeshift replica health probes by
296
- // DistSender circuit breakers, make sure they bypass AC.
297
- //
298
- // TODO(erikgrinaker): the various bypass conditions here should be moved to
299
- // one or more request flags.
300
- if ba .IsSingleLeaseInfoRequest () {
301
- bypassAdmission = true
302
- }
303
- createTime := ba .AdmissionHeader .CreateTime
304
- if ! bypassAdmission && createTime == 0 {
305
- // TODO(sumeer): revisit this for multi-tenant. Specifically, the SQL use
306
- // of zero CreateTime needs to be revisited. It should use high priority.
307
- createTime = timeutil .Now ().UnixNano ()
308
- }
309
- admissionInfo := admission.WorkInfo {
310
- TenantID : tenantID ,
311
- Priority : admissionpb .WorkPriority (ba .AdmissionHeader .Priority ),
312
- CreateTime : createTime ,
313
- BypassAdmission : bypassAdmission ,
296
+ return Handle {}, nil
314
297
}
315
-
298
+ admissionInfo := workInfoForBatch (n .settings , requestTenantID , rangeTenantID , ba )
299
+ ah := Handle {tenantID : admissionInfo .TenantID }
316
300
admissionEnabled := true
317
301
// Don't subject HeartbeatTxnRequest to the storeAdmissionQ. Even though
318
302
// it would bypass admission, it would consume a slot. When writes are
@@ -323,13 +307,14 @@ func (n *controllerImpl) AdmitKVWork(
323
307
if ba .IsWrite () && ! ba .IsSingleHeartbeatTxnRequest () {
324
308
var admitted bool
325
309
attemptFlowControl := kvflowcontrol .Enabled .Get (& n .settings .SV )
326
- if attemptFlowControl && ! bypassAdmission {
310
+ if attemptFlowControl && ! admissionInfo . BypassAdmission {
327
311
kvflowHandle , found := n .kvflowHandles .LookupReplicationAdmissionHandle (ba .RangeID )
328
312
if ! found {
329
313
return Handle {}, nil
330
314
}
331
315
var err error
332
- admitted , err = kvflowHandle .Admit (ctx , admissionInfo .Priority , timeutil .FromUnixNanos (createTime ))
316
+ admitted , err = kvflowHandle .Admit (
317
+ ctx , admissionInfo .Priority , timeutil .FromUnixNanos (admissionInfo .CreateTime ))
333
318
if err != nil {
334
319
return Handle {}, err
335
320
} else if admitted {
@@ -654,3 +639,57 @@ func (wb *StoreWriteBytes) Release() {
654
639
}
655
640
storeWriteBytesPool .Put (wb )
656
641
}
642
+
643
+ func workInfoForBatch (
644
+ st * cluster.Settings ,
645
+ requestTenantID roachpb.TenantID ,
646
+ rangeTenantID roachpb.TenantID ,
647
+ ba * kvpb.BatchRequest ,
648
+ ) admission.WorkInfo {
649
+ bypassAdmission := ba .IsAdmin ()
650
+ source := ba .AdmissionHeader .Source
651
+ tenantID := requestTenantID
652
+ if requestTenantID .IsSystem () {
653
+ if useRangeTenantIDForNonAdminEnabled .Get (& st .SV ) && ! bypassAdmission &&
654
+ rangeTenantID .IsSet () {
655
+ tenantID = rangeTenantID
656
+ }
657
+ // Else, either it is an admin request (common), or rangeTenantID is not
658
+ // set (rare), or the cluster setting is disabled, so continue using the
659
+ // SystemTenantID.
660
+ } else {
661
+ // Request is from a SQL node.
662
+ bypassAdmission = false
663
+ source = kvpb .AdmissionHeader_FROM_SQL
664
+ }
665
+ if source == kvpb .AdmissionHeader_OTHER {
666
+ bypassAdmission = true
667
+ }
668
+ // TODO(abaptist): Revisit and deprecate this setting in v23.1.
669
+ if admission .KVBulkOnlyAdmissionControlEnabled .Get (& st .SV ) {
670
+ if admissionpb .WorkPriority (ba .AdmissionHeader .Priority ) >= admissionpb .NormalPri {
671
+ bypassAdmission = true
672
+ }
673
+ }
674
+ // LeaseInfo requests are used as makeshift replica health probes by
675
+ // DistSender circuit breakers, make sure they bypass AC.
676
+ //
677
+ // TODO(erikgrinaker): the various bypass conditions here should be moved to
678
+ // one or more request flags.
679
+ if ba .IsSingleLeaseInfoRequest () {
680
+ bypassAdmission = true
681
+ }
682
+ createTime := ba .AdmissionHeader .CreateTime
683
+ if ! bypassAdmission && createTime == 0 {
684
+ // TODO(sumeer): revisit this for multi-tenant. Specifically, the SQL use
685
+ // of zero CreateTime needs to be revisited. It should use high priority.
686
+ createTime = timeutil .Now ().UnixNano ()
687
+ }
688
+ admissionInfo := admission.WorkInfo {
689
+ TenantID : tenantID ,
690
+ Priority : admissionpb .WorkPriority (ba .AdmissionHeader .Priority ),
691
+ CreateTime : createTime ,
692
+ BypassAdmission : bypassAdmission ,
693
+ }
694
+ return admissionInfo
695
+ }
0 commit comments