Skip to content

Commit 7167c08

Browse files
craig[bot]fqaziandyyang890
committed
154061: sql/schemachanger: simplify validation for CTAS statements r=fqazi a=fqazi Previously, the validation we used for CTAS statements would re-run the same query using the same AOST. This worked fine for deterministic queries, where the count between the re-executed query and backfill would match. However, if the query was non-deterministic (e.g., due to non-deterministic functions/routines), this check would fail. To address this, this patch removes the AOST query validation and only uses the row count from the bulk writer Fixes: #153363 Release note: None 154143: changefeedccl: deflake tests that expect frequent span-level checkpoints r=aerfrei,asg0451 a=andyyang890 **changefeedccl: delete leftover comment** This patch deletes a leftover comment that should've been deleted in the commit that had aggregators periodically flush their progress. Release note: None --- **changefeedccl: deflake tests that expect frequent span-level checkpoints** In a recent commit, the interval at which change aggregators would flush their frontiers to the change frontier was decoupled from the span-level checkpoint interval (which is configurable via a cluster setting). This caused some tests that only set the cluster setting (and not `min_checkpoint_frequency`) to a low value to sometimes flake. This patch updates all such tests to also configure the `min_checkpoint_frequency` option. [master] Fixes #151279 [release-25.4] Informs #154066 Release note: None --- **changefeedccl: deflake initial backfill checkpoint tests** This patch deflakes the initial backfill checkpoint tests, which filter out resolved spans to force gaps during a backfill. The tests however did not ensure that there would be at least one span that was not filtered out, which would result in no span-level checkpoint being created, leading the tests to time out. Release note: None Co-authored-by: Faizan Qazi <[email protected]> Co-authored-by: Andy Yang <[email protected]>
3 parents ffca318 + 512edf5 + fd1b981 commit 7167c08

File tree

6 files changed

+60
-108
lines changed

6 files changed

+60
-108
lines changed

