Skip to content

Commit c62e1ba

Browse files
craig[bot]andyyang890
andcommitted
Merge #144004
144004: changefeedccl: fix premature shutdown due to schema change bug r=rharding6373 a=andyyang890 Fixes #144108 Test failures on master: Fixes #143976 Fixes #144045 Fixes #144219 Test failures on release branches (won't be fixed until PR is backported): Informs #144287 Informs #144291 Informs #144352 --- **changefeedccl: add more verbose logging around schema changes** This patch adds more verbose logging to the change aggregator around receiving and emitting resolved spans to help debug recurring changefeed schema change test flakes. Release note: None --- **changefeedccl: fix premature shutdown due to schema change bug** This patch fixes a bug that could potentially cause a changefeed to erroneously complete when one of its watched tables encounters a schema change has been fixed. The root cause for the bug was that if we happened to get a rangefeed checkpoint at precisely `ts.Prev()` for some schema change timestamp `ts`, the kv feed would deliver a resolved span with `ts` and a NONE boundary to the change aggregator, which would advance its frontier; then when the resolved span with `ts` and a RESTART boundary was sent to the change aggregator, the frontier would not be advanced and so would not be flushed to the change frontier. The change frontier would then read a nil row from the change aggregator and shut the changefeed down as if it had completed successfully. This bug has been fixed by modifying the resolved span frontier forwarding logic to consider forwarding to the current timestamp but with a non-NONE boundary type to be advancing the frontier. Two alternative solutions that were ruled out were: 1. Unconditionally flushing the frontier when we get a non-NONE boundary instead of only flushing when the frontier is advanced. - Problem: We would flush the frontier O(spans) number of times. 2. Making the kv feed not emit resolved events for rangefeed checkpoints that are at a schema change boundary. - Problem: We wouldn't be able to save the per-span progress at the schema change boundary. Release note (bug fix): A bug that could potentially cause a changefeed to erroneously complete when one of its watched tables encounters a schema change has been fixed. --- **changefeedccl/resolvedspan: update assertion for forwarding boundary** This patch adds an assertion that the frontier will not forward the boundary to a different type at the same time. This assertion is important because if it's violated, the changefeed processors will not shut down correctly during a schema change. One potential way this could be violated in the future is a change to how boundary types are determined on aggregators causing different boundaries to be sent from aggregators in a mixed-version cluster for the same schema change. Release note: None Co-authored-by: Andy Yang <[email protected]>
2 parents 61868f0 + 656cc3d commit c62e1ba

File tree

4 files changed

+168
-97
lines changed

4 files changed

+168
-97
lines changed

