Skip to content

Commit 4b81e1f

Browse files
committed
changefeedccl: fix premature shutdown due to schema change bug
This patch fixes a bug that could potentially cause a changefeed to erroneously complete when one of its watched tables encounters a schema change has been fixed. The root cause for the bug was that if we happened to get a rangefeed checkpoint at precisely `ts.Prev()` for some schema change timestamp `ts`, the kv feed would deliver a resolved span with `ts` and a NONE boundary to the change aggregator, which would advance its frontier; then when the resolved span with `ts` and a RESTART boundary was sent to the change aggregator, the frontier would not be advanced and so would not be flushed to the change frontier. The change frontier would then read a nil row from the change aggregator and shut the changefeed down as if it had completed successfully. This bug has been fixed by modifying the resolved span frontier forwarding logic to consider forwarding to the current timestamp but with a non-NONE boundary type to be advancing the frontier. Two alternative solutions that were ruled out were: 1. Unconditionally flushing the frontier when we get a non-NONE boundary instead of only flushing when the frontier is advanced. - Problem: We would flush the frontier O(spans) number of times. 2. Making the kv feed not emit resolved events for rangefeed checkpoints that are at a schema change boundary. - Problem: We wouldn't be able to save the per-span progress at the schema change boundary. Release note (bug fix): A bug that could potentially cause a changefeed to erroneously complete when one of its watched tables encounters a schema change has been fixed.
1 parent ff553d0 commit 4b81e1f

File tree

3 files changed

+77
-16
lines changed

3 files changed

+77
-16
lines changed

pkg/ccl/changefeedccl/changefeed_processors.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -878,7 +878,7 @@ func (ca *changeAggregator) noteResolvedSpan(resolved jobspb.ResolvedSpan) (retu
878878
return nil
879879
}
880880

881-
advanced, err := ca.frontier.ForwardResolvedSpan(ca.Ctx(), resolved)
881+
advanced, err := ca.frontier.ForwardResolvedSpan(resolved)
882882
if err != nil {
883883
return err
884884
}
@@ -1660,7 +1660,7 @@ func (cf *changeFrontier) noteAggregatorProgress(d rowenc.EncDatum) error {
16601660
}
16611661