pkg/ccl/changefeedccl/alter_changefeed_test.go

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1520,19 +1520,27 @@ func TestAlterChangefeedAddTargetsDuringBackfill(t *testing.T) {
15201520

15211521
// Emit resolved events for the majority of spans. Be extra paranoid and ensure that
15221522
// we have at least 1 span for which we don't emit resolvedFoo timestamp (to force checkpointing).
1523-
haveGaps := false
1524-
knobs.FilterSpanWithMutation = func(r *jobspb.ResolvedSpan) (bool, error) {
1523+
// We however also need to ensure there's at least one span that isn't filtered out.
1524+
var allowedOne, haveGaps bool
1525+
knobs.FilterSpanWithMutation = func(r *jobspb.ResolvedSpan) (filter bool, _ error) {
15251526
rndMu.Lock()
15261527
defer rndMu.Unlock()
1528+
defer func() {
1529+
t.Logf("resolved span: %s@%s, filter: %t", r.Span, r.Timestamp, filter)
1530+
}()
15271531

15281532
if r.Span.Equal(fooTableSpan) {
15291533
return true, nil
15301534
}
1531-
if haveGaps {
1532-
return rndMu.rnd.Intn(10) > 7, nil
1535+
if !allowedOne {
1536+
allowedOne = true
1537+
return false, nil
15331538
}
1534-
haveGaps = true
1535-
return true, nil
1539+
if !haveGaps {
1540+
haveGaps = true
1541+
return true, nil
1542+
}
1543+
return rndMu.rnd.Intn(10) > 7, nil
15361544
}
15371545

15381546
// Checkpoint progress frequently, and set the checkpoint size limit.
@@ -1542,7 +1550,8 @@ func TestAlterChangefeedAddTargetsDuringBackfill(t *testing.T) {
15421550
context.Background(), &s.Server.ClusterSettings().SV, maxCheckpointSize)
15431551

15441552
registry := s.Server.JobRegistry().(*jobs.Registry)
1545-
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo WITH resolved = '100ms'`)
1553+
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo
1554+
WITH resolved = '100ms', min_checkpoint_frequency='1ns'`)
15461555

15471556
g := ctxgroup.WithContext(context.Background())
15481557
g.Go(func() error {
@@ -1581,8 +1590,10 @@ func TestAlterChangefeedAddTargetsDuringBackfill(t *testing.T) {
15811590

15821591
// Collect spans we attempt to resolve after when we resume.
15831592
var resolvedFoo []roachpb.Span
1584-
knobs.FilterSpanWithMutation = func(r *jobspb.ResolvedSpan) (bool, error) {
1585-
t.Logf("resolved span: %#v", r)
1593+
knobs.FilterSpanWithMutation = func(r *jobspb.ResolvedSpan) (filter bool, _ error) {
1594+
defer func() {
1595+
t.Logf("resolved span: %s@%s, filter: %t", r.Span, r.Timestamp, filter)
1596+
}()
15861597
if !r.Span.Equal(fooTableSpan) {
15871598
resolvedFoo = append(resolvedFoo, r.Span)
15881599
}

pkg/ccl/changefeedccl/changefeed_processors.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -932,12 +932,6 @@ func (ca *changeAggregator) noteResolvedSpan(resolved jobspb.ResolvedSpan) error
932932

933933
forceFlush := resolved.BoundaryType != jobspb.ResolvedSpan_NONE
934934

935-
// NB: if we miss flush window, and the flush frequency is fairly high (minutes),
936-
// it might be a while before frontier advances again (particularly if
937-
// the number of ranges and closed timestamp settings are high).
938-
// TODO(yevgeniy): Consider doing something similar to how job checkpointing
939-
// works in the frontier where if we missed the window to checkpoint, we will attempt
940-
// the checkpoint at the next opportune moment.
941935
checkpointFrontier := (advanced && forceFlush) || ca.frontierFlushLimiter.canSave(ctx)
942936

943937
if checkpointFrontier {

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2861,7 +2861,8 @@ func TestChangefeedLaggingSpanCheckpointing(t *testing.T) {
28612861

28622862
var jobID jobspb.JobID
28632863
sqlDB.QueryRow(t,
2864-
`CREATE CHANGEFEED FOR foo INTO 'null://' WITH resolved='50ms', no_initial_scan, cursor=$1`, tsStr,
2864+
`CREATE CHANGEFEED FOR foo INTO 'null://'
2865+
WITH resolved='50ms', min_checkpoint_frequency='50ms', no_initial_scan, cursor=$1`, tsStr,
28652866
).Scan(&jobID)
28662867

28672868
// Helper to read job progress
@@ -3003,7 +3004,8 @@ func TestChangefeedSchemaChangeBackfillCheckpoint(t *testing.T) {
30033004
}
30043005

30053006
// Setup changefeed job details, avoid relying on initial scan functionality
3006-
baseFeed := feed(t, f, `CREATE CHANGEFEED FOR foo WITH resolved='100ms', min_checkpoint_frequency='100ms', no_initial_scan`)
3007+
baseFeed := feed(t, f, `CREATE CHANGEFEED FOR foo
3008+
WITH resolved='100ms', min_checkpoint_frequency='1ns', no_initial_scan`)
30073009
jobFeed := baseFeed.(cdctest.EnterpriseTestFeed)
30083010
jobRegistry := s.Server.JobRegistry().(*jobs.Registry)
30093011

@@ -9174,22 +9176,21 @@ func TestChangefeedBackfillCheckpoint(t *testing.T) {
91749176

91759177
// Emit resolved events for majority of spans. Be extra paranoid and ensure that
91769178
// we have at least 1 span for which we don't emit resolved timestamp (to force checkpointing).
9177-
haveGaps := false
9179+
// We however also need to ensure there's at least one span that isn't filtered out.
9180+
var allowedOne, haveGaps bool
91789181
knobs.FilterSpanWithMutation = func(r *jobspb.ResolvedSpan) (bool, error) {
91799182
if r.Span.Equal(tableSpan) {
9180-
// Do not emit resolved events for the entire table span.
9181-
// We "simulate" large table by splitting single table span into many parts, so
9182-
// we want to resolve those sub-spans instead of the entire table span.
9183-
// However, we have to emit something -- otherwise the entire changefeed
9184-
// machine would not work.
9185-
r.Span.EndKey = tableSpan.Key.Next()
9183+
return true, nil
9184+
}
9185+
if !allowedOne {
9186+
allowedOne = true
91869187
return false, nil
91879188
}
9188-
if haveGaps {
9189-
return rnd.Intn(10) > 7, nil
9189+
if !haveGaps {
9190+
haveGaps = true
9191+
return true, nil
91909192
}
9191-
haveGaps = true
9192-
return true, nil
9193+
return rnd.Intn(10) > 7, nil
91939194
}
91949195

91959196
// Checkpoint progress frequently, and set the checkpoint size limit.
@@ -9199,7 +9200,8 @@ func TestChangefeedBackfillCheckpoint(t *testing.T) {
91999200
context.Background(), &s.Server.ClusterSettings().SV, maxCheckpointSize)
92009201

92019202
registry := s.Server.JobRegistry().(*jobs.Registry)
9202-
foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH resolved='100ms'`)
9203+
foo := feed(t, f, `CREATE CHANGEFEED FOR foo
9204+
WITH resolved='100ms', min_checkpoint_frequency='1ns'`)
92039205
// Some test feeds (kafka) are not buffered, so we have to consume messages.
92049206
var shouldDrain int32 = 1
92059207
g := ctxgroup.WithContext(context.Background())
@@ -12059,7 +12061,8 @@ func TestChangefeedProtectedTimestampUpdate(t *testing.T) {
1205912061
require.Equal(t, int64(0), managePTSCount)
1206012062
require.Equal(t, int64(0), managePTSErrorCount)
1206112063

12062-
createStmt := `CREATE CHANGEFEED FOR foo WITH resolved='10ms', no_initial_scan`
12064+
createStmt := `CREATE CHANGEFEED FOR foo
12065+
WITH resolved='10ms', min_checkpoint_frequency='10ms', no_initial_scan`
1206312066
testFeed := feed(t, f, createStmt)
1206412067
defer closeFeed(t, testFeed)
1206512068

@@ -12172,7 +12175,8 @@ func TestChangefeedProtectedTimestampUpdateError(t *testing.T) {
1217212175
return errors.New("test error")
1217312176
}
1217412177

12175-
createStmt := `CREATE CHANGEFEED FOR foo WITH resolved='10ms', no_initial_scan`
12178+
createStmt := `CREATE CHANGEFEED FOR foo
12179+
WITH resolved='10ms', min_checkpoint_frequency='10ms', no_initial_scan`
1217612180
testFeed := feed(t, f, createStmt)
1217712181
defer closeFeed(t, testFeed)
1217812182

pkg/ccl/changefeedccl/protected_timestamps_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -970,7 +970,8 @@ func TestChangefeedProtectedTimestampUpdateForMultipleTables(t *testing.T) {
970970
require.Equal(t, int64(0), managePTSCount)
971971
require.Equal(t, int64(0), managePTSErrorCount)
972972

973-
createStmt := `CREATE CHANGEFEED FOR foo, bar WITH resolved='10ms', initial_scan='no'`
973+
createStmt := `CREATE CHANGEFEED FOR foo, bar
974+
WITH resolved='10ms', min_checkpoint_frequency='100ms', initial_scan='no'`
974975
testFeed := feed(t, f, createStmt)
975976
defer closeFeed(t, testFeed)
976977

@@ -1114,7 +1115,8 @@ func TestChangefeedPerTableProtectedTimestampProgression(t *testing.T) {
11141115
}
11151116
}
11161117

1117-
createStmt := `CREATE CHANGEFEED FOR table1, table2, table3 WITH resolved='100ms'`
1118+
createStmt := `CREATE CHANGEFEED FOR table1, table2, table3
1119+
WITH resolved='100ms', min_checkpoint_frequency='100ms'`
11181120
testFeed := feed(t, f, createStmt)
11191121
defer closeFeed(t, testFeed)
11201122

pkg/sql/schema_changer.go

Lines changed: 5 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -312,58 +312,11 @@ const schemaChangerBackfillTxnDebugName = "schemaChangerBackfill"
312312
// validateBackfillQueryIntoTable validates that source query matches the contents of
313313
// a backfilled table, when executing the query at queryTS.
314314
func (sc *SchemaChanger) validateBackfillQueryIntoTable(
315-
ctx context.Context,
316-
table catalog.TableDescriptor,
317-
entryCountWrittenToPrimaryIdx int64,
318-
queryTS hlc.Timestamp,
319-
query string,
320-
skipAOSTValidation bool,
315+
ctx context.Context, table catalog.TableDescriptor, entryCountWrittenToPrimaryIdx int64,
321316
) error {
322-
var aostEntryCount int64
323317
sd := NewInternalSessionData(ctx, sc.execCfg.Settings, "validateBackfillQueryIntoTable")
324318
sd.SessionData = *sc.sessionData
325-
// First get the expected row count for the source query at the target timestamp.
326-
if err := sc.execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
327-
if skipAOSTValidation {
328-
return nil
329-
}
330-
parsedQuery, err := parser.ParseOne(query)
331-
if err != nil {
332-
return err
333-
}
334-
// If the query has an AOST clause, then we will remove it here.
335-
selectTop, ok := parsedQuery.AST.(*tree.Select)
336-
if ok {
337-
selectStmt := selectTop.Select
338-
var parenSel *tree.ParenSelect
339-
var ok bool
340-
for parenSel, ok = selectStmt.(*tree.ParenSelect); ok; parenSel, ok = selectStmt.(*tree.ParenSelect) {
341-
selectStmt = parenSel.Select.Select
342-
}
343-
sc, ok := selectStmt.(*tree.SelectClause)
344-
if ok && sc.From.AsOf.Expr != nil {
345-
sc.From.AsOf.Expr = nil
346-
query = parsedQuery.AST.String()
347-
}
348-
}
349-
// Inject the query and the time we should scan at.
350-
err = txn.KV().SetFixedTimestamp(ctx, queryTS)
351-
if err != nil {
352-
return err
353-
}
354-
countQuery := fmt.Sprintf("SELECT count(*) FROM (%s)", query)
355-
row, err := txn.QueryRow(ctx, "backfill-query-src-count", txn.KV(), countQuery)
356-
if err != nil {
357-
return err
358-
}
359-
aostEntryCount = int64(tree.MustBeDInt(row[0]))
360-
return nil
361-
}, isql.WithSessionData(sd),
362-
isql.WithPriority(admissionpb.BulkNormalPri),
363-
); err != nil {
364-
return err
365-
}
366-
// Next run validation on table that was populated using count queries.
319+
// Run validation on table that was populated using count queries.
367320
// Get rid of the table ID prefix.
368321
index := table.GetPrimaryIndex()
369322
now := sc.db.KV().Clock().Now()
@@ -390,7 +343,7 @@ func (sc *SchemaChanger) validateBackfillQueryIntoTable(
390343
// Testing knob that allows us to manipulate counts to fail
391344
// the validation.
392345
if sc.testingKnobs.RunDuringQueryBackfillValidation != nil {
393-
newTblEntryCount, err = sc.testingKnobs.RunDuringQueryBackfillValidation(aostEntryCount, newTblEntryCount)
346+
newTblEntryCount, err = sc.testingKnobs.RunDuringQueryBackfillValidation(entryCountWrittenToPrimaryIdx, newTblEntryCount)
394347
if err != nil {
395348
return err
396349
}
@@ -403,9 +356,6 @@ func (sc *SchemaChanger) validateBackfillQueryIntoTable(
403356
)
404357
}
405358
}
406-
if !skipAOSTValidation && newTblEntryCount != aostEntryCount {
407-
return errors.AssertionFailedf("backfill query did not populate index %q with expected number of rows (expected: %d, got: %d)", index.GetName(), aostEntryCount, newTblEntryCount)
408-
}
409359
return nil
410360
}
411361

@@ -435,8 +385,6 @@ func (sc *SchemaChanger) backfillQueryIntoTable(
435385
}
436386
}
437387
}()
438-
validationTime := ts
439-
var skipAOSTValidation bool
440388
bulkSummary := kvpb.BulkOpSummary{}
441389

442390
err = sc.db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
@@ -596,23 +544,7 @@ func (sc *SchemaChanger) backfillQueryIntoTable(
596544
return
597545
}
598546
})
599-
600-
if planAndRunErr != nil {
601-
return planAndRunErr
602-
}
603-
// Otherwise, the select statement had no fixed timestamp. For
604-
// validating counts we will start with the read timestamp.
605-
if ts.IsEmpty() {
606-
validationTime = txn.KV().ReadTimestamp()
607-
}
608-
// If a virtual table was used then we can't conduct any kind of validation,
609-
// since our AOST query might be reading data not in KV.
610-
for _, tbl := range localPlanner.curPlan.mem.Metadata().AllTables() {
611-
if tbl.Table.IsVirtualTable() {
612-
skipAOSTValidation = true
613-
}
614-
}
615-
return nil
547+
return planAndRunErr
616548
})
617549

618550
// BatchTimestampBeforeGCError is retryable for the schema changer, but we
@@ -638,15 +570,14 @@ func (sc *SchemaChanger) backfillQueryIntoTable(
638570

639571
// Validation will be skipped if the query is not AOST safe.
640572
// (i.e. reading non-KV data via CRDB internal)
641-
if err := sc.validateBackfillQueryIntoTable(ctx, table, entriesWrittenToPrimaryIdx, validationTime, query, skipAOSTValidation); err != nil {
573+
if err := sc.validateBackfillQueryIntoTable(ctx, table, entriesWrittenToPrimaryIdx); err != nil {
642574
return err
643575
}
644576
return nil
645577
}
646578

647579
// maybe backfill a created table by executing the AS query. Return nil if
648580
// successfully backfilled.
649-
//
650581
// Note that this does not connect to the tracing settings of the
651582
// surrounding SQL transaction. This should be OK as (at the time of
652583
// this writing) this code path is only used for standalone CREATE

pkg/sql/schema_changer_test.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8033,11 +8033,15 @@ func TestCreateTableAsValidationFailure(t *testing.T) {
80338033
defer leaktest.AfterTest(t)()
80348034
defer log.Scope(t).Close(t)
80358035

8036+
var hookEnabled atomic.Bool
80368037
s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{
80378038
Knobs: base.TestingKnobs{
80388039
SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{
80398040
RunDuringQueryBackfillValidation: func(expectedCount int64, currentCount int64) (newCurrentCount int64, err error) {
8040-
return 0, nil
8041+
if hookEnabled.Load() {
8042+
return 0, nil
8043+
}
8044+
return currentCount, nil
80418045
},
80428046
},
80438047
},
@@ -8050,7 +8054,13 @@ func TestCreateTableAsValidationFailure(t *testing.T) {
80508054
runner.Exec(t, "INSERT INTO t1 VALUES (1)")
80518055
runner.Exec(t, "INSERT INTO t1 VALUES (2)")
80528056
runner.Exec(t, "INSERT INTO t1 VALUES (3)")
8057+
// Validate cases that should work
8058+
runner.Exec(t, "CREATE TABLE simple_copy AS (SELECT * FROM t1)")
8059+
// Row count is can change.
8060+
runner.Exec(t, "CREATE TABLE t_random AS (SELECT * FROM t1 WHERE random() > 0.5)")
8061+
runner.Exec(t, " CREATE TABLE t_random2 AS (SELECT * FROM generate_series(0, CAST((100 * random()) AS INT)));")
80538062
// Execute a CTAS and CREATE MATERIALIZED VIEW statements that should fail.
8063+
hookEnabled.Store(true)
80548064
runner.ExpectErr(t, "backfill query did not populate index \"t2_pkey\" with expected number of rows", "CREATE TABLE t2 AS (SELECT * FROM t1)")
80558065
runner.ExpectErr(t, "backfill query did not populate index \"t2_pkey\" with expected number of rows", "CREATE MATERIALIZED VIEW t2 AS (SELECT n FROM t1)")
80568066
}

0 commit comments

Comments
 (0)