Skip to content

Commit b48712f

Browse files
committed
sql: add backfill validation for entries written by BulkAdder
This patch fixes the logic for counting the number of entries written by the BulkAdder during the backfillQueryIntoTable operation, which is used by CREATE TABLE AS, CREATE MATERIALIZED VIEW, and REFRESH MATERIALIZED VIEW. We now use this count when validating the backfill, as an additional check. Release note: None
1 parent 875e04a commit b48712f

File tree

2 files changed

+48
-20
lines changed

2 files changed

+48
-20
lines changed

pkg/sql/rowexec/bulk_row_writer.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,9 @@ func (sp *bulkRowWriter) Start(ctx context.Context) {
7272
ctx = sp.StartInternal(ctx, "bulkRowWriter")
7373
sp.input.Start(ctx)
7474
err := sp.work(ctx)
75-
sp.MoveToDraining(err)
75+
if err != nil {
76+
sp.MoveToDraining(err)
77+
}
7678
}
7779

7880
// Next is part of the RowSource interface.

pkg/sql/schema_changer.go

Lines changed: 45 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"time"
1515

1616
"github.com/cockroachdb/cockroach/pkg/base"
17+
"github.com/cockroachdb/cockroach/pkg/clusterversion"
1718
"github.com/cockroachdb/cockroach/pkg/config"
1819
"github.com/cockroachdb/cockroach/pkg/jobs"
1920
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
@@ -311,13 +312,21 @@ const schemaChangerBackfillTxnDebugName = "schemaChangerBackfill"
311312
// validateBackfillQueryIntoTable validates that source query matches the contents of
312313
// a backfilled table, when executing the query at queryTS.
313314
func (sc *SchemaChanger) validateBackfillQueryIntoTable(
314-
ctx context.Context, table catalog.TableDescriptor, queryTS hlc.Timestamp, query string,
315+
ctx context.Context,
316+
table catalog.TableDescriptor,
317+
entryCountWrittenToPrimaryIdx int64,
318+
queryTS hlc.Timestamp,
319+
query string,
320+
skipAOSTValidation bool,
315321
) error {
316-
var entryCount int64
322+
var aostEntryCount int64
317323
sd := NewInternalSessionData(ctx, sc.execCfg.Settings, "validateBackfillQueryIntoTable")
318324
sd.SessionData = *sc.sessionData
319325
// First get the expected row count for the source query at the target timestamp.
320326
if err := sc.execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
327+
if skipAOSTValidation {
328+
return nil
329+
}
321330
parsedQuery, err := parser.ParseOne(query)
322331
if err != nil {
323332
return err
@@ -347,9 +356,11 @@ func (sc *SchemaChanger) validateBackfillQueryIntoTable(
347356
if err != nil {
348357
return err
349358
}
350-
entryCount = int64(tree.MustBeDInt(row[0]))
359+
aostEntryCount = int64(tree.MustBeDInt(row[0]))
351360
return nil
352-
}, isql.WithSessionData(sd)); err != nil {
361+
}, isql.WithSessionData(sd),
362+
isql.WithPriority(admissionpb.BulkNormalPri),
363+
); err != nil {
353364
return err
354365
}
355366
// Next run validation on table that was populated using count queries.
@@ -361,7 +372,7 @@ func (sc *SchemaChanger) validateBackfillQueryIntoTable(
361372
// counts match.
362373
mut := builder.BuildExistingMutable().(*tabledesc.Mutable)
363374
mut.SetPublic()
364-
count, err := countIndexRowsAndMaybeCheckUniqueness(ctx, mut, index, false,
375+
newTblEntryCount, err := countIndexRowsAndMaybeCheckUniqueness(ctx, mut, index, false,
365376
descs.NewHistoricalInternalExecTxnRunner(now, func(ctx context.Context, fn descs.InternalExecFn) error {
366377
return sc.execCfg.InternalDB.DescsTxn(ctx, func(
367378
ctx context.Context, txn descs.Txn,
@@ -379,13 +390,21 @@ func (sc *SchemaChanger) validateBackfillQueryIntoTable(
379390
// Testing knob that allows us to manipulate counts to fail
380391
// the validation.
381392
if sc.testingKnobs.RunDuringQueryBackfillValidation != nil {
382-
count, err = sc.testingKnobs.RunDuringQueryBackfillValidation(entryCount, count)
393+
newTblEntryCount, err = sc.testingKnobs.RunDuringQueryBackfillValidation(aostEntryCount, newTblEntryCount)
383394
if err != nil {
384395
return err
385396
}
386397
}
387-
if count != entryCount {
388-
return errors.AssertionFailedf("backfill query did not populate index %q with expected number of rows (expected: %d, got: %d)", index.GetName(), entryCount, count)
398+
if entryCountWrittenToPrimaryIdx > 0 {
399+
if newTblEntryCount != entryCountWrittenToPrimaryIdx {
400+
return errors.AssertionFailedf(
401+
"backfill query did not populate index %q with expected number of rows (expected: %d, got: %d)",
402+
index.GetName(), entryCountWrittenToPrimaryIdx, newTblEntryCount,
403+
)
404+
}
405+
}
406+
if !skipAOSTValidation && newTblEntryCount != aostEntryCount {
407+
return errors.AssertionFailedf("backfill query did not populate index %q with expected number of rows (expected: %d, got: %d)", index.GetName(), aostEntryCount, newTblEntryCount)
389408
}
390409
return nil
391410
}
@@ -417,9 +436,12 @@ func (sc *SchemaChanger) backfillQueryIntoTable(
417436
}
418437
}()
419438
validationTime := ts
420-
var skipValidation bool
421-
// Get the expected entry count against the source query.
439+
var skipAOSTValidation bool
440+
bulkSummary := kvpb.BulkOpSummary{}
441+
422442
err = sc.db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
443+
// Clear out the previous summary if the transaction gets retried.
444+
bulkSummary = kvpb.BulkOpSummary{}
423445
defer func() {
424446
isTxnRetry = true
425447
}()
@@ -515,15 +537,12 @@ func (sc *SchemaChanger) backfillQueryIntoTable(
515537
return err
516538
}
517539
}
518-
res := kvpb.BulkOpSummary{}
519540
rw := NewCallbackResultWriter(func(ctx context.Context, row tree.Datums) error {
520-
// TODO(adityamaru): Use the BulkOpSummary for either telemetry or to
521-
// return to user.
522541
var counts kvpb.BulkOpSummary
523542
if err := protoutil.Unmarshal([]byte(*row[0].(*tree.DBytes)), &counts); err != nil {
524543
return err
525544
}
526-
res.Add(counts)
545+
bulkSummary.Add(counts)
527546
return nil
528547
})
529548
recv := MakeDistSQLReceiver(
@@ -590,7 +609,7 @@ func (sc *SchemaChanger) backfillQueryIntoTable(
590609
// since our AOST query might be reading data not in KV.
591610
for _, tbl := range localPlanner.curPlan.mem.Metadata().AllTables() {
592611
if tbl.Table.IsVirtualTable() {
593-
skipValidation = true
612+
skipAOSTValidation = true
594613
}
595614
}
596615
return nil
@@ -608,12 +627,19 @@ func (sc *SchemaChanger) backfillQueryIntoTable(
608627
return err
609628
}
610629

630+
// Count how many keys were written to the primary index, as reported by the
631+
// BulkAdder. We need a version gate since pre-25.4 clusters did not count
632+
// correctly.
633+
entriesWrittenToPrimaryIdx := int64(0)
634+
if sc.execCfg.Settings.Version.IsActive(ctx, clusterversion.V25_4) {
635+
key := kvpb.BulkOpSummaryID(uint64(table.GetID()), uint64(table.GetPrimaryIndex().GetID()))
636+
entriesWrittenToPrimaryIdx = bulkSummary.EntryCounts[key]
637+
}
638+
611639
// Validation will be skipped if the query is not AOST safe.
612640
// (i.e. reading non-KV data via CRDB internal)
613-
if !skipValidation {
614-
if err := sc.validateBackfillQueryIntoTable(ctx, table, validationTime, query); err != nil {
615-
return err
616-
}
641+
if err := sc.validateBackfillQueryIntoTable(ctx, table, entriesWrittenToPrimaryIdx, validationTime, query, skipAOSTValidation); err != nil {
642+
return err
617643
}
618644
return nil
619645
}

0 commit comments

Comments
 (0)