Skip to content

Commit 2d8fc98

Browse files
committed
changefeedccl: emit warning when resolved or min_checkpoint_frequency is set too low
This change adds client-side notices to `CREATE CHANGEFEED` and `ALTER CHANGEFEED` statements when the `resolved` or `min_checkpoint_frequency` options are set below a recommended threshold (e.g., 500ms). These warnings aim to guide users toward more balanced configurations. Setting these options too low can significantly increase CPU usage due to more frequent checkpointing and resolved timestamp emissions, introducing performance trade-offs. Epic: CRDB-52074 Fixes #149238 Release note (general change): A warning is now emitted when creating or altering a changefeed with `resolved` or `min_checkpoint_frequency` set below 500ms. This helps users understand the tradeoff between message latency and cluster CPU usage.
1 parent 9b7bfc2 commit 2d8fc98

File tree

2 files changed

+77
-3
lines changed

2 files changed

+77
-3
lines changed

pkg/ccl/changefeedccl/changefeed_stmt.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -833,6 +833,20 @@ func createChangefeedJobRecord(
833833
"less frequently", resolved, resolvedStr, freqStr, freq))
834834
}
835835

836+
const minRecommendedFrequency = 500 * time.Millisecond
837+
838+
if emit && resolvedOpt != nil && *resolvedOpt < minRecommendedFrequency {
839+
p.BufferClientNotice(ctx, pgnotice.Newf(
840+
"the 'resolved' timestamp interval (%s) is very low; consider increasing it to at least %s",
841+
resolvedOpt, minRecommendedFrequency))
842+
}
843+
844+
if freqOpt != nil && *freqOpt < minRecommendedFrequency {
845+
p.BufferClientNotice(ctx, pgnotice.Newf(
846+
"the 'min_checkpoint_frequency' timestamp interval (%s) is very low; consider increasing it to at least %s",
847+
freqOpt, minRecommendedFrequency))
848+
}
849+
836850
ptsExpiration, err := opts.GetPTSExpiration()
837851
if err != nil {
838852
return nil, err

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 63 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5074,6 +5074,10 @@ func TestChangefeedResolvedNotice(t *testing.T) {
50745074
defer cleanup()
50755075
s := cluster.Server(1)
50765076

5077+
// Set the default min_checkpoint_frequency to 30 seconds for this test
5078+
restoreDefault := changefeedbase.TestingSetDefaultMinCheckpointFrequency(30 * time.Second)
5079+
defer restoreDefault()
5080+
50775081
pgURL, cleanup := pgurlutils.PGUrl(t, s.SQLAddr(), t.Name(), url.User(username.RootUser))
50785082
defer cleanup()
50795083
pgBase, err := pq.NewConnector(pgURL.String())
@@ -5103,10 +5107,9 @@ func TestChangefeedResolvedNotice(t *testing.T) {
51035107
t.Run("resolved<min_checkpoint_frequency default", func(t *testing.T) {
51045108
actual = "(no notice)"
51055109
f := makeKafkaFeedFactory(t, s, dbWithHandler)
5106-
testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'kafka://does.not.matter/' WITH resolved='20ms'`)
5110+
testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'kafka://does.not.matter/' WITH resolved='5s'`)
51075111
defer closeFeed(t, testFeed)
5108-
// Note: default min_checkpoint_frequency is set to 100ms in startTestCluster.
5109-
require.Equal(t, `resolved (20ms) messages will not be emitted more frequently than the default min_checkpoint_frequency (100ms), but may be emitted less frequently`, actual)
5112+
require.Equal(t, `resolved (5s) messages will not be emitted more frequently than the default min_checkpoint_frequency (30s), but may be emitted less frequently`, actual)
51105113
})
51115114
t.Run("resolved=min_checkpoint_frequency", func(t *testing.T) {
51125115
actual = "(no notice)"
@@ -5131,6 +5134,63 @@ func TestChangefeedResolvedNotice(t *testing.T) {
51315134
})
51325135
}
51335136

5137+
func TestChangefeedLowFrequencyNotices(t *testing.T) {
5138+
defer leaktest.AfterTest(t)()
5139+
defer log.Scope(t).Close(t)
5140+
5141+
cluster, _, cleanup := startTestCluster(t)
5142+
defer cleanup()
5143+
s := cluster.Server(1)
5144+
5145+
pgURL, cleanup := pgurlutils.PGUrl(t, s.SQLAddr(), t.Name(), url.User(username.RootUser))
5146+
defer cleanup()
5147+
pgBase, err := pq.NewConnector(pgURL.String())
5148+
if err != nil {
5149+
t.Fatal(err)
5150+
}
5151+
var actual string
5152+
connector := pq.ConnectorWithNoticeHandler(pgBase, func(n *pq.Error) {
5153+
actual = n.Message
5154+
})
5155+
5156+
dbWithHandler := gosql.OpenDB(connector)
5157+
defer dbWithHandler.Close()
5158+
5159+
sqlDB := sqlutils.MakeSQLRunner(dbWithHandler)
5160+
5161+
sqlDB.Exec(t, `CREATE TABLE ☃ (i INT PRIMARY KEY)`)
5162+
sqlDB.Exec(t, `INSERT INTO ☃ VALUES (0)`)
5163+
5164+
t.Run("no options specified", func(t *testing.T) {
5165+
actual = "(no notice)"
5166+
f := makeKafkaFeedFactory(t, s, dbWithHandler)
5167+
testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'kafka://does.not.matter/'`)
5168+
defer closeFeed(t, testFeed)
5169+
require.Equal(t, `changefeed will emit to topic _u2603_`, actual)
5170+
})
5171+
t.Run("normal resolved and min_checkpoint_frequency", func(t *testing.T) {
5172+
actual = "(no notice)"
5173+
f := makeKafkaFeedFactory(t, s, dbWithHandler)
5174+
testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'kafka://does.not.matter/' WITH resolved='10s', min_checkpoint_frequency='10s'`)
5175+
defer closeFeed(t, testFeed)
5176+
require.Equal(t, `changefeed will emit to topic _u2603_`, actual)
5177+
})
5178+
t.Run("low resolved timestamp", func(t *testing.T) {
5179+
actual = "(no notice)"
5180+
f := makeKafkaFeedFactory(t, s, dbWithHandler)
5181+
testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'kafka://does.not.matter/' WITH resolved='200ms'`)
5182+
defer closeFeed(t, testFeed)
5183+
require.Equal(t, `the 'resolved' timestamp interval (200ms) is very low; consider increasing it to at least 500ms`, actual)
5184+
})
5185+
t.Run("low min_checkpoint_frequency timestamp", func(t *testing.T) {
5186+
actual = "(no notice)"
5187+
f := makeKafkaFeedFactory(t, s, dbWithHandler)
5188+
testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'kafka://does.not.matter/' WITH min_checkpoint_frequency='200ms'`)
5189+
defer closeFeed(t, testFeed)
5190+
require.Equal(t, `the 'min_checkpoint_frequency' timestamp interval (200ms) is very low; consider increasing it to at least 500ms`, actual)
5191+
})
5192+
}
5193+
51345194
func TestChangefeedOutputTopics(t *testing.T) {
51355195
defer leaktest.AfterTest(t)()
51365196
defer log.Scope(t).Close(t)

0 commit comments

Comments
 (0)