@@ -972,7 +972,7 @@ func TestChangefeedFullTableName(t *testing.T) {
972
972
sqlDB .Exec (t , `INSERT INTO foo VALUES (1, 'a')` )
973
973
974
974
t .Run (`envelope=row` , func (t * testing.T ) {
975
- foo := feed (t , f , `CREATE CHANGEFEED FOR foo WITH full_table_name` )
975
+ foo := feed (t , f , `CREATE CHANGEFEED FOR foo WITH full_table_name` , optOutOfMetamorphicEnrichedEnvelope { reason : "broken for webhook; see #145927" } )
976
976
defer closeFeed (t , foo )
977
977
assertPayloads (t , foo , []string {`d.public.foo: [1]->{"after": {"a": 1, "b": "a"}}` })
978
978
})
@@ -1100,7 +1100,7 @@ func TestChangefeedTimestamps(t *testing.T) {
1100
1100
sqlDB .Exec (t , `CREATE TABLE foo (a INT PRIMARY KEY)` )
1101
1101
sqlDB .Exec (t , `INSERT INTO foo VALUES (0)` )
1102
1102
1103
- foo := feed (t , f , `CREATE CHANGEFEED FOR foo WITH updated, resolved` )
1103
+ foo := feed (t , f , `CREATE CHANGEFEED FOR foo WITH updated, resolved` , optOutOfMetamorphicEnrichedEnvelope { reason : "this test calls testfeed.Next() directly" } )
1104
1104
defer closeFeed (t , foo )
1105
1105
1106
1106
// Grab the first non resolved-timestamp row.
@@ -1805,7 +1805,8 @@ func TestNoStopAfterNonTargetColumnDrop(t *testing.T) {
1805
1805
sqlDB .Exec (t , `INSERT INTO hasfams values (0, 'a', 'b', 'c')` )
1806
1806
1807
1807
// Open up the changefeed.
1808
- cf := feed (t , f , `CREATE CHANGEFEED FOR TABLE hasfams FAMILY b_and_c WITH schema_change_policy='stop'` )
1808
+ cf := feed (t , f , `CREATE CHANGEFEED FOR TABLE hasfams FAMILY b_and_c WITH schema_change_policy='stop'` ,
1809
+ optOutOfMetamorphicEnrichedEnvelope {"requires families" })
1809
1810
defer closeFeed (t , cf )
1810
1811
assertPayloads (t , cf , []string {
1811
1812
`hasfams.b_and_c: [0]->{"after": {"b": "b", "c": "c"}}` ,
@@ -2046,8 +2047,12 @@ func TestChangefeedColumnDropsOnTheSameTableWithMultipleFamilies(t *testing.T) {
2046
2047
sqlDB .Exec (t , `CREATE TABLE hasfams (id int primary key, a string, b string, c string, FAMILY id_a (id, a), FAMILY b_and_c (b, c))` )
2047
2048
sqlDB .Exec (t , `INSERT INTO hasfams values (0, 'a', 'b', 'c')` )
2048
2049
2050
+ var args []any
2051
+ if _ , ok := f .(* webhookFeedFactory ); ok {
2052
+ args = append (args , optOutOfMetamorphicEnrichedEnvelope {reason : "metamorphic enriched envelope does not support column families for webhook sinks" })
2053
+ }
2049
2054
// Open up the changefeed.
2050
- cf := feed (t , f , `CREATE CHANGEFEED FOR TABLE hasfams FAMILY id_a, TABLE hasfams FAMILY b_and_c` )
2055
+ cf := feed (t , f , `CREATE CHANGEFEED FOR TABLE hasfams FAMILY id_a, TABLE hasfams FAMILY b_and_c` , args ... )
2051
2056
defer closeFeed (t , cf )
2052
2057
assertPayloads (t , cf , []string {
2053
2058
`hasfams.b_and_c: [0]->{"after": {"b": "b", "c": "c"}}` ,
@@ -3286,7 +3291,11 @@ func TestChangefeedEachColumnFamily(t *testing.T) {
3286
3291
// Must specify WITH split_column_families
3287
3292
sqlDB .ExpectErrWithTimeout (t , `multiple column families` , `CREATE CHANGEFEED FOR foo` )
3288
3293
3289
- foo := feed (t , f , `CREATE CHANGEFEED FOR foo WITH split_column_families` )
3294
+ var args []any
3295
+ if _ , ok := f .(* webhookFeedFactory ); ok {
3296
+ args = append (args , optOutOfMetamorphicEnrichedEnvelope {reason : "metamorphic enriched envelope does not support column families for webhook sinks" })
3297
+ }
3298
+ foo := feed (t , f , `CREATE CHANGEFEED FOR foo WITH split_column_families` , args ... )
3290
3299
defer closeFeed (t , foo )
3291
3300
3292
3301
assertPayloads (t , foo , []string {
@@ -3356,21 +3365,27 @@ func TestChangefeedSingleColumnFamily(t *testing.T) {
3356
3365
3357
3366
sqlDB .ExpectErrWithTimeout (t , `nosuchfamily` , `CREATE CHANGEFEED FOR foo FAMILY nosuchfamily` )
3358
3367
3359
- fooMost := feed (t , f , `CREATE CHANGEFEED FOR foo FAMILY most` )
3368
+ // TODO(#145927): unskip this when we have family or topic info in enriched feeds.
3369
+ var args []any
3370
+ if _ , ok := f .(* webhookFeedFactory ); ok {
3371
+ args = append (args , optOutOfMetamorphicEnrichedEnvelope {reason : "metamorphic enriched envelope does not support column families for webhook sinks" })
3372
+ }
3373
+
3374
+ fooMost := feed (t , f , `CREATE CHANGEFEED FOR foo FAMILY most` , args ... )
3360
3375
defer closeFeed (t , fooMost )
3361
3376
assertPayloads (t , fooMost , []string {
3362
3377
`foo.most: [0]->{"after": {"a": 0, "b": "dog"}}` ,
3363
3378
`foo.most: [1]->{"after": {"a": 1, "b": "dollar"}}` ,
3364
3379
})
3365
3380
3366
- fooRest := feed (t , f , `CREATE CHANGEFEED FOR foo FAMILY rest` )
3381
+ fooRest := feed (t , f , `CREATE CHANGEFEED FOR foo FAMILY rest` , args ... )
3367
3382
defer closeFeed (t , fooRest )
3368
3383
assertPayloads (t , fooRest , []string {
3369
3384
`foo.rest: [0]->{"after": {"c": "cat", "d": null}}` ,
3370
3385
`foo.rest: [1]->{"after": {"c": "cent", "d": null}}` ,
3371
3386
})
3372
3387
3373
- fooBoth := feed (t , f , `CREATE CHANGEFEED FOR foo FAMILY rest, foo FAMILY most` )
3388
+ fooBoth := feed (t , f , `CREATE CHANGEFEED FOR foo FAMILY rest, foo FAMILY most` , args ... )
3374
3389
defer closeFeed (t , fooBoth )
3375
3390
assertPayloads (t , fooBoth , []string {
3376
3391
`foo.most: [0]->{"after": {"a": 0, "b": "dog"}}` ,
@@ -3406,13 +3421,14 @@ func TestChangefeedSingleColumnFamilySchemaChanges(t *testing.T) {
3406
3421
sqlDB .Exec (t , `CREATE TABLE foo (a INT PRIMARY KEY, b STRING, c STRING, FAMILY most (a,b), FAMILY rest (c))` )
3407
3422
sqlDB .Exec (t , `INSERT INTO foo values (0, 'dog', 'cat')` )
3408
3423
3409
- fooMost := feed (t , f , `CREATE CHANGEFEED FOR foo FAMILY most` )
3424
+ arg := optOutOfMetamorphicEnrichedEnvelope {reason : "metamorphic enriched envelope does not support column families; see #145927" }
3425
+ fooMost := feed (t , f , `CREATE CHANGEFEED FOR foo FAMILY most` , arg )
3410
3426
defer closeFeed (t , fooMost )
3411
3427
assertPayloads (t , fooMost , []string {
3412
3428
`foo.most: [0]->{"after": {"a": 0, "b": "dog"}}` ,
3413
3429
})
3414
3430
3415
- fooRest := feed (t , f , `CREATE CHANGEFEED FOR foo FAMILY rest` )
3431
+ fooRest := feed (t , f , `CREATE CHANGEFEED FOR foo FAMILY rest` , arg )
3416
3432
defer closeFeed (t , fooRest )
3417
3433
assertPayloads (t , fooRest , []string {
3418
3434
`foo.rest: [0]->{"after": {"c": "cat"}}` ,
@@ -3447,7 +3463,13 @@ func TestChangefeedEachColumnFamilySchemaChanges(t *testing.T) {
3447
3463
// Table with 2 column families.
3448
3464
sqlDB .Exec (t , `CREATE TABLE foo (a INT PRIMARY KEY, b STRING, c STRING, FAMILY f1 (a,b), FAMILY f2 (c))` )
3449
3465
sqlDB .Exec (t , `INSERT INTO foo values (0, 'dog', 'cat')` )
3450
- foo := feed (t , f , `CREATE CHANGEFEED FOR foo WITH split_column_families` )
3466
+
3467
+ var args []any
3468
+ if _ , ok := f .(* webhookFeedFactory ); ok {
3469
+ args = append (args , optOutOfMetamorphicEnrichedEnvelope {reason : "metamorphic enriched envelope does not support column families for webhook sinks" })
3470
+ }
3471
+
3472
+ foo := feed (t , f , `CREATE CHANGEFEED FOR foo WITH split_column_families` , args ... )
3451
3473
defer closeFeed (t , foo )
3452
3474
assertPayloads (t , foo , []string {
3453
3475
`foo.f1: [0]->{"after": {"a": 0, "b": "dog"}}` ,
@@ -3747,7 +3769,8 @@ func TestChangefeedCustomKey(t *testing.T) {
3747
3769
3748
3770
sqlDB .Exec (t , `CREATE TABLE foo (a INT PRIMARY KEY, b STRING, c STRING)` )
3749
3771
sqlDB .Exec (t , `INSERT INTO foo VALUES (0, 'dog', 'cat')` )
3750
- foo := feed (t , f , `CREATE CHANGEFEED FOR foo WITH key_column='b', unordered` )
3772
+ foo := feed (t , f , `CREATE CHANGEFEED FOR foo WITH key_column='b', unordered` ,
3773
+ optOutOfMetamorphicEnrichedEnvelope {reason : "custom key not supported in test framework" })
3751
3774
defer closeFeed (t , foo )
3752
3775
assertPayloads (t , foo , []string {
3753
3776
`foo: ["dog"]->{"after": {"a": 0, "b": "dog", "c": "cat"}}` ,
@@ -4160,12 +4183,9 @@ func TestChangefeedEnriched(t *testing.T) {
4160
4183
for _ , tc := range cases {
4161
4184
t .Run (tc .name , func (t * testing.T ) {
4162
4185
testFn := func (t * testing.T , s TestServer , f cdctest.TestFeedFactory ) {
4163
- _ , isWebhook := f .(* webhookFeedFactory )
4164
4186
4165
- // The webhook sink forces key_in_value, and its implementation
4166
- // in the cdctest framework strips out the key in the value.
4167
- // This makes it impossible to test these features on that sink.
4168
- // TODO(#138749): fix this situation
4187
+ // The webhook testfeed removes the key from the value, so skip keyInValue and schema'd tests for it.
4188
+ _ , isWebhook := f .(* webhookFeedFactory )
4169
4189
if isWebhook && (tc .keyInValue || slices .Contains (tc .enrichedProperties , "schema" )) {
4170
4190
return
4171
4191
}
@@ -4182,11 +4202,10 @@ func TestChangefeedEnriched(t *testing.T) {
4182
4202
}
4183
4203
foo := feed (t , f , create )
4184
4204
defer closeFeed (t , foo )
4185
- // TODO(#139660): the webhook sink forces topic_in_value, but
4186
- // this is not supported by the enriched envelope type. We should adapt
4187
- // the test framework to account for this.
4205
+
4206
+ // The webhook testfeed relies on source.table_name for assertion matching. See: #145927
4188
4207
topic := "foo"
4189
- if isWebhook {
4208
+ if isWebhook && ! slices . Contains ( tc . enrichedProperties , "source" ) {
4190
4209
topic = ""
4191
4210
}
4192
4211
@@ -4657,7 +4676,7 @@ func TestChangefeedEnrichedSourceWithDataJSON(t *testing.T) {
4657
4676
assertPayloadsEnriched (t , testFeed , []string {`foo: {"i": 0}->{"after": {"i": 0}, "op": "c"}` }, sourceAssertion )
4658
4677
}
4659
4678
}
4660
- for _ , sink := range []string {"kafka" , "pubsub" , "sinkless" , "cloudstorage" } {
4679
+ for _ , sink := range []string {"kafka" , "pubsub" , "sinkless" , "cloudstorage" , "webhook" } {
4661
4680
testLocality := roachpb.Locality {
4662
4681
Tiers : []roachpb.Tier {{
4663
4682
Key : "region" ,
@@ -4670,115 +4689,6 @@ func TestChangefeedEnrichedSourceWithDataJSON(t *testing.T) {
4670
4689
})
4671
4690
}
4672
4691
4673
- // TODO(#139660): the webhook sink forces topic_in_value, but
4674
- // this is not supported by the enriched envelope type. We should adapt
4675
- // the test framework to account for this.
4676
- func TestChangefeedEnrichedSourceWithDataJSONWebhook (t * testing.T ) {
4677
- defer leaktest .AfterTest (t )()
4678
- defer log .Scope (t ).Close (t )
4679
-
4680
- testutils .RunTrueAndFalse (t , "ts_{ns,hlc}" , func (t * testing.T , withUpdated bool ) {
4681
- testutils .RunTrueAndFalse (t , "mvcc_ts" , func (t * testing.T , withMVCCTS bool ) {
4682
- clusterName := "clusterName123"
4683
- dbVersion := "v999.0.0"
4684
- defer build .TestingOverrideVersion (dbVersion )()
4685
- mkTestFn := func (sink string ) func (t * testing.T , s TestServer , f cdctest.TestFeedFactory ) {
4686
- return func (t * testing.T , s TestServer , f cdctest.TestFeedFactory ) {
4687
- clusterID := s .Server .ExecutorConfig ().(sql.ExecutorConfig ).NodeInfo .LogicalClusterID ().String ()
4688
-
4689
- sqlDB := sqlutils .MakeSQLRunner (s .DB )
4690
-
4691
- sqlDB .Exec (t , `CREATE TABLE foo (i INT PRIMARY KEY)` )
4692
- sqlDB .Exec (t , `INSERT INTO foo values (0)` )
4693
-
4694
- var tableID int
4695
- sqlDB .QueryRow (t , `SELECT table_id FROM crdb_internal.tables WHERE name = 'foo' AND database_name = 'd'` ).Scan (& tableID )
4696
-
4697
- stmt := `CREATE CHANGEFEED FOR foo WITH envelope=enriched, enriched_properties='source', format=json`
4698
- if withMVCCTS {
4699
- stmt += ", mvcc_timestamp"
4700
- }
4701
- if withUpdated {
4702
- stmt += ", updated"
4703
- }
4704
- testFeed := feed (t , f , stmt )
4705
- defer closeFeed (t , testFeed )
4706
-
4707
- var jobID int64
4708
- var nodeName string
4709
- var sourceAssertion func (actualSource map [string ]any )
4710
- if ef , ok := testFeed .(cdctest.EnterpriseTestFeed ); ok {
4711
- jobID = int64 (ef .JobID ())
4712
- }
4713
- sqlDB .QueryRow (t , `SELECT value FROM crdb_internal.node_runtime_info where component = 'DB' and field = 'Host'` ).Scan (& nodeName )
4714
-
4715
- sourceAssertion = func (actualSource map [string ]any ) {
4716
- nodeID := actualSource ["node_id" ]
4717
- require .NotNil (t , nodeID )
4718
-
4719
- sourceNodeLocality := fmt .Sprintf (`region=%s` , testServerRegion )
4720
-
4721
- const dummyMvccTimestamp = "1234567890.0001"
4722
- jobIDStr := strconv .FormatInt (jobID , 10 )
4723
-
4724
- dummyUpdatedTSNS := 12345678900001000
4725
- dummyUpdatedTSHLC :=
4726
- hlc.Timestamp {WallTime : int64 (dummyUpdatedTSNS ), Logical : 0 }.AsOfSystemTime ()
4727
-
4728
- var assertion string
4729
- assertionMap := map [string ]any {
4730
- "cluster_id" : clusterID ,
4731
- "cluster_name" : clusterName ,
4732
- "crdb_internal_table_id" : tableID ,
4733
- "db_version" : dbVersion ,
4734
- "job_id" : jobIDStr ,
4735
- "node_id" : nodeID ,
4736
- "node_name" : nodeName ,
4737
- "origin" : "cockroachdb" ,
4738
- "changefeed_sink" : sink ,
4739
- "source_node_locality" : sourceNodeLocality ,
4740
- "database_name" : "d" ,
4741
- "schema_name" : "public" ,
4742
- "table_name" : "foo" ,
4743
- "primary_keys" : []any {"i" },
4744
- }
4745
- if withMVCCTS {
4746
- assertReasonableMVCCTimestamp (t , actualSource ["mvcc_timestamp" ].(string ))
4747
- actualSource ["mvcc_timestamp" ] = dummyMvccTimestamp
4748
- assertionMap ["mvcc_timestamp" ] = dummyMvccTimestamp
4749
- }
4750
- if withUpdated {
4751
- tsns := actualSource ["ts_ns" ].(gojson.Number )
4752
- tsnsInt , err := tsns .Int64 ()
4753
- require .NoError (t , err )
4754
- assertReasonableMVCCTimestamp (t , tsns .String ())
4755
- actualSource ["ts_ns" ] = dummyUpdatedTSNS
4756
- assertionMap ["ts_ns" ] = dummyUpdatedTSNS
4757
- assertEqualTSNSHLCWalltime (t , tsnsInt , actualSource ["ts_hlc" ].(string ))
4758
- actualSource ["ts_hlc" ] = dummyUpdatedTSHLC
4759
- assertionMap ["ts_hlc" ] = dummyUpdatedTSHLC
4760
- }
4761
- assertion = toJSON (t , assertionMap )
4762
-
4763
- value , err := reformatJSON (actualSource )
4764
- require .NoError (t , err )
4765
- require .JSONEq (t , assertion , string (value ))
4766
- }
4767
-
4768
- assertPayloadsEnriched (t , testFeed , []string {`: {"i": 0}->{"after": {"i": 0}, "op": "c"}` }, sourceAssertion )
4769
- }
4770
- }
4771
- testLocality := roachpb.Locality {
4772
- Tiers : []roachpb.Tier {{
4773
- Key : "region" ,
4774
- Value : testServerRegion ,
4775
- }}}
4776
- cdcTest (t , mkTestFn ("webhook" ), feedTestForceSink ("webhook" ), feedTestUseClusterName (clusterName ),
4777
- feedTestUseLocality (testLocality ))
4778
- })
4779
- })
4780
- }
4781
-
4782
4692
func TestChangefeedEnrichedSourceSchemaInfo (t * testing.T ) {
4783
4693
defer leaktest .AfterTest (t )()
4784
4694
defer log .Scope (t ).Close (t )
@@ -4880,7 +4790,7 @@ func TestChangefeedEnrichedSourceSchemaInfo(t *testing.T) {
4880
4790
assertPayloadsEnriched (t , foo , testCase .expectedRowsAfterSchemaChange , sourceAssertionAfterSchemaChange )
4881
4791
}
4882
4792
4883
- cdcTest (t , testFn , feedTestForceSink ("kafka" ))
4793
+ cdcTest (t , testFn , feedTestRestrictSinks ("kafka" ))
4884
4794
})
4885
4795
}
4886
4796
}
@@ -8513,7 +8423,8 @@ func TestChangefeedCheckpointSchemaChange(t *testing.T) {
8513
8423
sqlDB .Exec (t , `INSERT INTO foo VALUES (0, 'initial')` )
8514
8424
sqlDB .Exec (t , `UPSERT INTO bar VALUES (0, 'initial')` )
8515
8425
8516
- foo := feed (t , f , `CREATE CHANGEFEED FOR foo, bar WITH resolved = '100ms', updated` )
8426
+ foo := feed (t , f , `CREATE CHANGEFEED FOR foo, bar WITH resolved = '100ms', updated` ,
8427
+ optOutOfMetamorphicEnrichedEnvelope {reason : "this test uses readNextMessages directly, which the metamorphic enriched envelope does not support" })
8517
8428
8518
8429
// Sketch of the test is as follows:
8519
8430
//
@@ -8639,7 +8550,7 @@ func TestChangefeedCheckpointSchemaChange(t *testing.T) {
8639
8550
})
8640
8551
}
8641
8552
8642
- cdcTest (t , testFn , feedTestEnterpriseSinks )
8553
+ cdcTest (t , testFn )
8643
8554
}
8644
8555
8645
8556
func TestChangefeedBackfillCheckpoint (t * testing.T ) {
0 commit comments