Skip to content

Commit 82cac4d

Browse files
committed
sql: add validation for CTAS / materialized views
Previously, no validation existed when backfilling queries for CTAS statements or creating materialized view. This could lead to scenarios with unnoticed data loss if anything went wrong. To address this, this patch adds validation operations that backfill queries into tables to confirm that row counts match the source. Fixes: #144957 Release note: None
1 parent 061c2ab commit 82cac4d

File tree

2 files changed

+145
-4
lines changed

2 files changed

+145
-4
lines changed

pkg/sql/schema_changer.go

Lines changed: 117 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,88 @@ func (sc *SchemaChanger) refreshMaterializedView(
308308

309309
const schemaChangerBackfillTxnDebugName = "schemaChangerBackfill"
310310

311+
// validateBackfillQueryIntoTable validates that source query matches the contents of
312+
// a backfilled table, when executing the query at queryTS.
313+
func (sc *SchemaChanger) validateBackfillQueryIntoTable(
314+
ctx context.Context, table catalog.TableDescriptor, queryTS hlc.Timestamp, query string,
315+
) error {
316+
var entryCount int64
317+
sd := NewInternalSessionData(ctx, sc.execCfg.Settings, "validateBackfillQueryIntoTable")
318+
sd.SessionData = *sc.sessionData
319+
// First get the expected row count for the source query at the target timestamp.
320+
if err := sc.execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
321+
parsedQuery, err := parser.ParseOne(query)
322+
if err != nil {
323+
return err
324+
}
325+
// If the query has an AOST clause, then we will remove it here.
326+
selectTop, ok := parsedQuery.AST.(*tree.Select)
327+
if ok {
328+
selectStmt := selectTop.Select
329+
var parenSel *tree.ParenSelect
330+
var ok bool
331+
for parenSel, ok = selectStmt.(*tree.ParenSelect); ok; parenSel, ok = selectStmt.(*tree.ParenSelect) {
332+
selectStmt = parenSel.Select.Select
333+
}
334+
sc, ok := selectStmt.(*tree.SelectClause)
335+
if ok && sc.From.AsOf.Expr != nil {
336+
sc.From.AsOf.Expr = nil
337+
query = parsedQuery.AST.String()
338+
}
339+
}
340+
// Inject the query and the time we should scan at.
341+
err = txn.KV().SetFixedTimestamp(ctx, queryTS)
342+
if err != nil {
343+
return err
344+
}
345+
countQuery := fmt.Sprintf("SELECT count(*) FROM (%s)", query)
346+
row, err := txn.QueryRow(ctx, "backfill-query-src-count", txn.KV(), countQuery)
347+
if err != nil {
348+
return err
349+
}
350+
entryCount = int64(tree.MustBeDInt(row[0]))
351+
return nil
352+
}, isql.WithSessionData(sd)); err != nil {
353+
return err
354+
}
355+
// Next run validation on table that was populated using count queries.
356+
// Get rid of the table ID prefix.
357+
index := table.GetPrimaryIndex()
358+
now := sc.db.KV().Clock().Now()
359+
builder := table.NewBuilder()
360+
// Make the table public so that we can validate our expected
361+
// counts match.
362+
mut := builder.BuildExistingMutable().(*tabledesc.Mutable)
363+
mut.SetPublic()
364+
count, err := countIndexRowsAndMaybeCheckUniqueness(ctx, mut, index, false,
365+
descs.NewHistoricalInternalExecTxnRunner(now, func(ctx context.Context, fn descs.InternalExecFn) error {
366+
return sc.execCfg.InternalDB.DescsTxn(ctx, func(
367+
ctx context.Context, txn descs.Txn,
368+
) error {
369+
if err := txn.KV().SetFixedTimestamp(ctx, now); err != nil {
370+
return err
371+
}
372+
return fn(ctx, txn)
373+
}, isql.WithPriority(admissionpb.BulkNormalPri))
374+
}),
375+
sessiondata.NodeUserWithBulkLowPriSessionDataOverride)
376+
if err != nil {
377+
return err
378+
}
379+
// Testing knob that allows us to manipulate counts to fail
380+
// the validation.
381+
if sc.testingKnobs.RunDuringQueryBackfillValidation != nil {
382+
count, err = sc.testingKnobs.RunDuringQueryBackfillValidation(entryCount, count)
383+
if err != nil {
384+
return err
385+
}
386+
}
387+
if count != entryCount {
388+
return errors.AssertionFailedf("backfill query did not populate index %q with expected number of rows (expected: %d, got: %d)", index.GetName(), entryCount, count)
389+
}
390+
return nil
391+
}
392+
311393
func (sc *SchemaChanger) backfillQueryIntoTable(
312394
ctx context.Context,
313395
table catalog.TableDescriptor,
@@ -334,6 +416,9 @@ func (sc *SchemaChanger) backfillQueryIntoTable(
334416
}
335417
}
336418
}()
419+
validationTime := ts
420+
var skipValidation bool
421+
// Get the expected entry count against the source query.
337422
err = sc.db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
338423
defer func() {
339424
isTxnRetry = true
@@ -430,7 +515,6 @@ func (sc *SchemaChanger) backfillQueryIntoTable(
430515
return err
431516
}
432517
}
433-
434518
res := kvpb.BulkOpSummary{}
435519
rw := NewCallbackResultWriter(func(ctx context.Context, row tree.Datums) error {
436520
// TODO(adityamaru): Use the BulkOpSummary for either telemetry or to
@@ -477,7 +561,6 @@ func (sc *SchemaChanger) backfillQueryIntoTable(
477561
}
478562
}
479563
}
480-
481564
planDistribution, _ := getPlanDistribution(
482565
ctx, localPlanner.Descriptors().HasUncommittedTypes(),
483566
localPlanner.extendedEvalCtx.SessionData(),
@@ -495,7 +578,22 @@ func (sc *SchemaChanger) backfillQueryIntoTable(
495578
}
496579
})
497580

