@@ -100,6 +100,7 @@ func newEnrichedSourceProvider(
100100 fieldNameSourceNodeLocality : json .FromString (sourceData .sourceNodeLocality ),
101101 fieldNameNodeName : json .FromString (sourceData .nodeName ),
102102 fieldNameNodeID : json .FromString (sourceData .nodeID ),
103+ fieldNameOrigin : json .FromString (originCockroachDB ),
103104 }
104105
105106 var nonFixedJSONFields []string
@@ -167,6 +168,7 @@ func (p *enrichedSourceProvider) GetAvro(
167168 dest [fieldNameSourceNodeLocality ] = goavro .Union (avro .SchemaTypeString , p .sourceData .sourceNodeLocality )
168169 dest [fieldNameNodeName ] = goavro .Union (avro .SchemaTypeString , p .sourceData .nodeName )
169170 dest [fieldNameNodeID ] = goavro .Union (avro .SchemaTypeString , p .sourceData .nodeID )
171+ dest [fieldNameOrigin ] = goavro .Union (avro .SchemaTypeString , originCockroachDB )
170172 }
171173
172174 if p .opts .mvccTimestamp {
@@ -196,6 +198,7 @@ const (
196198 fieldNameMVCCTimestamp = "mvcc_timestamp"
197199 fieldNameUpdatedTSNS = "ts_ns"
198200 fieldNameUpdatedTSHLC = "ts_hlc"
201+ fieldNameOrigin = "origin"
199202)
200203
201204type fieldInfo struct {
@@ -208,6 +211,17 @@ type fieldInfo struct {
208211// everything is nullable in avro for better backwards compatibility, whereas we
209212// use the optional flag in kafka connect more meaningfully.
210213var allFieldInfo = map [string ]fieldInfo {
214+ fieldNameOrigin : {
215+ avroSchemaField : avro.SchemaField {
216+ Name : fieldNameOrigin ,
217+ SchemaType : []avro.SchemaType {avro .SchemaTypeNull , avro .SchemaTypeString },
218+ },
219+ kafkaConnectSchema : kcjsonschema.Schema {
220+ Field : fieldNameOrigin ,
221+ TypeName : kcjsonschema .SchemaTypeString ,
222+ Optional : false ,
223+ },
224+ },
211225 fieldNameChangefeedSink : {
212226 avroSchemaField : avro.SchemaField {
213227 Name : fieldNameChangefeedSink ,
@@ -355,3 +369,5 @@ func init() {
355369 Optional : true ,
356370 }
357371}
372+
373+ const originCockroachDB = "cockroachdb"
0 commit comments