Skip to content

Commit 90175f5

Browse files
committed
changefeedccl: remove extra characters from topic names
Previously, when creating topic names, we would include quotes around identifiers that included capital letters resulting in some unexpected topic names. We removed this behavior from when we render table names as strings. This change is behind a flag: feature.changefeed.bare_table_names. That flag is on by default. Epic: none Fixes: #148181 Release note (sql change): Fix a bug where extra quotes or escaped quote characters would be added to topic names in changefeeds. Can be turned off by setting feature.changefeed.bare_table_names to false.
1 parent 44853b2 commit 90175f5

File tree

4 files changed

+92
-1
lines changed

4 files changed

+92
-1
lines changed

pkg/ccl/changefeedccl/changefeed_stmt.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1817,6 +1817,9 @@ func getQualifiedTableName(
18171817
if err != nil {
18181818
return "", err
18191819
}
1820+
if changefeedbase.UseBareTableNames.Get(&execCfg.Settings.SV) {
1821+
return tree.AsStringWithFlags(&tbName, tree.FmtBareIdentifiers), nil
1822+
}
18201823
return tbName.String(), nil
18211824
}
18221825

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 73 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -788,6 +788,78 @@ func TestChangefeedBasicConfluentKafka(t *testing.T) {
788788
cdcTest(t, testFn, feedTestForceSink("kafka"))
789789
}
790790

791+
func TestChangefeedQuotedTableNameTopicName(t *testing.T) {
792+
defer leaktest.AfterTest(t)()
793+
defer log.Scope(t).Close(t)
794+
795+
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
796+
sqlDB := sqlutils.MakeSQLRunner(s.DB)
797+
sqlDB.Exec(t, `CREATE TABLE "MyTable" (a INT PRIMARY KEY, b STRING)`)
798+
sqlDB.Exec(t, `INSERT INTO "MyTable" VALUES (0, 'initial')`)
799+
sqlDB.Exec(t, `UPSERT INTO "MyTable" VALUES (0, 'updated')`)
800+
801+
foo := feed(t, f,
802+
`CREATE CHANGEFEED FOR d.public."MyTable" WITH diff, full_table_name`)
803+
defer closeFeed(t, foo)
804+
805+
// The topic name should be d.public.MyTable and not d.public._u0022_MyTable_u0022_
806+
// or d.public."MyTable".
807+
assertPayloads(t, foo, []string{
808+
`d.public.MyTable: [0]->{"after": {"a": 0, "b": "updated"}, "before": null}`,
809+
})
810+
}
811+
812+
cdcTest(t, testFn)
813+
}
814+
815+
// TestChangefeedQuotedIdentifiersTopicName is similar to
816+
// TestChangefeedQuotedTableNameTopicName, but for quoted identifiers
817+
// in the SELECT clause instead of the table name.
818+
func TestChangefeedQuotedIdentifiersTopicName(t *testing.T) {
819+
defer leaktest.AfterTest(t)()
820+
defer log.Scope(t).Close(t)
821+
822+
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
823+
sqlDB := sqlutils.MakeSQLRunner(s.DB)
824+
825+
sqlDB.Exec(t, `CREATE TABLE mytable (
826+
id INT PRIMARY KEY,
827+
"SomeField" JSONB,
828+
"AnotherField" JSONB
829+
)`)
830+
831+
sqlDB.Exec(t, `INSERT INTO mytable VALUES (
832+
1,
833+
'{"PropA": "value1", "prop_b": "value2"}'::jsonb,
834+
'{"PropC": "value3", "prop_d": "value4"}'::jsonb
835+
)`)
836+
837+
sqlDB.Exec(t, `INSERT INTO mytable VALUES (
838+
2,
839+
'{"PropA": "value5", "prop_b": "value6"}'::jsonb,
840+
'{"PropC": "value7", "prop_d": "value8"}'::jsonb
841+
)`)
842+
843+
foo := feed(t, f, `CREATE CHANGEFEED WITH diff, full_table_name, on_error=pause, envelope=wrapped AS SELECT
844+
id,
845+
"SomeField"->>'PropA' AS "PropA",
846+
"SomeField"->>'prop_b' AS "PropB",
847+
"AnotherField"->>'PropC' AS "PropC",
848+
"AnotherField"->>'prop_d' AS "PropD"
849+
FROM public.mytable`)
850+
defer closeFeed(t, foo)
851+
852+
// The topic should show up as d.public.mytable and not as
853+
// d.public.u0022_mytable_u0022 or d.public."MyTable".
854+
assertPayloads(t, foo, []string{
855+
`d.public.mytable: [1]->{"after": {"PropA": "value1", "PropB": "value2", "PropC": "value3", "PropD": "value4", "id": 1}, "before": null}`,
856+
`d.public.mytable: [2]->{"after": {"PropA": "value5", "PropB": "value6", "PropC": "value7", "PropD": "value8", "id": 2}, "before": null}`,
857+
})
858+
}
859+
860+
cdcTest(t, testFn)
861+
}
862+
791863
func TestChangefeedDiff(t *testing.T) {
792864
defer leaktest.AfterTest(t)()
793865
defer log.Scope(t).Close(t)
@@ -972,7 +1044,7 @@ func TestChangefeedFullTableName(t *testing.T) {
9721044
sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 'a')`)
9731045

9741046
t.Run(`envelope=row`, func(t *testing.T) {
975-
foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH full_table_name`, optOutOfMetamorphicEnrichedEnvelope{reason: "broken for webhook; see #145927"})
1047+
foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH full_table_name`)
9761048
defer closeFeed(t, foo)
9771049
assertPayloads(t, foo, []string{`d.public.foo: [1]->{"after": {"a": 1, "b": "a"}}`})
9781050
})

pkg/ccl/changefeedccl/changefeedbase/settings.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -380,3 +380,11 @@ var KafkaV2ErrorDetailsEnabled = settings.RegisterBoolSetting(
380380
true,
381381
settings.WithPublic,
382382
)
383+
384+
// UseBareTableNames is used to enable and disable the use of bare table names
385+
// in changefeed topics.
386+
var UseBareTableNames = settings.RegisterBoolSetting(
387+
settings.ApplicationLevel,
388+
"changefeed.bare_table_names.enabled",
389+
"set to true to use bare table names in changefeed topics, false to use quoted table names; default is true",
390+
true)

pkg/ccl/changefeedccl/helpers_test.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1082,6 +1082,14 @@ func maybeForceEnrichedEnvelope(
10821082
return create, args, false, nil
10831083
}
10841084
}
1085+
if strings.EqualFold(opt.Key.String(), "full_table_name") {
1086+
// TODO(#145927): full_table_name is not supported in enriched envelopes.
1087+
switch f.(type) {
1088+
case *webhookFeedFactory:
1089+
t.Logf("did not force enriched envelope for %s because full_table_name was specified for webhook sink", create)
1090+
return create, args, false, nil
1091+
}
1092+
}
10851093
if strings.EqualFold(opt.Key.String(), "envelope") {
10861094
envelopeKV = &opt
10871095
}

0 commit comments

Comments
 (0)