Skip to content

Commit 2c5c35a

Browse files
craig[bot]asg0451
andcommitted
Merge #146133
146133: changefeedccl: add metamorphic enriched envelope to all changefeed tests r=andyyang890 a=asg0451 Adds a metamorphic enriched envelope to all changefeed tests, with the ability to opt out. Fixes #139660 Release note: None Co-authored-by: Miles Frankel <[email protected]>
2 parents 1527d0a + a2654ef commit 2c5c35a

File tree

7 files changed

+274
-137
lines changed

7 files changed

+274
-137
lines changed

pkg/ccl/changefeedccl/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,7 @@ go_test(
328328
"//pkg/util/leaktest",
329329
"//pkg/util/log",
330330
"//pkg/util/log/eventpb",
331+
"//pkg/util/metamorphic",
331332
"//pkg/util/mon",
332333
"//pkg/util/parquet",
333334
"//pkg/util/protoutil",

pkg/ccl/changefeedccl/alter_changefeed_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -513,7 +513,8 @@ func TestAlterChangefeedDropTargetFamily(t *testing.T) {
513513
sqlDB := sqlutils.MakeSQLRunner(s.DB)
514514
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING, FAMILY onlya (a), FAMILY onlyb (b))`)
515515

516-
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo FAMILY onlya, foo FAMILY onlyb`)
516+
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo FAMILY onlya, foo FAMILY onlyb`,
517+
optOutOfMetamorphicEnrichedEnvelope{"requires families"})
517518
defer closeFeed(t, testFeed)
518519

519520
feed, ok := testFeed.(cdctest.EnterpriseTestFeed)

pkg/ccl/changefeedccl/cdctest/testfeed.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,4 +109,11 @@ type EnterpriseTestFeed interface {
109109
HighWaterMark() (hlc.Timestamp, error)
110110
// WaitForHighWaterMark waits until job highwatermark progresses beyond specified threshold.
111111
WaitForHighWaterMark(minHWM hlc.Timestamp) error
112+
113+
// ForcedEnriched returns true if the feed was metamorphically forced to use
114+
// the enriched envelopes.
115+
ForcedEnriched() bool
116+
117+
// SetForcedEnriched sets the forced enriched flag.
118+
SetForcedEnriched(forced bool)
112119
}

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 45 additions & 134 deletions
Original file line numberDiff line numberDiff line change
@@ -972,7 +972,7 @@ func TestChangefeedFullTableName(t *testing.T) {
972972
sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 'a')`)
973973