498-
return planAndRunErr
581+
if planAndRunErr != nil {
582+
return planAndRunErr
583+
}
584+
// Otherwise, the select statement had no fixed timestamp. For
585+
// validating counts we will start with the read timestamp.
586+
if ts.IsEmpty() {
587+
validationTime = txn.KV().ReadTimestamp()
588+
}
589+
// If a virtual table was used then we can't conduct any kind of validation,
590+
// since our AOST query might be reading data not in KV.
591+
for _, tbl := range localPlanner.curPlan.mem.Metadata().AllTables() {
592+
if tbl.Table.IsVirtualTable() {
593+
skipValidation = true
594+
}
595+
}
596+
return nil
499597
})
500598

501599
// BatchTimestampBeforeGCError is retryable for the schema changer, but we
@@ -506,7 +604,18 @@ func (sc *SchemaChanger) backfillQueryIntoTable(
506604
errors.Wrap(err, "unable to retry backfill since fixed timestamp is before the GC timestamp"),
507605
)
508606
}
509-
return err
607+
if err != nil {
608+
return err
609+
}
610+
611+
// Validation will be skipped if the query is not AOST safe.
612+
// (i.e. reading non-KV data via CRDB internal)
613+
if !skipValidation {
614+
if err := sc.validateBackfillQueryIntoTable(ctx, table, validationTime, query); err != nil {
615+
return err
616+
}
617+
}
618+
return nil
510619
}
511620

512621
// maybe backfill a created table by executing the AS query. Return nil if
@@ -2773,6 +2882,10 @@ type SchemaChangerTestingKnobs struct {
27732882

27742883
// RunBeforeModifyRowLevelTTL is called just before the modify row level TTL is committed.
27752884
RunBeforeModifyRowLevelTTL func() error
2885+
2886+
// RunAfterQueryBackfillValidation is called right after validation is executed
2887+
// for a query backfill.
2888+
RunDuringQueryBackfillValidation func(expectedCount int64, currentCount int64) (newCurrentCount int64, err error)
27762889
}
27772890

27782891
// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.

pkg/sql/schema_changer_test.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7883,3 +7883,31 @@ func TestLeaseGenerationBumpWithSchemaChange(t *testing.T) {
78837883
runner.Exec(t, "ALTER TABLE t1 ALTER PRIMARY KEY USING COLUMNS(n, j)")
78847884
require.NoError(t, grp.Wait())
78857885
}
7886+
7887+
// TestCreateTableAsValidationFailure simulates a synthetic validation
7888+
// failure for CREATE TABLE AS.
7889+
func TestCreateTableAsValidationFailure(t *testing.T) {
7890+
defer leaktest.AfterTest(t)()
7891+
defer log.Scope(t).Close(t)
7892+
7893+
s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{
7894+
Knobs: base.TestingKnobs{
7895+
SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{
7896+
RunDuringQueryBackfillValidation: func(expectedCount int64, currentCount int64) (newCurrentCount int64, err error) {
7897+
return 0, nil
7898+
},
7899+
},
7900+
},
7901+
})
7902+
7903+
defer s.Stopper().Stop(context.Background())
7904+
runner := sqlutils.MakeSQLRunner(sqlDB)
7905+
// Create table table and populate it.
7906+
runner.Exec(t, "CREATE TABLE t1(n int PRIMARY KEY)")
7907+
runner.Exec(t, "INSERT INTO t1 VALUES (1)")
7908+
runner.Exec(t, "INSERT INTO t1 VALUES (2)")
7909+
runner.Exec(t, "INSERT INTO t1 VALUES (3)")
7910+
// Execute a CTAS and CREATE MATERIALIZED VIEW statements that should fail.
7911+
runner.ExpectErr(t, "backfill query did not populate index \"t2_pkey\" with expected number of rows", "CREATE TABLE t2 AS (SELECT * FROM t1)")
7912+
runner.ExpectErr(t, "backfill query did not populate index \"t2_pkey\" with expected number of rows", "CREATE MATERIALIZED VIEW t2 AS (SELECT n FROM t1)")
7913+
}

0 commit comments

Comments
 (0)