@@ -12218,13 +12218,13 @@ func TestChangefeedProtobuf(t *testing.T) {
12218
12218
type testCase struct {
12219
12219
envelope string
12220
12220
withDiff bool
12221
+ withSource bool
12221
12222
expectedRows []string
12222
12223
}
12223
12224
12224
12225
tests := []testCase {
12225
12226
{
12226
12227
envelope : "bare" ,
12227
- withDiff : false ,
12228
12228
expectedRows : []string {
12229
12229
`pricing: {"id":1}->{"values":{"discount":15.75,"id":1,"name":"Chair","options":["Brown","Black"],"tax":"2.500"},"__crdb__":{"key":{"id":1},"topic":"pricing"}}` ,
12230
12230
`pricing: {"id":2}->{"values":{"discount":20,"id":2,"name":"Table","options":["Brown","Black"],"tax":"1.23456789"},"__crdb__":{"key":{"id":2},"topic":"pricing"}}` ,
@@ -12238,12 +12238,48 @@ func TestChangefeedProtobuf(t *testing.T) {
12238
12238
envelope : "wrapped" ,
12239
12239
withDiff : true ,
12240
12240
expectedRows : []string {
12241
- `pricing: {"id":1}->{"after":{"values":{"discount":15.75,"id":1,"name":"Chair","options":["Brown","Black"],"tax":"2.500"}},"before":{}," key":{"id":1},"topic":"pricing"}` ,
12242
- `pricing: {"id":2}->{"after":{"values":{"discount":20,"id":2,"name":"Table","options":["Brown","Black"],"tax":"1.23456789"}},"before":{}," key":{"id":2},"topic":"pricing"}` ,
12241
+ `pricing: {"id":1}->{"after":{"values":{"discount":15.75,"id":1,"name":"Chair","options":["Brown","Black"],"tax":"2.500"}},"key":{"id":1},"topic":"pricing"}` ,
12242
+ `pricing: {"id":2}->{"after":{"values":{"discount":20,"id":2,"name":"Table","options":["Brown","Black"],"tax":"1.23456789"}},"key":{"id":2},"topic":"pricing"}` ,
12243
12243
`pricing: {"id":2}->{"after":{"values":{"discount":25.5,"id":2,"name":"Table","options":["Brown","Black"],"tax":"1.23456789"}},"before":{"values":{"discount":20,"id":2,"name":"Table","options":["Brown","Black"],"tax":"1.23456789"}},"key":{"id":2},"topic":"pricing"}` ,
12244
12244
`pricing: {"id":1}->{"after":{"values":{"discount":10,"id":1,"name":"Armchair","options":["Red"],"tax":"1.000"}},"before":{"values":{"discount":15.75,"id":1,"name":"Chair","options":["Brown","Black"],"tax":"2.500"}},"key":{"id":1},"topic":"pricing"}` ,
12245
- `pricing: {"id":3}->{"after":{"values":{"discount":50,"id":3,"name":"Sofa","options":["Gray"],"tax":"4.250"}},"before":{},"key":{"id":3},"topic":"pricing"}` ,
12246
- `pricing: {"id":2}->{"after":{},"before":{"values":{"discount":25.5,"id":2,"name":"Table","options":["Brown","Black"],"tax":"1.23456789"}},"key":{"id":2},"topic":"pricing"}` ,
12245
+ `pricing: {"id":3}->{"after":{"values":{"discount":50,"id":3,"name":"Sofa","options":["Gray"],"tax":"4.250"}},"key":{"id":3},"topic":"pricing"}` ,
12246
+ `pricing: {"id":2}->{"before":{"values":{"discount":25.5,"id":2,"name":"Table","options":["Brown","Black"],"tax":"1.23456789"}},"key":{"id":2},"topic":"pricing"}` ,
12247
+ },
12248
+ },
12249
+ {
12250
+ envelope : "wrapped" ,
12251
+ expectedRows : []string {
12252
+ `pricing: {"id":1}->{"after":{"values":{"discount":15.75,"id":1,"name":"Chair","options":["Brown","Black"],"tax":"2.500"}},"key":{"id":1},"topic":"pricing"}` ,
12253
+ `pricing: {"id":2}->{"after":{"values":{"discount":20,"id":2,"name":"Table","options":["Brown","Black"],"tax":"1.23456789"}},"key":{"id":2},"topic":"pricing"}` ,
12254
+ `pricing: {"id":2}->{"after":{"values":{"discount":25.5,"id":2,"name":"Table","options":["Brown","Black"],"tax":"1.23456789"}},"key":{"id":2},"topic":"pricing"}` ,
12255
+ `pricing: {"id":1}->{"after":{"values":{"discount":10,"id":1,"name":"Armchair","options":["Red"],"tax":"1.000"}},"key":{"id":1},"topic":"pricing"}` ,
12256
+ `pricing: {"id":3}->{"after":{"values":{"discount":50,"id":3,"name":"Sofa","options":["Gray"],"tax":"4.250"}},"key":{"id":3},"topic":"pricing"}` ,
12257
+ `pricing: {"id":2}->{"key":{"id":2},"topic":"pricing"}` ,
12258
+ },
12259
+ },
12260
+ {
12261
+ envelope : "enriched" ,
12262
+ withDiff : true ,
12263
+ withSource : true ,
12264
+ expectedRows : []string {
12265
+ `pricing: {"id":1}->{"after": {"values": {"discount": 10, "id": 1, "name": "Armchair", "options": ["Red"], "tax": "1.000"}}, "before": {"values": {"discount": 15.75, "id": 1, "name": "Chair", "options": ["Brown", "Black"], "tax": "2.500"}}, "key": {"id": 1}, "op": 2}` ,
12266
+ `pricing: {"id":1}->{"after": {"values": {"discount": 15.75, "id": 1, "name": "Chair", "options": ["Brown", "Black"], "tax": "2.500"}}, "key": {"id": 1}, "op": 1}` ,
12267
+ `pricing: {"id":2}->{"after": {"values": {"discount": 20, "id": 2, "name": "Table", "options": ["Brown", "Black"], "tax": "1.23456789"}}, "key": {"id": 2}, "op": 1}` ,
12268
+ `pricing: {"id":2}->{"after": {"values": {"discount": 25.5, "id": 2, "name": "Table", "options": ["Brown", "Black"], "tax": "1.23456789"}}, "before": {"values": {"discount": 20, "id": 2, "name": "Table", "options": ["Brown", "Black"], "tax": "1.23456789"}}, "key": {"id": 2}, "op": 2}` ,
12269
+ `pricing: {"id":2}->{"before": {"values": {"discount": 25.5, "id": 2, "name": "Table", "options": ["Brown", "Black"], "tax": "1.23456789"}}, "key": {"id": 2}, "op": 3}` ,
12270
+ `pricing: {"id":3}->{"after": {"values": {"discount": 50, "id": 3, "name": "Sofa", "options": ["Gray"], "tax": "4.250"}}, "key": {"id": 3}, "op": 1}` ,
12271
+ },
12272
+ },
12273
+ {
12274
+ envelope : "enriched" ,
12275
+ withDiff : true ,
12276
+ expectedRows : []string {
12277
+ `pricing: {"id":1}->{"after": {"values": {"discount": 10, "id": 1, "name": "Armchair", "options": ["Red"], "tax": "1.000"}}, "before": {"values": {"discount": 15.75, "id": 1, "name": "Chair", "options": ["Brown", "Black"], "tax": "2.500"}}, "key": {"id": 1}, "op": 2}` ,
12278
+ `pricing: {"id":1}->{"after": {"values": {"discount": 15.75, "id": 1, "name": "Chair", "options": ["Brown", "Black"], "tax": "2.500"}}, "key": {"id": 1}, "op": 1}` ,
12279
+ `pricing: {"id":2}->{"after": {"values": {"discount": 20, "id": 2, "name": "Table", "options": ["Brown", "Black"], "tax": "1.23456789"}}, "key": {"id": 2}, "op": 1}` ,
12280
+ `pricing: {"id":2}->{"after": {"values": {"discount": 25.5, "id": 2, "name": "Table", "options": ["Brown", "Black"], "tax": "1.23456789"}}, "before": {"values": {"discount": 20, "id": 2, "name": "Table", "options": ["Brown", "Black"], "tax": "1.23456789"}}, "key": {"id": 2}, "op": 2}` ,
12281
+ `pricing: {"id":2}->{"before": {"values": {"discount": 25.5, "id": 2, "name": "Table", "options": ["Brown", "Black"], "tax": "1.23456789"}}, "key": {"id": 2}, "op": 3}` ,
12282
+ `pricing: {"id":3}->{"after": {"values": {"discount": 50, "id": 3, "name": "Sofa", "options": ["Gray"], "tax": "4.250"}}, "key": {"id": 3}, "op": 1}` ,
12247
12283
},
12248
12284
},
12249
12285
}
@@ -12272,7 +12308,9 @@ func TestChangefeedProtobuf(t *testing.T) {
12272
12308
if tc .withDiff {
12273
12309
opts = append (opts , "diff" )
12274
12310
}
12275
-
12311
+ if tc .withSource {
12312
+ opts = append (opts , "enriched_properties='source'" )
12313
+ }
12276
12314
feed := feed (t , f , fmt .Sprintf ("CREATE CHANGEFEED FOR pricing WITH %s" , strings .Join (opts , ", " )))
12277
12315
defer closeFeed (t , feed )
12278
12316
@@ -12281,7 +12319,26 @@ func TestChangefeedProtobuf(t *testing.T) {
12281
12319
sqlDB .Exec (t , `INSERT INTO pricing VALUES (3, 'Sofa', 50.00, 4.250, ARRAY['Gray'])` )
12282
12320
sqlDB .Exec (t , `DELETE FROM pricing WHERE id = 2` )
12283
12321
12284
- assertPayloads (t , feed , tc .expectedRows )
12322
+ if tc .envelope == "enriched" {
12323
+ sourceAssertion := func (source map [string ]any ) {
12324
+ if tc .withSource {
12325
+ require .NotNil (t , source )
12326
+ require .Equal (t , "kafka" , source ["changefeed_sink" ])
12327
+ require .Equal (t , "d" , source ["database_name" ])
12328
+ require .Equal (t , "public" , source ["schema_name" ])
12329
+ require .Equal (t , "pricing" , source ["table_name" ])
12330
+ require .Equal (t , "cockroachdb" , source ["origin" ])
12331
+ require .ElementsMatch (t , []any {"id" }, source ["primary_keys" ].([]any ))
12332
+ } else {
12333
+ require .Nil (t , source )
12334
+ }
12335
+ }
12336
+ assertPayloadsEnriched (t , feed , tc .expectedRows , sourceAssertion )
12337
+ } else {
12338
+
12339
+ assertPayloads (t , feed , tc .expectedRows )
12340
+ }
12341
+
12285
12342
}
12286
12343
cdcTest (t , testFn , feedTestForceSink ("kafka" ))
12287
12344
})
0 commit comments