Skip to content

Commit e3ffe03

Browse files
committed
server: plumb AC pacer support into UA servers
This change threads the AdmissionPacerFactory from the top-level server down to shared process tenant servers, enabling elastic admission control for tenant workloads. For separate process tenants, we continue to use a noop coordinator until full SQL admission control is implemented. Release note (sql change): clusters utilizing cluster virtualization, such as those running PCR, now apply the same admission control pacing to various bulk operations that is used in by clusters not using cluster virtualization. Epic: none.
1 parent 73f993e commit e3ffe03

File tree

3 files changed

+16
-12
lines changed

3 files changed

+16
-12
lines changed

pkg/server/server_controller_new_server.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"github.com/cockroachdb/cockroach/pkg/sql/isql"
2424
"github.com/cockroachdb/cockroach/pkg/storage/fs"
2525
"github.com/cockroachdb/cockroach/pkg/util"
26+
"github.com/cockroachdb/cockroach/pkg/util/admission"
2627
"github.com/cockroachdb/cockroach/pkg/util/log"
2728
"github.com/cockroachdb/cockroach/pkg/util/netutil/addr"
2829
"github.com/cockroachdb/cockroach/pkg/util/stop"
@@ -78,7 +79,7 @@ func (s *topLevelServer) newTenantServer(
7879
// Apply the TestTenantArgs, if any.
7980
baseCfg.TestingKnobs = testArgs.Knobs
8081

81-
tenantServer, err := newTenantServerInternal(ctx, baseCfg, sqlCfg, tenantStopper, tenantNameContainer)
82+
tenantServer, err := newTenantServerInternal(ctx, baseCfg, sqlCfg, tenantStopper, tenantNameContainer, s.db.AdmissionPacerFactory)
8283
if err != nil {
8384
return nil, err
8485
}
@@ -130,6 +131,7 @@ func newTenantServerInternal(
130131
sqlCfg SQLConfig,
131132
stopper *stop.Stopper,
132133
tenantNameContainer *roachpb.TenantNameContainer,
134+
elastic admission.PacerFactory,
133135
) (*SQLServerWrapper, error) {
134136
ambientCtx := baseCfg.AmbientCtx
135137
stopper.SetTracer(baseCfg.Tracer)
@@ -141,7 +143,7 @@ func newTenantServerInternal(
141143
log.Infof(newCtx, "creating tenant server")
142144

143145
// Now instantiate the tenant server proper.
144-
return newSharedProcessTenantServer(newCtx, stopper, baseCfg, sqlCfg, tenantNameContainer)
146+
return newSharedProcessTenantServer(newCtx, stopper, baseCfg, sqlCfg, tenantNameContainer, elastic)
145147
}
146148

147149
func (s *topLevelServer) makeSharedProcessTenantConfig(

pkg/server/tenant.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -216,8 +216,9 @@ func NewSeparateProcessTenantServer(
216216
return spanconfiglimiter.New(ie, st, knobs)
217217
},
218218
}
219-
220-
return newTenantServer(ctx, stopper, baseCfg, sqlCfg, tenantNameContainer, deps, mtinfopb.ServiceModeExternal)
219+
// TODO(irfansharif): hook up NewGrantCoordinatorSQL.
220+
var noopElasticCPUGrantCoord *admission.ElasticCPUGrantCoordinator = nil
221+
return newTenantServer(ctx, stopper, baseCfg, sqlCfg, tenantNameContainer, deps, mtinfopb.ServiceModeExternal, noopElasticCPUGrantCoord)
221222
}
222223

223224
// newSharedProcessTenantServer creates a tenant-specific, SQL-only
@@ -232,6 +233,7 @@ func newSharedProcessTenantServer(
232233
baseCfg BaseConfig,
233234
sqlCfg SQLConfig,
234235
tenantNameContainer *roachpb.TenantNameContainer,
236+
elastic admission.PacerFactory,
235237
) (*SQLServerWrapper, error) {
236238
if baseCfg.IDContainer.Get() == 0 {
237239
return nil, errors.AssertionFailedf("programming error: NewSharedProcessTenantServer called before NodeID was assigned.")
@@ -255,7 +257,7 @@ func newSharedProcessTenantServer(
255257
return spanconfiglimiter.NoopLimiter{}
256258
},
257259
}
258-
return newTenantServer(ctx, stopper, baseCfg, sqlCfg, tenantNameContainer, deps, mtinfopb.ServiceModeShared)
260+
return newTenantServer(ctx, stopper, baseCfg, sqlCfg, tenantNameContainer, deps, mtinfopb.ServiceModeShared, elastic)
259261
}
260262

261263
// newTenantServer constructs a SQLServerWrapper.
@@ -269,6 +271,7 @@ func newTenantServer(
269271
tenantNameContainer *roachpb.TenantNameContainer,
270272
deps tenantServerDeps,
271273
serviceMode mtinfopb.TenantServiceMode,
274+
elastic admission.PacerFactory,
272275
) (*SQLServerWrapper, error) {
273276
// TODO(knz): Make the license application a per-server thing
274277
// instead of a global thing.
@@ -316,7 +319,7 @@ func newTenantServer(
316319
// then rely on some mechanism to retrieve the ID from the name to
317320
// initialize the rest of the server.
318321
baseCfg.idProvider.SetTenantID(sqlCfg.TenantID)
319-
args, err := makeTenantSQLServerArgs(ctx, stopper, baseCfg, sqlCfg, tenantNameContainer, deps, serviceMode)
322+
args, err := makeTenantSQLServerArgs(ctx, stopper, baseCfg, sqlCfg, tenantNameContainer, deps, serviceMode, elastic)
320323
if err != nil {
321324
return nil, err
322325
}
@@ -1093,6 +1096,7 @@ func makeTenantSQLServerArgs(
10931096
tenantNameContainer *roachpb.TenantNameContainer,
10941097
deps tenantServerDeps,
10951098
serviceMode mtinfopb.TenantServiceMode,
1099+
elastic admission.PacerFactory,
10961100
) (sqlServerArgs, error) {
10971101
st := baseCfg.Settings
10981102

@@ -1248,7 +1252,7 @@ func makeTenantSQLServerArgs(
12481252
dbCtx := kv.DefaultDBContext(st, stopper)
12491253
dbCtx.NodeID = deps.instanceIDContainer
12501254
db := kv.NewDBWithContext(baseCfg.AmbientCtx, tcsFactory, clock, dbCtx)
1251-
1255+
db.AdmissionPacerFactory = elastic
12521256
rangeFeedKnobs, _ := baseCfg.TestingKnobs.RangeFeed.(*rangefeed.TestingKnobs)
12531257
rangeFeedFactory, err := rangefeed.NewFactory(stopper, db, st, rangeFeedKnobs)
12541258
if err != nil {
@@ -1325,8 +1329,6 @@ func makeTenantSQLServerArgs(
13251329
remoteFlowRunnerAcc := monitorAndMetrics.rootSQLMemoryMonitor.MakeBoundAccount()
13261330
remoteFlowRunner := flowinfra.NewRemoteFlowRunner(baseCfg.AmbientCtx, stopper, &remoteFlowRunnerAcc)
13271331

1328-
// TODO(irfansharif): hook up NewGrantCoordinatorSQL.
1329-
var noopElasticCPUGrantCoord *admission.ElasticCPUGrantCoordinator = nil
13301332
return sqlServerArgs{
13311333
sqlServerOptionalKVArgs: sqlServerOptionalKVArgs{
13321334
nodesStatusServer: serverpb.MakeOptionalNodesStatusServer(nil),
@@ -1381,7 +1383,7 @@ func makeTenantSQLServerArgs(
13811383
grpc: grpcServer,
13821384
drpc: drpcServer,
13831385
externalStorageBuilder: esb,
1384-
admissionPacerFactory: noopElasticCPUGrantCoord,
1386+
admissionPacerFactory: elastic,
13851387
rangeDescIteratorFactory: tenantConnect,
13861388
tenantTimeSeriesServer: sTS,
13871389
tenantCapabilitiesReader: sql.EmptySystemTenantOnly[tenantcapabilities.Reader](),

pkg/sql/row/kv_batch_fetcher.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ var logAdmissionPacerErr = log.Every(100 * time.Millisecond)
6161
// each time we seek admission for response handling during internally submitted
6262
// low priority reads (like row-level TTL selects).
6363
var elasticCPUDurationPerLowPriReadResponse = settings.RegisterDurationSetting(
64-
settings.SystemOnly,
64+
settings.ApplicationLevel,
6565
"sqladmission.elastic_cpu.duration_per_low_pri_read_response",
6666
"controls how many CPU tokens are allotted for handling responses for internally submitted low priority reads",
6767
// NB: Experimentally, during TTL reads, we observed cumulative on-CPU time
@@ -75,7 +75,7 @@ var elasticCPUDurationPerLowPriReadResponse = settings.RegisterDurationSetting(
7575
// internally submitted low-priority reads (like row-level TTL selects)
7676
// integrate with elastic CPU control.
7777
var internalLowPriReadElasticControlEnabled = settings.RegisterBoolSetting(
78-
settings.SystemOnly,
78+
settings.ApplicationLevel,
7979
"sqladmission.low_pri_read_response_elastic_control.enabled",
8080
"determines whether the sql portion of internally submitted reads integrate with elastic CPU controller",
8181
true,

0 commit comments

Comments
 (0)