974974
t.Run(`envelope=row`, func(t *testing.T) {
975-
foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH full_table_name`)
975+
foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH full_table_name`, optOutOfMetamorphicEnrichedEnvelope{reason: "broken for webhook; see #145927"})
976976
defer closeFeed(t, foo)
977977
assertPayloads(t, foo, []string{`d.public.foo: [1]->{"after": {"a": 1, "b": "a"}}`})
978978
})
@@ -1100,7 +1100,7 @@ func TestChangefeedTimestamps(t *testing.T) {
11001100
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`)
11011101
sqlDB.Exec(t, `INSERT INTO foo VALUES (0)`)
11021102

1103-
foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH updated, resolved`)
1103+
foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH updated, resolved`, optOutOfMetamorphicEnrichedEnvelope{reason: "this test calls testfeed.Next() directly"})
11041104
defer closeFeed(t, foo)
11051105

11061106
// Grab the first non resolved-timestamp row.
@@ -1805,7 +1805,8 @@ func TestNoStopAfterNonTargetColumnDrop(t *testing.T) {
18051805
sqlDB.Exec(t, `INSERT INTO hasfams values (0, 'a', 'b', 'c')`)
18061806

18071807
// Open up the changefeed.
1808-
cf := feed(t, f, `CREATE CHANGEFEED FOR TABLE hasfams FAMILY b_and_c WITH schema_change_policy='stop'`)
1808+
cf := feed(t, f, `CREATE CHANGEFEED FOR TABLE hasfams FAMILY b_and_c WITH schema_change_policy='stop'`,
1809+
optOutOfMetamorphicEnrichedEnvelope{"requires families"})
18091810
defer closeFeed(t, cf)
18101811
assertPayloads(t, cf, []string{
18111812
`hasfams.b_and_c: [0]->{"after": {"b": "b", "c": "c"}}`,
@@ -2046,8 +2047,12 @@ func TestChangefeedColumnDropsOnTheSameTableWithMultipleFamilies(t *testing.T) {
20462047
sqlDB.Exec(t, `CREATE TABLE hasfams (id int primary key, a string, b string, c string, FAMILY id_a (id, a), FAMILY b_and_c (b, c))`)
20472048
sqlDB.Exec(t, `INSERT INTO hasfams values (0, 'a', 'b', 'c')`)
20482049

2050+
var args []any
2051+
if _, ok := f.(*webhookFeedFactory); ok {
2052+
args = append(args, optOutOfMetamorphicEnrichedEnvelope{reason: "metamorphic enriched envelope does not support column families for webhook sinks"})
2053+
}
20492054
// Open up the changefeed.
2050-
cf := feed(t, f, `CREATE CHANGEFEED FOR TABLE hasfams FAMILY id_a, TABLE hasfams FAMILY b_and_c`)
2055+
cf := feed(t, f, `CREATE CHANGEFEED FOR TABLE hasfams FAMILY id_a, TABLE hasfams FAMILY b_and_c`, args...)
20512056
defer closeFeed(t, cf)
20522057
assertPayloads(t, cf, []string{
20532058
`hasfams.b_and_c: [0]->{"after": {"b": "b", "c": "c"}}`,
@@ -3286,7 +3291,11 @@ func TestChangefeedEachColumnFamily(t *testing.T) {
32863291
// Must specify WITH split_column_families
32873292
sqlDB.ExpectErrWithTimeout(t, `multiple column families`, `CREATE CHANGEFEED FOR foo`)
32883293

3289-
foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH split_column_families`)
3294+
var args []any
3295+
if _, ok := f.(*webhookFeedFactory); ok {
3296+
args = append(args, optOutOfMetamorphicEnrichedEnvelope{reason: "metamorphic enriched envelope does not support column families for webhook sinks"})
3297+
}
3298+
foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH split_column_families`, args...)
32903299
defer closeFeed(t, foo)
32913300

32923301
assertPayloads(t, foo, []string{
@@ -3356,21 +3365,27 @@ func TestChangefeedSingleColumnFamily(t *testing.T) {
33563365

33573366
sqlDB.ExpectErrWithTimeout(t, `nosuchfamily`, `CREATE CHANGEFEED FOR foo FAMILY nosuchfamily`)
33583367

3359-
fooMost := feed(t, f, `CREATE CHANGEFEED FOR foo FAMILY most`)
3368+
// TODO(#145927): unskip this when we have family or topic info in enriched feeds.
3369+
var args []any
3370+
if _, ok := f.(*webhookFeedFactory); ok {
3371+
args = append(args, optOutOfMetamorphicEnrichedEnvelope{reason: "metamorphic enriched envelope does not support column families for webhook sinks"})
3372+
}
3373+
3374+
fooMost := feed(t, f, `CREATE CHANGEFEED FOR foo FAMILY most`, args...)
33603375
defer closeFeed(t, fooMost)
33613376
assertPayloads(t, fooMost, []string{
33623377
`foo.most: [0]->{"after": {"a": 0, "b": "dog"}}`,
33633378
`foo.most: [1]->{"after": {"a": 1, "b": "dollar"}}`,
33643379
})
33653380

3366-
fooRest := feed(t, f, `CREATE CHANGEFEED FOR foo FAMILY rest`)
3381+
fooRest := feed(t, f, `CREATE CHANGEFEED FOR foo FAMILY rest`, args...)
33673382
defer closeFeed(t, fooRest)
33683383
assertPayloads(t, fooRest, []string{
33693384
`foo.rest: [0]->{"after": {"c": "cat", "d": null}}`,
33703385
`foo.rest: [1]->{"after": {"c": "cent", "d": null}}`,
33713386
})
33723387

3373-
fooBoth := feed(t, f, `CREATE CHANGEFEED FOR foo FAMILY rest, foo FAMILY most`)
3388+
fooBoth := feed(t, f, `CREATE CHANGEFEED FOR foo FAMILY rest, foo FAMILY most`, args...)
33743389
defer closeFeed(t, fooBoth)
33753390
assertPayloads(t, fooBoth, []string{
33763391
`foo.most: [0]->{"after": {"a": 0, "b": "dog"}}`,
@@ -3406,13 +3421,14 @@ func TestChangefeedSingleColumnFamilySchemaChanges(t *testing.T) {
34063421
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING, c STRING, FAMILY most (a,b), FAMILY rest (c))`)
34073422
sqlDB.Exec(t, `INSERT INTO foo values (0, 'dog', 'cat')`)
34083423

