Skip to content

Commit 308bd8a

Browse files
craig[bot]dt
andcommitted
Merge #153172
153172: colfetcher: elastic AC pace low-pri bulk reads r=dt a=dt Some bulk low pri reads can read large amounts of data. If KV admission slots are not contended these scans scan end up doing non-trivial chunks of client-side work in a tight loop, potentially remaining on-core for some time. Having background/bulk work remain on-core for long, uninterrupted blocks of time can push scheduling delays for foreground work higher, so we typically aim to add scheduling-latency-driven pacer preemption points to these sorts of loops. Release note: none. Epic: none. Co-authored-by: David Taylor <[email protected]>
2 parents b26650a + aba1838 commit 308bd8a

File tree

6 files changed

+37
-2
lines changed

6 files changed

+37
-2
lines changed

pkg/sql/colfetcher/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ go_library(
5656
"//pkg/sql/sessiondata",
5757
"//pkg/sql/types",
5858
"//pkg/storage",
59+
"//pkg/util/admission",
60+
"//pkg/util/admission/admissionpb",
5961
"//pkg/util/buildutil",
6062
"//pkg/util/encoding",
6163
"//pkg/util/hlc",

pkg/sql/colfetcher/cfetcher.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"sort"
1515
"strings"
1616
"sync"
17+
"time"
1718

1819
"github.com/cockroachdb/apd/v3"
1920
"github.com/cockroachdb/cockroach/pkg/col/coldata"
@@ -38,6 +39,8 @@ import (
3839
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
3940
"github.com/cockroachdb/cockroach/pkg/sql/types"
4041
"github.com/cockroachdb/cockroach/pkg/storage"
42+
"github.com/cockroachdb/cockroach/pkg/util/admission"
43+
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
4144
"github.com/cockroachdb/cockroach/pkg/util/encoding"
4245
"github.com/cockroachdb/cockroach/pkg/util/hlc"
4346
"github.com/cockroachdb/cockroach/pkg/util/intsets"
@@ -218,6 +221,9 @@ type cFetcherArgs struct {
218221
alwaysReallocate bool
219222
// Txn is the txn for the fetch. It might be nil.
220223
txn *kv.Txn
224+
225+
// tenantID is required in some places for AC/bookkeeping.
226+
tenantID roachpb.TenantID
221227
}
222228

223229
// noOutputColumn is a sentinel value to denote that a system column is not
@@ -279,6 +285,7 @@ type cFetcher struct {
279285
// cpuStopWatch tracks the CPU time spent by this cFetcher while fulfilling KV
280286
// requests *in the current goroutine*.
281287
cpuStopWatch *timeutil.CPUStopWatch
288+
pacer *admission.Pacer
282289

283290
// machine contains fields that get updated during the run of the fetcher.
284291
machine struct {
@@ -544,6 +551,17 @@ func (cf *cFetcher) Init(
544551
if cf.cFetcherArgs.collectStats {
545552
cf.cpuStopWatch = timeutil.NewCPUStopWatch()
546553
}
554+
if cf.txn != nil {
555+
if pri := admissionpb.WorkPriority(cf.txn.AdmissionHeader().Priority); pri <= admissionpb.BulkNormalPri {
556+
cf.pacer = cf.txn.DB().AdmissionPacerFactory.NewPacer(
557+
50*time.Millisecond, // Request a realistic per-batch amount.
558+
admission.WorkInfo{
559+
TenantID: cf.tenantID, Priority: pri, CreateTime: timeutil.Now().UnixNano(),
560+
},
561+
)
562+
}
563+
}
564+
547565
cf.machine.state[0] = stateResetBatch
548566
cf.machine.state[1] = stateInitFetch
549567

@@ -724,6 +742,9 @@ func (cf *cFetcher) setNextKV(kv roachpb.KeyValue) {
724742
// rows, the Batch.Length is 0.
725743
func (cf *cFetcher) NextBatch(ctx context.Context) (coldata.Batch, error) {
726744
for {
745+
if err := cf.pacer.Pace(ctx); err != nil {
746+
return nil, err
747+
}
727748
if debugState {
728749
log.Dev.Infof(ctx, "State %s", cf.machine.state[0])
729750
}
@@ -1480,6 +1501,8 @@ func (cf *cFetcher) Release() {
14801501

14811502
func (cf *cFetcher) Close(ctx context.Context) {
14821503
if cf != nil {
1504+
cf.pacer.Close()
1505+
cf.pacer = nil
14831506
cf.nextKVer = nil
14841507
if cf.fetcher != nil {
14851508
cf.bytesRead = cf.fetcher.GetBytesRead()

pkg/sql/colfetcher/cfetcher_wrapper.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,11 @@ func newCFetcherWrapper(
223223
const collectStats = false
224224
// We cannot reuse batches if we're not serializing the response.
225225
alwaysReallocate := !mustSerialize
226+
227+
tenantID, ok := roachpb.ClientTenantFromContext(ctx)
228+
if !ok {
229+
tenantID = roachpb.SystemTenantID
230+
}
226231
// TODO(yuzefovich, 23.1): think through estimatedRowCount (#94850) and
227232
// traceKV arguments.
228233
fetcher.cFetcherArgs = cFetcherArgs{
@@ -232,7 +237,8 @@ func newCFetcherWrapper(
232237
true, /* singleUse */
233238
collectStats,
234239
alwaysReallocate,
235-
nil, /* txn */
240+
nil, /* txn; TODO(dt): this means no AC priority info is passed. */
241+
tenantID,
236242
}
237243

238244
// This memory monitor is not connected to the memory accounting system

pkg/sql/colfetcher/colbatch_scan.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -347,6 +347,7 @@ func NewColBatchScan(
347347
shouldCollectStats,
348348
false, /* alwaysReallocate */
349349
flowCtx.Txn,
350+
flowCtx.Codec().TenantID,
350351
}
351352
if err = fetcher.Init(fetcherAllocator, kvFetcher, tableArgs); err != nil {
352353
fetcher.Release()

pkg/sql/colfetcher/index_join.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -648,6 +648,7 @@ func NewColIndexJoin(
648648
shouldCollectStats,
649649
false, /* alwaysReallocate */
650650
txn,
651+
flowCtx.Codec().TenantID,
651652
}
652653
if err = fetcher.Init(
653654
fetcherAllocator, kvFetcher, tableArgs,

pkg/sql/sessiondatapb/local_only_session_data.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -387,6 +387,8 @@ func ParseQoSLevelFromString(val string) (_ QoSLevel, ok bool) {
387387
return UserLow, true
388388
case NormalName:
389389
return Normal, true
390+
case BulkLowName:
391+
return BulkLow, true
390392
default:
391393
return 0, false
392394
}
@@ -414,7 +416,7 @@ func ToQoSLevelString(value int32) string {
414416
// Validate checks for a valid user QoSLevel setting before returning it.
415417
func (e QoSLevel) Validate() QoSLevel {
416418
switch e {
417-
case Normal, UserHigh, UserLow:
419+
case Normal, UserHigh, UserLow, BulkLow:
418420
return e
419421
default:
420422
panic(errors.AssertionFailedf("use of illegal user QoSLevel: %s", e.String()))

0 commit comments

Comments
 (0)