Skip to content

Commit 9346663

Browse files
craig[bot]aerfrei
andcommitted
Merge #158146
158146: changefeedccl: respect full_table_name for db-level feeds r=log-head,asg0451 a=aerfrei Previously, DB-level feeds would emit messages with plain table neames, even when full_table_name was specified. Now, we respect this option. Epic: none Fixes: #158145 Release note: None Co-authored-by: Aerin Freilich <[email protected]>
2 parents eb2465b + 393d45b commit 9346663

File tree

2 files changed

+45
-5
lines changed

2 files changed

+45
-5
lines changed

pkg/ccl/changefeedccl/changefeed.go

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,10 @@ func AllTargets(
7979
if len(cd.TargetSpecifications) > 1 {
8080
return changefeedbase.Targets{}, errors.AssertionFailedf("database-level changefeed is not supported with multiple targets")
8181
}
82-
targets, err = getTargetsFromDatabaseSpec(ctx, ts, execCfg, timestamp)
82+
_, useFullTableName := cd.Opts[changefeedbase.OptFullTableName]
83+
targets, err = getTargetsFromDatabaseSpec(
84+
ctx, ts, execCfg, timestamp, useFullTableName,
85+
)
8386
if err != nil {
8487
return changefeedbase.Targets{}, err
8588
}
@@ -117,6 +120,7 @@ func getTargetsFromDatabaseSpec(
117120
ts jobspb.ChangefeedTargetSpecification,
118121
execCfg *sql.ExecutorConfig,
119122
timestamp hlc.Timestamp,
123+
useFullTableName bool,
120124
) (targets changefeedbase.Targets, err error) {
121125
err = sql.DescsTxn(ctx, execCfg, func(
122126
ctx context.Context, txn isql.Txn, descs *descs.Collection,
@@ -172,15 +176,21 @@ func getTargetsFromDatabaseSpec(
172176
tableType = jobspb.ChangefeedTargetSpecification_EACH_FAMILY
173177
}
174178

179+
tableName := func() string {
180+
if useFullTableName {
181+
return fullyQualifiedTableName
182+
}
183+
return desc.GetName()
184+
}()
175185
targets.Add(changefeedbase.Target{
176186
Type: tableType,
177187
DescID: desc.GetID(),
178-
StatementTimeName: changefeedbase.StatementTimeName(desc.GetName()),
188+
StatementTimeName: changefeedbase.StatementTimeName(tableName),
179189
})
180190
}
181191
case tree.IncludeFilter:
182-
for name := range ts.FilterList.Tables {
183-
tn, err := parser.ParseTableName(name)
192+
for fullyQualifiedTableName := range ts.FilterList.Tables {
193+
tn, err := parser.ParseTableName(fullyQualifiedTableName)
184194
if err != nil {
185195
return err
186196
}
@@ -215,10 +225,16 @@ func getTargetsFromDatabaseSpec(
215225
tableType = jobspb.ChangefeedTargetSpecification_EACH_FAMILY
216226
}
217227

228+
tableName := func() string {
229+
if useFullTableName {
230+
return fullyQualifiedTableName
231+
}
232+
return desc.GetName()
233+
}()
218234
targets.Add(changefeedbase.Target{
219235
Type: tableType,
220236
DescID: tableID,
221-
StatementTimeName: changefeedbase.StatementTimeName(desc.GetName()),
237+
StatementTimeName: changefeedbase.StatementTimeName(tableName),
222238
})
223239
}
224240
default:

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1374,6 +1374,30 @@ func TestChangefeedFullTableName(t *testing.T) {
13741374
})
13751375
}
13761376

1377+
func TestDatabaseLevelChangefeedWithFullTableName(t *testing.T) {
1378+
defer leaktest.AfterTest(t)()
1379+
defer log.Scope(t).Close(t)
1380+
1381+
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
1382+
sqlDB := sqlutils.MakeSQLRunner(s.DB)
1383+
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`)
1384+
1385+
normal := feed(t, f, `CREATE CHANGEFEED FOR DATABASE d WITH full_table_name`)
1386+
defer closeFeed(t, normal)
1387+
include := feed(t, f, `CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES foo WITH full_table_name`)
1388+
defer closeFeed(t, include)
1389+
exclude := feed(t, f, `CREATE CHANGEFEED FOR DATABASE d EXCLUDE TABLES bar WITH full_table_name`)
1390+
defer closeFeed(t, exclude)
1391+
1392+
sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 'a')`)
1393+
assertPayloads(t, normal, []string{`d.public.foo: [1]->{"after": {"a": 1, "b": "a"}}`})
1394+
assertPayloads(t, include, []string{`d.public.foo: [1]->{"after": {"a": 1, "b": "a"}}`})
1395+
assertPayloads(t, exclude, []string{`d.public.foo: [1]->{"after": {"a": 1, "b": "a"}}`})
1396+
}
1397+
1398+
cdcTest(t, testFn, feedTestEnterpriseSinks)
1399+
}
1400+
13771401
func TestChangefeedMultiTable(t *testing.T) {
13781402
defer leaktest.AfterTest(t)()
13791403
defer log.Scope(t).Close(t)

0 commit comments

Comments
 (0)