Skip to content

Commit fc353fd

Browse files
committed
changefeedccl: add origin source field to enriched envelope
Add origin="cockroachdb" to the enriched envelope source object. Fixes #139693 Release note: None
1 parent 78e2a2e commit fc353fd

File tree

2 files changed

+18
-0
lines changed

2 files changed

+18
-0
lines changed

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4431,6 +4431,7 @@ func TestChangefeedEnrichedSourceWithData(t *testing.T) {
44314431
"node_name": map[string]any{"string": nodeName},
44324432
"changefeed_sink": map[string]any{"string": sink},
44334433
"source_node_locality": map[string]any{"string": sourceNodeLocality},
4434+
"origin": map[string]any{"string": "cockroachdb"},
44344435
},
44354436
}
44364437
if withMVCCTS {
@@ -4467,6 +4468,7 @@ func TestChangefeedEnrichedSourceWithData(t *testing.T) {
44674468
"node_name": nodeName,
44684469
"changefeed_sink": sink,
44694470
"source_node_locality": sourceNodeLocality,
4471+
"origin": "cockroachdb",
44704472
}
44714473
if withMVCCTS {
44724474
assertReasonableMVCCTimestamp(t, actualSource["mvcc_timestamp"].(string))

pkg/ccl/changefeedccl/enriched_source_provider.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ func newEnrichedSourceProvider(
100100
fieldNameSourceNodeLocality: json.FromString(sourceData.sourceNodeLocality),
101101
fieldNameNodeName: json.FromString(sourceData.nodeName),
102102
fieldNameNodeID: json.FromString(sourceData.nodeID),
103+
fieldNameOrigin: json.FromString(originCockroachDB),
103104
}
104105

105106
var nonFixedJSONFields []string
@@ -167,6 +168,7 @@ func (p *enrichedSourceProvider) GetAvro(
167168
dest[fieldNameSourceNodeLocality] = goavro.Union(avro.SchemaTypeString, p.sourceData.sourceNodeLocality)
168169
dest[fieldNameNodeName] = goavro.Union(avro.SchemaTypeString, p.sourceData.nodeName)
169170
dest[fieldNameNodeID] = goavro.Union(avro.SchemaTypeString, p.sourceData.nodeID)
171+
dest[fieldNameOrigin] = goavro.Union(avro.SchemaTypeString, originCockroachDB)
170172
}
171173

172174
if p.opts.mvccTimestamp {
@@ -196,6 +198,7 @@ const (
196198
fieldNameMVCCTimestamp = "mvcc_timestamp"
197199
fieldNameUpdatedTSNS = "ts_ns"
198200
fieldNameUpdatedTSHLC = "ts_hlc"
201+
fieldNameOrigin = "origin"
199202
)
200203

201204
type fieldInfo struct {
@@ -208,6 +211,17 @@ type fieldInfo struct {
208211
// everything is nullable in avro for better backwards compatibility, whereas we
209212
// use the optional flag in kafka connect more meaningfully.
210213
var allFieldInfo = map[string]fieldInfo{
214+
fieldNameOrigin: {
215+
avroSchemaField: avro.SchemaField{
216+
Name: fieldNameOrigin,
217+
SchemaType: []avro.SchemaType{avro.SchemaTypeNull, avro.SchemaTypeString},
218+
},
219+
kafkaConnectSchema: kcjsonschema.Schema{
220+
Field: fieldNameOrigin,
221+
TypeName: kcjsonschema.SchemaTypeString,
222+
Optional: false,
223+
},
224+
},
211225
fieldNameChangefeedSink: {
212226
avroSchemaField: avro.SchemaField{
213227
Name: fieldNameChangefeedSink,
@@ -355,3 +369,5 @@ func init() {
355369
Optional: true,
356370
}
357371
}
372+
373+
const originCockroachDB = "cockroachdb"

0 commit comments

Comments
 (0)