@@ -17,6 +17,7 @@ import (
17
17
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvstreamer"
18
18
"github.com/cockroachdb/cockroach/pkg/roachpb"
19
19
"github.com/cockroachdb/cockroach/pkg/settings"
20
+ "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
20
21
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
21
22
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecspan"
22
23
"github.com/cockroachdb/cockroach/pkg/sql/colexecerror"
@@ -63,6 +64,16 @@ type ColIndexJoin struct {
63
64
// and may not correspond to batch boundaries.
64
65
startIdx int
65
66
67
+ // scanRowCounts contains the expected and actual number of rows fetched for
68
+ // the current lookup scan. The expected row count is equal to the number of
69
+ // input rows that have been consumed to create the spans. These counts are
70
+ // used to make assertions that prevent returning incorrect results due to
71
+ // index corruption. See assertScanRowCounts() for more details.
72
+ scanRowCounts struct {
73
+ expected int64
74
+ actual int64
75
+ }
76
+
66
77
// limitHintHelper is used in limiting batches of input rows in the presence
67
78
// of hard and soft limits.
68
79
limitHintHelper execinfra.LimitHintHelper
@@ -120,6 +131,10 @@ type ColIndexJoin struct {
120
131
// table because the scan might synthesize additional implicit system columns.
121
132
ResultTypes []* types.T
122
133
134
+ // lockingWaitPolicy is the wait policy for the cFetcher's underlying
135
+ // row.KVFetcher.
136
+ lockingWaitPolicy descpb.ScanLockingWaitPolicy
137
+
123
138
// maintainOrdering is true when the index join is required to maintain its
124
139
// input ordering, in which case the ordering of the spans cannot be changed.
125
140
maintainOrdering bool
@@ -204,8 +219,11 @@ func (s *ColIndexJoin) Next() coldata.Batch {
204
219
sort .Sort (spans )
205
220
}
206
221
207
- // Index joins will always return exactly one output row per input row.
222
+ // For memory accounting, we assume the index join will return
223
+ // exactly one output row per input row. This is true most of the
224
+ // time, except when the locking wait policy is SKIP LOCKED.
208
225
s .cf .setEstimatedRowCount (uint64 (rowCount ))
226
+ s .scanRowCounts .expected = rowCount
209
227
// Note that the fetcher takes ownership of the spans slice - it
210
228
// will modify it and perform the memory accounting. We don't care
211
229
// about the modification here, but we want to be conscious about
@@ -239,8 +257,11 @@ func (s *ColIndexJoin) Next() coldata.Batch {
239
257
// still has the references to it.
240
258
s .spanAssembler .AccountForSpans ()
241
259
s .state = indexJoinConstructingSpans
260
+ s .assertScanRowCounts ()
261
+ s .scanRowCounts .actual = 0
242
262
continue
243
263
}
264
+ s .scanRowCounts .actual += int64 (n )
244
265
s .mu .Lock ()
245
266
s .mu .rowsRead += int64 (n )
246
267
s .mu .Unlock ()
@@ -312,6 +333,30 @@ func (s *ColIndexJoin) getRowSize(idx int) int64 {
312
333
return rowSize
313
334
}
314
335
336
+ // assertScanRowCounts performs assertions to prevent silently returning
337
+ // incorrect results, e.g., if an index is corrupt. In the common case, the
338
+ // number of fetched rows in an index join should be equal to the number of
339
+ // input rows. The only exception is when the locking wait policy is
340
+ // SKIP LOCKED, in which case the number of fetched rows may be less than
341
+ // the number of input rows, but never greater.
342
+ func (s * ColIndexJoin ) assertScanRowCounts () {
343
+ if s .lockingWaitPolicy == descpb .ScanLockingWaitPolicy_SKIP_LOCKED {
344
+ if s .scanRowCounts .actual > s .scanRowCounts .expected {
345
+ colexecerror .InternalError (errors .AssertionFailedf (
346
+ "expected to fetch no more than %d rows, found %d" ,
347
+ s .scanRowCounts .expected , s .scanRowCounts .actual ,
348
+ ))
349
+ }
350
+ } else {
351
+ if s .scanRowCounts .actual != s .scanRowCounts .expected {
352
+ colexecerror .InternalError (errors .AssertionFailedf (
353
+ "expected to fetch %d rows, found %d" ,
354
+ s .scanRowCounts .expected , s .scanRowCounts .actual ,
355
+ ))
356
+ }
357
+ }
358
+ }
359
+
315
360
// getBatchSize calculates the size of the entire current batch. Note that it
316
361
// accounts only for the size of the data itself, and ignores extra overhead
317
362
// such as selection vectors or byte offsets. getBatchSize is not exactly
@@ -616,16 +661,17 @@ func NewColIndexJoin(
616
661
)
617
662
618
663
op := & ColIndexJoin {
619
- OneInputNode : colexecop .NewOneInputNode (input ),
620
- flowCtx : flowCtx ,
621
- processorID : processorID ,
622
- cf : fetcher ,
623
- spanAssembler : spanAssembler ,
624
- ResultTypes : tableArgs .typs ,
625
- maintainOrdering : spec .MaintainOrdering ,
626
- txn : txn ,
627
- usesStreamer : useStreamer ,
628
- limitHintHelper : execinfra .MakeLimitHintHelper (spec .LimitHint , post ),
664
+ OneInputNode : colexecop .NewOneInputNode (input ),
665
+ flowCtx : flowCtx ,
666
+ processorID : processorID ,
667
+ cf : fetcher ,
668
+ spanAssembler : spanAssembler ,
669
+ ResultTypes : tableArgs .typs ,
670
+ maintainOrdering : spec .MaintainOrdering ,
671
+ txn : txn ,
672
+ usesStreamer : useStreamer ,
673
+ limitHintHelper : execinfra .MakeLimitHintHelper (spec .LimitHint , post ),
674
+ lockingWaitPolicy : spec .LockingWaitPolicy ,
629
675
}
630
676
op .mem .inputBatchSizeLimit = getIndexJoinBatchSize (
631
677
useStreamer , flowCtx .EvalCtx .TestingKnobs .ForceProductionValues , flowCtx .EvalCtx .SessionData (),
0 commit comments