16621662
func (cf *changeFrontier) forwardFrontier(resolved jobspb.ResolvedSpan) error {
1663-
frontierChanged, err := cf.frontier.ForwardResolvedSpan(cf.Ctx(), resolved)
1663+
frontierChanged, err := cf.frontier.ForwardResolvedSpan(resolved)
16641664
if err != nil {
16651665
return err
16661666
}

pkg/ccl/changefeedccl/resolvedspan/frontier.go

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
package resolvedspan
77

88
import (
9-
"context"
109
"iter"
1110
"slices"
1211

@@ -44,8 +43,8 @@ func NewAggregatorFrontier(
4443
// ForwardResolvedSpan forwards the progress of a resolved span and also does
4544
// some boundary validation.
4645
func (f *AggregatorFrontier) ForwardResolvedSpan(
47-
ctx context.Context, r jobspb.ResolvedSpan,
48-
) (bool, error) {
46+
r jobspb.ResolvedSpan,
47+
) (forwarded bool, err error) {
4948
switch boundaryType := r.BoundaryType; boundaryType {
5049
case jobspb.ResolvedSpan_NONE:
5150
case jobspb.ResolvedSpan_BACKFILL, jobspb.ResolvedSpan_EXIT, jobspb.ResolvedSpan_RESTART:
@@ -89,8 +88,8 @@ func NewCoordinatorFrontier(
8988
// ForwardResolvedSpan forwards the progress of a resolved span and also does
9089
// some boundary validation.
9190
func (f *CoordinatorFrontier) ForwardResolvedSpan(
92-
ctx context.Context, r jobspb.ResolvedSpan,
93-
) (bool, error) {
91+
r jobspb.ResolvedSpan,
92+
) (forwarded bool, err error) {
9493
switch boundaryType := r.BoundaryType; boundaryType {
9594
case jobspb.ResolvedSpan_NONE:
9695
case jobspb.ResolvedSpan_BACKFILL:
@@ -224,8 +223,14 @@ func newResolvedSpanFrontier(
224223
}
225224

226225
// ForwardResolvedSpan forwards the progress of a resolved span.
227-
func (f *resolvedSpanFrontier) ForwardResolvedSpan(r jobspb.ResolvedSpan) (bool, error) {
228-
forwarded, err := f.Forward(r.Span, r.Timestamp)
226+
// The frontier is considered forwarded if either the frontier
227+
// timestamp advances or the current frontier timestamp becomes
228+
// a boundary timestamp (for some non-NONE boundary type)
229+
// and all the spans are at the boundary timestamp already.
230+
func (f *resolvedSpanFrontier) ForwardResolvedSpan(
231+
r jobspb.ResolvedSpan,
232+
) (forwarded bool, err error) {
233+
forwarded, err = f.Forward(r.Span, r.Timestamp)
229234
if err != nil {
230235
return false, err
231236
}
@@ -235,7 +240,13 @@ func (f *resolvedSpanFrontier) ForwardResolvedSpan(r jobspb.ResolvedSpan) (bool,
235240
ts: r.Timestamp,
236241
typ: r.BoundaryType,
237242
}
238-
f.boundary.Forward(newBoundary)
243+
boundaryForwarded := f.boundary.Forward(newBoundary)
244+
if boundaryForwarded && !forwarded {
245+
// The frontier is considered forwarded if the boundary type
246+
// changes to non-NONE and all the spans are at the boundary
247+
// timestamp already.
248+
forwarded, _, _ = f.AtBoundary()
249+
}
239250
}
240251
return forwarded, nil
241252
}
@@ -375,9 +386,9 @@ func (b *resolvedSpanBoundary) At(ts hlc.Timestamp) (bool, jobspb.ResolvedSpan_B
375386
return false, 0
376387
}
377388

378-
// After returns whether a timestamp is later than the boundary timestamp.
389+
// After returns whether the boundary is after a given timestamp.
379390
func (b *resolvedSpanBoundary) After(ts hlc.Timestamp) bool {
380-
return ts.Less(b.ts)
391+
return b.ts.After(ts)
381392
}
382393

383394
// Forward forwards the boundary to the new boundary if it is later.

pkg/ccl/changefeedccl/resolvedspan/frontier_test.go

Lines changed: 54 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ func TestCoordinatorFrontier(t *testing.T) {
155155

156156
type frontier interface {
157157
Frontier() hlc.Timestamp
158-
ForwardResolvedSpan(context.Context, jobspb.ResolvedSpan) (bool, error)
158+
ForwardResolvedSpan(jobspb.ResolvedSpan) (bool, error)
159159
InBackfill(jobspb.ResolvedSpan) bool
160160
AtBoundary() (bool, jobspb.ResolvedSpan_BoundaryType, hlc.Timestamp)
161161
All() iter.Seq[jobspb.ResolvedSpan]
@@ -171,7 +171,7 @@ func testBackfillSpan(
171171
) {
172172
backfillSpan := makeResolvedSpan(start, end, ts, jobspb.ResolvedSpan_NONE)
173173
require.True(t, f.InBackfill(backfillSpan))
174-
_, err := f.ForwardResolvedSpan(ctx, backfillSpan)
174+
_, err := f.ForwardResolvedSpan(backfillSpan)
175175
require.NoError(t, err)
176176
require.Equal(t, frontierAfterSpan, f.Frontier())
177177
}
@@ -186,7 +186,7 @@ func testBoundarySpan(
186186
frontierAfterSpan hlc.Timestamp,
187187
) {
188188
boundarySpan := makeResolvedSpan(start, end, boundaryTS, boundaryType)
189-
_, err := f.ForwardResolvedSpan(ctx, boundarySpan)
189+
_, err := f.ForwardResolvedSpan(boundarySpan)
190190
require.NoError(t, err)
191191

192192
if finalBoundarySpan := frontierAfterSpan.Equal(boundaryTS); finalBoundarySpan {
@@ -220,7 +220,7 @@ func testIllegalBoundarySpan(
220220
boundaryType jobspb.ResolvedSpan_BoundaryType,
221221
) {
222222
boundarySpan := makeResolvedSpan(start, end, boundaryTS, boundaryType)
223-
_, err := f.ForwardResolvedSpan(ctx, boundarySpan)
223+
_, err := f.ForwardResolvedSpan(boundarySpan)
224224
require.True(t, errors.HasAssertionFailure(err))
225225
}
226226

@@ -244,3 +244,53 @@ func makeSpan(start, end string) roachpb.Span {
244244
EndKey: roachpb.Key(end),
245245
}
246246
}
247+
248+
func TestAggregatorFrontier_ForwardResolvedSpan(t *testing.T) {
249+
defer leaktest.AfterTest(t)()
250+
defer log.Scope(t).Close(t)
251+
252+
// Create a fresh frontier with no progress.
253+
f, err := resolvedspan.NewAggregatorFrontier(
254+
hlc.Timestamp{},
255+
hlc.Timestamp{},
256+
makeSpan("a", "f"),
257+
)
258+
require.NoError(t, err)
259+
require.Zero(t, f.Frontier())
260+
261+
t.Run("advance frontier with no boundary", func(t *testing.T) {
262+
// Forwarding part of the span space to 10 should not advance the frontier.
263+
forwarded, err := f.ForwardResolvedSpan(
264+
makeResolvedSpan("a", "b", makeTS(10), jobspb.ResolvedSpan_NONE))
265+
require.NoError(t, err)
266+
require.False(t, forwarded)
267+
require.Zero(t, f.Frontier())
268+
269+
// Forwarding the rest of the span space to 10 should advance the frontier.
270+
forwarded, err = f.ForwardResolvedSpan(
271+
makeResolvedSpan("b", "f", makeTS(10), jobspb.ResolvedSpan_NONE))
272+
require.NoError(t, err)
273+
require.True(t, forwarded)
274+
require.Equal(t, makeTS(10), f.Frontier())
275+
})
276+
277+
t.Run("advance frontier with same timestamp and new boundary", func(t *testing.T) {
278+
// Forwarding part of the span space to 10 again with a non-NONE boundary
279+
// should be considered forwarding the frontier because we're learning
280+
// about a new boundary.
281+
forwarded, err := f.ForwardResolvedSpan(
282+
makeResolvedSpan("c", "f", makeTS(10), jobspb.ResolvedSpan_RESTART))
283+
require.NoError(t, err)
284+
require.True(t, forwarded)
285+
require.Equal(t, makeTS(10), f.Frontier())
286+
287+
// Forwarding the rest of the span space to 10 again with a non-NONE boundary
288+
// should not be considered forwarding the frontier because we already
289+
// know about the new boundary.
290+
forwarded, err = f.ForwardResolvedSpan(
291+
makeResolvedSpan("a", "c", makeTS(10), jobspb.ResolvedSpan_RESTART))
292+
require.NoError(t, err)
293+
require.False(t, forwarded)
294+
require.Equal(t, makeTS(10), f.Frontier())
295+
})
296+
}

0 commit comments

Comments
 (0)