Skip to content

Commit be8c8a3

Browse files
committed
sql: clarify fetcher parallelization
I find "limit batches" terminology a bit confusing, so this commit switches places where we use it to "parallelize" which indicates that the DistSender-level cross-range parallelism should be used, which in turn means that TargetBytes limit cannot be set (i.e. "should _not_ limit batches"). Release note: None
1 parent 7cf8e92 commit be8c8a3

File tree

5 files changed

+36
-33
lines changed

5 files changed

+36
-33
lines changed

pkg/sql/colfetcher/cfetcher.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -586,15 +586,15 @@ func cFetcherFirstBatchLimit(limitHint rowinfra.RowLimit, maxKeysPerRow uint32)
586586
func (cf *cFetcher) StartScan(
587587
ctx context.Context,
588588
spans roachpb.Spans,
589-
limitBatches bool,
589+
parallelize bool,
590590
batchBytesLimit rowinfra.BytesLimit,
591591
limitHint rowinfra.RowLimit,
592592
) error {
593593
if len(spans) == 0 {
594594
return errors.AssertionFailedf("no spans")
595595
}
596-
if !limitBatches && batchBytesLimit != rowinfra.NoBytesLimit {
597-
return errors.AssertionFailedf("batchBytesLimit set without limitBatches")
596+
if parallelize && batchBytesLimit != rowinfra.NoBytesLimit {
597+
return errors.AssertionFailedf("TargetBytes limit requested with parallelize=true")
598598
}
599599

600600
firstBatchLimit := cFetcherFirstBatchLimit(limitHint, cf.table.spec.MaxKeysPerRow)

pkg/sql/colfetcher/colbatch_scan.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -221,11 +221,10 @@ func (s *ColBatchScan) Init(ctx context.Context) {
221221
s.Ctx, s.flowCtx, "colbatchscan", s.processorID,
222222
&s.ContentionEventsListener, &s.ScanStatsListener, &s.TenantConsumptionListener,
223223
)
224-
limitBatches := !s.parallelize
225224
if err := s.cf.StartScan(
226225
s.Ctx,
227226
s.Spans,
228-
limitBatches,
227+
s.parallelize,
229228
s.batchBytesLimit,
230229
s.limitHint,
231230
); err != nil {

pkg/sql/colfetcher/index_join.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ func (s *ColIndexJoin) Next() coldata.Batch {
215215
if err := s.cf.StartScan(
216216
s.Ctx,
217217
spans,
218-
false, /* limitBatches */
218+
true, /* parallelize */
219219
rowinfra.NoBytesLimit,
220220
rowinfra.NoRowLimit,
221221
); err != nil {

pkg/sql/rowexec/joinreader.go

Lines changed: 28 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -115,11 +115,15 @@ type joinReader struct {
115115

116116
// fetcher wraps the row.Fetcher used to perform lookups. This enables the
117117
// joinReader to wrap the fetcher with a stat collector when necessary.
118-
fetcher rowFetcher
119-
alloc tree.DatumAlloc
120-
rowAlloc rowenc.EncDatumRowAlloc
121-
shouldLimitBatches bool
122-
readerType joinReaderType
118+
fetcher rowFetcher
119+
alloc tree.DatumAlloc
120+
rowAlloc rowenc.EncDatumRowAlloc
121+
// parallelize, if true, indicates that the KV lookups will be parallelized
122+
// across ranges when using the DistSender API. It has no influence on the
123+
// behavior when using the Streamer API (when the lookups are always
124+
// parallelized).
125+
parallelize bool
126+
readerType joinReaderType
123127

124128
// txn is the transaction used by the join reader.
125129
txn *kv.Txn
@@ -326,18 +330,19 @@ func newJoinReader(
326330
// in case of indexJoinReaderType, we know that there's exactly one lookup
327331
// row for each input row. Similarly, in case of spec.LookupColumnsAreKey,
328332
// we know that there's at most one lookup row per input row. In other
329-
// cases, we use limits.
330-
shouldLimitBatches := !spec.LookupColumnsAreKey && readerType == lookupJoinReaderType
333+
// cases, we disable parallelism and use the TargetBytes limit.
334+
parallelize := spec.LookupColumnsAreKey || readerType == indexJoinReaderType
331335
if flowCtx.EvalCtx.SessionData().ParallelizeMultiKeyLookupJoinsEnabled {
332-
shouldLimitBatches = false
336+
parallelize = true
333337
}
334338
if spec.MaintainLookupOrdering {
335-
// MaintainLookupOrdering indicates the output of the lookup joiner should
336-
// be sorted by <inputCols>, <lookupCols>. It doesn't make sense for
337-
// MaintainLookupOrdering to be true when MaintainOrdering is not.
338-
// Additionally, we need to disable parallelism for the traditional fetcher
339-
// in order to ensure the lookups are ordered, so set shouldLimitBatches.
340-
spec.MaintainOrdering, shouldLimitBatches = true, true
339+
// MaintainLookupOrdering indicates the output of the lookup joiner
340+
// should be sorted by <inputCols>, <lookupCols>. It doesn't make sense
341+
// for MaintainLookupOrdering to be true when MaintainOrdering is not.
342+
//
343+
// Additionally, we need to disable parallelism for the traditional
344+
// fetcher in order to ensure the lookups are ordered.
345+
spec.MaintainOrdering, parallelize = true, false
341346
}
342347
useStreamer, txn, err := flowCtx.UseStreamer(ctx)
343348
if err != nil {
@@ -354,7 +359,7 @@ func newJoinReader(
354359
input: input,
355360
lookupCols: lookupCols,
356361
outputGroupContinuationForLeftRow: spec.OutputGroupContinuationForLeftRow,
357-
shouldLimitBatches: shouldLimitBatches,
362+
parallelize: parallelize,
358363
readerType: readerType,
359364
txn: txn,
360365
usesStreamer: useStreamer,
@@ -862,8 +867,8 @@ func (jr *joinReader) getBatchBytesLimit() rowinfra.BytesLimit {
862867
// BatchRequests.
863868
return rowinfra.NoBytesLimit
864869
}
865-
if !jr.shouldLimitBatches {
866-
// We deem it safe to not limit the batches in order to get the
870+
if jr.parallelize {
871+
// We deem it safe to not use the TargetBytes limit in order to get the
867872
// DistSender-level parallelism.
868873
return rowinfra.NoBytesLimit
869874
}
@@ -1047,11 +1052,13 @@ func (jr *joinReader) readInput() (
10471052
// fetcher only accepts a limit if the spans are sorted), and
10481053
// b) Pebble has various optimizations for Seeks in sorted order.
10491054
if jr.readerType == indexJoinReaderType && jr.maintainOrdering {
1050-
// Assert that the index join doesn't have shouldLimitBatches set. Since we
1051-
// didn't sort above, the fetcher doesn't support a limit.
1052-
if jr.shouldLimitBatches {
1055+
// Assert that the index join has 'parallelize=true' set. Since we
1056+
// didn't sort above, the fetcher doesn't support the TargetBytes limit
1057+
// (which would be set via getBatchBytesLimit() if 'parallelize' was
1058+
// false).
1059+
if !jr.parallelize {
10531060
err := errors.AssertionFailedf("index join configured with both maintainOrdering and " +
1054-
"shouldLimitBatched; this shouldn't have happened as the implementation doesn't support it")
1061+
"parallelize=false; this shouldn't have happened as the implementation doesn't support it")
10551062
jr.MoveToDraining(err)
10561063
return jrStateUnknown, nil, jr.DrainHelper()
10571064
}

pkg/sql/rowexec/tablereader.go

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -199,17 +199,14 @@ func (tr *tableReader) startScan(ctx context.Context) error {
199199
if cb := tr.FlowCtx.Cfg.TestingKnobs.TableReaderStartScanCb; cb != nil {
200200
cb()
201201
}
202-
limitBatches := !tr.parallelize
203-
var bytesLimit rowinfra.BytesLimit
204-
if !limitBatches {
205-
bytesLimit = rowinfra.NoBytesLimit
206-
} else {
202+
bytesLimit := rowinfra.NoBytesLimit
203+
if !tr.parallelize {
207204
bytesLimit = rowinfra.BytesLimit(tr.FlowCtx.Cfg.TestingKnobs.TableReaderBatchBytesLimit)
208205
if bytesLimit == 0 {
209206
bytesLimit = rowinfra.GetDefaultBatchBytesLimit(tr.FlowCtx.EvalCtx.TestingKnobs.ForceProductionValues)
210207
}
211208
}
212-
log.VEventf(ctx, 1, "starting scan with limitBatches %t", limitBatches)
209+
log.VEventf(ctx, 1, "starting scan with parallelize=%t", tr.parallelize)
213210
var err error
214211
if tr.maxTimestampAge == 0 {
215212
err = tr.fetcher.StartScan(

0 commit comments

Comments
 (0)