@@ -209,6 +209,10 @@ type joinReader struct {
209
209
// curBatchInputRowCount is the number of input rows in the current batch.
210
210
curBatchInputRowCount int64
211
211
212
+ // If set, the lookup columns form a key in the target table and thus each
213
+ // lookup has at most one result.
214
+ lookupColumnsAreKey bool
215
+
212
216
// lockingWaitPolicy is the wait policy for the underlying rowFetcher.
213
217
lockingWaitPolicy descpb.ScanLockingWaitPolicy
214
218
@@ -380,6 +384,7 @@ func newJoinReader(
380
384
outputGroupContinuationForLeftRow : spec .OutputGroupContinuationForLeftRow ,
381
385
parallelize : parallelize ,
382
386
readerType : readerType ,
387
+ lookupColumnsAreKey : spec .LookupColumnsAreKey ,
383
388
lockingWaitPolicy : spec .LockingWaitPolicy ,
384
389
txn : txn ,
385
390
usesStreamer : useStreamer ,
@@ -934,9 +939,8 @@ func (jr *joinReader) readInput() (
934
939
jr .resetScratchWhenReadingInput = false
935
940
}
936
941
937
- // Assert that the correct number of rows were fetched in the last batch,
938
- // for index joins.
939
- if err := jr .assertIndexJoinRowCounts (); err != nil {
942
+ // Assert that the correct number of rows were fetched in the last batch.
943
+ if err := jr .assertBatchRowCounts (); err != nil {
940
944
jr .MoveToDraining (err )
941
945
return jrStateUnknown , nil , jr .DrainHelper ()
942
946
}
@@ -1114,30 +1118,29 @@ func (jr *joinReader) readInput() (
1114
1118
return jrFetchingLookupRows , outRow , nil
1115
1119
}
1116
1120
1117
- // assertIndexJoinRowCounts performs assertions to prevent silently returning
1118
- // incorrect results, e.g., if an index is corrupt. In the common case, the
1119
- // number of fetched rows in an index join should be equal to the number of
1120
- // input rows. The only exception is when the locking wait policy is
1121
- // SKIP LOCKED, in which case the number of fetched rows may be less than
1122
- // the number of input rows, but never greater.
1123
- func (jr * joinReader ) assertIndexJoinRowCounts () error {
1124
- if jr .readerType != indexJoinReaderType {
1125
- return nil
1121
+ // assertBatchRowCounts performs assertions to prevent silently returning
1122
+ // incorrect results, e.g., if the lookup index is corrupt.
1123
+ func (jr * joinReader ) assertBatchRowCounts () error {
1124
+ // An index join without SKIP LOCKED should fetch exactly one row for each
1125
+ // input row.
1126
+ nonSkippingIndexJoin := jr .readerType == indexJoinReaderType &&
1127
+ jr .lockingWaitPolicy != descpb .ScanLockingWaitPolicy_SKIP_LOCKED
1128
+ if nonSkippingIndexJoin && jr .curBatchRowsRead != jr .curBatchInputRowCount {
1129
+ return errors .AssertionFailedf (
1130
+ "expected to fetch %d rows, found %d" ,
1131
+ jr .curBatchInputRowCount , jr .curBatchRowsRead ,
1132
+ )
1126
1133
}
1127
- if jr .lockingWaitPolicy == descpb .ScanLockingWaitPolicy_SKIP_LOCKED {
1128
- if jr .curBatchRowsRead > jr .curBatchInputRowCount {
1129
- return errors .AssertionFailedf (
1130
- "expected to fetch no more than %d rows, found %d" ,
1131
- jr .curBatchInputRowCount , jr .curBatchRowsRead ,
1132
- )
1133
- }
1134
- } else {
1135
- if jr .curBatchRowsRead != jr .curBatchInputRowCount {
1136
- return errors .AssertionFailedf (
1137
- "expected to fetch %d rows, found %d" ,
1138
- jr .curBatchInputRowCount , jr .curBatchRowsRead ,
1139
- )
1140
- }
1134
+ // An index join with SKIP LOCKED or a lookup join where the lookup columns
1135
+ // form a key should fetch at most one row for each input row.
1136
+ skippingIndexJoin := jr .readerType == indexJoinReaderType &&
1137
+ jr .lockingWaitPolicy == descpb .ScanLockingWaitPolicy_SKIP_LOCKED
1138
+ if (skippingIndexJoin || jr .lookupColumnsAreKey ) &&
1139
+ jr .curBatchRowsRead > jr .curBatchInputRowCount {
1140
+ return errors .AssertionFailedf (
1141
+ "expected to fetch no more than %d rows, found %d" ,
1142
+ jr .curBatchInputRowCount , jr .curBatchRowsRead ,
1143
+ )
1141
1144
}
1142
1145
return nil
1143
1146
}
0 commit comments