Skip to content

Commit 33813e6

Browse files
committed
cdc: include crdb_internal_table_id in changefeed source metadata
Previously, the metadata env of changefeed source objects did not include the CockroachDB internal table ID. This field is necessary downstream to uniquely identify tables even if their names change. This change adds the crdb_internal_table_id field to the source object within the metadata env, ensuring consistent identification of tables across renames and supporting downstream integrations. Fixes #143371 Epic: CRDB-48789 Release note (general change): Changefeed source metadata now includes the new crdb_internal_table_id field, enabling downstream consumers to uniquely identify tables even if table names change.
1 parent 6c161fa commit 33813e6

File tree

2 files changed

+149
-32
lines changed

2 files changed

+149
-32
lines changed

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 134 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -4437,6 +4437,10 @@ func TestChangefeedEnrichedSourceWithDataAvro(t *testing.T) {
44374437

44384438
sqlDB.Exec(t, `CREATE TABLE foo (i INT PRIMARY KEY)`)
44394439
sqlDB.Exec(t, `INSERT INTO foo values (0)`)
4440+
4441+
var tableID int
4442+
sqlDB.QueryRow(t, `SELECT table_id FROM crdb_internal.tables WHERE name = 'foo' AND database_name = 'd'`).Scan(&tableID)
4443+
44404444
stmt := `CREATE CHANGEFEED FOR foo WITH envelope=enriched, enriched_properties='source', format=avro`
44414445
if withMVCCTS {
44424446
stmt += ", mvcc_timestamp"
@@ -4474,12 +4478,13 @@ func TestChangefeedEnrichedSourceWithDataAvro(t *testing.T) {
44744478
var assertion string
44754479
assertionMap := map[string]any{
44764480
"source": map[string]any{
4477-
"changefeed_sink": map[string]any{"string": sink},
4478-
"cluster_id": map[string]any{"string": clusterID},
4479-
"cluster_name": map[string]any{"string": clusterName},
4480-
"database_name": map[string]any{"string": "d"},
4481-
"db_version": map[string]any{"string": dbVersion},
4482-
"job_id": map[string]any{"string": jobIDStr},
4481+
"changefeed_sink": map[string]any{"string": sink},
4482+
"cluster_id": map[string]any{"string": clusterID},
4483+
"cluster_name": map[string]any{"string": clusterName},
4484+
"crdb_internal_table_id": map[string]any{"int": tableID},
4485+
"database_name": map[string]any{"string": "d"},
4486+
"db_version": map[string]any{"string": dbVersion},
4487+
"job_id": map[string]any{"string": jobIDStr},
44834488
// Note that the field is still present in the avro schema, so it appears here as nil.
44844489
"mvcc_timestamp": nil,
44854490
"node_id": map[string]any{"string": nodeID},
@@ -4555,6 +4560,7 @@ func TestChangefeedEnrichedSourceWithDataJSON(t *testing.T) {
45554560

45564561
sqlDB.Exec(t, `CREATE TABLE foo (i INT PRIMARY KEY)`)
45574562
sqlDB.Exec(t, `INSERT INTO foo values (0)`)
4563+
45584564
stmt := `CREATE CHANGEFEED FOR foo WITH envelope=enriched, enriched_properties='source', format=json`
45594565
if withMVCCTS {
45604566
stmt += ", mvcc_timestamp"
@@ -4567,11 +4573,14 @@ func TestChangefeedEnrichedSourceWithDataJSON(t *testing.T) {
45674573

45684574
var jobID int64
45694575
var nodeName string
4576+
var tableID int
4577+
45704578
var sourceAssertion func(actualSource map[string]any)
45714579
if ef, ok := testFeed.(cdctest.EnterpriseTestFeed); ok {
45724580
jobID = int64(ef.JobID())
45734581
}
45744582
sqlDB.QueryRow(t, `SELECT value FROM crdb_internal.node_runtime_info where component = 'DB' and field = 'Host'`).Scan(&nodeName)
4583+
sqlDB.QueryRow(t, `SELECT table_id FROM crdb_internal.tables WHERE name = 'foo' AND database_name = 'd'`).Scan(&tableID)
45754584

45764585
sourceAssertion = func(actualSource map[string]any) {
45774586
nodeID := actualSource["node_id"]
@@ -4593,19 +4602,20 @@ func TestChangefeedEnrichedSourceWithDataJSON(t *testing.T) {
45934602

45944603
var assertion string
45954604
assertionMap := map[string]any{
4596-
"cluster_id": clusterID,
4597-
"cluster_name": clusterName,
4598-
"db_version": dbVersion,
4599-
"job_id": jobIDStr,
4600-
"node_id": nodeID,
4601-
"node_name": nodeName,
4602-
"origin": "cockroachdb",
4603-
"changefeed_sink": sink,
4604-
"source_node_locality": sourceNodeLocality,
4605-
"database_name": "d",
4606-
"schema_name": "public",
4607-
"table_name": "foo",
4608-
"primary_keys": []any{"i"},
4605+
"cluster_id": clusterID,
4606+
"cluster_name": clusterName,
4607+
"crdb_internal_table_id": tableID,
4608+
"db_version": dbVersion,
4609+
"job_id": jobIDStr,
4610+
"node_id": nodeID,
4611+
"node_name": nodeName,
4612+
"origin": "cockroachdb",
4613+
"changefeed_sink": sink,
4614+
"source_node_locality": sourceNodeLocality,
4615+
"database_name": "d",
4616+
"schema_name": "public",
4617+
"table_name": "foo",
4618+
"primary_keys": []any{"i"},
46094619
}
46104620
if withMVCCTS {
46114621
assertReasonableMVCCTimestamp(t, actualSource["mvcc_timestamp"].(string))
@@ -4666,6 +4676,10 @@ func TestChangefeedEnrichedSourceWithDataJSONWebhook(t *testing.T) {
46664676

46674677
sqlDB.Exec(t, `CREATE TABLE foo (i INT PRIMARY KEY)`)
46684678
sqlDB.Exec(t, `INSERT INTO foo values (0)`)
4679+
4680+
var tableID int
4681+
sqlDB.QueryRow(t, `SELECT table_id FROM crdb_internal.tables WHERE name = 'foo' AND database_name = 'd'`).Scan(&tableID)
4682+
46694683
stmt := `CREATE CHANGEFEED FOR foo WITH envelope=enriched, enriched_properties='source', format=json`
46704684
if withMVCCTS {
46714685
stmt += ", mvcc_timestamp"
@@ -4699,19 +4713,20 @@ func TestChangefeedEnrichedSourceWithDataJSONWebhook(t *testing.T) {
46994713

47004714
var assertion string
47014715
assertionMap := map[string]any{
4702-
"cluster_id": clusterID,
4703-
"cluster_name": clusterName,
4704-
"db_version": dbVersion,
4705-
"job_id": jobIDStr,
4706-
"node_id": nodeID,
4707-
"node_name": nodeName,
4708-
"origin": "cockroachdb",
4709-
"changefeed_sink": sink,
4710-
"source_node_locality": sourceNodeLocality,
4711-
"database_name": "d",
4712-
"schema_name": "public",
4713-
"table_name": "foo",
4714-
"primary_keys": []any{"i"},
4716+
"cluster_id": clusterID,
4717+
"cluster_name": clusterName,
4718+
"crdb_internal_table_id": tableID,
4719+
"db_version": dbVersion,
4720+
"job_id": jobIDStr,
4721+
"node_id": nodeID,
4722+
"node_name": nodeName,
4723+
"origin": "cockroachdb",
4724+
"changefeed_sink": sink,
4725+
"source_node_locality": sourceNodeLocality,
4726+
"database_name": "d",
4727+
"schema_name": "public",
4728+
"table_name": "foo",
4729+
"primary_keys": []any{"i"},
47154730
}
47164731
if withMVCCTS {
47174732
assertReasonableMVCCTimestamp(t, actualSource["mvcc_timestamp"].(string))
@@ -4783,6 +4798,9 @@ func TestChangefeedEnrichedSourceSchemaInfo(t *testing.T) {
47834798
for _, testCase := range cases {
47844799
t.Run(testCase.name, func(t *testing.T) {
47854800
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
4801+
4802+
var originalTableID int
4803+
47864804
sqlDB := sqlutils.MakeSQLRunner(s.DB)
47874805

47884806
sqlDB.Exec(t, `CREATE TABLE foo (a INT, b STRING, c INT, PRIMARY KEY (a, b))`)
@@ -4799,11 +4817,21 @@ func TestChangefeedEnrichedSourceSchemaInfo(t *testing.T) {
47994817
require.Equal(t, map[string]any{"string": "public"}, actualSourceValue["schema_name"])
48004818
require.Equal(t, map[string]any{"string": "d"}, actualSourceValue["database_name"])
48014819
require.Equal(t, map[string]any{"array": []any{"a", "b"}}, actualSourceValue["primary_keys"])
4820+
4821+
num := actualSourceValue["crdb_internal_table_id"].(map[string]any)["int"].(gojson.Number)
4822+
idInt, err := num.Int64()
4823+
require.NoError(t, err)
4824+
originalTableID = int(idInt)
48024825
} else {
48034826
require.Equal(t, "foo", actualSource["table_name"])
48044827
require.Equal(t, "public", actualSource["schema_name"])
48054828
require.Equal(t, "d", actualSource["database_name"])
48064829
require.Equal(t, []any{"a", "b"}, actualSource["primary_keys"])
4830+
4831+
num := actualSource["crdb_internal_table_id"].(gojson.Number)
4832+
idInt, err := num.Int64()
4833+
require.NoError(t, err)
4834+
originalTableID = int(idInt)
48074835
}
48084836
}
48094837
assertPayloadsEnriched(t, foo, []string{testCase.expectedRow}, sourceAssertion)
@@ -4818,11 +4846,21 @@ func TestChangefeedEnrichedSourceSchemaInfo(t *testing.T) {
48184846
require.Equal(t, map[string]any{"string": "public"}, actualSourceValue["schema_name"])
48194847
require.Equal(t, map[string]any{"string": "d"}, actualSourceValue["database_name"])
48204848
require.Equal(t, map[string]any{"array": []any{"a", "b"}}, actualSourceValue["primary_keys"])
4849+
4850+
num := actualSourceValue["crdb_internal_table_id"].(map[string]any)["int"].(gojson.Number)
4851+
idInt, err := num.Int64()
4852+
require.NoError(t, err)
4853+
require.Equal(t, originalTableID, int(idInt))
48214854
} else {
48224855
require.Equal(t, "foo", actualSource["table_name"])
48234856
require.Equal(t, "public", actualSource["schema_name"])
48244857
require.Equal(t, "d", actualSource["database_name"])
48254858
require.Equal(t, []any{"a", "b"}, actualSource["primary_keys"])
4859+
4860+
num := actualSource["crdb_internal_table_id"].(gojson.Number)
4861+
idInt, err := num.Int64()
4862+
require.NoError(t, err)
4863+
require.Equal(t, originalTableID, int(idInt))
48264864
}
48274865
}
48284866
assertPayloadsEnriched(t, foo, testCase.expectedRowsAfterSchemaChange, sourceAssertionAfterSchemaChange)
@@ -4860,6 +4898,9 @@ func TestChangefeedEnrichedSourceSchemaInfoOnPrimaryKeyChange(t *testing.T) {
48604898
for _, testCase := range cases {
48614899
t.Run(testCase.name, func(t *testing.T) {
48624900
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
4901+
4902+
var originalTableID int
4903+
48634904
sqlDB := sqlutils.MakeSQLRunner(s.DB)
48644905

48654906
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`)
@@ -4876,11 +4917,21 @@ func TestChangefeedEnrichedSourceSchemaInfoOnPrimaryKeyChange(t *testing.T) {
48764917
require.Equal(t, map[string]any{"string": "public"}, actualSourceValue["schema_name"])
48774918
require.Equal(t, map[string]any{"string": "d"}, actualSourceValue["database_name"])
48784919
require.Equal(t, map[string]any{"array": []any{"a"}}, actualSourceValue["primary_keys"])
4920+
4921+
num := actualSourceValue["crdb_internal_table_id"].(map[string]any)["int"].(gojson.Number)
4922+
idInt, err := num.Int64()
4923+
require.NoError(t, err)
4924+
originalTableID = int(idInt)
48794925
} else {
48804926
require.Equal(t, "foo", actualSource["table_name"])
48814927
require.Equal(t, "public", actualSource["schema_name"])
48824928
require.Equal(t, "d", actualSource["database_name"])
48834929
require.Equal(t, []any{"a"}, actualSource["primary_keys"])
4930+
4931+
num := actualSource["crdb_internal_table_id"].(gojson.Number)
4932+
idInt, err := num.Int64()
4933+
require.NoError(t, err)
4934+
originalTableID = int(idInt)
48844935
}
48854936
}
48864937
if testCase.format == "json" {
@@ -4900,11 +4951,21 @@ func TestChangefeedEnrichedSourceSchemaInfoOnPrimaryKeyChange(t *testing.T) {
49004951
require.Equal(t, map[string]any{"string": "public"}, actualSourceValue["schema_name"])
49014952
require.Equal(t, map[string]any{"string": "d"}, actualSourceValue["database_name"])
49024953
require.Equal(t, map[string]any{"array": []any{"b"}}, actualSourceValue["primary_keys"])
4954+
4955+
num := actualSourceValue["crdb_internal_table_id"].(map[string]any)["int"].(gojson.Number)
4956+
idInt, err := num.Int64()
4957+
require.NoError(t, err)
4958+
require.Equal(t, originalTableID, int(idInt))
49034959
} else {
49044960
require.Equal(t, "foo", actualSource["table_name"])
49054961
require.Equal(t, "public", actualSource["schema_name"])
49064962
require.Equal(t, "d", actualSource["database_name"])
49074963
require.Equal(t, []any{"b"}, actualSource["primary_keys"])
4964+
4965+
num := actualSource["crdb_internal_table_id"].(gojson.Number)
4966+
idInt, err := num.Int64()
4967+
require.NoError(t, err)
4968+
require.Equal(t, originalTableID, int(idInt))
49084969
}
49094970
}
49104971
assertPayloadsEnriched(t, foo, []string{testCase.expectedRowAfter}, sourceAssertionAfterPKChange)
@@ -4915,6 +4976,47 @@ func TestChangefeedEnrichedSourceSchemaInfoOnPrimaryKeyChange(t *testing.T) {
49154976
}
49164977
}
49174978

4979+
func TestChangefeedEnrichedTableIDStableOnRename(t *testing.T) {
4980+
defer leaktest.AfterTest(t)()
4981+
defer log.Scope(t).Close(t)
4982+
4983+
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
4984+
sqlDB := sqlutils.MakeSQLRunner(s.DB)
4985+
4986+
sqlDB.Exec(t, `CREATE TABLE foo (i INT PRIMARY KEY)`)
4987+
sqlDB.Exec(t, `INSERT INTO foo VALUES (1)`)
4988+
4989+
foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH envelope=enriched, enriched_properties='source'`)
4990+
defer closeFeed(t, foo)
4991+
4992+
var originalTableID int
4993+
4994+
// Capture first row (before rename)
4995+
sourceAssertionBefore := func(actualSource map[string]any) {
4996+
num := actualSource["crdb_internal_table_id"].(gojson.Number)
4997+
id, err := num.Int64()
4998+
require.NoError(t, err)
4999+
originalTableID = int(id)
5000+
}
5001+
assertPayloadsEnriched(t, foo, []string{`foo: {"i": 1}->{"after": {"i": 1}, "op": "c"}`}, sourceAssertionBefore)
5002+
5003+
// Rename table
5004+
sqlDB.Exec(t, `ALTER TABLE foo RENAME TO bar`)
5005+
sqlDB.Exec(t, `INSERT INTO bar VALUES (2)`)
5006+
5007+
// Capture second row (after rename)
5008+
sourceAssertionAfter := func(actualSource map[string]any) {
5009+
num := actualSource["crdb_internal_table_id"].(gojson.Number)
5010+
id, err := num.Int64()
5011+
require.NoError(t, err)
5012+
require.Equal(t, originalTableID, int(id))
5013+
}
5014+
assertPayloadsEnriched(t, foo, []string{`foo: {"i": 2}->{"after": {"i": 2}, "op": "c"}`}, sourceAssertionAfter)
5015+
}
5016+
5017+
cdcTest(t, testFn, feedTestForceSink("kafka"))
5018+
}
5019+
49185020
func TestChangefeedExpressionUsesSerializedSessionData(t *testing.T) {
49195021
defer leaktest.AfterTest(t)()
49205022
defer log.Scope(t).Close(t)

pkg/ccl/changefeedccl/enriched_source_provider.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ func newEnrichedSourceProvider(
162162
addNonFixedJSONfield(fieldNameSchemaName)
163163
addNonFixedJSONfield(fieldNameTableName)
164164
addNonFixedJSONfield(fieldNamePrimaryKeys)
165+
addNonFixedJSONfield(fieldNameTableID)
165166

166167
if opts.MVCCTimestamps {
167168
addNonFixedJSONfield(fieldNameMVCCTimestamp)
@@ -208,6 +209,7 @@ func (p *enrichedSourceProvider) GetJSON(
208209
p.jsonNonFixedData[fieldNameSchemaName] = json.FromString(tableInfo.schemaName)
209210
p.jsonNonFixedData[fieldNameTableName] = json.FromString(tableInfo.tableName)
210211
p.jsonNonFixedData[fieldNamePrimaryKeys] = tableInfo.primaryKeysJSON
212+
p.jsonNonFixedData[fieldNameTableID] = json.FromInt(int(metadata.TableID))
211213

212214
if p.opts.mvccTimestamp {
213215
p.jsonNonFixedData[fieldNameMVCCTimestamp] = json.FromString(evCtx.mvcc.AsOfSystemTime())
@@ -247,6 +249,7 @@ func (p *enrichedSourceProvider) GetAvro(
247249
dest[fieldNameSchemaName] = goavro.Union(avro.SchemaTypeString, tableInfo.schemaName)
248250
dest[fieldNameTableName] = goavro.Union(avro.SchemaTypeString, tableInfo.tableName)
249251
dest[fieldNamePrimaryKeys] = goavro.Union(avro.SchemaTypeArray, tableInfo.primaryKeys)
252+
dest[fieldNameTableID] = goavro.Union(avro.SchemaTypeInt, int32(tableID))
250253

251254
if p.opts.mvccTimestamp {
252255
dest[fieldNameMVCCTimestamp] = goavro.Union(avro.SchemaTypeString, evCtx.mvcc.AsOfSystemTime())
@@ -280,6 +283,7 @@ const (
280283
fieldNameSchemaName = "schema_name"
281284
fieldNameTableName = "table_name"
282285
fieldNamePrimaryKeys = "primary_keys"
286+
fieldNameTableID = "crdb_internal_table_id"
283287
)
284288

285289
type fieldInfo struct {
@@ -472,6 +476,17 @@ var allFieldInfo = map[string]fieldInfo{
472476
Items: &kcjsonschema.Schema{TypeName: kcjsonschema.SchemaTypeString},
473477
},
474478
},
479+
fieldNameTableID: {
480+
avroSchemaField: avro.SchemaField{
481+
Name: fieldNameTableID,
482+
SchemaType: []avro.SchemaType{avro.SchemaTypeNull, avro.SchemaTypeInt},
483+
},
484+
kafkaConnectSchema: kcjsonschema.Schema{
485+
Field: fieldNameTableID,
486+
TypeName: kcjsonschema.SchemaTypeInt32,
487+
Optional: false,
488+
},
489+
},
475490
}
476491

477492
// filled in by init() using allFieldInfo

0 commit comments

Comments
 (0)