pkg/ccl/changefeedccl/changefeed_processors.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -868,14 +868,18 @@ func (ca *changeAggregator) flushBufferedEvents() error {
868868
// changeAggregator node to the changeFrontier node to allow the changeFrontier
869869
// to persist the overall changefeed's progress
870870
func (ca *changeAggregator) noteResolvedSpan(resolved jobspb.ResolvedSpan) (returnErr error) {
871+
if log.V(2) {
872+
log.Infof(ca.Ctx(), "resolved span from kv feed: %#v", resolved)
873+
}
874+
871875
if resolved.Timestamp.IsEmpty() {
872876
// @0.0 resolved timestamps could come in from rangefeed checkpoint.
873877
// When rangefeed starts running, it emits @0.0 resolved timestamp.
874878
// We don't care about those as far as checkpointing concerned.
875879
return nil
876880
}
877881

878-
advanced, err := ca.frontier.ForwardResolvedSpan(ca.Ctx(), resolved)
882+
advanced, err := ca.frontier.ForwardResolvedSpan(resolved)
879883
if err != nil {
880884
return err
881885
}
@@ -951,6 +955,9 @@ func (ca *changeAggregator) emitResolved(batch jobspb.ResolvedSpans) error {
951955
RecentKvCount: ca.recentKVCount,
952956
},
953957
}
958+
if log.V(2) {
959+
log.Infof(ca.Ctx(), "progress update to be sent to change frontier: %#v", progressUpdate)
960+
}
954961
updateBytes, err := protoutil.Marshal(&progressUpdate)
955962
if err != nil {
956963
return err
@@ -1654,7 +1661,7 @@ func (cf *changeFrontier) noteAggregatorProgress(d rowenc.EncDatum) error {
16541661
}
16551662

16561663
func (cf *changeFrontier) forwardFrontier(resolved jobspb.ResolvedSpan) error {
1657-
frontierChanged, err := cf.frontier.ForwardResolvedSpan(cf.Ctx(), resolved)
1664+
frontierChanged, err := cf.frontier.ForwardResolvedSpan(resolved)
16581665
if err != nil {
16591666
return err
16601667
}

pkg/ccl/changefeedccl/resolvedspan/BUILD.bazel

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ go_library(
1515
"//pkg/roachpb",
1616
"//pkg/settings",
1717
"//pkg/util/hlc",
18-
"//pkg/util/log",
1918
"//pkg/util/span",
2019
"@com_github_cockroachdb_errors//:errors",
2120
"@com_github_cockroachdb_redact//:redact",

pkg/ccl/changefeedccl/resolvedspan/frontier.go

Lines changed: 57 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
package resolvedspan
77

88
import (
9-
"context"
109
"iter"
1110
"slices"
1211

@@ -16,7 +15,6 @@ import (
1615
"github.com/cockroachdb/cockroach/pkg/roachpb"
1716
"github.com/cockroachdb/cockroach/pkg/settings"
1817
"github.com/cockroachdb/cockroach/pkg/util/hlc"
19-
"github.com/cockroachdb/cockroach/pkg/util/log"
2018
"github.com/cockroachdb/cockroach/pkg/util/span"
2119
"github.com/cockroachdb/errors"
2220
"github.com/cockroachdb/redact"
@@ -44,15 +42,15 @@ func NewAggregatorFrontier(
4442
// ForwardResolvedSpan forwards the progress of a resolved span and also does
4543
// some boundary validation.
4644
func (f *AggregatorFrontier) ForwardResolvedSpan(
47-
ctx context.Context, r jobspb.ResolvedSpan,
48-
) (bool, error) {
45+
r jobspb.ResolvedSpan,
46+
) (forwarded bool, err error) {
4947
switch boundaryType := r.BoundaryType; boundaryType {
5048
case jobspb.ResolvedSpan_NONE:
5149
case jobspb.ResolvedSpan_BACKFILL, jobspb.ResolvedSpan_EXIT, jobspb.ResolvedSpan_RESTART:
5250
// Boundary resolved events should be ingested from the schema feed
5351
// serially, where the changefeed won't ever observe a new schema change
5452
// boundary until it has progressed past the current boundary.
55-
if err := f.assertBoundaryNotEarlier(ctx, r); err != nil {
53+
if err := f.assertBoundaryNotEarlierOrDifferent(r); err != nil {
5654
return false, err
5755
}
5856
default:
@@ -89,8 +87,8 @@ func NewCoordinatorFrontier(
8987
// ForwardResolvedSpan forwards the progress of a resolved span and also does
9088
// some boundary validation.
9189
func (f *CoordinatorFrontier) ForwardResolvedSpan(
92-
ctx context.Context, r jobspb.ResolvedSpan,
93-
) (bool, error) {
90+
r jobspb.ResolvedSpan,
91+
) (forwarded bool, err error) {
9492
switch boundaryType := r.BoundaryType; boundaryType {
9593
case jobspb.ResolvedSpan_NONE:
9694
case jobspb.ResolvedSpan_BACKFILL:
@@ -103,21 +101,22 @@ func (f *CoordinatorFrontier) ForwardResolvedSpan(
103101
// it is a BACKFILL we have already seen, then it is fine for it to be
104102
// an earlier timestamp than the latest boundary.
105103
boundaryTS := r.Timestamp
106-
_, ok := slices.BinarySearchFunc(f.backfills, boundaryTS, func(elem hlc.Timestamp, ts hlc.Timestamp) int {
107-
return elem.Compare(ts)
108-
})
109-
if ok {
104+
if _, ok := slices.BinarySearchFunc(f.backfills, boundaryTS,
105+
func(elem hlc.Timestamp, ts hlc.Timestamp) int {
106+
return elem.Compare(ts)
107+
},
108+
); ok {
110109
break
111110
}
112-
if err := f.assertBoundaryNotEarlier(ctx, r); err != nil {
111+
if err := f.assertBoundaryNotEarlierOrDifferent(r); err != nil {
113112
return false, err
114113
}
115114
f.backfills = append(f.backfills, boundaryTS)
116115
case jobspb.ResolvedSpan_EXIT, jobspb.ResolvedSpan_RESTART:
117116
// EXIT and RESTART are final boundaries that cause the changefeed
118117
// processors to all move to draining and so should not be followed
119118
// by any other boundaries.
120-
if err := f.assertBoundaryNotEarlier(ctx, r); err != nil {
119+
if err := f.assertBoundaryNotEarlierOrDifferent(r); err != nil {
121120
return false, err
122121
}
123122
default:
@@ -130,9 +129,10 @@ func (f *CoordinatorFrontier) ForwardResolvedSpan(
130129
// If the frontier changed, we check if the frontier has advanced past any known backfills.
131130
if frontierChanged {
132131
frontier := f.Frontier()
133-
i, _ := slices.BinarySearchFunc(f.backfills, frontier, func(elem hlc.Timestamp, ts hlc.Timestamp) int {
134-
return elem.Compare(ts)
135-
})
132+
i, _ := slices.BinarySearchFunc(f.backfills, frontier,
133+
func(elem hlc.Timestamp, ts hlc.Timestamp) int {
134+
return elem.Compare(ts)
135+
})
136136
f.backfills = f.backfills[i:]
137137
}
138138
return frontierChanged, nil
@@ -145,10 +145,11 @@ func (f *CoordinatorFrontier) ForwardResolvedSpan(
145145
// happening at different timestamps.
146146
func (f *CoordinatorFrontier) InBackfill(r jobspb.ResolvedSpan) bool {
147147
boundaryTS := r.Timestamp
148-
_, ok := slices.BinarySearchFunc(f.backfills, boundaryTS, func(elem hlc.Timestamp, ts hlc.Timestamp) int {
149-
return elem.Next().Compare(ts)
150-
})
151-
if ok {
148+
if _, ok := slices.BinarySearchFunc(f.backfills, boundaryTS,
149+
func(elem hlc.Timestamp, ts hlc.Timestamp) int {
150+
return elem.Next().Compare(ts)
151+
},
152+
); ok {
152153
return true
153154
}
154155

@@ -161,11 +162,12 @@ func (f *CoordinatorFrontier) All() iter.Seq[jobspb.ResolvedSpan] {
161162
for resolvedSpan := range f.resolvedSpanFrontier.All() {
162163
// Check if it's at an earlier backfill boundary.
163164
if resolvedSpan.BoundaryType == jobspb.ResolvedSpan_NONE {
164-
for _, b := range f.backfills {
165-
if b.Equal(resolvedSpan.Timestamp) {
166-
resolvedSpan.BoundaryType = jobspb.ResolvedSpan_BACKFILL
167-
break
168-
}
165+
if _, ok := slices.BinarySearchFunc(f.backfills, resolvedSpan.Timestamp,
166+
func(elem hlc.Timestamp, ts hlc.Timestamp) int {
167+
return elem.Compare(ts)
168+
},
169+
); ok {
170+
resolvedSpan.BoundaryType = jobspb.ResolvedSpan_BACKFILL
169171
}
170172
}
171173
if !yield(resolvedSpan) {
@@ -224,8 +226,14 @@ func newResolvedSpanFrontier(
224226
}
225227

226228
// ForwardResolvedSpan forwards the progress of a resolved span.
227-
func (f *resolvedSpanFrontier) ForwardResolvedSpan(r jobspb.ResolvedSpan) (bool, error) {
228-
forwarded, err := f.Forward(r.Span, r.Timestamp)
229+
// The frontier is considered forwarded if either the frontier
230+
// timestamp advances or the current frontier timestamp becomes
231+
// a boundary timestamp (for some non-NONE boundary type)
232+
// and all the spans are at the boundary timestamp already.
233+
func (f *resolvedSpanFrontier) ForwardResolvedSpan(
234+
r jobspb.ResolvedSpan,
235+
) (forwarded bool, err error) {
236+
forwarded, err = f.Forward(r.Span, r.Timestamp)
229237
if err != nil {
230238
return false, err
231239
}
@@ -235,7 +243,13 @@ func (f *resolvedSpanFrontier) ForwardResolvedSpan(r jobspb.ResolvedSpan) (bool,
235243
ts: r.Timestamp,
236244
typ: r.BoundaryType,
237245
}
238-
f.boundary.Forward(newBoundary)
246+
boundaryForwarded := f.boundary.Forward(newBoundary)
247+
if boundaryForwarded && !forwarded {
248+
// The frontier is considered forwarded if the boundary type
249+
// changes to non-NONE and all the spans are at the boundary
250+
// timestamp already.
251+
forwarded, _, _ = f.AtBoundary()
252+
}
239253
}
240254
return forwarded, nil
241255
}
@@ -284,23 +298,26 @@ func (f *resolvedSpanFrontier) InBackfill(r jobspb.ResolvedSpan) bool {
284298
return false
285299
}
286300

287-
// assertBoundaryNotEarlier is a helper method provided to assert that a
288-
// resolved span does not have an earlier boundary than the existing one.
289-
func (f *resolvedSpanFrontier) assertBoundaryNotEarlier(
290-
ctx context.Context, r jobspb.ResolvedSpan,
291-
) error {
301+
// assertBoundaryNotEarlierOrDifferent is a helper method that asserts that a
302+
// resolved span does not have an earlier boundary than the existing one
303+
// nor is it at the same time as the existing one with a different type.
304+
func (f *resolvedSpanFrontier) assertBoundaryNotEarlierOrDifferent(r jobspb.ResolvedSpan) error {
292305
boundaryType := r.BoundaryType
293306
if boundaryType == jobspb.ResolvedSpan_NONE {
294-
return errors.AssertionFailedf("assertBoundaryNotEarlier should not be called for NONE boundary")
307+
return errors.AssertionFailedf(
308+
"assertBoundaryNotEarlierOrDifferent should not be called for NONE boundary")
295309
}
296310
boundaryTS := r.Timestamp
311+
newBoundary := newResolvedSpanBoundary(boundaryTS, boundaryType)
297312
if f.boundary.After(boundaryTS) {
298-
newBoundary := newResolvedSpanBoundary(boundaryTS, boundaryType)
299-
err := errors.AssertionFailedf("received resolved span for %s "+
313+
return errors.AssertionFailedf("received resolved span for %s "+
300314
"with %v, which is earlier than previously received %v",
301315
r.Span, newBoundary, f.boundary)
302-
log.Errorf(ctx, "error while forwarding boundary resolved span: %v", err)
303-
return err
316+
}
317+
if atBoundary, typ := f.boundary.At(boundaryTS); atBoundary && boundaryType != typ {
318+
return errors.AssertionFailedf("received resolved span for %s "+
319+
"with %v, which has a different type from previously received %v with same timestamp",
320+
r.Span, newBoundary, f.boundary)
304321
}
305322
return nil
306323
}
@@ -375,9 +392,9 @@ func (b *resolvedSpanBoundary) At(ts hlc.Timestamp) (bool, jobspb.ResolvedSpan_B
375392
return false, 0
376393
}
377394

378-
// After returns whether a timestamp is later than the boundary timestamp.
395+
// After returns whether the boundary is after a given timestamp.
379396
func (b *resolvedSpanBoundary) After(ts hlc.Timestamp) bool {
380-
return ts.Less(b.ts)
397+
return b.ts.After(ts)
381398
}
382399

383400
// Forward forwards the boundary to the new boundary if it is later.

0 commit comments

Comments
 (0)