Skip to content

Commit c490602

Browse files
committed
changefeedccl: deflake initial backfill checkpoint tests
This patch deflakes the initial backfill checkpoint tests, which filter out resolved spans to force gaps during a backfill. The tests however did not ensure that there would be at least one span that was not filtered out, which would result in no span-level checkpoint being created, leading the tests to time out. Release note: None
1 parent af6b700 commit c490602

File tree

2 files changed

+28
-19
lines changed

2 files changed

+28
-19
lines changed

pkg/ccl/changefeedccl/alter_changefeed_test.go

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1520,19 +1520,27 @@ func TestAlterChangefeedAddTargetsDuringBackfill(t *testing.T) {
15201520

15211521
// Emit resolved events for the majority of spans. Be extra paranoid and ensure that
15221522
// we have at least 1 span for which we don't emit resolvedFoo timestamp (to force checkpointing).
1523-
haveGaps := false
1524-
knobs.FilterSpanWithMutation = func(r *jobspb.ResolvedSpan) (bool, error) {
1523+
// We however also need to ensure there's at least one span that isn't filtered out.
1524+
var allowedOne, haveGaps bool
1525+
knobs.FilterSpanWithMutation = func(r *jobspb.ResolvedSpan) (filter bool, _ error) {
15251526
rndMu.Lock()
15261527
defer rndMu.Unlock()
1528+
defer func() {
1529+
t.Logf("resolved span: %s@%s, filter: %t", r.Span, r.Timestamp, filter)
1530+
}()
15271531

15281532
if r.Span.Equal(fooTableSpan) {
15291533
return true, nil
15301534
}
1531-
if haveGaps {
1532-
return rndMu.rnd.Intn(10) > 7, nil
1535+
if !allowedOne {
1536+
allowedOne = true
1537+
return false, nil
15331538
}
1534-
haveGaps = true
1535-
return true, nil
1539+
if !haveGaps {
1540+
haveGaps = true
1541+
return true, nil
1542+
}
1543+
return rndMu.rnd.Intn(10) > 7, nil
15361544
}
15371545

15381546
// Checkpoint progress frequently, and set the checkpoint size limit.
@@ -1582,8 +1590,10 @@ WITH resolved = '100ms', min_checkpoint_frequency='1ns'`)
15821590

15831591
// Collect spans we attempt to resolve after when we resume.
15841592
var resolvedFoo []roachpb.Span
1585-
knobs.FilterSpanWithMutation = func(r *jobspb.ResolvedSpan) (bool, error) {
1586-
t.Logf("resolved span: %#v", r)
1593+
knobs.FilterSpanWithMutation = func(r *jobspb.ResolvedSpan) (filter bool, _ error) {
1594+
defer func() {
1595+
t.Logf("resolved span: %s@%s, filter: %t", r.Span, r.Timestamp, filter)
1596+
}()
15871597
if !r.Span.Equal(fooTableSpan) {
15881598
resolvedFoo = append(resolvedFoo, r.Span)
15891599
}

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9126,22 +9126,21 @@ func TestChangefeedBackfillCheckpoint(t *testing.T) {
91269126

91279127
// Emit resolved events for majority of spans. Be extra paranoid and ensure that
91289128
// we have at least 1 span for which we don't emit resolved timestamp (to force checkpointing).
9129-
haveGaps := false
9129+
// We however also need to ensure there's at least one span that isn't filtered out.
9130+
var allowedOne, haveGaps bool
91309131
knobs.FilterSpanWithMutation = func(r *jobspb.ResolvedSpan) (bool, error) {
91319132
if r.Span.Equal(tableSpan) {
9132-
// Do not emit resolved events for the entire table span.
9133-
// We "simulate" large table by splitting single table span into many parts, so
9134-
// we want to resolve those sub-spans instead of the entire table span.
9135-
// However, we have to emit something -- otherwise the entire changefeed
9136-
// machine would not work.
9137-
r.Span.EndKey = tableSpan.Key.Next()
9133+
return true, nil
9134+
}
9135+
if !allowedOne {
9136+
allowedOne = true
91389137
return false, nil
91399138
}
9140-
if haveGaps {
9141-
return rnd.Intn(10) > 7, nil
9139+
if !haveGaps {
9140+
haveGaps = true
9141+
return true, nil
91429142
}
9143-
haveGaps = true
9144-
return true, nil
9143+
return rnd.Intn(10) > 7, nil
91459144
}
91469145

91479146
// Checkpoint progress frequently, and set the checkpoint size limit.

0 commit comments

Comments
 (0)