Skip to content

Commit ce35c63

Browse files
craig[bot]mgartnerannrpom
committed
148538: sql: add index join and lookup join assertions r=mgartner a=mgartner #### sql: add fetch row count assertions in vectorized index joiner Assertions have been added to the vectorized index joiner that ensure that an index join fetches the expected number of rows. If an index join has a locking wait policy of `SKIP LOCKED`, it should fetch no more than the number of input rows. Otherwise, an index join should fetch exactly one row for each input row. If these assertions fail, the query results may be incorrect, e.g., due to index corruption, and an internal error is preferred over a successful result. Informs #135696 Release note: None #### sql: add fetch row count assertions in row-by-row index joiner Assertions have been added to the join reader that ensure that an index join fetches the expected number of rows. Informs #135696 Release note: None #### sql: add fetch row count assertions in row-by-row lookup joiner Assertions have been added to the join reader that ensure that a lookup join on key columns fetches the expected number of rows. Fixes #135696 Release note: None 148857: storage: enable value separation by default r=annrpom a=annrpom Epic: none Release note (ops change): The `storage.value_separation.enabled` cluster setting is now true by default. This enables value separation for sstables, where values exceeding a certain size threshold are stored in separate blob files rather than inline in the sstable. This helps improve write performance (write-amp) by avoiding rewriting such values during compactions. Co-authored-by: Marcus Gartner <[email protected]> Co-authored-by: Annie Pompa <[email protected]>
3 parents 9ab7ea1 + 92e2814 + 430462c commit ce35c63

File tree

3 files changed

+102
-14
lines changed

3 files changed

+102
-14
lines changed

pkg/sql/colfetcher/index_join.go

