Skip to content

Commit fd1b981

Browse files
committed
changefeedccl: deflake initial backfill checkpoint tests
This patch deflakes the initial backfill checkpoint tests, which filter out resolved spans to force gaps during a backfill. The tests however did not ensure that there would be at least one span that was not filtered out, which would result in no span-level checkpoint being created, leading the tests to time out. Release note: None
1 parent 0980cbd commit fd1b981

File tree

2 files changed

+28
-19
lines changed

2 files changed

+28
-19
lines changed

pkg/ccl/changefeedccl/alter_changefeed_test.go

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1520,19 +1520,27 @@ func TestAlterChangefeedAddTargetsDuringBackfill(t *testing.T) {
15201520

15211521
// Emit resolved events for the majority of spans. Be extra paranoid and ensure that
15221522
// we have at least 1 span for which we don't emit resolvedFoo timestamp (to force checkpointing).
1523-
haveGaps := false
1524-
knobs.FilterSpanWithMutation = func(r *jobspb.ResolvedSpan) (bool, error) {
1523+
// We however also need to ensure there's at least one span that isn't filtered out.
1524+
var allowedOne, haveGaps bool
1525+
knobs.FilterSpanWithMutation = func(r *jobspb.ResolvedSpan) (filter bool, _ error) {
15251526
rndMu.Lock()
15261527
defer rndMu.Unlock()
1528+
defer func() {
1529+
t.Logf("resolved span: %s@%s, filter: %t", r.Span, r.Timestamp, filter)
1530+
}()
15271531

15281532
if r.Span.Equal(fooTableSpan) {
15291533
return true, nil
15301534
}
1531-
if haveGaps {
1532-
return rndMu.rnd.Intn(10) > 7, nil
1535+
if !allowedOne {
1536+
allowedOne = true
1537+
return false, nil
15331538
}
1534-
haveGaps = true
1535-
return true, nil
1539+
if !haveGaps {
1540+
haveGaps = true
1541+
return true, nil
1542+
}
1543+
return rndMu.rnd.Intn(10) > 7, nil
15361544
}
15371545

15381546
// Checkpoint progress frequently, and set the checkpoint size limit.
@@ -1582,8 +1590,10 @@ WITH resolved = '100ms', min_checkpoint_frequency='1ns'`)
15821590

15831591
// Collect spans we attempt to resolve after when we resume.
15841592
var resolvedFoo []roachpb.Span
1585-
knobs.FilterSpanWithMutation = func(r *jobspb.ResolvedSpan) (bool, error) {
1586-
t.Logf("resolved span: %#v", r)
1593+
knobs.FilterSpanWithMutation = func(r *jobspb.ResolvedSpan) (filter bool, _ error) {
1594+
defer func() {
1595+
t.Logf("resolved span: %s@%s, filter: %t", r.Span, r.Timestamp, filter)
1596+
}()
15871597
if !r.Span.Equal(fooTableSpan) {
15881598
resolvedFoo = append(resolvedFoo, r.Span)
15891599
}

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9117,22 +9117,21 @@ func TestChangefeedBackfillCheckpoint(t *testing.T) {
91179117

91189118
// Emit resolved events for majority of spans. Be extra paranoid and ensure that
91199119
// we have at least 1 span for which we don't emit resolved timestamp (to force checkpointing).
9120-
haveGaps := false
9120+
// We however also need to ensure there's at least one span that isn't filtered out.
9121+
var allowedOne, haveGaps bool
91219122
knobs.FilterSpanWithMutation = func(r *jobspb.ResolvedSpan) (bool, error) {
91229123
if r.Span.Equal(tableSpan) {
9123-
// Do not emit resolved events for the entire table span.
9124-
// We "simulate" large table by splitting single table span into many parts, so
9125-
// we want to resolve those sub-spans instead of the entire table span.
9126-
// However, we have to emit something -- otherwise the entire changefeed
9127-
// machine would not work.
9128-
r.Span.EndKey = tableSpan.Key.Next()
9124+
return true, nil
9125+
}
9126+
if !allowedOne {
9127+
allowedOne = true
91299128
return false, nil
91309129
}
9131-
if haveGaps {
9132-
return rnd.Intn(10) > 7, nil
9130+
if !haveGaps {
9131+
haveGaps = true
9132+
return true, nil
91339133
}
9134-
haveGaps = true
9135-
return true, nil
9134+
return rnd.Intn(10) > 7, nil
91369135
}
91379136

91389137
// Checkpoint progress frequently, and set the checkpoint size limit.

0 commit comments

Comments
 (0)