Skip to content

Commit 656cc3d

Browse files
committed
changefeedccl/resolvedspan: update assertion for forwarding boundary
This patch adds an assertion that the frontier will not forward the boundary to a different type at the same time. This assertion is important because if it's violated, the changefeed processors will not shut down correctly during a schema change. One potential way this could be violated in the future is a change to how boundary types are determined on aggregators causing different boundaries to be sent from aggregators in a mixed-version cluster for the same schema change. Release note: None
1 parent 4b81e1f commit 656cc3d

File tree

3 files changed

+84
-81
lines changed

3 files changed

+84
-81
lines changed

pkg/ccl/changefeedccl/resolvedspan/BUILD.bazel

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ go_library(
1515
"//pkg/roachpb",
1616
"//pkg/settings",
1717
"//pkg/util/hlc",
18-
"//pkg/util/log",
1918
"//pkg/util/span",
2019
"@com_github_cockroachdb_errors//:errors",
2120
"@com_github_cockroachdb_redact//:redact",

pkg/ccl/changefeedccl/resolvedspan/frontier.go

Lines changed: 36 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import (
1515
"github.com/cockroachdb/cockroach/pkg/roachpb"
1616
"github.com/cockroachdb/cockroach/pkg/settings"
1717
"github.com/cockroachdb/cockroach/pkg/util/hlc"
18-
"github.com/cockroachdb/cockroach/pkg/util/log"
1918
"github.com/cockroachdb/cockroach/pkg/util/span"
2019
"github.com/cockroachdb/errors"
2120
"github.com/cockroachdb/redact"
@@ -51,7 +50,7 @@ func (f *AggregatorFrontier) ForwardResolvedSpan(
5150
// Boundary resolved events should be ingested from the schema feed
5251
// serially, where the changefeed won't ever observe a new schema change
5352
// boundary until it has progressed past the current boundary.
54-
if err := f.assertBoundaryNotEarlier(ctx, r); err != nil {
53+
if err := f.assertBoundaryNotEarlierOrDifferent(r); err != nil {
5554
return false, err
5655
}
5756
default:
@@ -102,21 +101,22 @@ func (f *CoordinatorFrontier) ForwardResolvedSpan(
102101
// it is a BACKFILL we have already seen, then it is fine for it to be
103102
// an earlier timestamp than the latest boundary.
104103
boundaryTS := r.Timestamp
105-
_, ok := slices.BinarySearchFunc(f.backfills, boundaryTS, func(elem hlc.Timestamp, ts hlc.Timestamp) int {
106-
return elem.Compare(ts)
107-
})
108-
if ok {
104+
if _, ok := slices.BinarySearchFunc(f.backfills, boundaryTS,
105+
func(elem hlc.Timestamp, ts hlc.Timestamp) int {
106+
return elem.Compare(ts)
107+
},
108+
); ok {
109109
break
110110
}
111-
if err := f.assertBoundaryNotEarlier(ctx, r); err != nil {
111+
if err := f.assertBoundaryNotEarlierOrDifferent(r); err != nil {
112112
return false, err
113113
}
114114
f.backfills = append(f.backfills, boundaryTS)
115115
case jobspb.ResolvedSpan_EXIT, jobspb.ResolvedSpan_RESTART:
116116
// EXIT and RESTART are final boundaries that cause the changefeed
117117
// processors to all move to draining and so should not be followed
118118
// by any other boundaries.
119-
if err := f.assertBoundaryNotEarlier(ctx, r); err != nil {
119+
if err := f.assertBoundaryNotEarlierOrDifferent(r); err != nil {
120120
return false, err
121121
}
122122
default:
@@ -129,9 +129,10 @@ func (f *CoordinatorFrontier) ForwardResolvedSpan(
129129
// If the frontier changed, we check if the frontier has advanced past any known backfills.
130130
if frontierChanged {
131131
frontier := f.Frontier()
132-
i, _ := slices.BinarySearchFunc(f.backfills, frontier, func(elem hlc.Timestamp, ts hlc.Timestamp) int {
133-
return elem.Compare(ts)
134-
})
132+
i, _ := slices.BinarySearchFunc(f.backfills, frontier,
133+
func(elem hlc.Timestamp, ts hlc.Timestamp) int {
134+
return elem.Compare(ts)
135+
})
135136
f.backfills = f.backfills[i:]
136137
}
137138
return frontierChanged, nil
@@ -144,10 +145,11 @@ func (f *CoordinatorFrontier) ForwardResolvedSpan(
144145
// happening at different timestamps.
145146
func (f *CoordinatorFrontier) InBackfill(r jobspb.ResolvedSpan) bool {
146147
boundaryTS := r.Timestamp
147-
_, ok := slices.BinarySearchFunc(f.backfills, boundaryTS, func(elem hlc.Timestamp, ts hlc.Timestamp) int {
148-
return elem.Next().Compare(ts)
149-
})
150-
if ok {
148+
if _, ok := slices.BinarySearchFunc(f.backfills, boundaryTS,
149+
func(elem hlc.Timestamp, ts hlc.Timestamp) int {
150+
return elem.Next().Compare(ts)
151+
},
152+
); ok {
151153
return true
152154
}
153155

@@ -160,11 +162,12 @@ func (f *CoordinatorFrontier) All() iter.Seq[jobspb.ResolvedSpan] {
160162
for resolvedSpan := range f.resolvedSpanFrontier.All() {
161163
// Check if it's at an earlier backfill boundary.
162164
if resolvedSpan.BoundaryType == jobspb.ResolvedSpan_NONE {
163-
for _, b := range f.backfills {
164-
if b.Equal(resolvedSpan.Timestamp) {
165-
resolvedSpan.BoundaryType = jobspb.ResolvedSpan_BACKFILL
166-
break
167-
}
165+
if _, ok := slices.BinarySearchFunc(f.backfills, resolvedSpan.Timestamp,
166+
func(elem hlc.Timestamp, ts hlc.Timestamp) int {
167+
return elem.Compare(ts)
168+
},
169+
); ok {
170+
resolvedSpan.BoundaryType = jobspb.ResolvedSpan_BACKFILL
168171
}
169172
}
170173
if !yield(resolvedSpan) {
@@ -295,23 +298,26 @@ func (f *resolvedSpanFrontier) InBackfill(r jobspb.ResolvedSpan) bool {
295298
return false
296299
}
297300

298-
// assertBoundaryNotEarlier is a helper method provided to assert that a
299-
// resolved span does not have an earlier boundary than the existing one.
300-
func (f *resolvedSpanFrontier) assertBoundaryNotEarlier(
301-
ctx context.Context, r jobspb.ResolvedSpan,
302-
) error {
301+
// assertBoundaryNotEarlierOrDifferent is a helper method that asserts that a
302+
// resolved span does not have an earlier boundary than the existing one
303+
// nor is it at the same time as the existing one with a different type.
304+
func (f *resolvedSpanFrontier) assertBoundaryNotEarlierOrDifferent(r jobspb.ResolvedSpan) error {
303305
boundaryType := r.BoundaryType
304306
if boundaryType == jobspb.ResolvedSpan_NONE {
305-
return errors.AssertionFailedf("assertBoundaryNotEarlier should not be called for NONE boundary")
307+
return errors.AssertionFailedf(
308+
"assertBoundaryNotEarlierOrDifferent should not be called for NONE boundary")
306309
}
307310
boundaryTS := r.Timestamp
311+
newBoundary := newResolvedSpanBoundary(boundaryTS, boundaryType)
308312
if f.boundary.After(boundaryTS) {
309-
newBoundary := newResolvedSpanBoundary(boundaryTS, boundaryType)
310-
err := errors.AssertionFailedf("received resolved span for %s "+
313+
return errors.AssertionFailedf("received resolved span for %s "+
311314
"with %v, which is earlier than previously received %v",
312315
r.Span, newBoundary, f.boundary)
313-
log.Errorf(ctx, "error while forwarding boundary resolved span: %v", err)
314-
return err
316+
}
317+
if atBoundary, typ := f.boundary.At(boundaryTS); atBoundary && boundaryType != typ {
318+
return errors.AssertionFailedf("received resolved span for %s "+
319+
"with %v, which has a different type from previously received %v with same timestamp",
320+
r.Span, newBoundary, f.boundary)
315321
}
316322
return nil
317323
}

pkg/ccl/changefeedccl/resolvedspan/frontier_test.go

Lines changed: 48 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
package resolvedspan_test
77

88
import (
9-
"context"
109
"iter"
1110
"testing"
1211

@@ -24,8 +23,6 @@ func TestAggregatorFrontier(t *testing.T) {
2423
defer leaktest.AfterTest(t)()
2524
defer log.Scope(t).Close(t)
2625

27-
ctx := context.Background()
28-
2926
// Create a fresh frontier with no progress.
3027
statementTime := makeTS(10)
3128
var initialHighwater hlc.Timestamp
@@ -38,32 +35,37 @@ func TestAggregatorFrontier(t *testing.T) {
3835
require.Equal(t, initialHighwater, f.Frontier())
3936

4037
// Forward spans representing initial scan.
41-
testBackfillSpan(t, ctx, f, "a", "b", statementTime, initialHighwater)
42-
testBackfillSpan(t, ctx, f, "b", "f", statementTime, statementTime)
38+
testBackfillSpan(t, f, "a", "b", statementTime, initialHighwater)
39+
testBackfillSpan(t, f, "b", "f", statementTime, statementTime)
4340

4441
// Forward spans signalling a backfill is required.
4542
backfillTS := makeTS(20)
46-
testBoundarySpan(t, ctx, f, "a", "b", backfillTS.Prev(), jobspb.ResolvedSpan_BACKFILL, statementTime)
47-
testBoundarySpan(t, ctx, f, "b", "c", backfillTS.Prev(), jobspb.ResolvedSpan_BACKFILL, statementTime)
48-
testBoundarySpan(t, ctx, f, "c", "d", backfillTS.Prev(), jobspb.ResolvedSpan_BACKFILL, statementTime)
49-
testBoundarySpan(t, ctx, f, "d", "e", backfillTS.Prev(), jobspb.ResolvedSpan_BACKFILL, statementTime)
50-
testBoundarySpan(t, ctx, f, "e", "f", backfillTS.Prev(), jobspb.ResolvedSpan_BACKFILL, backfillTS.Prev())
43+
testBoundarySpan(t, f, "a", "b", backfillTS.Prev(), jobspb.ResolvedSpan_BACKFILL, statementTime)
44+
testBoundarySpan(t, f, "b", "c", backfillTS.Prev(), jobspb.ResolvedSpan_BACKFILL, statementTime)
45+
testBoundarySpan(t, f, "c", "d", backfillTS.Prev(), jobspb.ResolvedSpan_BACKFILL, statementTime)
46+
testBoundarySpan(t, f, "d", "e", backfillTS.Prev(), jobspb.ResolvedSpan_BACKFILL, statementTime)
47+
testBoundarySpan(t, f, "e", "f", backfillTS.Prev(), jobspb.ResolvedSpan_BACKFILL, backfillTS.Prev())
5148

52-
// Confirm that attempting to signal an earlier boundary causes an assertion error.
49+
// Verify that attempting to signal an earlier boundary causes an assertion error.
5350
illegalBoundaryTS := makeTS(15)
54-
testIllegalBoundarySpan(t, ctx, f, "a", "f", illegalBoundaryTS, jobspb.ResolvedSpan_BACKFILL)
55-
testIllegalBoundarySpan(t, ctx, f, "a", "f", illegalBoundaryTS, jobspb.ResolvedSpan_RESTART)
56-
testIllegalBoundarySpan(t, ctx, f, "a", "f", illegalBoundaryTS, jobspb.ResolvedSpan_EXIT)
51+
testIllegalBoundarySpan(t, f, "a", "f", illegalBoundaryTS, jobspb.ResolvedSpan_BACKFILL)
52+
testIllegalBoundarySpan(t, f, "a", "f", illegalBoundaryTS, jobspb.ResolvedSpan_RESTART)
53+
testIllegalBoundarySpan(t, f, "a", "f", illegalBoundaryTS, jobspb.ResolvedSpan_EXIT)
54+
55+
// Verify that attempting to signal a boundary at the latest boundary time with a different
56+
// boundary type causes an assertion error.
57+
testIllegalBoundarySpan(t, f, "a", "f", backfillTS.Prev(), jobspb.ResolvedSpan_RESTART)
58+
testIllegalBoundarySpan(t, f, "a", "f", backfillTS.Prev(), jobspb.ResolvedSpan_EXIT)
5759

5860
// Forward spans representing actual backfill.
59-
testBackfillSpan(t, ctx, f, "d", "e", backfillTS, backfillTS.Prev())
60-
testBackfillSpan(t, ctx, f, "e", "f", backfillTS, backfillTS.Prev())
61-
testBackfillSpan(t, ctx, f, "a", "d", backfillTS, backfillTS)
61+
testBackfillSpan(t, f, "d", "e", backfillTS, backfillTS.Prev())
62+
testBackfillSpan(t, f, "e", "f", backfillTS, backfillTS.Prev())
63+
testBackfillSpan(t, f, "a", "d", backfillTS, backfillTS)
6264

6365
// Forward spans signalling a restart is required.
6466
restartTS := makeTS(30)
65-
testBoundarySpan(t, ctx, f, "a", "b", restartTS.Prev(), jobspb.ResolvedSpan_RESTART, backfillTS)
66-
testBoundarySpan(t, ctx, f, "b", "f", restartTS.Prev(), jobspb.ResolvedSpan_RESTART, restartTS.Prev())
67+
testBoundarySpan(t, f, "a", "b", restartTS.Prev(), jobspb.ResolvedSpan_RESTART, backfillTS)
68+
testBoundarySpan(t, f, "b", "f", restartTS.Prev(), jobspb.ResolvedSpan_RESTART, restartTS.Prev())
6769

6870
// Simulate restarting by creating a new frontier with the initial highwater
6971
// set to the previous frontier timestamp.
@@ -76,21 +78,19 @@ func TestAggregatorFrontier(t *testing.T) {
7678
require.NoError(t, err)
7779

7880
// Forward spans representing post-restart backfill.
79-
testBackfillSpan(t, ctx, f, "a", "b", restartTS, initialHighwater)
80-
testBackfillSpan(t, ctx, f, "e", "f", restartTS, initialHighwater)
81-
testBackfillSpan(t, ctx, f, "b", "e", restartTS, restartTS)
81+
testBackfillSpan(t, f, "a", "b", restartTS, initialHighwater)
82+
testBackfillSpan(t, f, "e", "f", restartTS, initialHighwater)
83+
testBackfillSpan(t, f, "b", "e", restartTS, restartTS)
8284

8385
// Forward spans signalling an exit is required.
8486
exitTS := makeTS(40)
85-
testBoundarySpan(t, ctx, f, "a", "f", exitTS.Prev(), jobspb.ResolvedSpan_EXIT, exitTS.Prev())
87+
testBoundarySpan(t, f, "a", "f", exitTS.Prev(), jobspb.ResolvedSpan_EXIT, exitTS.Prev())
8688
}
8789

8890
func TestCoordinatorFrontier(t *testing.T) {
8991
defer leaktest.AfterTest(t)()
9092
defer log.Scope(t).Close(t)
9193

92-
ctx := context.Background()
93-
9494
// Create a fresh frontier with no progress.
9595
statementTime := makeTS(10)
9696
var initialHighwater hlc.Timestamp
@@ -103,35 +103,40 @@ func TestCoordinatorFrontier(t *testing.T) {
103103
require.Equal(t, initialHighwater, f.Frontier())
104104

105105
// Forward spans representing initial scan.
106-
testBackfillSpan(t, ctx, f, "a", "b", statementTime, initialHighwater)
107-
testBackfillSpan(t, ctx, f, "b", "f", statementTime, statementTime)
106+
testBackfillSpan(t, f, "a", "b", statementTime, initialHighwater)
107+
testBackfillSpan(t, f, "b", "f", statementTime, statementTime)
108108

109109
// Forward span signalling a backfill is required.
110110
backfillTS1 := makeTS(15)
111-
testBoundarySpan(t, ctx, f, "a", "b", backfillTS1.Prev(), jobspb.ResolvedSpan_BACKFILL, statementTime)
111+
testBoundarySpan(t, f, "a", "b", backfillTS1.Prev(), jobspb.ResolvedSpan_BACKFILL, statementTime)
112112

113113
// Forward span signalling another backfill is required (simulates multiple
114114
// aggregators progressing at different speeds).
115115
backfillTS2 := makeTS(20)
116-
testBackfillSpan(t, ctx, f, "a", "b", backfillTS1, statementTime)
117-
testBoundarySpan(t, ctx, f, "a", "b", backfillTS2.Prev(), jobspb.ResolvedSpan_BACKFILL, statementTime)
116+
testBackfillSpan(t, f, "a", "b", backfillTS1, statementTime)
117+
testBoundarySpan(t, f, "a", "b", backfillTS2.Prev(), jobspb.ResolvedSpan_BACKFILL, statementTime)
118118

119119
// Verify that spans signalling backfills at earlier timestamp are allowed.
120-
testBoundarySpan(t, ctx, f, "b", "c", backfillTS1.Prev(), jobspb.ResolvedSpan_BACKFILL, statementTime)
120+
testBoundarySpan(t, f, "b", "c", backfillTS1.Prev(), jobspb.ResolvedSpan_BACKFILL, statementTime)
121121

122122
// Verify that no other boundary spans at earlier timestamp are allowed.
123-
testIllegalBoundarySpan(t, ctx, f, "a", "f", backfillTS1.Prev(), jobspb.ResolvedSpan_RESTART)
124-
testIllegalBoundarySpan(t, ctx, f, "a", "f", backfillTS1.Prev(), jobspb.ResolvedSpan_EXIT)
123+
testIllegalBoundarySpan(t, f, "a", "f", backfillTS1.Prev(), jobspb.ResolvedSpan_RESTART)
124+
testIllegalBoundarySpan(t, f, "a", "f", backfillTS1.Prev(), jobspb.ResolvedSpan_EXIT)
125+
126+
// Verify that attempting to signal a boundary at the latest boundary time with a different
127+
// boundary type causes an assertion error.
128+
testIllegalBoundarySpan(t, f, "a", "f", backfillTS2.Prev(), jobspb.ResolvedSpan_RESTART)
129+
testIllegalBoundarySpan(t, f, "a", "f", backfillTS2.Prev(), jobspb.ResolvedSpan_EXIT)
125130

126131
// Forward spans completing first backfill and signalling and completing second backfill.
127-
testBackfillSpan(t, ctx, f, "b", "f", backfillTS1, backfillTS1)
128-
testBoundarySpan(t, ctx, f, "b", "f", backfillTS2.Prev(), jobspb.ResolvedSpan_BACKFILL, backfillTS2.Prev())
129-
testBackfillSpan(t, ctx, f, "a", "f", backfillTS2, backfillTS2)
132+
testBackfillSpan(t, f, "b", "f", backfillTS1, backfillTS1)
133+
testBoundarySpan(t, f, "b", "f", backfillTS2.Prev(), jobspb.ResolvedSpan_BACKFILL, backfillTS2.Prev())
134+
testBackfillSpan(t, f, "a", "f", backfillTS2, backfillTS2)
130135

131136
// Forward spans signalling a restart is required.
132137
restartTS := makeTS(30)
133-
testBoundarySpan(t, ctx, f, "a", "b", restartTS.Prev(), jobspb.ResolvedSpan_RESTART, backfillTS2)
134-
testBoundarySpan(t, ctx, f, "b", "f", restartTS.Prev(), jobspb.ResolvedSpan_RESTART, restartTS.Prev())
138+
testBoundarySpan(t, f, "a", "b", restartTS.Prev(), jobspb.ResolvedSpan_RESTART, backfillTS2)
139+
testBoundarySpan(t, f, "b", "f", restartTS.Prev(), jobspb.ResolvedSpan_RESTART, restartTS.Prev())
135140

136141
// Simulate restarting by creating a new frontier with the initial highwater
137142
// set to the previous frontier timestamp.
@@ -144,13 +149,13 @@ func TestCoordinatorFrontier(t *testing.T) {
144149
require.NoError(t, err)
145150

146151
// Forward spans representing post-restart backfill.
147-
testBackfillSpan(t, ctx, f, "a", "b", restartTS, initialHighwater)
148-
testBackfillSpan(t, ctx, f, "e", "f", restartTS, initialHighwater)
149-
testBackfillSpan(t, ctx, f, "b", "e", restartTS, restartTS)
152+
testBackfillSpan(t, f, "a", "b", restartTS, initialHighwater)
153+
testBackfillSpan(t, f, "e", "f", restartTS, initialHighwater)
154+
testBackfillSpan(t, f, "b", "e", restartTS, restartTS)
150155

151156
// Forward spans signalling an exit is required.
152157
exitTS := makeTS(40)
153-
testBoundarySpan(t, ctx, f, "a", "f", exitTS.Prev(), jobspb.ResolvedSpan_EXIT, exitTS.Prev())
158+
testBoundarySpan(t, f, "a", "f", exitTS.Prev(), jobspb.ResolvedSpan_EXIT, exitTS.Prev())
154159
}
155160

156161
type frontier interface {
@@ -162,12 +167,7 @@ type frontier interface {
162167
}
163168

164169
func testBackfillSpan(
165-
t *testing.T,
166-
ctx context.Context,
167-
f frontier,
168-
start, end string,
169-
ts hlc.Timestamp,
170-
frontierAfterSpan hlc.Timestamp,
170+
t *testing.T, f frontier, start, end string, ts hlc.Timestamp, frontierAfterSpan hlc.Timestamp,
171171
) {
172172
backfillSpan := makeResolvedSpan(start, end, ts, jobspb.ResolvedSpan_NONE)
173173
require.True(t, f.InBackfill(backfillSpan))
@@ -178,7 +178,6 @@ func testBackfillSpan(
178178

179179
func testBoundarySpan(
180180
t *testing.T,
181-
ctx context.Context,
182181
f frontier,
183182
start, end string,
184183
boundaryTS hlc.Timestamp,
@@ -213,7 +212,6 @@ func testBoundarySpan(
213212

214213
func testIllegalBoundarySpan(
215214
t *testing.T,
216-
ctx context.Context,
217215
f frontier,
218216
start, end string,
219217
boundaryTS hlc.Timestamp,

0 commit comments

Comments
 (0)