Skip to content

Commit 810615e

Browse files
craig[bot]elizaMkraule
andcommitted
Merge #147341
147341: cdc: include crdb_internal_table_id in changefeed source metadata r=asg0451,aerfrei a=elizaMkraule Previously, the metadata env of changefeed source objects did not include the CockroachDB internal table ID. This field is necessary downstream to uniquely identify tables even if their names change. This change adds the crdb_internal_table_id field to the source object within the metadata env, ensuring consistent identification of tables across renames and supporting downstream integrations. Fixes #143371 Epic: CRDB-48789 Release note (general change): Changefeed source metadata now includes the new crdb_internal_table_id field, enabling downstream consumers to uniquely identify tables even if table names change. Co-authored-by: Eliza Kraule <[email protected]>
2 parents 0a6c586 + 33813e6 commit 810615e

File tree

2 files changed

+149
-32
lines changed

2 files changed

+149
-32
lines changed

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 134 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -4451,6 +4451,10 @@ func TestChangefeedEnrichedSourceWithDataAvro(t *testing.T) {
44514451

44524452
sqlDB.Exec(t, `CREATE TABLE foo (i INT PRIMARY KEY)`)
44534453
sqlDB.Exec(t, `INSERT INTO foo values (0)`)
4454+
4455+
var tableID int
4456+
sqlDB.QueryRow(t, `SELECT table_id FROM crdb_internal.tables WHERE name = 'foo' AND database_name = 'd'`).Scan(&tableID)
4457+
44544458
stmt := `CREATE CHANGEFEED FOR foo WITH envelope=enriched, enriched_properties='source', format=avro`
44554459
if withMVCCTS {
44564460
stmt += ", mvcc_timestamp"
@@ -4488,12 +4492,13 @@ func TestChangefeedEnrichedSourceWithDataAvro(t *testing.T) {
44884492
var assertion string
44894493
assertionMap := map[string]any{
44904494
"source": map[string]any{
4491-
"changefeed_sink": map[string]any{"string": sink},
4492-
"cluster_id": map[string]any{"string": clusterID},
4493-
"cluster_name": map[string]any{"string": clusterName},
4494-
"database_name": map[string]any{"string": "d"},
4495-
"db_version": map[string]any{"string": dbVersion},
4496-
"job_id": map[string]any{"string": jobIDStr},
4495+
"changefeed_sink": map[string]any{"string": sink},
4496+
"cluster_id": map[string]any{"string": clusterID},
4497+
"cluster_name": map[string]any{"string": clusterName},
4498+
"crdb_internal_table_id": map[string]any{"int": tableID},
4499+
"database_name": map[string]any{"string": "d"},
4500+
"db_version": map[string]any{"string": dbVersion},
4501+
"job_id": map[string]any{"string": jobIDStr},
44974502
// Note that the field is still present in the avro schema, so it appears here as nil.
44984503
"mvcc_timestamp": nil,
44994504
"node_id": map[string]any{"string": nodeID},
@@ -4569,6 +4574,7 @@ func TestChangefeedEnrichedSourceWithDataJSON(t *testing.T) {
45694574

45704575
sqlDB.Exec(t, `CREATE TABLE foo (i INT PRIMARY KEY)`)
45714576
sqlDB.Exec(t, `INSERT INTO foo values (0)`)
4577+
45724578
stmt := `CREATE CHANGEFEED FOR foo WITH envelope=enriched, enriched_properties='source', format=json`
45734579
if withMVCCTS {
45744580
stmt += ", mvcc_timestamp"
@@ -4581,11 +4587,14 @@ func TestChangefeedEnrichedSourceWithDataJSON(t *testing.T) {
45814587

45824588
var jobID int64
45834589
var nodeName string
4590+
var tableID int
4591+
45844592
var sourceAssertion func(actualSource map[string]any)
45854593
if ef, ok := testFeed.(cdctest.EnterpriseTestFeed); ok {
45864594
jobID = int64(ef.JobID())
45874595
}
45884596
sqlDB.QueryRow(t, `SELECT value FROM crdb_internal.node_runtime_info where component = 'DB' and field = 'Host'`).Scan(&nodeName)
4597+
sqlDB.QueryRow(t, `SELECT table_id FROM crdb_internal.tables WHERE name = 'foo' AND database_name = 'd'`).Scan(&tableID)
45894598

45904599
sourceAssertion = func(actualSource map[string]any) {
45914600
nodeID := actualSource["node_id"]
@@ -4607,19 +4616,20 @@ func TestChangefeedEnrichedSourceWithDataJSON(t *testing.T) {
46074616

46084617
var assertion string
46094618
assertionMap := map[string]any{
4610-
"cluster_id": clusterID,
4611-
"cluster_name": clusterName,
4612-
"db_version": dbVersion,
4613-
"job_id": jobIDStr,
4614-
"node_id": nodeID,
4615-
"node_name": nodeName,
4616-
"origin": "cockroachdb",
4617-
"changefeed_sink": sink,
4618-
"source_node_locality": sourceNodeLocality,
4619-
"database_name": "d",
4620-
"schema_name": "public",
4621-
"table_name": "foo",
4622-
"primary_keys": []any{"i"},
4619+
"cluster_id": clusterID,
4620+
"cluster_name": clusterName,
4621+
"crdb_internal_table_id": tableID,
4622+
"db_version": dbVersion,
4623+
"job_id": jobIDStr,
4624+
"node_id": nodeID,
4625+
"node_name": nodeName,
4626+
"origin": "cockroachdb",
4627+
"changefeed_sink": sink,
4628+
"source_node_locality": sourceNodeLocality,
4629+
"database_name": "d",
4630+
"schema_name": "public",
4631+
"table_name": "foo",
4632+
"primary_keys": []any{"i"},
46234633
}
46244634
if withMVCCTS {
46254635
assertReasonableMVCCTimestamp(t, actualSource["mvcc_timestamp"].(string))
@@ -4680,6 +4690,10 @@ func TestChangefeedEnrichedSourceWithDataJSONWebhook(t *testing.T) {
46804690

46814691
sqlDB.Exec(t, `CREATE TABLE foo (i INT PRIMARY KEY)`)
46824692
sqlDB.Exec(t, `INSERT INTO foo values (0)`)
4693+
4694+
var tableID int
4695+
sqlDB.QueryRow(t, `SELECT table_id FROM crdb_internal.tables WHERE name = 'foo' AND database_name = 'd'`).Scan(&tableID)
4696+
46834697
stmt := `CREATE CHANGEFEED FOR foo WITH envelope=enriched, enriched_properties='source', format=json`
46844698
if withMVCCTS {
46854699
stmt += ", mvcc_timestamp"
@@ -4713,19 +4727,20 @@ func TestChangefeedEnrichedSourceWithDataJSONWebhook(t *testing.T) {
47134727

47144728
var assertion string
47154729
assertionMap := map[string]any{
4716-
"cluster_id": clusterID,
4717-
"cluster_name": clusterName,
4718-
"db_version": dbVersion,
4719-
"job_id": jobIDStr,
4720-
"node_id": nodeID,
4721-
"node_name": nodeName,
4722-
"origin": "cockroachdb",
4723-
"changefeed_sink": sink,
4724-
"source_node_locality": sourceNodeLocality,
4725-
"database_name": "d",
4726-
"schema_name": "public",
4727-
"table_name": "foo",
4728-
"primary_keys": []any{"i"},
4730+
"cluster_id": clusterID,
4731+
"cluster_name": clusterName,
4732+
"crdb_internal_table_id": tableID,
4733+
"db_version": dbVersion,
4734+
"job_id": jobIDStr,
4735+
"node_id": nodeID,
4736+
"node_name": nodeName,
4737+
"origin": "cockroachdb",
4738+
"changefeed_sink": sink,
4739+
"source_node_locality": sourceNodeLocality,
4740+
"database_name": "d",
4741+
"schema_name": "public",
4742+
"table_name": "foo",
4743+
"primary_keys": []any{"i"},
47294744
}
47304745
if withMVCCTS {
47314746
assertReasonableMVCCTimestamp(t, actualSource["mvcc_timestamp"].(string))
@@ -4797,6 +4812,9 @@ func TestChangefeedEnrichedSourceSchemaInfo(t *testing.T) {
47974812
for _, testCase := range cases {
47984813
t.Run(testCase.name, func(t *testing.T) {
47994814
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
4815+
4816+
var originalTableID int
4817+
48004818
sqlDB := sqlutils.MakeSQLRunner(s.DB)
48014819

48024820
sqlDB.Exec(t, `CREATE TABLE foo (a INT, b STRING, c INT, PRIMARY KEY (a, b))`)
@@ -4813,11 +4831,21 @@ func TestChangefeedEnrichedSourceSchemaInfo(t *testing.T) {
48134831
require.Equal(t, map[string]any{"string": "public"}, actualSourceValue["schema_name"])
48144832
require.Equal(t, map[string]any{"string": "d"}, actualSourceValue["database_name"])
48154833
require.Equal(t, map[string]any{"array": []any{"a", "b"}}, actualSourceValue["primary_keys"])
4834+
4835+
num := actualSourceValue["crdb_internal_table_id"].(map[string]any)["int"].(gojson.Number)
4836+
idInt, err := num.Int64()
4837+
require.NoError(t, err)
4838+
originalTableID = int(idInt)
48164839
} else {
48174840
require.Equal(t, "foo", actualSource["table_name"])
48184841
require.Equal(t, "public", actualSource["schema_name"])
48194842
require.Equal(t, "d", actualSource["database_name"])
48204843
require.Equal(t, []any{"a", "b"}, actualSource["primary_keys"])
4844+
4845+
num := actualSource["crdb_internal_table_id"].(gojson.Number)
4846+
idInt, err := num.Int64()
4847+
require.NoError(t, err)
4848+
originalTableID = int(idInt)
48214849
}
48224850
}
48234851
assertPayloadsEnriched(t, foo, []string{testCase.expectedRow}, sourceAssertion)
@@ -4832,11 +4860,21 @@ func TestChangefeedEnrichedSourceSchemaInfo(t *testing.T) {
48324860
require.Equal(t, map[string]any{"string": "public"}, actualSourceValue["schema_name"])
48334861
require.Equal(t, map[string]any{"string": "d"}, actualSourceValue["database_name"])
48344862
require.Equal(t, map[string]any{"array": []any{"a", "b"}}, actualSourceValue["primary_keys"])
4863+
4864+
num := actualSourceValue["crdb_internal_table_id"].(map[string]any)["int"].(gojson.Number)
4865+
idInt, err := num.Int64()
4866+
require.NoError(t, err)
4867+
require.Equal(t, originalTableID, int(idInt))
48354868
} else {
48364869
require.Equal(t, "foo", actualSource["table_name"])
48374870
require.Equal(t, "public", actualSource["schema_name"])
48384871
require.Equal(t, "d", actualSource["database_name"])
48394872
require.Equal(t, []any{"a", "b"}, actualSource["primary_keys"])
4873+
4874+
num := actualSource["crdb_internal_table_id"].(gojson.Number)
4875+
idInt, err := num.Int64()
4876+
require.NoError(t, err)
4877+
require.Equal(t, originalTableID, int(idInt))
48404878
}
48414879
}
48424880
assertPayloadsEnriched(t, foo, testCase.expectedRowsAfterSchemaChange, sourceAssertionAfterSchemaChange)
@@ -4874,6 +4912,9 @@ func TestChangefeedEnrichedSourceSchemaInfoOnPrimaryKeyChange(t *testing.T) {
48744912
for _, testCase := range cases {
48754913
t.Run(testCase.name, func(t *testing.T) {
48764914
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
4915+
4916+
var originalTableID int
4917+
48774918
sqlDB := sqlutils.MakeSQLRunner(s.DB)
48784919

48794920
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`)
@@ -4890,11 +4931,21 @@ func TestChangefeedEnrichedSourceSchemaInfoOnPrimaryKeyChange(t *testing.T) {
48904931
require.Equal(t, map[string]any{"string": "public"}, actualSourceValue["schema_name"])
48914932
require.Equal(t, map[string]any{"string": "d"}, actualSourceValue["database_name"])
48924933
require.Equal(t, map[string]any{"array": []any{"a"}}, actualSourceValue["primary_keys"])
4934+
4935+
num := actualSourceValue["crdb_internal_table_id"].(map[string]any)["int"].(gojson.Number)
4936+
idInt, err := num.Int64()
4937+
require.NoError(t, err)
4938+
originalTableID = int(idInt)
48934939
} else {
48944940
require.Equal(t, "foo", actualSource["table_name"])
48954941
require.Equal(t, "public", actualSource["schema_name"])
48964942
require.Equal(t, "d", actualSource["database_name"])
48974943
require.Equal(t, []any{"a"}, actualSource["primary_keys"])
4944+
4945+
num := actualSource["crdb_internal_table_id"].(gojson.Number)
4946+
idInt, err := num.Int64()
4947+
require.NoError(t, err)
4948+
originalTableID = int(idInt)
48984949
}
48994950
}
49004951
if testCase.format == "json" {
@@ -4914,11 +4965,21 @@ func TestChangefeedEnrichedSourceSchemaInfoOnPrimaryKeyChange(t *testing.T) {
49144965
require.Equal(t, map[string]any{"string": "public"}, actualSourceValue["schema_name"])
49154966
require.Equal(t, map[string]any{"string": "d"}, actualSourceValue["database_name"])
49164967
require.Equal(t, map[string]any{"array": []any{"b"}}, actualSourceValue["primary_keys"])
4968+
4969+
num := actualSourceValue["crdb_internal_table_id"].(map[string]any)["int"].(gojson.Number)
4970+
idInt, err := num.Int64()
4971+
require.NoError(t, err)
4972+
require.Equal(t, originalTableID, int(idInt))
49174973
} else {
49184974
require.Equal(t, "foo", actualSource["table_name"])
49194975
require.Equal(t, "public", actualSource["schema_name"])
49204976
require.Equal(t, "d", actualSource["database_name"])
49214977
require.Equal(t, []any{"b"}, actualSource["primary_keys"])
4978+
4979+
num := actualSource["crdb_internal_table_id"].(gojson.Number)
4980+
idInt, err := num.Int64()
4981+
require.NoError(t, err)
4982+
require.Equal(t, originalTableID, int(idInt))
49224983
}
49234984
}
49244985
assertPayloadsEnriched(t, foo, []string{testCase.expectedRowAfter}, sourceAssertionAfterPKChange)
@@ -4929,6 +4990,47 @@ func TestChangefeedEnrichedSourceSchemaInfoOnPrimaryKeyChange(t *testing.T) {
49294990
}
49304991
}
49314992

4993+
func TestChangefeedEnrichedTableIDStableOnRename(t *testing.T) {
4994+
defer leaktest.AfterTest(t)()
4995+
defer log.Scope(t).Close(t)
4996+
4997+
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
4998+
sqlDB := sqlutils.MakeSQLRunner(s.DB)
4999+
5000+
sqlDB.Exec(t, `CREATE TABLE foo (i INT PRIMARY KEY)`)
5001+
sqlDB.Exec(t, `INSERT INTO foo VALUES (1)`)
5002+
5003+
foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH envelope=enriched, enriched_properties='source'`)
5004+
defer closeFeed(t, foo)
5005+
5006+
var originalTableID int
5007+
5008+
// Capture first row (before rename)
5009+
sourceAssertionBefore := func(actualSource map[string]any) {
5010+
num := actualSource["crdb_internal_table_id"].(gojson.Number)
5011+
id, err := num.Int64()
5012+
require.NoError(t, err)
5013+
originalTableID = int(id)
5014+
}
5015+
assertPayloadsEnriched(t, foo, []string{`foo: {"i": 1}->{"after": {"i": 1}, "op": "c"}`}, sourceAssertionBefore)
5016+
5017+
// Rename table
5018+
sqlDB.Exec(t, `ALTER TABLE foo RENAME TO bar`)
5019+
sqlDB.Exec(t, `INSERT INTO bar VALUES (2)`)
5020+
5021+
// Capture second row (after rename)
5022+
sourceAssertionAfter := func(actualSource map[string]any) {
5023+
num := actualSource["crdb_internal_table_id"].(gojson.Number)
5024+
id, err := num.Int64()
5025+
require.NoError(t, err)
5026+
require.Equal(t, originalTableID, int(id))
5027+
}
5028+
assertPayloadsEnriched(t, foo, []string{`foo: {"i": 2}->{"after": {"i": 2}, "op": "c"}`}, sourceAssertionAfter)
5029+
}
5030+
5031+
cdcTest(t, testFn, feedTestForceSink("kafka"))
5032+
}
5033+
49325034
func TestChangefeedExpressionUsesSerializedSessionData(t *testing.T) {
49335035
defer leaktest.AfterTest(t)()
49345036
defer log.Scope(t).Close(t)

pkg/ccl/changefeedccl/enriched_source_provider.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ func newEnrichedSourceProvider(
162162
addNonFixedJSONfield(fieldNameSchemaName)
163163
addNonFixedJSONfield(fieldNameTableName)
164164
addNonFixedJSONfield(fieldNamePrimaryKeys)
165+
addNonFixedJSONfield(fieldNameTableID)
165166

166167
if opts.MVCCTimestamps {
167168
addNonFixedJSONfield(fieldNameMVCCTimestamp)
@@ -208,6 +209,7 @@ func (p *enrichedSourceProvider) GetJSON(
208209
p.jsonNonFixedData[fieldNameSchemaName] = json.FromString(tableInfo.schemaName)
209210
p.jsonNonFixedData[fieldNameTableName] = json.FromString(tableInfo.tableName)
210211
p.jsonNonFixedData[fieldNamePrimaryKeys] = tableInfo.primaryKeysJSON
212+
p.jsonNonFixedData[fieldNameTableID] = json.FromInt(int(metadata.TableID))
211213

212214
if p.opts.mvccTimestamp {
213215
p.jsonNonFixedData[fieldNameMVCCTimestamp] = json.FromString(evCtx.mvcc.AsOfSystemTime())
@@ -247,6 +249,7 @@ func (p *enrichedSourceProvider) GetAvro(
247249
dest[fieldNameSchemaName] = goavro.Union(avro.SchemaTypeString, tableInfo.schemaName)
248250
dest[fieldNameTableName] = goavro.Union(avro.SchemaTypeString, tableInfo.tableName)
249251
dest[fieldNamePrimaryKeys] = goavro.Union(avro.SchemaTypeArray, tableInfo.primaryKeys)
252+
dest[fieldNameTableID] = goavro.Union(avro.SchemaTypeInt, int32(tableID))
250253

251254
if p.opts.mvccTimestamp {
252255
dest[fieldNameMVCCTimestamp] = goavro.Union(avro.SchemaTypeString, evCtx.mvcc.AsOfSystemTime())
@@ -280,6 +283,7 @@ const (
280283
fieldNameSchemaName = "schema_name"
281284
fieldNameTableName = "table_name"
282285
fieldNamePrimaryKeys = "primary_keys"
286+
fieldNameTableID = "crdb_internal_table_id"
283287
)
284288

285289
type fieldInfo struct {
@@ -472,6 +476,17 @@ var allFieldInfo = map[string]fieldInfo{
472476
Items: &kcjsonschema.Schema{TypeName: kcjsonschema.SchemaTypeString},
473477
},
474478
},
479+
fieldNameTableID: {
480+
avroSchemaField: avro.SchemaField{
481+
Name: fieldNameTableID,
482+
SchemaType: []avro.SchemaType{avro.SchemaTypeNull, avro.SchemaTypeInt},
483+
},
484+
kafkaConnectSchema: kcjsonschema.Schema{
485+
Field: fieldNameTableID,
486+
TypeName: kcjsonschema.SchemaTypeInt32,
487+
Optional: false,
488+
},
489+
},
475490
}
476491

477492
// filled in by init() using allFieldInfo

0 commit comments

Comments
 (0)