Skip to content

Commit 695d4ec

Browse files
craig[bot]yuzefovich
andcommitted
108208: rowexec: fix remote lookups when streamer is used r=yuzefovich a=yuzefovich Previously, we could incorrectly pass non-zero batch bytes limit when performing the remote lookups when the join reader is powered by the streamer. This would lead to an internal error and is now fixed. Fixes: cockroachdb#108206. Release note (bug fix): CockroachDB previously could encounter an internal error `unexpected non-zero bytes limit for txnKVStreamer` when evaluating locality-optimized lookup joins in case it had to perform the remote regions' lookup. The bug was introduced in 22.2 and is now fixed. Temporary workaround without upgrading is to run `SET streamer_enabled = false;`. Co-authored-by: Yahor Yuzefovich <[email protected]>
2 parents 5cfe9fe + c2b6d75 commit 695d4ec

File tree

2 files changed

+99
-17
lines changed

2 files changed

+99
-17
lines changed

pkg/ccl/logictestccl/testdata/logic_test/regional_by_row_query_behavior

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3475,3 +3475,82 @@ SELECT pk FROM regional_by_row_table_virt GROUP BY v;
34753475
# can be proven to be monotonically increasing or decreasing.
34763476
statement error pq: column "pk" must appear in the GROUP BY clause or be used in an aggregate function
34773477
SELECT pk FROM regional_by_row_table_virt GROUP BY (a+10);
3478+
3479+
# Regression test for incorrectly setting bytes limit in the streamer on remote
3480+
# lookups (#108206).
3481+
statement ok
3482+
CREATE TABLE t108206_p (
3483+
id INT PRIMARY KEY,
3484+
p_id INT,
3485+
INDEX (p_id),
3486+
FAMILY (id, p_id)
3487+
) LOCALITY REGIONAL BY ROW;
3488+
CREATE TABLE t108206_c (
3489+
c_id INT PRIMARY KEY,
3490+
c_p_id INT,
3491+
INDEX (c_p_id),
3492+
FAMILY (c_id, c_p_id)
3493+
) LOCALITY REGIONAL BY ROW;
3494+
INSERT INTO t108206_p (crdb_region, id, p_id) VALUES ('ap-southeast-2', 1, 10), ('ca-central-1', 2, 20), ('us-east-1', 3, 30);
3495+
INSERT INTO t108206_c (crdb_region, c_id, c_p_id) VALUES ('ap-southeast-2', 10, 10), ('ca-central-1', 20, 20), ('us-east-1', 30, 30)
3496+
3497+
statement ok
3498+
SET tracing = on,kv,results; SELECT * FROM t108206_c WHERE EXISTS (SELECT * FROM t108206_p WHERE p_id = c_p_id) AND c_id = 20; SET tracing = off
3499+
3500+
# If the row is not found in the local region, the other regions are searched in
3501+
# parallel.
3502+
query T
3503+
SELECT message FROM [SHOW KV TRACE FOR SESSION] WITH ORDINALITY
3504+
WHERE message LIKE 'fetched:%' OR message LIKE 'output row%'
3505+
OR message LIKE 'Scan%'
3506+
ORDER BY ordinality ASC
3507+
----
3508+
Scan /Table/138/1/"@"/20/0
3509+
Scan /Table/138/1/"\x80"/20/0, /Table/138/1/"\xc0"/20/0
3510+
fetched: /t108206_c/t108206_c_pkey/?/20/c_p_id -> /20
3511+
Scan /Table/137/2/"@"/2{0-1}
3512+
Scan /Table/137/2/"\x80"/2{0-1}, /Table/137/2/"\xc0"/2{0-1}
3513+
fetched: /t108206_p/t108206_p_p_id_idx/'ca-central-1'/20/2 -> <undecoded>
3514+
output row: [20 20]
3515+
3516+
# Left join with locality optimized search enabled.
3517+
query T retry
3518+
SELECT * FROM [EXPLAIN (DISTSQL) SELECT * FROM t108206_c WHERE EXISTS (SELECT * FROM t108206_p WHERE p_id = c_p_id) AND c_id = 20] OFFSET 2
3519+
----
3520+
·
3521+
• lookup join (semi)
3522+
│ table: t108206_p@t108206_p_p_id_idx
3523+
│ lookup condition: (crdb_region = 'ap-southeast-2') AND (c_p_id = p_id)
3524+
│ remote lookup condition: (crdb_region IN ('ca-central-1', 'us-east-1')) AND (c_p_id = p_id)
3525+
3526+
└── • union all
3527+
│ limit: 1
3528+
3529+
├── • scan
3530+
│ missing stats
3531+
│ table: t108206_c@t108206_c_pkey
3532+
│ spans: [/'ap-southeast-2'/20 - /'ap-southeast-2'/20]
3533+
3534+
└── • scan
3535+
missing stats
3536+
table: t108206_c@t108206_c_pkey
3537+
spans: [/'ca-central-1'/20 - /'ca-central-1'/20] [/'us-east-1'/20 - /'us-east-1'/20]
3538+
·
3539+
Diagram: https://cockroachdb.github.io/distsqlplan/decode.html#eJysk1Fr6koQx9_vpxjmRb1scRPLRRYKlhq5KVZ7jXALVSRNpnZPYzZnd0Mtxe9-2Oix6tFyWk4eQjI7-9v__Gf2Dc33DAVGQT-4GsPf0BsNb-A-uLvtX4YDqHfDaBz912_AfoL1eNvn_8wS-P_fYBRAcOfyoH48q9hkFTOZwgUkM_fRgMtBF-rJOubzxhSGvV4UjMFHhrlKaRAvyKC4Rw-nDAutEjJGaRd6qxLCdImCM5R5UVoXnjJMlCYUb2ilzQgFjuOHjEYUp6SbHBmmZGOZVdhtEZ3t16x4pldkeKWycpEbAU4e2yhGhlERu2hzgpPJss0n2PR5k0Ocp-CBsk-kcbpiqEr7rsjYeE4ovJ0Swi4KvmJfq8L7g1V0NhWcVO0fqPZOqn4Xa0jLOIMyVzolTeme3unqSHkDdaaKpr9fWF8upAXvpDR-IM3_jKHXSuYbP1v7x45fCxLQD3pjiIKbEK6H4QDZ1uZia3NR2TmT6RIZ9pV6Lgv4pmQOKhdQ75zDBSxr57wmhOh4nHu8vRn5jg8X0Gk1kOGIFsoSZEd2u9u3rLV39zNY1pI94K_Ebc-Ldc91-jDTNJcqP2lk68DI1meMHJEpVG7ooMm_17Izz00DpXNaT5BRpU7oVqukyl3_DitQFUjJ2PWqv_4J82rJcydoihfbEd0leV8l8UOS_yGptUfiuyT_kNT6kHR-msSdY4-ZenFXWSDfPGdHXj8fdBviuXFti57US4V1Y25QPMaZIYY38TN1yZJeyFwaKxMUVpe0Wv31IwAA__-d1uWE
3540+
3541+
statement ok
3542+
SET vectorize=on
3543+
3544+
query T
3545+
EXPLAIN (VEC) SELECT * FROM child LEFT JOIN parent ON p_id = c_p_id WHERE c_id = 10
3546+
----
3547+
3548+
└ Node 1
3549+
└ *rowexec.joinReader
3550+
└ *colexec.limitOp
3551+
└ *colexec.SerialUnorderedSynchronizer
3552+
├ *colfetcher.ColBatchScan
3553+
└ *colfetcher.ColBatchScan
3554+
3555+
statement ok
3556+
RESET vectorize

