Skip to content

Commit 512edf5

Browse files
committed
sql/schemachanger: simplify validation for CTAS statements
Previously, the validation we used for CTAS statements would re-run the same query using the same AOST. This worked fine for deterministic queries, where the count between the re-executed query and backfill would match. However, if the query was non-deterministic (e.g., due to non-deterministic functions/routines), this check would fail. To address this, this patch removes the AOST query validation and only uses the row count from the bulk writer Fixes: #153363 Release note: None
1 parent 4fe5b55 commit 512edf5

File tree

2 files changed

+16
-75
lines changed

2 files changed

+16
-75
lines changed

pkg/sql/schema_changer.go

Lines changed: 5 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -312,58 +312,11 @@ const schemaChangerBackfillTxnDebugName = "schemaChangerBackfill"
312312
// validateBackfillQueryIntoTable validates that source query matches the contents of
313313
// a backfilled table, when executing the query at queryTS.
314314
func (sc *SchemaChanger) validateBackfillQueryIntoTable(
315-
ctx context.Context,
316-
table catalog.TableDescriptor,
317-
entryCountWrittenToPrimaryIdx int64,
318-
queryTS hlc.Timestamp,
319-
query string,
320-
skipAOSTValidation bool,
315+
ctx context.Context, table catalog.TableDescriptor, entryCountWrittenToPrimaryIdx int64,
321316
) error {
322-
var aostEntryCount int64
323317
sd := NewInternalSessionData(ctx, sc.execCfg.Settings, "validateBackfillQueryIntoTable")
324318
sd.SessionData = *sc.sessionData
325-
// First get the expected row count for the source query at the target timestamp.
326-
if err := sc.execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
327-
if skipAOSTValidation {
328-
return nil
329-
}
330-
parsedQuery, err := parser.ParseOne(query)
331-
if err != nil {
332-
return err
333-
}
334-
// If the query has an AOST clause, then we will remove it here.
335-
selectTop, ok := parsedQuery.AST.(*tree.Select)
336-
if ok {
337-
selectStmt := selectTop.Select
338-
var parenSel *tree.ParenSelect
339-
var ok bool
340-
for parenSel, ok = selectStmt.(*tree.ParenSelect); ok; parenSel, ok = selectStmt.(*tree.ParenSelect) {
341-
selectStmt = parenSel.Select.Select
342-
}
343-
sc, ok := selectStmt.(*tree.SelectClause)
344-
if ok && sc.From.AsOf.Expr != nil {
345-
sc.From.AsOf.Expr = nil
346-
query = parsedQuery.AST.String()
347-
}
348-
}
349-
// Inject the query and the time we should scan at.
350-
err = txn.KV().SetFixedTimestamp(ctx, queryTS)
351-
if err != nil {
352-
return err
353-
}
354-
countQuery := fmt.Sprintf("SELECT count(*) FROM (%s)", query)
355-
row, err := txn.QueryRow(ctx, "backfill-query-src-count", txn.KV(), countQuery)
356-
if err != nil {
357-
return err
358-
}
359-
aostEntryCount = int64(tree.MustBeDInt(row[0]))
360-
return nil
361-
}, isql.WithSessionData(sd),
362-
isql.WithPriority(admissionpb.BulkNormalPri),
363-
); err != nil {
364-
return err
365-
}
366-
// Next run validation on table that was populated using count queries.
319+
// Run validation on table that was populated using count queries.
367320
// Get rid of the table ID prefix.
368321
index := table.GetPrimaryIndex()
369322
now := sc.db.KV().Clock().Now()
@@ -390,7 +343,7 @@ func (sc *SchemaChanger) validateBackfillQueryIntoTable(
390343
// Testing knob that allows us to manipulate counts to fail
391344
// the validation.
392345
if sc.testingKnobs.RunDuringQueryBackfillValidation != nil {
393-
newTblEntryCount, err = sc.testingKnobs.RunDuringQueryBackfillValidation(aostEntryCount, newTblEntryCount)
346+
newTblEntryCount, err = sc.testingKnobs.RunDuringQueryBackfillValidation(entryCountWrittenToPrimaryIdx, newTblEntryCount)
394347
if err != nil {
395348
return err
396349
}
@@ -403,9 +356,6 @@ func (sc *SchemaChanger) validateBackfillQueryIntoTable(
403356
)
404357
}
405358
}
406-
if !skipAOSTValidation && newTblEntryCount != aostEntryCount {
407-
return errors.AssertionFailedf("backfill query did not populate index %q with expected number of rows (expected: %d, got: %d)", index.GetName(), aostEntryCount, newTblEntryCount)
408-
}
409359
return nil
410360
}
411361