3409-
fooMost := feed(t, f, `CREATE CHANGEFEED FOR foo FAMILY most`)
3424+
arg := optOutOfMetamorphicEnrichedEnvelope{reason: "metamorphic enriched envelope does not support column families; see #145927"}
3425+
fooMost := feed(t, f, `CREATE CHANGEFEED FOR foo FAMILY most`, arg)
34103426
defer closeFeed(t, fooMost)
34113427
assertPayloads(t, fooMost, []string{
34123428
`foo.most: [0]->{"after": {"a": 0, "b": "dog"}}`,
34133429
})
34143430

3415-
fooRest := feed(t, f, `CREATE CHANGEFEED FOR foo FAMILY rest`)
3431+
fooRest := feed(t, f, `CREATE CHANGEFEED FOR foo FAMILY rest`, arg)
34163432
defer closeFeed(t, fooRest)
34173433
assertPayloads(t, fooRest, []string{
34183434
`foo.rest: [0]->{"after": {"c": "cat"}}`,
@@ -3447,7 +3463,13 @@ func TestChangefeedEachColumnFamilySchemaChanges(t *testing.T) {
34473463
// Table with 2 column families.
34483464
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING, c STRING, FAMILY f1 (a,b), FAMILY f2 (c))`)
34493465
sqlDB.Exec(t, `INSERT INTO foo values (0, 'dog', 'cat')`)
3450-
foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH split_column_families`)
3466+
3467+
var args []any
3468+
if _, ok := f.(*webhookFeedFactory); ok {
3469+
args = append(args, optOutOfMetamorphicEnrichedEnvelope{reason: "metamorphic enriched envelope does not support column families for webhook sinks"})
3470+
}
3471+
3472+
foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH split_column_families`, args...)
34513473
defer closeFeed(t, foo)
34523474
assertPayloads(t, foo, []string{
34533475
`foo.f1: [0]->{"after": {"a": 0, "b": "dog"}}`,
@@ -3747,7 +3769,8 @@ func TestChangefeedCustomKey(t *testing.T) {
37473769

37483770
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING, c STRING)`)
37493771
sqlDB.Exec(t, `INSERT INTO foo VALUES (0, 'dog', 'cat')`)
3750-
foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH key_column='b', unordered`)
3772+
foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH key_column='b', unordered`,
3773+
optOutOfMetamorphicEnrichedEnvelope{reason: "custom key not supported in test framework"})
37513774
defer closeFeed(t, foo)
37523775
assertPayloads(t, foo, []string{
37533776
`foo: ["dog"]->{"after": {"a": 0, "b": "dog", "c": "cat"}}`,
@@ -4160,12 +4183,9 @@ func TestChangefeedEnriched(t *testing.T) {
41604183
for _, tc := range cases {
41614184
t.Run(tc.name, func(t *testing.T) {
41624185
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
4163-
_, isWebhook := f.(*webhookFeedFactory)
41644186

4165-
// The webhook sink forces key_in_value, and its implementation
4166-
// in the cdctest framework strips out the key in the value.
4167-
// This makes it impossible to test these features on that sink.
4168-
// TODO(#138749): fix this situation
4187+
// The webhook testfeed removes the key from the value, so skip keyInValue and schema'd tests for it.
4188+
_, isWebhook := f.(*webhookFeedFactory)
41694189
if isWebhook && (tc.keyInValue || slices.Contains(tc.enrichedProperties, "schema")) {
41704190
return
41714191
}
@@ -4182,11 +4202,10 @@ func TestChangefeedEnriched(t *testing.T) {
41824202
}
41834203
foo := feed(t, f, create)
41844204
defer closeFeed(t, foo)
4185-
// TODO(#139660): the webhook sink forces topic_in_value, but
4186-
// this is not supported by the enriched envelope type. We should adapt
4187-
// the test framework to account for this.
4205+
4206+
// The webhook testfeed relies on source.table_name for assertion matching. See: #145927
41884207
topic := "foo"
4189-
if isWebhook {
4208+
if isWebhook && !slices.Contains(tc.enrichedProperties, "source") {
41904209
topic = ""
41914210
}
41924211

@@ -4657,7 +4676,7 @@ func TestChangefeedEnrichedSourceWithDataJSON(t *testing.T) {
46574676
assertPayloadsEnriched(t, testFeed, []string{`foo: {"i": 0}->{"after": {"i": 0}, "op": "c"}`}, sourceAssertion)
46584677
}
46594678
}
4660-
for _, sink := range []string{"kafka", "pubsub", "sinkless", "cloudstorage"} {
4679+
for _, sink := range []string{"kafka", "pubsub", "sinkless", "cloudstorage", "webhook"} {
46614680
testLocality := roachpb.Locality{
46624681
Tiers: []roachpb.Tier{{
46634682
Key: "region",
@@ -4670,115 +4689,6 @@ func TestChangefeedEnrichedSourceWithDataJSON(t *testing.T) {
46704689
})
46714690
}
46724691

4673-
// TODO(#139660): the webhook sink forces topic_in_value, but
4674-
// this is not supported by the enriched envelope type. We should adapt
4675-
// the test framework to account for this.
4676-
func TestChangefeedEnrichedSourceWithDataJSONWebhook(t *testing.T) {
4677-
defer leaktest.AfterTest(t)()
4678-
defer log.Scope(t).Close(t)
4679-
4680-
testutils.RunTrueAndFalse(t, "ts_{ns,hlc}", func(t *testing.T, withUpdated bool) {
4681-
testutils.RunTrueAndFalse(t, "mvcc_ts", func(t *testing.T, withMVCCTS bool) {
4682-
clusterName := "clusterName123"
4683-
dbVersion := "v999.0.0"
4684-
defer build.TestingOverrideVersion(dbVersion)()
4685-
mkTestFn := func(sink string) func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
4686-
return func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
4687-
clusterID := s.Server.ExecutorConfig().(sql.ExecutorConfig).NodeInfo.LogicalClusterID().String()
4688-
4689-
sqlDB := sqlutils.MakeSQLRunner(s.DB)
4690-
4691-
sqlDB.Exec(t, `CREATE TABLE foo (i INT PRIMARY KEY)`)
4692-
sqlDB.Exec(t, `INSERT INTO foo values (0)`)
4693-
4694-
var tableID int
4695-
sqlDB.QueryRow(t, `SELECT table_id FROM crdb_internal.tables WHERE name = 'foo' AND database_name = 'd'`).Scan(&tableID)
4696-
4697-
stmt := `CREATE CHANGEFEED FOR foo WITH envelope=enriched, enriched_properties='source', format=json`
4698-
if withMVCCTS {
4699-
stmt += ", mvcc_timestamp"
4700-
}
4701-
if withUpdated {
4702-
stmt += ", updated"
4703-
}
4704-
testFeed := feed(t, f, stmt)
4705-
defer closeFeed(t, testFeed)
4706-
4707-
var jobID int64
4708-
var nodeName string
4709-
var sourceAssertion func(actualSource map[string]any)
4710-
if ef, ok := testFeed.(cdctest.EnterpriseTestFeed); ok {
4711-
jobID = int64(ef.JobID())
4712-
}
4713-
sqlDB.QueryRow(t, `SELECT value FROM crdb_internal.node_runtime_info where component = 'DB' and field = 'Host'`).Scan(&nodeName)
4714-
4715-
sourceAssertion = func(actualSource map[string]any) {
4716-
nodeID := actualSource["node_id"]
4717-
require.NotNil(t, nodeID)
4718-
4719-
sourceNodeLocality := fmt.Sprintf(`region=%s`, testServerRegion)
4720-
4721-
const dummyMvccTimestamp = "1234567890.0001"
4722-
jobIDStr := strconv.FormatInt(jobID, 10)
4723-
4724-
dummyUpdatedTSNS := 12345678900001000
4725-
dummyUpdatedTSHLC :=
4726-
hlc.Timestamp{WallTime: int64(dummyUpdatedTSNS), Logical: 0}.AsOfSystemTime()
4727-
4728-
var assertion string
4729-
assertionMap := map[string]any{
4730-
"cluster_id": clusterID,
4731-
"cluster_name": clusterName,
4732-
"crdb_internal_table_id": tableID,
4733-
"db_version": dbVersion,
4734-
"job_id": jobIDStr,
4735-
"node_id": nodeID,
4736-
"node_name": nodeName,
4737-
"origin": "cockroachdb",
4738-
"changefeed_sink": sink,
4739-
"source_node_locality": sourceNodeLocality,
4740-
"database_name": "d",
4741-
"schema_name": "public",
4742-
"table_name": "foo",
4743-
"primary_keys": []any{"i"},
4744-
}
4745-
if withMVCCTS {
4746-
assertReasonableMVCCTimestamp(t, actualSource["mvcc_timestamp"].(string))
4747-
actualSource["mvcc_timestamp"] = dummyMvccTimestamp
4748-
assertionMap["mvcc_timestamp"] = dummyMvccTimestamp
4749-
}
4750-
if withUpdated {
4751-
tsns := actualSource["ts_ns"].(gojson.Number)
4752-
tsnsInt, err := tsns.Int64()
4753-
require.NoError(t, err)
4754-
assertReasonableMVCCTimestamp(t, tsns.String())
4755-
actualSource["ts_ns"] = dummyUpdatedTSNS
4756-
assertionMap["ts_ns"] = dummyUpdatedTSNS
4757-
assertEqualTSNSHLCWalltime(t, tsnsInt, actualSource["ts_hlc"].(string))
4758-
actualSource["ts_hlc"] = dummyUpdatedTSHLC
4759-
assertionMap["ts_hlc"] = dummyUpdatedTSHLC
4760-
}
4761-
assertion = toJSON(t, assertionMap)
4762-
4763-
value, err := reformatJSON(actualSource)
4764-
require.NoError(t, err)
4765-
require.JSONEq(t, assertion, string(value))
4766-
}
4767-
4768-
assertPayloadsEnriched(t, testFeed, []string{`: {"i": 0}->{"after": {"i": 0}, "op": "c"}`}, sourceAssertion)
4769-
}
4770-
}
4771-
testLocality := roachpb.Locality{
4772-
Tiers: []roachpb.Tier{{
4773-
Key: "region",
4774-
Value: testServerRegion,
4775-
}}}
4776-
cdcTest(t, mkTestFn("webhook"), feedTestForceSink("webhook"), feedTestUseClusterName(clusterName),
4777-
feedTestUseLocality(testLocality))
4778-
})
4779-
})
4780-
}
4781-
47824692
func TestChangefeedEnrichedSourceSchemaInfo(t *testing.T) {
47834693
defer leaktest.AfterTest(t)()
47844694
defer log.Scope(t).Close(t)
@@ -4880,7 +4790,7 @@ func TestChangefeedEnrichedSourceSchemaInfo(t *testing.T) {
48804790
assertPayloadsEnriched(t, foo, testCase.expectedRowsAfterSchemaChange, sourceAssertionAfterSchemaChange)
48814791
}
48824792

4883-
cdcTest(t, testFn, feedTestForceSink("kafka"))
4793+
cdcTest(t, testFn, feedTestRestrictSinks("kafka"))
48844794
})
48854795
}
48864796
}
@@ -8513,7 +8423,8 @@ func TestChangefeedCheckpointSchemaChange(t *testing.T) {
85138423
sqlDB.Exec(t, `INSERT INTO foo VALUES (0, 'initial')`)
85148424
sqlDB.Exec(t, `UPSERT INTO bar VALUES (0, 'initial')`)
85158425

8516-
foo := feed(t, f, `CREATE CHANGEFEED FOR foo, bar WITH resolved = '100ms', updated`)
8426+
foo := feed(t, f, `CREATE CHANGEFEED FOR foo, bar WITH resolved = '100ms', updated`,
8427+
optOutOfMetamorphicEnrichedEnvelope{reason: "this test uses readNextMessages directly, which the metamorphic enriched envelope does not support"})
85178428

85188429
// Sketch of the test is as follows:
85198430
//
@@ -8639,7 +8550,7 @@ func TestChangefeedCheckpointSchemaChange(t *testing.T) {
86398550
})
86408551
}
86418552

8642-
cdcTest(t, testFn, feedTestEnterpriseSinks)
8553+
cdcTest(t, testFn)
86438554
}
86448555

86458556
func TestChangefeedBackfillCheckpoint(t *testing.T) {

0 commit comments

Comments
 (0)