Skip to content

Commit 2c99066

Browse files
craig[bot]yuzefovich
andcommitted
Merge #148182
148182: sql: minor cleanup of readers r=yuzefovich a=yuzefovich **rowexec: remove a couple testing knobs from processor protos** This commit removes TableReaderSpec.BatchBytesLimit and JoinReaderSpec.LookupBatchBytesLimit that are only used in tests. Given that we have access to the testing knobs on each node that creates the necessary processor, we can just consult that directly. This removal avoids the possible confusion for how and when these fields are used. **sql: clarify fetcher parallelization** I find "limit batches" terminology a bit confusing, so this commit switches places where we use to "parallelize" which indicates that the DistSender-level cross-range parallelism should be used, which in turn means that TargetBytes limit cannot be set (i.e. "should _not_ limit batches"). Epic: None Release note: None Co-authored-by: Yahor Yuzefovich <[email protected]>
2 parents db7f5f4 + be8c8a3 commit 2c99066

File tree

7 files changed

+51
-85
lines changed

7 files changed

+51
-85
lines changed

pkg/sql/colfetcher/cfetcher.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -586,15 +586,15 @@ func cFetcherFirstBatchLimit(limitHint rowinfra.RowLimit, maxKeysPerRow uint32)
586586
func (cf *cFetcher) StartScan(
587587
ctx context.Context,
588588
spans roachpb.Spans,
589-
limitBatches bool,
589+
parallelize bool,
590590
batchBytesLimit rowinfra.BytesLimit,
591591
limitHint rowinfra.RowLimit,
592592
) error {
593593
if len(spans) == 0 {
594594
return errors.AssertionFailedf("no spans")
595595
}
596-
if !limitBatches && batchBytesLimit != rowinfra.NoBytesLimit {
597-
return errors.AssertionFailedf("batchBytesLimit set without limitBatches")
596+
if parallelize && batchBytesLimit != rowinfra.NoBytesLimit {
597+
return errors.AssertionFailedf("TargetBytes limit requested with parallelize=true")
598598
}
599599

600600
firstBatchLimit := cFetcherFirstBatchLimit(limitHint, cf.table.spec.MaxKeysPerRow)

pkg/sql/colfetcher/colbatch_scan.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -169,14 +169,14 @@ func newColBatchScanBase(
169169
s.MakeSpansCopy()
170170
}
171171

172-
if spec.LimitHint > 0 || spec.BatchBytesLimit > 0 {
172+
if spec.LimitHint > 0 {
173173
// Parallelize shouldn't be set when there's a limit hint, but double-check
174174
// just in case.
175175
spec.Parallelize = false
176176
}
177177
var batchBytesLimit rowinfra.BytesLimit
178178
if !spec.Parallelize {
179-
batchBytesLimit = rowinfra.BytesLimit(spec.BatchBytesLimit)
179+
batchBytesLimit = rowinfra.BytesLimit(flowCtx.Cfg.TestingKnobs.TableReaderBatchBytesLimit)
180180
if batchBytesLimit == 0 {
181181
batchBytesLimit = rowinfra.GetDefaultBatchBytesLimit(flowCtx.EvalCtx.TestingKnobs.ForceProductionValues)
182182
}
@@ -221,11 +221,10 @@ func (s *ColBatchScan) Init(ctx context.Context) {
221221
s.Ctx, s.flowCtx, "colbatchscan", s.processorID,
222222
&s.ContentionEventsListener, &s.ScanStatsListener, &s.TenantConsumptionListener,
223223
)
224-
limitBatches := !s.parallelize
225224
if err := s.cf.StartScan(
226225
s.Ctx,
227226
s.Spans,
228-
limitBatches,
227+
s.parallelize,
229228
s.batchBytesLimit,
230229
s.limitHint,
231230
); err != nil {

pkg/sql/colfetcher/index_join.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ func (s *ColIndexJoin) Next() coldata.Batch {
215215
if err := s.cf.StartScan(
216216
s.Ctx,
217217
spans,
218-
false, /* limitBatches */
218+
true, /* parallelize */
219219
rowinfra.NoBytesLimit,
220220
rowinfra.NoRowLimit,
221221
); err != nil {

pkg/sql/distsql_physical_planner.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2467,9 +2467,6 @@ func (dsp *DistSQLPlanner) planTableReaders(
24672467
}
24682468

24692469
tr.Parallelize = info.parallelize
2470-
if !tr.Parallelize {
2471-
tr.BatchBytesLimit = dsp.distSQLSrv.TestingKnobs.TableReaderBatchBytesLimit
2472-
}
24732470
tr.IgnoreMisplannedRanges = ignoreMisplannedRanges
24742471
p.TotalEstimatedScannedRows += info.estimatedRowCount
24752472

@@ -3444,7 +3441,6 @@ func (dsp *DistSQLPlanner) planLookupJoin(
34443441
MaintainLookupOrdering: maintainLookupOrdering,
34453442
LeftJoinWithPairedJoiner: planInfo.isSecondJoinInPairedJoiner,
34463443
OutputGroupContinuationForLeftRow: planInfo.isFirstJoinInPairedJoiner,
3447-
LookupBatchBytesLimit: dsp.distSQLSrv.TestingKnobs.JoinReaderBatchBytesLimit,
34483444
LimitHint: planInfo.limitHint,
34493445
RemoteOnlyLookups: planInfo.remoteOnlyLookups,
34503446
ReverseScans: planInfo.reverseScans,

pkg/sql/execinfrapb/processors_sql.proto

Lines changed: 3 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -74,11 +74,6 @@ message TableReaderSpec {
7474
// limit hint.
7575
optional bool parallelize = 12 [(gogoproto.nullable) = false];
7676

77-
// batch_bytes_limit, if non-zero, controls the TargetBytes limits that the
78-
// TableReader will use for its scans. If zero, then the server-side default
79-
// is used. If parallelize is set, this cannot be set.
80-
optional int64 batch_bytes_limit = 17 [(gogoproto.nullable) = false];
81-
8277
// If non-zero, this enables inconsistent historical scanning where different
8378
// batches can be read with different timestamps. This is used for
8479
// long-running table statistics which may outlive the TTL. Using this setting
@@ -124,7 +119,7 @@ message TableReaderSpec {
124119
// leaseholder of the beginning of the key spans to be scanned).
125120
optional bool ignore_misplanned_ranges = 22 [(gogoproto.nullable) = false];
126121

127-
reserved 1, 2, 4, 6, 7, 8, 13, 14, 15, 16, 19;
122+
reserved 1, 2, 4, 6, 7, 8, 13, 14, 15, 16, 17, 19;
128123
}
129124

130125
// FiltererSpec is the specification for a processor that filters input rows
@@ -303,18 +298,10 @@ message JoinReaderSpec {
303298
// variables @(N+1) to @(N+M) refer to fetched columns.
304299
optional Expression on_expr = 4 [(gogoproto.nullable) = false];
305300

306-
// This used to be used for an extra index filter expression. It was removed
307-
// in DistSQL version 24.
308-
reserved 5;
309-
310301
// For lookup joins. Only JoinType_INNER and JoinType_LEFT_OUTER are
311302
// supported.
312303
optional sqlbase.JoinType type = 6 [(gogoproto.nullable) = false];
313304

314-
// This field used to be a visibility level of the columns that should be
315-
// produced. We now produce the columns in the FetchSpec.
316-
reserved 7;
317-
318305
// Indicates the row-level locking strength to be used by the join. If set to
319306
// FOR_NONE, no row-level locking should be performed.
320307
optional sqlbase.ScanLockingStrength locking_strength = 9 [(gogoproto.nullable) = false];
@@ -335,8 +322,6 @@ message JoinReaderSpec {
335322
// optimizations.
336323
optional bool maintain_ordering = 11 [(gogoproto.nullable) = false];
337324

338-
reserved 12, 13;
339-
340325
// LeftJoinWithPairedJoiner is used when a left {outer,anti,semi} join is
341326
// being achieved by pairing two joins, and this is the second join. See
342327
// the comment above.
@@ -349,13 +334,6 @@ message JoinReaderSpec {
349334
// be true.
350335
optional bool output_group_continuation_for_left_row = 15 [(gogoproto.nullable) = false];
351336

352-
// lookup_batch_bytes_limit, if non-zero, controls the TargetBytes limits that
353-
// the joiner will use for its lookups. If zero, then the server-side default
354-
// is used. Note that, regardless of this setting, bytes limits are not always
355-
// used for lookups - it depends on whether the joiner decides it wants
356-
// DistSender-parallelism or not.
357-
optional int64 lookup_batch_bytes_limit = 18 [(gogoproto.nullable) = false];
358-
359337
// A hint for how many rows the consumer of the join reader output might
360338
// need. This is used to size the initial batches of input rows to try to
361339
// avoid reading many more rows than needed by the processor receiving the
@@ -384,6 +362,8 @@ message JoinReaderSpec {
384362
// reverse order. This is only useful if a lookup can return more than one
385363
// row.
386364
optional bool reverse_scans = 25 [(gogoproto.nullable) = false];
365+
366+
reserved 5, 7, 12, 13, 18;
387367
}
388368

389369
// SorterSpec is the specification for a "sorting aggregator". A sorting

pkg/sql/rowexec/joinreader.go

Lines changed: 31 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -115,11 +115,15 @@ type joinReader struct {
115115

116116
// fetcher wraps the row.Fetcher used to perform lookups. This enables the
117117
// joinReader to wrap the fetcher with a stat collector when necessary.
118-
fetcher rowFetcher
119-
alloc tree.DatumAlloc
120-
rowAlloc rowenc.EncDatumRowAlloc
121-
shouldLimitBatches bool
122-
readerType joinReaderType
118+
fetcher rowFetcher
119+
alloc tree.DatumAlloc
120+
rowAlloc rowenc.EncDatumRowAlloc
121+
// parallelize, if true, indicates that the KV lookups will be parallelized
122+
// across ranges when using the DistSender API. It has no influence on the
123+
// behavior when using the Streamer API (when the lookups are always
124+
// parallelized).
125+
parallelize bool
126+
readerType joinReaderType
123127

124128
// txn is the transaction used by the join reader.
125129
txn *kv.Txn
@@ -238,11 +242,6 @@ type joinReader struct {
238242
// and requires that the spec has MaintainOrdering set to true.
239243
outputGroupContinuationForLeftRow bool
240244

241-
// lookupBatchBytesLimit controls the TargetBytes of lookup requests. If 0, a
242-
// default will be used. Regardless of this value, bytes limits aren't always
243-
// used.
244-
lookupBatchBytesLimit rowinfra.BytesLimit
245-
246245
// limitHintHelper is used in limiting batches of input rows in the presence
247246
// of hard and soft limits.
248247
limitHintHelper execinfra.LimitHintHelper
@@ -331,18 +330,19 @@ func newJoinReader(
331330
// in case of indexJoinReaderType, we know that there's exactly one lookup
332331
// row for each input row. Similarly, in case of spec.LookupColumnsAreKey,
333332
// we know that there's at most one lookup row per input row. In other
334-
// cases, we use limits.
335-
shouldLimitBatches := !spec.LookupColumnsAreKey && readerType == lookupJoinReaderType
333+
// cases, we disable parallelism and use the TargetBytes limit.
334+
parallelize := spec.LookupColumnsAreKey || readerType == indexJoinReaderType
336335
if flowCtx.EvalCtx.SessionData().ParallelizeMultiKeyLookupJoinsEnabled {
337-
shouldLimitBatches = false
336+
parallelize = true
338337
}
339338
if spec.MaintainLookupOrdering {
340-
// MaintainLookupOrdering indicates the output of the lookup joiner should
341-
// be sorted by <inputCols>, <lookupCols>. It doesn't make sense for
342-
// MaintainLookupOrdering to be true when MaintainOrdering is not.
343-
// Additionally, we need to disable parallelism for the traditional fetcher
344-
// in order to ensure the lookups are ordered, so set shouldLimitBatches.
345-
spec.MaintainOrdering, shouldLimitBatches = true, true
339+
// MaintainLookupOrdering indicates the output of the lookup joiner
340+
// should be sorted by <inputCols>, <lookupCols>. It doesn't make sense
341+
// for MaintainLookupOrdering to be true when MaintainOrdering is not.
342+
//
343+
// Additionally, we need to disable parallelism for the traditional
344+
// fetcher in order to ensure the lookups are ordered.
345+
spec.MaintainOrdering, parallelize = true, false
346346
}
347347
useStreamer, txn, err := flowCtx.UseStreamer(ctx)
348348
if err != nil {
@@ -359,11 +359,10 @@ func newJoinReader(
359359
input: input,
360360
lookupCols: lookupCols,
361361
outputGroupContinuationForLeftRow: spec.OutputGroupContinuationForLeftRow,
362-
shouldLimitBatches: shouldLimitBatches,
362+
parallelize: parallelize,
363363
readerType: readerType,
364364
txn: txn,
365365
usesStreamer: useStreamer,
366-
lookupBatchBytesLimit: rowinfra.BytesLimit(spec.LookupBatchBytesLimit),
367366
limitHintHelper: execinfra.MakeLimitHintHelper(spec.LimitHint, post),
368367
errorOnLookup: errorOnLookup,
369368
allowEnforceHomeRegionFollowerReads: flowCtx.EvalCtx.SessionData().EnforceHomeRegionFollowerReadsEnabled,
@@ -868,16 +867,15 @@ func (jr *joinReader) getBatchBytesLimit() rowinfra.BytesLimit {
868867
// BatchRequests.
869868
return rowinfra.NoBytesLimit
870869
}
871-
if !jr.shouldLimitBatches {
872-
// We deem it safe to not limit the batches in order to get the
870+
if jr.parallelize {
871+
// We deem it safe to not use the TargetBytes limit in order to get the
873872
// DistSender-level parallelism.
874873
return rowinfra.NoBytesLimit
875874
}
876-
bytesLimit := jr.lookupBatchBytesLimit
877-
if bytesLimit == 0 {
878-
bytesLimit = rowinfra.GetDefaultBatchBytesLimit(jr.FlowCtx.EvalCtx.TestingKnobs.ForceProductionValues)
875+
if testingLimit := jr.FlowCtx.Cfg.TestingKnobs.JoinReaderBatchBytesLimit; testingLimit != 0 {
876+
return rowinfra.BytesLimit(testingLimit)
879877
}
880-
return bytesLimit
878+
return rowinfra.GetDefaultBatchBytesLimit(jr.FlowCtx.EvalCtx.TestingKnobs.ForceProductionValues)
881879
}
882880

883881
// readInput reads the next batch of input rows and starts an index scan, which
@@ -1054,11 +1052,13 @@ func (jr *joinReader) readInput() (
10541052
// fetcher only accepts a limit if the spans are sorted), and
10551053
// b) Pebble has various optimizations for Seeks in sorted order.
10561054
if jr.readerType == indexJoinReaderType && jr.maintainOrdering {
1057-
// Assert that the index join doesn't have shouldLimitBatches set. Since we
1058-
// didn't sort above, the fetcher doesn't support a limit.
1059-
if jr.shouldLimitBatches {
1055+
// Assert that the index join has 'parallelize=true' set. Since we
1056+
// didn't sort above, the fetcher doesn't support the TargetBytes limit
1057+
// (which would be set via getBatchBytesLimit() if 'parallelize' was
1058+
// false).
1059+
if !jr.parallelize {
10601060
err := errors.AssertionFailedf("index join configured with both maintainOrdering and " +
1061-
"shouldLimitBatched; this shouldn't have happened as the implementation doesn't support it")
1061+
"parallelize=false; this shouldn't have happened as the implementation doesn't support it")
10621062
jr.MoveToDraining(err)
10631063
return jrStateUnknown, nil, jr.DrainHelper()
10641064
}

pkg/sql/rowexec/tablereader.go

Lines changed: 10 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,8 @@ type tableReader struct {
3434
execinfra.ProcessorBase
3535
execinfra.SpansWithCopy
3636

37-
limitHint rowinfra.RowLimit
38-
parallelize bool
39-
batchBytesLimit rowinfra.BytesLimit
37+
limitHint rowinfra.RowLimit
38+
parallelize bool
4039

4140
scanStarted bool
4241

@@ -83,24 +82,16 @@ func newTableReader(
8382
return nil, errors.AssertionFailedf("attempting to create a tableReader with uninitialized NodeID")
8483
}
8584

86-
if spec.LimitHint > 0 || spec.BatchBytesLimit > 0 {
85+
if spec.LimitHint > 0 {
8786
// Parallelize shouldn't be set when there's a limit hint, but double-check
8887
// just in case.
8988
spec.Parallelize = false
9089
}
91-
var batchBytesLimit rowinfra.BytesLimit
92-
if !spec.Parallelize {
93-
batchBytesLimit = rowinfra.BytesLimit(spec.BatchBytesLimit)
94-
if batchBytesLimit == 0 {
95-
batchBytesLimit = rowinfra.GetDefaultBatchBytesLimit(flowCtx.EvalCtx.TestingKnobs.ForceProductionValues)
96-
}
97-
}
9890

9991
tr := trPool.Get().(*tableReader)
10092

10193
tr.limitHint = rowinfra.RowLimit(execinfra.LimitHint(spec.LimitHint, post))
10294
tr.parallelize = spec.Parallelize
103-
tr.batchBytesLimit = batchBytesLimit
10495
tr.maxTimestampAge = time.Duration(spec.MaxTimestampAgeNanos)
10596

10697
// Make sure the key column types are hydrated. The fetched column types
@@ -208,14 +199,14 @@ func (tr *tableReader) startScan(ctx context.Context) error {
208199
if cb := tr.FlowCtx.Cfg.TestingKnobs.TableReaderStartScanCb; cb != nil {
209200
cb()
210201
}
211-
limitBatches := !tr.parallelize
212-
var bytesLimit rowinfra.BytesLimit
213-
if !limitBatches {
214-
bytesLimit = rowinfra.NoBytesLimit
215-
} else {
216-
bytesLimit = tr.batchBytesLimit
202+
bytesLimit := rowinfra.NoBytesLimit
203+
if !tr.parallelize {
204+
bytesLimit = rowinfra.BytesLimit(tr.FlowCtx.Cfg.TestingKnobs.TableReaderBatchBytesLimit)
205+
if bytesLimit == 0 {
206+
bytesLimit = rowinfra.GetDefaultBatchBytesLimit(tr.FlowCtx.EvalCtx.TestingKnobs.ForceProductionValues)
207+
}
217208
}
218-
log.VEventf(ctx, 1, "starting scan with limitBatches %t", limitBatches)
209+
log.VEventf(ctx, 1, "starting scan with parallelize=%t", tr.parallelize)
219210
var err error
220211
if tr.maxTimestampAge == 0 {
221212
err = tr.fetcher.StartScan(

0 commit comments

Comments
 (0)