@@ -435,8 +385,6 @@ func (sc *SchemaChanger) backfillQueryIntoTable(
435385
}
436386
}
437387
}()
438-
validationTime := ts
439-
var skipAOSTValidation bool
440388
bulkSummary := kvpb.BulkOpSummary{}
441389

442390
err = sc.db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
@@ -596,23 +544,7 @@ func (sc *SchemaChanger) backfillQueryIntoTable(
596544
return
597545
}
598546
})
599-
600-
if planAndRunErr != nil {
601-
return planAndRunErr
602-
}
603-
// Otherwise, the select statement had no fixed timestamp. For
604-
// validating counts we will start with the read timestamp.
605-
if ts.IsEmpty() {
606-
validationTime = txn.KV().ReadTimestamp()
607-
}
608-
// If a virtual table was used then we can't conduct any kind of validation,
609-
// since our AOST query might be reading data not in KV.
610-
for _, tbl := range localPlanner.curPlan.mem.Metadata().AllTables() {
611-
if tbl.Table.IsVirtualTable() {
612-
skipAOSTValidation = true
613-
}
614-
}
615-
return nil
547+
return planAndRunErr
616548
})
617549

618550
// BatchTimestampBeforeGCError is retryable for the schema changer, but we
@@ -638,15 +570,14 @@ func (sc *SchemaChanger) backfillQueryIntoTable(
638570

639571
// Validation will be skipped if the query is not AOST safe.
640572
// (i.e. reading non-KV data via CRDB internal)
641-
if err := sc.validateBackfillQueryIntoTable(ctx, table, entriesWrittenToPrimaryIdx, validationTime, query, skipAOSTValidation); err != nil {
573+
if err := sc.validateBackfillQueryIntoTable(ctx, table, entriesWrittenToPrimaryIdx); err != nil {
642574
return err
643575
}
644576
return nil
645577
}
646578

647579
// maybe backfill a created table by executing the AS query. Return nil if
648580
// successfully backfilled.
649-
//
650581
// Note that this does not connect to the tracing settings of the
651582
// surrounding SQL transaction. This should be OK as (at the time of
652583
// this writing) this code path is only used for standalone CREATE

pkg/sql/schema_changer_test.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8033,11 +8033,15 @@ func TestCreateTableAsValidationFailure(t *testing.T) {
80338033
defer leaktest.AfterTest(t)()
80348034
defer log.Scope(t).Close(t)
80358035

8036+
var hookEnabled atomic.Bool
80368037
s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{
80378038
Knobs: base.TestingKnobs{
80388039
SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{
80398040
RunDuringQueryBackfillValidation: func(expectedCount int64, currentCount int64) (newCurrentCount int64, err error) {
8040-
return 0, nil
8041+
if hookEnabled.Load() {
8042+
return 0, nil
8043+
}
8044+
return currentCount, nil
80418045
},
80428046
},
80438047
},
@@ -8050,7 +8054,13 @@ func TestCreateTableAsValidationFailure(t *testing.T) {
80508054
runner.Exec(t, "INSERT INTO t1 VALUES (1)")
80518055
runner.Exec(t, "INSERT INTO t1 VALUES (2)")
80528056
runner.Exec(t, "INSERT INTO t1 VALUES (3)")
8057+
// Validate cases that should work
8058+
runner.Exec(t, "CREATE TABLE simple_copy AS (SELECT * FROM t1)")
8059+
// Row count is can change.
8060+
runner.Exec(t, "CREATE TABLE t_random AS (SELECT * FROM t1 WHERE random() > 0.5)")
8061+
runner.Exec(t, " CREATE TABLE t_random2 AS (SELECT * FROM generate_series(0, CAST((100 * random()) AS INT)));")
80538062
// Execute a CTAS and CREATE MATERIALIZED VIEW statements that should fail.
8063+
hookEnabled.Store(true)
80548064
runner.ExpectErr(t, "backfill query did not populate index \"t2_pkey\" with expected number of rows", "CREATE TABLE t2 AS (SELECT * FROM t1)")
80558065
runner.ExpectErr(t, "backfill query did not populate index \"t2_pkey\" with expected number of rows", "CREATE MATERIALIZED VIEW t2 AS (SELECT n FROM t1)")
80568066
}

0 commit comments

Comments
 (0)