Lines changed: 57 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvstreamer"
1818
"github.com/cockroachdb/cockroach/pkg/roachpb"
1919
"github.com/cockroachdb/cockroach/pkg/settings"
20+
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
2021
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
2122
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecspan"
2223
"github.com/cockroachdb/cockroach/pkg/sql/colexecerror"
@@ -63,6 +64,16 @@ type ColIndexJoin struct {
6364
// and may not correspond to batch boundaries.
6465
startIdx int
6566

67+
// scanRowCounts contains the expected and actual number of rows fetched for
68+
// the current lookup scan. The expected row count is equal to the number of
69+
// input rows that have been consumed to create the spans. These counts are
70+
// used to make assertions that prevent returning incorrect results due to
71+
// index corruption. See assertScanRowCounts() for more details.
72+
scanRowCounts struct {
73+
expected int64
74+
actual int64
75+
}
76+
6677
// limitHintHelper is used in limiting batches of input rows in the presence
6778
// of hard and soft limits.
6879
limitHintHelper execinfra.LimitHintHelper
@@ -120,6 +131,10 @@ type ColIndexJoin struct {
120131
// table because the scan might synthesize additional implicit system columns.
121132
ResultTypes []*types.T
122133

134+
// lockingWaitPolicy is the wait policy for the cFetcher's underlying
135+
// row.KVFetcher.
136+
lockingWaitPolicy descpb.ScanLockingWaitPolicy
137+
123138
// maintainOrdering is true when the index join is required to maintain its
124139
// input ordering, in which case the ordering of the spans cannot be changed.
125140
maintainOrdering bool
@@ -204,8 +219,11 @@ func (s *ColIndexJoin) Next() coldata.Batch {
204219
sort.Sort(spans)
205220
}
206221

207-
// Index joins will always return exactly one output row per input row.
222+
// For memory accounting, we assume the index join will return
223+
// exactly one output row per input row. This is true most of the
224+
// time, except when the locking wait policy is SKIP LOCKED.
208225
s.cf.setEstimatedRowCount(uint64(rowCount))
226+
s.scanRowCounts.expected = rowCount
209227
// Note that the fetcher takes ownership of the spans slice - it
210228
// will modify it and perform the memory accounting. We don't care
211229
// about the modification here, but we want to be conscious about
@@ -239,8 +257,11 @@ func (s *ColIndexJoin) Next() coldata.Batch {
239257
// still has the references to it.
240258
s.spanAssembler.AccountForSpans()
241259
s.state = indexJoinConstructingSpans
260+
s.assertScanRowCounts()
261+
s.scanRowCounts.actual = 0
242262
continue
243263
}
264+
s.scanRowCounts.actual += int64(n)
244265
s.mu.Lock()
245266
s.mu.rowsRead += int64(n)
246267
s.mu.Unlock()
@@ -312,6 +333,30 @@ func (s *ColIndexJoin) getRowSize(idx int) int64 {
312333
return rowSize
313334
}
314335

336+
// assertScanRowCounts performs assertions to prevent silently returning
337+
// incorrect results, e.g., if an index is corrupt. In the common case, the
338+
// number of fetched rows in an index join should be equal to the number of
339+
// input rows. The only exception is when the locking wait policy is
340+
// SKIP LOCKED, in which case the number of fetched rows may be less than
341+
// the number of input rows, but never greater.
342+
func (s *ColIndexJoin) assertScanRowCounts() {
343+
if s.lockingWaitPolicy == descpb.ScanLockingWaitPolicy_SKIP_LOCKED {
344+
if s.scanRowCounts.actual > s.scanRowCounts.expected {
345+
colexecerror.InternalError(errors.AssertionFailedf(
346+
"expected to fetch no more than %d rows, found %d",
347+
s.scanRowCounts.expected, s.scanRowCounts.actual,
348+
))
349+
}
350+
} else {
351+
if s.scanRowCounts.actual != s.scanRowCounts.expected {
352+
colexecerror.InternalError(errors.AssertionFailedf(
353+
"expected to fetch %d rows, found %d",
354+
s.scanRowCounts.expected, s.scanRowCounts.actual,
355+
))
356+
}
357+
}
358+
}
359+
315360
// getBatchSize calculates the size of the entire current batch. Note that it
316361
// accounts only for the size of the data itself, and ignores extra overhead
317362
// such as selection vectors or byte offsets. getBatchSize is not exactly
@@ -616,16 +661,17 @@ func NewColIndexJoin(
616661
)
617662

618663
op := &ColIndexJoin{
619-
OneInputNode: colexecop.NewOneInputNode(input),
620-
flowCtx: flowCtx,
621-
processorID: processorID,
622-
cf: fetcher,
623-
spanAssembler: spanAssembler,
624-
ResultTypes: tableArgs.typs,
625-
maintainOrdering: spec.MaintainOrdering,
626-
txn: txn,
627-
usesStreamer: useStreamer,
628-
limitHintHelper: execinfra.MakeLimitHintHelper(spec.LimitHint, post),
664+
OneInputNode: colexecop.NewOneInputNode(input),
665+
flowCtx: flowCtx,
666+
processorID: processorID,
667+
cf: fetcher,
668+
spanAssembler: spanAssembler,
669+
ResultTypes: tableArgs.typs,
670+
maintainOrdering: spec.MaintainOrdering,
671+
txn: txn,
672+
usesStreamer: useStreamer,
673+
limitHintHelper: execinfra.MakeLimitHintHelper(spec.LimitHint, post),
674+
lockingWaitPolicy: spec.LockingWaitPolicy,
629675
}
630676
op.mem.inputBatchSizeLimit = getIndexJoinBatchSize(
631677
useStreamer, flowCtx.EvalCtx.TestingKnobs.ForceProductionValues, flowCtx.EvalCtx.SessionData(),

pkg/sql/rowexec/joinreader.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,13 @@ type joinReader struct {
209209
// curBatchInputRowCount is the number of input rows in the current batch.
210210
curBatchInputRowCount int64
211211

212+
// If set, the lookup columns form a key in the target table and thus each
213+
// lookup has at most one result.
214+
lookupColumnsAreKey bool
215+
216+
// lockingWaitPolicy is the wait policy for the underlying rowFetcher.
217+
lockingWaitPolicy descpb.ScanLockingWaitPolicy
218+
212219
// State variables for each batch of input rows.
213220
scratchInputRows rowenc.EncDatumRows
214221
// resetScratchWhenReadingInput tracks whether scratchInputRows needs to be
@@ -377,6 +384,8 @@ func newJoinReader(
377384
outputGroupContinuationForLeftRow: spec.OutputGroupContinuationForLeftRow,
378385
parallelize: parallelize,
379386
readerType: readerType,
387+
lookupColumnsAreKey: spec.LookupColumnsAreKey,
388+
lockingWaitPolicy: spec.LockingWaitPolicy,
380389
txn: txn,
381390
usesStreamer: useStreamer,
382391
limitHintHelper: execinfra.MakeLimitHintHelper(spec.LimitHint, post),
@@ -930,6 +939,12 @@ func (jr *joinReader) readInput() (
930939
jr.resetScratchWhenReadingInput = false
931940
}
932941

942+
// Assert that the correct number of rows were fetched in the last batch.
943+
if err := jr.assertBatchRowCounts(); err != nil {
944+
jr.MoveToDraining(err)
945+
return jrStateUnknown, nil, jr.DrainHelper()
946+
}
947+
933948
// Read the next batch of input rows.
934949
for {
935950
var encDatumRow rowenc.EncDatumRow
@@ -1103,6 +1118,33 @@ func (jr *joinReader) readInput() (
11031118
return jrFetchingLookupRows, outRow, nil
11041119
}
11051120

1121+
// assertBatchRowCounts performs assertions to prevent silently returning
1122+
// incorrect results, e.g., if the lookup index is corrupt.
1123+
func (jr *joinReader) assertBatchRowCounts() error {
1124+
// An index join without SKIP LOCKED should fetch exactly one row for each
1125+
// input row.
1126+
nonSkippingIndexJoin := jr.readerType == indexJoinReaderType &&
1127+
jr.lockingWaitPolicy != descpb.ScanLockingWaitPolicy_SKIP_LOCKED
1128+
if nonSkippingIndexJoin && jr.curBatchRowsRead != jr.curBatchInputRowCount {
1129+
return errors.AssertionFailedf(
1130+
"expected to fetch %d rows, found %d",
1131+
jr.curBatchInputRowCount, jr.curBatchRowsRead,
1132+
)
1133+
}
1134+
// An index join with SKIP LOCKED or a lookup join where the lookup columns
1135+
// form a key should fetch at most one row for each input row.
1136+
skippingIndexJoin := jr.readerType == indexJoinReaderType &&
1137+
jr.lockingWaitPolicy == descpb.ScanLockingWaitPolicy_SKIP_LOCKED
1138+
if (skippingIndexJoin || jr.lookupColumnsAreKey) &&
1139+
jr.curBatchRowsRead > jr.curBatchInputRowCount {
1140+
return errors.AssertionFailedf(
1141+
"expected to fetch no more than %d rows, found %d",
1142+
jr.curBatchInputRowCount, jr.curBatchRowsRead,
1143+
)
1144+
}
1145+
return nil
1146+
}
1147+
11061148
var noHomeRegionError = pgerror.Newf(pgcode.QueryHasNoHomeRegion,
11071149
"Query has no home region. Try using a lower LIMIT value or running the query from a different region. %s",
11081150
sqlerrors.EnforceHomeRegionFurtherInfo)

pkg/storage/pebble.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -423,10 +423,10 @@ var (
423423
valueSeparationEnabled = settings.RegisterBoolSetting(
424424
settings.SystemVisible,
425425
"storage.value_separation.enabled",
426-
"(experimental) whether or not values may be separated into blob files; "+
426+
"whether or not values may be separated into blob files; "+
427427
"requires columnar blocks to be enabled",
428428
metamorphic.ConstantWithTestBool(
429-
"storage.value_separation.enabled", false), /* defaultValue */
429+
"storage.value_separation.enabled", true /* defaultValue */),
430430
)
431431
valueSeparationMinimumSize = settings.RegisterIntSetting(
432432
settings.SystemVisible,
@@ -448,7 +448,7 @@ var (
448448
settings.SystemVisible,
449449
"storage.value_separation.rewrite_minimum_age",
450450
"the minimum age of a blob file before it is eligible for a rewrite compaction",
451-
5*time.Minute, // 5 minutes
451+
5*time.Minute,
452452
settings.DurationWithMinimum(0),
453453
)
454454
valueSeparationCompactionGarbageThreshold = settings.RegisterIntSetting(

0 commit comments

Comments
 (0)