@@ -209,6 +209,9 @@ type joinReader struct {
209
209
// curBatchInputRowCount is the number of input rows in the current batch.
210
210
curBatchInputRowCount int64
211
211
212
+ // lockingWaitPolicy is the wait policy for the underlying rowFetcher.
213
+ lockingWaitPolicy descpb.ScanLockingWaitPolicy
214
+
212
215
// State variables for each batch of input rows.
213
216
scratchInputRows rowenc.EncDatumRows
214
217
// resetScratchWhenReadingInput tracks whether scratchInputRows needs to be
@@ -377,6 +380,7 @@ func newJoinReader(
377
380
outputGroupContinuationForLeftRow : spec .OutputGroupContinuationForLeftRow ,
378
381
parallelize : parallelize ,
379
382
readerType : readerType ,
383
+ lockingWaitPolicy : spec .LockingWaitPolicy ,
380
384
txn : txn ,
381
385
usesStreamer : useStreamer ,
382
386
limitHintHelper : execinfra .MakeLimitHintHelper (spec .LimitHint , post ),
@@ -930,6 +934,13 @@ func (jr *joinReader) readInput() (
930
934
jr .resetScratchWhenReadingInput = false
931
935
}
932
936
937
+ // Assert that the correct number of rows were fetched in the last batch,
938
+ // for index joins.
939
+ if err := jr .assertIndexJoinRowCounts (); err != nil {
940
+ jr .MoveToDraining (err )
941
+ return jrStateUnknown , nil , jr .DrainHelper ()
942
+ }
943
+
933
944
// Read the next batch of input rows.
934
945
for {
935
946
var encDatumRow rowenc.EncDatumRow
@@ -1103,6 +1114,34 @@ func (jr *joinReader) readInput() (
1103
1114
return jrFetchingLookupRows , outRow , nil
1104
1115
}
1105
1116
1117
+ // assertIndexJoinRowCounts performs assertions to prevent silently returning
1118
+ // incorrect results, e.g., if an index is corrupt. In the common case, the
1119
+ // number of fetched rows in an index join should be equal to the number of
1120
+ // input rows. The only exception is when the locking wait policy is
1121
+ // SKIP LOCKED, in which case the number of fetched rows may be less than
1122
+ // the number of input rows, but never greater.
1123
+ func (jr * joinReader ) assertIndexJoinRowCounts () error {
1124
+ if jr .readerType != indexJoinReaderType {
1125
+ return nil
1126
+ }
1127
+ if jr .lockingWaitPolicy == descpb .ScanLockingWaitPolicy_SKIP_LOCKED {
1128
+ if jr .curBatchRowsRead > jr .curBatchInputRowCount {
1129
+ return errors .AssertionFailedf (
1130
+ "expected to fetch no more than %d rows, found %d" ,
1131
+ jr .curBatchInputRowCount , jr .curBatchRowsRead ,
1132
+ )
1133
+ }
1134
+ } else {
1135
+ if jr .curBatchRowsRead != jr .curBatchInputRowCount {
1136
+ return errors .AssertionFailedf (
1137
+ "expected to fetch %d rows, found %d" ,
1138
+ jr .curBatchInputRowCount , jr .curBatchRowsRead ,
1139
+ )
1140
+ }
1141
+ }
1142
+ return nil
1143
+ }
1144
+
1106
1145
var noHomeRegionError = pgerror .Newf (pgcode .QueryHasNoHomeRegion ,
1107
1146
"Query has no home region. Try using a lower LIMIT value or running the query from a different region. %s" ,
1108
1147
sqlerrors .EnforceHomeRegionFurtherInfo )
0 commit comments