Skip to content

Commit b982846

Browse files
authored
Merge pull request #154709 from cockroachdb/blathers/backport-release-25.4-154598
release-25.4: changefeedccl/resolvedspan: fix empty boundary handling bug
2 parents 6fb93c8 + 0907fc6 commit b982846

File tree

2 files changed

+139
-84
lines changed

2 files changed

+139
-84
lines changed

pkg/ccl/changefeedccl/resolvedspan/frontier.go

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -255,18 +255,12 @@ func (f *resolvedSpanFrontier) ForwardResolvedSpan(
255255
return false, err
256256
}
257257
f.latestTS.Forward(r.Timestamp)
258-
if r.BoundaryType != jobspb.ResolvedSpan_NONE {
259-
newBoundary := resolvedSpanBoundary{
260-
ts: r.Timestamp,
261-
typ: r.BoundaryType,
262-
}
263-
boundaryForwarded := f.boundary.Forward(newBoundary)
264-
if boundaryForwarded && !forwarded {
265-
// The frontier is considered forwarded if the boundary type
266-
// changes to non-NONE and all the spans are at the boundary
267-
// timestamp already.
268-
forwarded, _, _ = f.AtBoundary()
269-
}
258+
boundaryForwarded := f.boundary.Forward(r.Timestamp, r.BoundaryType)
259+
if boundaryForwarded && !forwarded {
260+
// The frontier is considered forwarded if the boundary type
261+
// changes to non-NONE and all the spans are at the boundary
262+
// timestamp already.
263+
forwarded, _, _ = f.AtBoundary()
270264
}
271265
return forwarded, nil
272266
}
@@ -408,27 +402,35 @@ func newResolvedSpanBoundary(
408402
// At returns whether a timestamp is equal to the boundary timestamp
409403
// and if so, the boundary type as well.
410404
func (b *resolvedSpanBoundary) At(ts hlc.Timestamp) (bool, jobspb.ResolvedSpan_BoundaryType) {
411-
if ts.Equal(b.ts) {
405+
if b.IsSet() && ts.Equal(b.ts) {
412406
return true, b.typ
413407
}
414408
return false, 0
415409
}
416410

417411
// After returns whether the boundary is after a given timestamp.
418412
func (b *resolvedSpanBoundary) After(ts hlc.Timestamp) bool {
419-
return b.ts.After(ts)
413+
return b.IsSet() && b.ts.After(ts)
420414
}
421415

422416
// Forward forwards the boundary to the new boundary if it is later.
423417
// It returns true if the boundary changed and false otherwise.
424-
func (b *resolvedSpanBoundary) Forward(newBoundary resolvedSpanBoundary) bool {
425-
if newBoundary.After(b.ts) {
426-
*b = newBoundary
418+
func (b *resolvedSpanBoundary) Forward(
419+
ts hlc.Timestamp, typ jobspb.ResolvedSpan_BoundaryType,
420+
) bool {
421+
if typ != jobspb.ResolvedSpan_NONE && ts.After(b.ts) {
422+
b.ts = ts
423+
b.typ = typ
427424
return true
428425
}
429426
return false
430427
}
431428

429+
// IsSet returns whether the boundary is set.
430+
func (b *resolvedSpanBoundary) IsSet() bool {
431+
return b.typ != jobspb.ResolvedSpan_NONE
432+
}
433+
432434
// SafeFormat implements the redact.SafeFormatter interface.
433435
func (b *resolvedSpanBoundary) SafeFormat(s redact.SafePrinter, _ rune) {
434436
s.Printf("%v boundary (%v)", b.typ, b.ts)

pkg/ccl/changefeedccl/resolvedspan/frontier_test.go

Lines changed: 120 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -179,15 +179,48 @@ func TestCoordinatorFrontier(t *testing.T) {
179179
}
180180

181181
type frontier interface {
182-
AddSpansAt(startAt hlc.Timestamp, spans ...roachpb.Span) error
182+
AddSpansAt(hlc.Timestamp, ...roachpb.Span) error
183183
Frontier() hlc.Timestamp
184+
Forward(roachpb.Span, hlc.Timestamp) (bool, error)
184185
ForwardResolvedSpan(jobspb.ResolvedSpan) (bool, error)
185186
InBackfill(jobspb.ResolvedSpan) bool
186187
AtBoundary() (bool, jobspb.ResolvedSpan_BoundaryType, hlc.Timestamp)
187188
All() iter.Seq[jobspb.ResolvedSpan]
188189
Frontiers() iter.Seq2[descpb.ID, span.ReadOnlyFrontier]
189190
}
190191

192+
func newFrontier(
193+
t testing.TB,
194+
frontierType string,
195+
statementTime hlc.Timestamp,
196+
initialHighwater hlc.Timestamp,
197+
codec resolvedspan.TableCodec,
198+
perTableTracking bool,
199+
spans ...roachpb.Span,
200+
) (frontier, error) {
201+
switch frontierType {
202+
case "aggregator":
203+
return resolvedspan.NewAggregatorFrontier(
204+
statementTime,
205+
initialHighwater,
206+
codec,
207+
perTableTracking,
208+
spans...,
209+
)
210+
case "coordinator":
211+
return resolvedspan.NewCoordinatorFrontier(
212+
statementTime,
213+
initialHighwater,
214+
codec,
215+
perTableTracking,
216+
spans...,
217+
)
218+
default:
219+
t.Fatalf("unknown frontier type: %s", frontierType)
220+
}
221+
panic("unreachable")
222+
}
223+
191224
func testBackfillSpan(
192225
t *testing.T, f frontier, start, end string, ts hlc.Timestamp, frontierAfterSpan hlc.Timestamp,
193226
) {
@@ -379,29 +412,15 @@ func TestFrontierPerTableResolvedTimestamps(t *testing.T) {
379412
initialHighWater := hlc.Timestamp{}
380413

381414
// Create frontier with multiple table spans.
382-
f, err := func() (frontier, error) {
383-
switch frontierType {
384-
case "aggregator":
385-
return resolvedspan.NewAggregatorFrontier(
386-
statementTime,
387-
initialHighWater,
388-
codec,
389-
true, /* perTableTracking */
390-
tableSpans...,
391-
)
392-
case "coordinator":
393-
return resolvedspan.NewCoordinatorFrontier(
394-
statementTime,
395-
initialHighWater,
396-
codec,
397-
true, /* perTableTracking */
398-
tableSpans...,
399-
)
400-
default:
401-
t.Fatalf("unknown frontier type: %s", frontierType)
402-
}
403-
panic("unreachable")
404-
}()
415+
f, err := newFrontier(
416+
t,
417+
frontierType,
418+
statementTime,
419+
initialHighWater,
420+
codec,
421+
true, /* perTableTracking */
422+
tableSpans...,
423+
)
405424
require.NoError(t, err)
406425
require.Equal(t, initialHighWater, f.Frontier())
407426

@@ -503,29 +522,15 @@ func TestFrontierForwardFullTableSpan(t *testing.T) {
503522
statementTime := makeTS(5)
504523
var initialHighWater hlc.Timestamp
505524

506-
f, err := func() (span.Frontier, error) {
507-
switch frontierType {
508-
case "aggregator":
509-
return resolvedspan.NewAggregatorFrontier(
510-
statementTime,
511-
initialHighWater,
512-
codec,
513-
true, /* perTableTracking */
514-
tableSpans...,
515-
)
516-
case "coordinator":
517-
return resolvedspan.NewCoordinatorFrontier(
518-
statementTime,
519-
initialHighWater,
520-
codec,
521-
true, /* perTableTracking */
522-
tableSpans...,
523-
)
524-
default:
525-
t.Fatalf("unknown frontier type: %s", frontierType)
526-
}
527-
panic("unreachable")
528-
}()
525+
f, err := newFrontier(
526+
t,
527+
frontierType,
528+
statementTime,
529+
initialHighWater,
530+
codec,
531+
true, /* perTableTracking */
532+
tableSpans...,
533+
)
529534
require.NoError(t, err)
530535
require.Equal(t, initialHighWater, f.Frontier())
531536

@@ -610,26 +615,14 @@ FROM [SHOW RANGES FROM TABLE foo WITH KEYS]`)
610615
now := makeTS(timeutil.Now().Unix())
611616

612617
// Create the frontier and add all the spans.
613-
f, err := func() (frontier, error) {
614-
switch frontierType {
615-
case "aggregator":
616-
return resolvedspan.NewAggregatorFrontier(
617-
now,
618-
now,
619-
codec,
620-
perTableTracking,
621-
)
622-
case "coordinator":
623-
return resolvedspan.NewCoordinatorFrontier(
624-
now,
625-
now,
626-
codec,
627-
perTableTracking,
628-
)
629-
default:
630-
panic("unreachable")
631-
}
632-
}()
618+
f, err := newFrontier(
619+
b,
620+
frontierType,
621+
now,
622+
now,
623+
codec,
624+
perTableTracking,
625+
)
633626
require.NoError(b, err)
634627
require.NoError(b, f.AddSpansAt(now, spans...))
635628

@@ -655,3 +648,63 @@ FROM [SHOW RANGES FROM TABLE foo WITH KEYS]`)
655648
}
656649
}
657650
}
651+
652+
func TestFrontierAtBoundary(t *testing.T) {
653+
defer leaktest.AfterTest(t)()
654+
defer log.Scope(t).Close(t)
655+
656+
rnd, _ := randutil.NewTestRand()
657+
perTableTracking := rnd.Intn(2) == 0
658+
t.Logf("per-table tracking: %t", perTableTracking)
659+
660+
testutils.RunValues(t, "frontier type", []string{"aggregator", "coordinator"},
661+
func(t *testing.T, frontierType string) {
662+
statementTime := makeTS(timeutil.Now().Unix())
663+
var initialHighWater hlc.Timestamp
664+
f, err := newFrontier(
665+
t,
666+
frontierType,
667+
statementTime,
668+
initialHighWater,
669+
mockCodec{},
670+
perTableTracking,
671+
makeSpan("a", "f"),
672+
)
673+
require.NoError(t, err)
674+
675+
// We can't be at boundary until a boundary is set.
676+
atBoundary, _, _ := f.AtBoundary()
677+
require.False(t, atBoundary)
678+
_, err = f.ForwardResolvedSpan(jobspb.ResolvedSpan{
679+
Span: makeSpan("a", "f"),
680+
Timestamp: statementTime,
681+
})
682+
require.NoError(t, err)
683+
atBoundary, _, _ = f.AtBoundary()
684+
require.False(t, atBoundary)
685+
686+
// Set a boundary by forwarding part of the span space.
687+
ts := statementTime.AddDuration(3 * time.Second)
688+
_, err = f.ForwardResolvedSpan(jobspb.ResolvedSpan{
689+
Span: makeSpan("a", "c"),
690+
Timestamp: ts,
691+
BoundaryType: jobspb.ResolvedSpan_BACKFILL,
692+
})
693+
require.NoError(t, err)
694+
atBoundary, _, _ = f.AtBoundary()
695+
require.False(t, atBoundary)
696+
697+
// Verify the boundary is reached after forwarding
698+
// the rest of the span space.
699+
_, err = f.ForwardResolvedSpan(jobspb.ResolvedSpan{
700+
Span: makeSpan("c", "f"),
701+
Timestamp: ts,
702+
BoundaryType: jobspb.ResolvedSpan_BACKFILL,
703+
})
704+
require.NoError(t, err)
705+
atBoundary, boundaryType, boundaryTS := f.AtBoundary()
706+
require.True(t, atBoundary)
707+
require.Equal(t, jobspb.ResolvedSpan_BACKFILL, boundaryType)
708+
require.Equal(t, ts, boundaryTS)
709+
})
710+
}

0 commit comments

Comments
 (0)