Skip to content

Commit 98e52c1

Browse files
committed
changefeedccl: warn user when creating changefeed
with an older cursor Older but valid cursors can cause changefeeds to lag behind a lot. To ensure that customers are not accidentally using a cursor that's too old, we now provide a warning if the provided cursor is older than 5 hours. This warning would not show if the changefeed option is initial_scan='only' as the changefeed would never lag behind with this option. The warning looks like: NOTICE: the provided cursor is 7 hours old; older cursors can result in increased changefeed latency Resolves #124290 Epic: CRDB-32401 Release note (general): Provide a warning when creating an enterprise changefeed with a cursor older than 5 hours
1 parent 7984c75 commit 98e52c1

File tree

3 files changed

+127
-14
lines changed

3 files changed

+127
-14
lines changed

pkg/ccl/changefeedccl/changefeed_stmt.go

Lines changed: 68 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,49 @@ func changefeedTypeCheck(
133133
return true, withSinkHeader, nil
134134
}
135135

136+
func maybeShowCursorAgeWarning(
137+
ctx context.Context, p sql.PlanHookState, opts changefeedbase.StatementOptions,
138+
) error {
139+
st, err := opts.GetInitialScanType()
140+
if err != nil {
141+
return err
142+
}
143+
144+
if !opts.HasStartCursor() || st == changefeedbase.OnlyInitialScan {
145+
return nil
146+
}
147+
statementTS := p.ExtendedEvalContext().GetStmtTimestamp().UnixNano()
148+
cursorTS, err := evalCursor(ctx, p, hlc.Timestamp{WallTime: statementTS}, opts.GetCursor())
149+
if err != nil {
150+
return err
151+
}
152+
153+
warningAge := int64(5 * time.Hour)
154+
cursorAge := func() int64 {
155+
knobs, _ := p.ExecCfg().DistSQLSrv.TestingKnobs.Changefeed.(*TestingKnobs)
156+
if knobs != nil && knobs.OverrideCursorAge != nil {
157+
return knobs.OverrideCursorAge()
158+
}
159+
return statementTS - cursorTS.WallTime
160+
}()
161+
162+
if cursorAge > warningAge {
163+
err = p.SendClientNotice(ctx,
164+
pgnotice.Newf(
165+
`the provided cursor is %d hours old; `+
166+
`older cursors can result in increased changefeed latency`,
167+
cursorAge/int64(time.Hour),
168+
),
169+
true,
170+
)
171+
if err != nil {
172+
return err
173+
}
174+
}
175+
176+
return nil
177+
}
178+
136179
// changefeedPlanHook implements sql.planHookFn.
137180
func changefeedPlanHook(
138181
ctx context.Context, stmt tree.Statement, p sql.PlanHookState,
@@ -229,6 +272,9 @@ func changefeedPlanHook(
229272

230273
telemetry.Count(`changefeed.create.core`)
231274
logChangefeedCreateTelemetry(ctx, jr, changefeedStmt.Select != nil)
275+
if err := maybeShowCursorAgeWarning(ctx, p, opts); err != nil {
276+
return err
277+
}
232278

233279
err := coreChangefeed(ctx, p, details, description, progress, resultsCh)
234280
// TODO(yevgeniy): This seems wrong -- core changefeeds always terminate
@@ -314,6 +360,10 @@ func changefeedPlanHook(
314360
return err
315361
}
316362

363+
if err := maybeShowCursorAgeWarning(ctx, p, opts); err != nil {
364+
return err
365+
}
366+
317367
logChangefeedCreateTelemetry(ctx, jr, changefeedStmt.Select != nil)
318368

319369
select {
@@ -388,6 +438,22 @@ func coreChangefeed(
388438
}
389439
}
390440

441+
func evalCursor(
442+
ctx context.Context, p sql.PlanHookState, statementTime hlc.Timestamp, timeString string,
443+
) (hlc.Timestamp, error) {
444+
if knobs, ok := p.ExecCfg().DistSQLSrv.TestingKnobs.Changefeed.(*TestingKnobs); ok {
445+
if knobs != nil && knobs.OverrideCursor != nil {
446+
timeString = knobs.OverrideCursor(&statementTime)
447+
}
448+
}
449+
asOfClause := tree.AsOfClause{Expr: tree.NewStrVal(timeString)}
450+
asOf, err := p.EvalAsOfTimestamp(ctx, asOfClause)
451+
if err != nil {
452+
return hlc.Timestamp{}, err
453+
}
454+
return asOf.Timestamp, nil
455+
}
456+
391457
func createChangefeedJobRecord(
392458
ctx context.Context,
393459
p sql.PlanHookState,
@@ -408,22 +474,10 @@ func createChangefeedJobRecord(
408474
WallTime: p.ExtendedEvalContext().GetStmtTimestamp().UnixNano(),
409475
}
410476
var initialHighWater hlc.Timestamp
411-
evalTimestamp := func(s string) (hlc.Timestamp, error) {
412-
if knobs, ok := p.ExecCfg().DistSQLSrv.TestingKnobs.Changefeed.(*TestingKnobs); ok {
413-
if knobs != nil && knobs.OverrideCursor != nil {
414-
s = knobs.OverrideCursor(&statementTime)
415-
}
416-
}
417-
asOfClause := tree.AsOfClause{Expr: tree.NewStrVal(s)}
418-
asOf, err := p.EvalAsOfTimestamp(ctx, asOfClause)
419-
if err != nil {
420-
return hlc.Timestamp{}, err
421-
}
422-
return asOf.Timestamp, nil
423-
}
477+
424478
if opts.HasStartCursor() {
425479
var err error
426-
initialHighWater, err = evalTimestamp(opts.GetCursor())
480+
initialHighWater, err = evalCursor(ctx, p, statementTime, opts.GetCursor())
427481
if err != nil {
428482
return nil, err
429483
}

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5864,6 +5864,62 @@ func TestChangefeedOutdatedCursor(t *testing.T) {
58645864
cdcTestWithSystem(t, testFn, feedTestNoTenants)
58655865
}
58665866

5867+
// TestChangefeedCursorWarning ensures that we show a warning if
5868+
// any of the tables we're creating a changefeed is past
5869+
// the warning threshold.
5870+
func TestChangefeedCursorAgeWarning(t *testing.T) {
5871+
defer leaktest.AfterTest(t)()
5872+
defer log.Scope(t).Close(t)
5873+
5874+
var cursorAges = []time.Duration{
5875+
time.Hour,
5876+
6 * time.Hour,
5877+
}
5878+
5879+
testutils.RunValues(t, "cursor age", cursorAges, func(t *testing.T, cursorAge time.Duration) {
5880+
s, stopServer := makeServer(t)
5881+
defer stopServer()
5882+
knobs := s.TestingKnobs.
5883+
DistSQL.(*execinfra.TestingKnobs).
5884+
Changefeed.(*TestingKnobs)
5885+
knobs.OverrideCursorAge = func() int64 {
5886+
return int64(cursorAge)
5887+
}
5888+
5889+
warning := fmt.Sprintf(
5890+
"the provided cursor is %d hours old; older cursors can result in increased changefeed latency",
5891+
int64(cursorAge/time.Hour))
5892+
noWarning := "(no notice)"
5893+
5894+
expectedWarning := func(initial_scan string) string {
5895+
if cursorAge == time.Hour || initial_scan == "only" {
5896+
return noWarning
5897+
}
5898+
return warning
5899+
}
5900+
5901+
sqlDB := sqlutils.MakeSQLRunner(s.DB)
5902+
sqlDB.Exec(t, `CREATE TABLE f (a INT PRIMARY KEY)`)
5903+
sqlDB.Exec(t, `INSERT INTO f VALUES (1)`)
5904+
timeNow := strings.Split(s.Server.Clock().Now().AsOfSystemTime(), ".")[0]
5905+
5906+
expectNotice(t, s.Server,
5907+
fmt.Sprintf(
5908+
`CREATE CHANGEFEED FOR TABLE d.f INTO 'null://' with cursor = '%s', initial_scan='only'`,
5909+
timeNow), expectedWarning("only"))
5910+
5911+
expectNotice(t, s.Server,
5912+
fmt.Sprintf(
5913+
`CREATE CHANGEFEED FOR TABLE d.f INTO 'null://' with cursor = '%s', initial_scan='yes'`,
5914+
timeNow), expectedWarning("yes"))
5915+
5916+
expectNotice(t, s.Server,
5917+
fmt.Sprintf(
5918+
`CREATE CHANGEFEED FOR TABLE d.f INTO 'null://' with cursor = '%s', initial_scan='no'`,
5919+
timeNow), expectedWarning("no"))
5920+
})
5921+
}
5922+
58675923
// TestChangefeedSchemaTTL ensures that changefeeds fail with an error in the case
58685924
// where the feed has fallen behind the GC TTL of the table's schema.
58695925
func TestChangefeedSchemaTTL(t *testing.T) {

pkg/ccl/changefeedccl/testing_knobs.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,9 @@ type TestingKnobs struct {
108108

109109
// WrapTelemetryLogger is used to wrap the periodic telemetry logger in tests.
110110
WrapTelemetryLogger func(logger telemetryLogger) telemetryLogger
111+
112+
// OverrideCursorAge is used to change how old a cursor is. Returns time in nanoseconds.
113+
OverrideCursorAge func() int64
111114
}
112115

113116
// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.

0 commit comments

Comments
 (0)