Skip to content

Commit a72af2c

Browse files
authored
Merge pull request #154193 from cockroachdb/blathers/backport-release-25.4-154143
release-25.4: changefeedccl: deflake tests that expect frequent span-level checkpoints
2 parents b288920 + c490602 commit a72af2c

File tree

4 files changed

+44
-33
lines changed

4 files changed

+44
-33
lines changed

pkg/ccl/changefeedccl/alter_changefeed_test.go

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1520,19 +1520,27 @@ func TestAlterChangefeedAddTargetsDuringBackfill(t *testing.T) {
15201520

15211521
// Emit resolved events for the majority of spans. Be extra paranoid and ensure that
15221522
// we have at least 1 span for which we don't emit resolvedFoo timestamp (to force checkpointing).
1523-
haveGaps := false
1524-
knobs.FilterSpanWithMutation = func(r *jobspb.ResolvedSpan) (bool, error) {
1523+
// We however also need to ensure there's at least one span that isn't filtered out.
1524+
var allowedOne, haveGaps bool
1525+
knobs.FilterSpanWithMutation = func(r *jobspb.ResolvedSpan) (filter bool, _ error) {
15251526
rndMu.Lock()
15261527
defer rndMu.Unlock()
1528+
defer func() {
1529+
t.Logf("resolved span: %s@%s, filter: %t", r.Span, r.Timestamp, filter)
1530+
}()
15271531

15281532
if r.Span.Equal(fooTableSpan) {
15291533
return true, nil
15301534
}
1531-
if haveGaps {
1532-
return rndMu.rnd.Intn(10) > 7, nil
1535+
if !allowedOne {
1536+
allowedOne = true
1537+
return false, nil
15331538
}
1534-
haveGaps = true
1535-
return true, nil
1539+
if !haveGaps {
1540+
haveGaps = true
1541+
return true, nil
1542+
}
1543+
return rndMu.rnd.Intn(10) > 7, nil
15361544
}
15371545

15381546
// Checkpoint progress frequently, and set the checkpoint size limit.
@@ -1542,7 +1550,8 @@ func TestAlterChangefeedAddTargetsDuringBackfill(t *testing.T) {
15421550
context.Background(), &s.Server.ClusterSettings().SV, maxCheckpointSize)
15431551

15441552
registry := s.Server.JobRegistry().(*jobs.Registry)
1545-
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo WITH resolved = '100ms'`)
1553+
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo
1554+
WITH resolved = '100ms', min_checkpoint_frequency='1ns'`)
15461555

15471556
g := ctxgroup.WithContext(context.Background())
15481557
g.Go(func() error {
@@ -1581,8 +1590,10 @@ func TestAlterChangefeedAddTargetsDuringBackfill(t *testing.T) {
15811590

15821591
// Collect spans we attempt to resolve after when we resume.
15831592
var resolvedFoo []roachpb.Span
1584-
knobs.FilterSpanWithMutation = func(r *jobspb.ResolvedSpan) (bool, error) {
1585-
t.Logf("resolved span: %#v", r)
1593+
knobs.FilterSpanWithMutation = func(r *jobspb.ResolvedSpan) (filter bool, _ error) {
1594+
defer func() {
1595+
t.Logf("resolved span: %s@%s, filter: %t", r.Span, r.Timestamp, filter)
1596+
}()
15861597
if !r.Span.Equal(fooTableSpan) {
15871598
resolvedFoo = append(resolvedFoo, r.Span)
15881599
}

pkg/ccl/changefeedccl/changefeed_processors.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -932,12 +932,6 @@ func (ca *changeAggregator) noteResolvedSpan(resolved jobspb.ResolvedSpan) error
932932

933933
forceFlush := resolved.BoundaryType != jobspb.ResolvedSpan_NONE
934934

935-
// NB: if we miss flush window, and the flush frequency is fairly high (minutes),
936-
// it might be a while before frontier advances again (particularly if
937-
// the number of ranges and closed timestamp settings are high).
938-
// TODO(yevgeniy): Consider doing something similar to how job checkpointing
939-
// works in the frontier where if we missed the window to checkpoint, we will attempt
940-
// the checkpoint at the next opportune moment.
941935
checkpointFrontier := (advanced && forceFlush) || ca.frontierFlushLimiter.canSave(ctx)
942936

943937
if checkpointFrontier {

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2810,7 +2810,8 @@ func TestChangefeedLaggingSpanCheckpointing(t *testing.T) {
28102810

28112811
var jobID jobspb.JobID
28122812
sqlDB.QueryRow(t,
2813-
`CREATE CHANGEFEED FOR foo INTO 'null://' WITH resolved='50ms', no_initial_scan, cursor=$1`, tsStr,
2813+
`CREATE CHANGEFEED FOR foo INTO 'null://'
2814+
WITH resolved='50ms', min_checkpoint_frequency='50ms', no_initial_scan, cursor=$1`, tsStr,
28142815
).Scan(&jobID)
28152816

28162817
// Helper to read job progress
@@ -2952,7 +2953,8 @@ func TestChangefeedSchemaChangeBackfillCheckpoint(t *testing.T) {
29522953
}
29532954

29542955
// Setup changefeed job details, avoid relying on initial scan functionality
2955-
baseFeed := feed(t, f, `CREATE CHANGEFEED FOR foo WITH resolved='100ms', min_checkpoint_frequency='100ms', no_initial_scan`)
2956+
baseFeed := feed(t, f, `CREATE CHANGEFEED FOR foo
2957+
WITH resolved='100ms', min_checkpoint_frequency='1ns', no_initial_scan`)
29562958
jobFeed := baseFeed.(cdctest.EnterpriseTestFeed)
29572959
jobRegistry := s.Server.JobRegistry().(*jobs.Registry)
29582960

@@ -9124,22 +9126,21 @@ func TestChangefeedBackfillCheckpoint(t *testing.T) {
91249126

91259127
// Emit resolved events for majority of spans. Be extra paranoid and ensure that
91269128
// we have at least 1 span for which we don't emit resolved timestamp (to force checkpointing).
9127-
haveGaps := false
9129+
// We however also need to ensure there's at least one span that isn't filtered out.
9130+
var allowedOne, haveGaps bool
91289131
knobs.FilterSpanWithMutation = func(r *jobspb.ResolvedSpan) (bool, error) {
91299132
if r.Span.Equal(tableSpan) {
9130-
// Do not emit resolved events for the entire table span.
9131-
// We "simulate" large table by splitting single table span into many parts, so
9132-
// we want to resolve those sub-spans instead of the entire table span.
9133-
// However, we have to emit something -- otherwise the entire changefeed
9134-
// machine would not work.
9135-
r.Span.EndKey = tableSpan.Key.Next()
9133+
return true, nil
9134+
}
9135+
if !allowedOne {
9136+
allowedOne = true
91369137
return false, nil
91379138
}
9138-
if haveGaps {
9139-
return rnd.Intn(10) > 7, nil
9139+
if !haveGaps {
9140+
haveGaps = true
9141+
return true, nil
91409142
}
9141-
haveGaps = true
9142-
return true, nil
9143+
return rnd.Intn(10) > 7, nil
91439144
}
91449145

91459146
// Checkpoint progress frequently, and set the checkpoint size limit.
@@ -9149,7 +9150,8 @@ func TestChangefeedBackfillCheckpoint(t *testing.T) {
91499150
context.Background(), &s.Server.ClusterSettings().SV, maxCheckpointSize)
91509151

91519152
registry := s.Server.JobRegistry().(*jobs.Registry)
9152-
foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH resolved='100ms'`)
9153+
foo := feed(t, f, `CREATE CHANGEFEED FOR foo
9154+
WITH resolved='100ms', min_checkpoint_frequency='1ns'`)
91539155
// Some test feeds (kafka) are not buffered, so we have to consume messages.
91549156
var shouldDrain int32 = 1
91559157
g := ctxgroup.WithContext(context.Background())
@@ -12009,7 +12011,8 @@ func TestChangefeedProtectedTimestampUpdate(t *testing.T) {
1200912011
require.Equal(t, int64(0), managePTSCount)
1201012012
require.Equal(t, int64(0), managePTSErrorCount)
1201112013

12012-
createStmt := `CREATE CHANGEFEED FOR foo WITH resolved='10ms', no_initial_scan`
12014+
createStmt := `CREATE CHANGEFEED FOR foo
12015+
WITH resolved='10ms', min_checkpoint_frequency='10ms', no_initial_scan`
1201312016
testFeed := feed(t, f, createStmt)
1201412017
defer closeFeed(t, testFeed)
1201512018

@@ -12122,7 +12125,8 @@ func TestChangefeedProtectedTimestampUpdateError(t *testing.T) {
1212212125
return errors.New("test error")
1212312126
}
1212412127

12125-
createStmt := `CREATE CHANGEFEED FOR foo WITH resolved='10ms', no_initial_scan`
12128+
createStmt := `CREATE CHANGEFEED FOR foo
12129+
WITH resolved='10ms', min_checkpoint_frequency='10ms', no_initial_scan`
1212612130
testFeed := feed(t, f, createStmt)
1212712131
defer closeFeed(t, testFeed)
1212812132

pkg/ccl/changefeedccl/protected_timestamps_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -970,7 +970,8 @@ func TestChangefeedProtectedTimestampUpdateForMultipleTables(t *testing.T) {
970970
require.Equal(t, int64(0), managePTSCount)
971971
require.Equal(t, int64(0), managePTSErrorCount)
972972

973-
createStmt := `CREATE CHANGEFEED FOR foo, bar WITH resolved='10ms', initial_scan='no'`
973+
createStmt := `CREATE CHANGEFEED FOR foo, bar
974+
WITH resolved='10ms', min_checkpoint_frequency='100ms', initial_scan='no'`
974975
testFeed := feed(t, f, createStmt)
975976
defer closeFeed(t, testFeed)
976977

@@ -1114,7 +1115,8 @@ func TestChangefeedPerTableProtectedTimestampProgression(t *testing.T) {
11141115
}
11151116
}
11161117

1117-
createStmt := `CREATE CHANGEFEED FOR table1, table2, table3 WITH resolved='100ms'`
1118+
createStmt := `CREATE CHANGEFEED FOR table1, table2, table3
1119+
WITH resolved='100ms', min_checkpoint_frequency='100ms'`
11181120
testFeed := feed(t, f, createStmt)
11191121
defer closeFeed(t, testFeed)
11201122

0 commit comments

Comments
 (0)