pkg/sql/rowexec/joinreader.go

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -819,6 +819,24 @@ func sortSpans(spans roachpb.Spans, spanIDs []int) {
819819
}
820820
}
821821

822+
func (jr *joinReader) getBatchBytesLimit() rowinfra.BytesLimit {
823+
if jr.usesStreamer {
824+
// The streamer itself sets the correct TargetBytes parameter on the
825+
// BatchRequests.
826+
return rowinfra.NoBytesLimit
827+
}
828+
if !jr.shouldLimitBatches {
829+
// We deem it safe to not limit the batches in order to get the
830+
// DistSender-level parallelism.
831+
return rowinfra.NoBytesLimit
832+
}
833+
bytesLimit := jr.lookupBatchBytesLimit
834+
if bytesLimit == 0 {
835+
bytesLimit = rowinfra.GetDefaultBatchBytesLimit(jr.EvalCtx.TestingKnobs.ForceProductionValues)
836+
}
837+
return bytesLimit
838+
}
839+
822840
// readInput reads the next batch of input rows and starts an index scan, which
823841
// for lookup join is the lookup of matching KVs for a batch of input rows.
824842
// It can sometimes emit a single row on behalf of the previous batch.
@@ -1016,19 +1034,8 @@ func (jr *joinReader) readInput() (
10161034
// modification here, but we want to be conscious about the memory
10171035
// accounting - we don't double count for any memory of spans because the
10181036
// joinReaderStrategy doesn't account for any memory used by the spans.
1019-
var bytesLimit rowinfra.BytesLimit
1020-
if !jr.usesStreamer {
1021-
if !jr.shouldLimitBatches {
1022-
bytesLimit = rowinfra.NoBytesLimit
1023-
} else {
1024-
bytesLimit = jr.lookupBatchBytesLimit
1025-
if jr.lookupBatchBytesLimit == 0 {
1026-
bytesLimit = rowinfra.GetDefaultBatchBytesLimit(jr.EvalCtx.TestingKnobs.ForceProductionValues)
1027-
}
1028-
}
1029-
}
10301037
if err = jr.fetcher.StartScan(
1031-
jr.Ctx(), spans, spanIDs, bytesLimit, rowinfra.NoRowLimit,
1038+
jr.Ctx(), spans, spanIDs, jr.getBatchBytesLimit(), rowinfra.NoRowLimit,
10321039
); err != nil {
10331040
jr.MoveToDraining(err)
10341041
return jrStateUnknown, nil, jr.DrainHelper()
@@ -1094,12 +1101,8 @@ func (jr *joinReader) fetchLookupRow() (joinReaderState, *execinfrapb.ProducerMe
10941101
}
10951102

10961103
log.VEventf(jr.Ctx(), 1, "scanning %d remote spans", len(spans))
1097-
bytesLimit := rowinfra.GetDefaultBatchBytesLimit(jr.EvalCtx.TestingKnobs.ForceProductionValues)
1098-
if !jr.shouldLimitBatches {
1099-
bytesLimit = rowinfra.NoBytesLimit
1100-
}
11011104
if err := jr.fetcher.StartScan(
1102-
jr.Ctx(), spans, spanIDs, bytesLimit, rowinfra.NoRowLimit,
1105+
jr.Ctx(), spans, spanIDs, jr.getBatchBytesLimit(), rowinfra.NoRowLimit,
11031106
); err != nil {
11041107
jr.MoveToDraining(err)
11051108
return jrStateUnknown, jr.DrainHelper()

0 commit comments

Comments
 (0)