Skip to content

Commit 162e51b

Browse files
craig[bot]asg0451yuzefovich
committed
147705: changefeedccl: add helper to detect changefeed errors r=aerfrei a=asg0451 Add a test helper to detect changefeed errors rather than just observing timeouts. Epic: None Release note: None 150431: opt: address minor test nits from a recent change r=yuzefovich a=yuzefovich Touches: #150376. Epic: None Release note: None Co-authored-by: Miles Frankel <[email protected]> Co-authored-by: Yahor Yuzefovich <[email protected]>
3 parents d5aceba + c9dd81c + b339b23 commit 162e51b

File tree

9 files changed

+176
-81
lines changed

9 files changed

+176
-81
lines changed

pkg/ccl/changefeedccl/alter_changefeed_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -514,7 +514,7 @@ func TestAlterChangefeedDropTargetAfterTableDrop(t *testing.T) {
514514
})
515515
}
516516

517-
cdcTest(t, testFn, feedTestEnterpriseSinks, feedTestNoExternalConnection)
517+
cdcTest(t, testFn, feedTestEnterpriseSinks, feedTestNoExternalConnection, withAllowChangefeedErr("error is expected when dropping"))
518518
}
519519

520520
func TestAlterChangefeedDropTargetFamily(t *testing.T) {

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 49 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -416,7 +416,7 @@ func TestRLSBlocking(t *testing.T) {
416416
require.Contains(t, err.Error(), expErrSubstr)
417417
}
418418

419-
cdcTest(t, testFn)
419+
cdcTest(t, testFn, withAllowChangefeedErr("expects terminal error"))
420420
}
421421

422422
func TestToJSONAsChangefeed(t *testing.T) {
@@ -744,7 +744,7 @@ func TestChangefeedSendError(t *testing.T) {
744744
`foo: [3]->{"after": {"a": 3}}`,
745745
`foo: [4]->{"after": {"a": 4}}`,
746746
})
747-
}, feedTestEnterpriseSinks)
747+
}, feedTestEnterpriseSinks, withAllowChangefeedErr("injects error"))
748748
}
749749

750750
func TestChangefeedBasicConfluentKafka(t *testing.T) {
@@ -909,9 +909,9 @@ func TestChangefeedTenants(t *testing.T) {
909909
defer leaktest.AfterTest(t)()
910910
defer log.Scope(t).Close(t)
911911

912-
kvServer, kvSQLdb, cleanup := startTestFullServer(t, feedTestOptions{argsFn: func(args *base.TestServerArgs) {
912+
kvServer, kvSQLdb, cleanup := startTestFullServer(t, makeOptions(t, withArgsFn(func(args *base.TestServerArgs) {
913913
args.ExternalIODirConfig.DisableOutbound = true
914-
}})
914+
})))
915915
defer cleanup()
916916

917917
tenantArgs := base.TestTenantArgs{
@@ -2529,7 +2529,7 @@ func TestChangefeedLaggingSpanCheckpointing(t *testing.T) {
25292529
defer leaktest.AfterTest(t)()
25302530
defer log.Scope(t).Close(t)
25312531

2532-
s, db, stopServer := startTestFullServer(t, feedTestOptions{})
2532+
s, db, stopServer := startTestFullServer(t, makeOptions(t, feedTestNoTenants))
25332533
defer stopServer()
25342534
sqlDB := sqlutils.MakeSQLRunner(db)
25352535

@@ -3506,7 +3506,7 @@ func TestChangefeedEachColumnFamily(t *testing.T) {
35063506
}
35073507
}
35083508

3509-
cdcTest(t, testFn)
3509+
cdcTest(t, testFn, withAllowChangefeedErr("expects terminal error"))
35103510
}
35113511

35123512
func TestChangefeedSingleColumnFamily(t *testing.T) {
@@ -3600,12 +3600,12 @@ func TestChangefeedSingleColumnFamilySchemaChanges(t *testing.T) {
36003600

36013601
// Removing all columns in a watched family fails the feed
36023602
waitForSchemaChange(t, sqlDB, `ALTER TABLE foo DROP column c`)
3603-
requireErrorSoon(context.Background(), t, fooRest,
3603+
requireTerminalErrorSoon(context.Background(), t, fooRest,
36043604
regexp.MustCompile(`CHANGEFEED targeting nonexistent or removed column family rest of table foo`))
36053605
}
36063606

36073607
runWithAndWithoutRegression141453(t, testFn, func(t *testing.T, testFn cdcTestFn) {
3608-
cdcTest(t, testFn)
3608+
cdcTest(t, testFn, withAllowChangefeedErr("expects terminal error"))
36093609
}, withMaybeUseLegacySchemaChanger())
36103610
}
36113611

@@ -3936,9 +3936,9 @@ func TestChangefeedCustomKey(t *testing.T) {
39363936
`foo: ["dog"]->{"after": {"a": 1, "b": "dog", "c": "zebra"}}`,
39373937
})
39383938
sqlDB.Exec(t, `ALTER TABLE foo RENAME COLUMN b to b2`)
3939-
requireErrorSoon(context.Background(), t, foo, regexp.MustCompile(`required column b not present`))
3939+
requireTerminalErrorSoon(context.Background(), t, foo, regexp.MustCompile(`required column b not present`))
39403940
}
3941-
cdcTest(t, testFn, feedTestForceSink("kafka"))
3941+
cdcTest(t, testFn, feedTestForceSink("kafka"), withAllowChangefeedErr("expects error"))
39423942
}
39433943

39443944
// Reproduce issue for #114196. This test verifies that changefeed with custom
@@ -5369,12 +5369,14 @@ func TestChangefeedOutputTopics(t *testing.T) {
53695369
})
53705370
}
53715371

5372-
// requireErrorSoon polls for the test feed for an error and asserts that
5373-
// the error matches the provided regex.
5374-
func requireErrorSoon(
5372+
// requireTerminalErrorSoon polls for the test feed for an error and asserts
5373+
// that the error matches the provided regex. This can either be a terminal
5374+
// error or an error encountered while parsing messages and doing testfeed
5375+
// things.
5376+
func requireTerminalErrorSoon(
53755377
ctx context.Context, t *testing.T, f cdctest.TestFeed, errRegex *regexp.Regexp,
53765378
) {
5377-
err := timeutil.RunWithTimeout(ctx, "requireErrorSoon", 30*time.Second, func(ctx context.Context) error {
5379+
err := timeutil.RunWithTimeout(ctx, "requireTerminalErrorSoon", 30*time.Second, func(ctx context.Context) error {
53785380
for {
53795381
select {
53805382
case <-ctx.Done():
@@ -5420,9 +5422,9 @@ func TestChangefeedFailOnTableOffline(t *testing.T) {
54205422
`for_import: [0]->{"after": {"a": 0, "b": null}}`,
54215423
})
54225424
sqlDB.Exec(t, `IMPORT INTO for_import CSV DATA ($1)`, dataSrv.URL)
5423-
requireErrorSoon(context.Background(), t, forImport,
5425+
requireTerminalErrorSoon(context.Background(), t, forImport,
54245426
regexp.MustCompile(`CHANGEFEED cannot target offline table: for_import \(offline reason: "importing"\)`))
5425-
})
5427+
}, withAllowChangefeedErr("expects terminal error"))
54265428

54275429
cdcTestNamedWithSystem(t, "reverted import fails changefeed with earlier cursor", func(t *testing.T, s TestServerWithSystem, f cdctest.TestFeedFactory) {
54285430
sysSQLDB := sqlutils.MakeSQLRunner(s.SystemDB)
@@ -5460,9 +5462,9 @@ func TestChangefeedFailOnTableOffline(t *testing.T) {
54605462
// Changefeed should fail regardless
54615463
forImport := feed(t, f, `CREATE CHANGEFEED FOR for_import WITH cursor=$1`, start)
54625464
defer closeFeed(t, forImport)
5463-
requireErrorSoon(context.Background(), t, forImport,
5465+
requireTerminalErrorSoon(context.Background(), t, forImport,
54645466
regexp.MustCompile(`CHANGEFEED cannot target offline table: for_import \(offline reason: "importing"\)`))
5465-
})
5467+
}, withAllowChangefeedErr("expects terminal error"))
54665468
}
54675469

54685470
func TestChangefeedRestartMultiNode(t *testing.T) {
@@ -6172,7 +6174,7 @@ func TestChangefeedTruncateOrDrop(t *testing.T) {
61726174
assertFailuresCounter(t, metrics, 3)
61736175
}
61746176

6175-
cdcTest(t, testFn, feedTestEnterpriseSinks)
6177+
cdcTest(t, testFn, feedTestEnterpriseSinks, withAllowChangefeedErr("expects errors"))
61766178
// will sometimes fail, non deterministic
61776179
}
61786180

@@ -6368,7 +6370,7 @@ func TestChangefeedRetryableError(t *testing.T) {
63686370
}
63696371
}
63706372

6371-
cdcTest(t, testFn, feedTestEnterpriseSinks)
6373+
cdcTest(t, testFn, feedTestEnterpriseSinks, withAllowChangefeedErr("expects error"))
63726374
}
63736375

63746376
func TestChangefeedJobUpdateFailsIfNotClaimed(t *testing.T) {
@@ -6563,7 +6565,7 @@ func TestChangefeedDataTTL(t *testing.T) {
65636565
// timestamp before beginning their backfill.
65646566
// TODO(samiskin): Tenant test disabled because this test requires
65656567
// forceTableGC which doesn't work on tenants
6566-
cdcTestWithSystem(t, testFn, feedTestForceSink("sinkless"), feedTestNoTenants)
6568+
cdcTestWithSystem(t, testFn, feedTestForceSink("sinkless"), feedTestNoTenants, withAllowChangefeedErr("expects batch ts gc error"))
65676569
}
65686570

65696571
// TestChangefeedOutdatedCursor ensures that create changefeeds fail with an
@@ -6603,7 +6605,7 @@ func TestChangefeedCursorAgeWarning(t *testing.T) {
66036605
}
66046606

66056607
testutils.RunValues(t, "cursor age", cursorAges, func(t *testing.T, cursorAge time.Duration) {
6606-
s, stopServer := makeServer(t)
6608+
s, stopServer := makeServer(t, withAllowChangefeedErr("expects batch ts gc error"))
66076609
defer stopServer()
66086610
knobs := s.TestingKnobs.
66096611
DistSQL.(*execinfra.TestingKnobs).
@@ -6725,7 +6727,7 @@ func TestChangefeedSchemaTTL(t *testing.T) {
67256727

67266728
// TODO(samiskin): tenant tests skipped because of forceTableGC not working
67276729
// with a ApplicationLayerInterface
6728-
cdcTestWithSystem(t, testFn, feedTestNoTenants)
6730+
cdcTestWithSystem(t, testFn, feedTestNoTenants, withAllowChangefeedErr("expects batch ts gc error"))
67296731
}
67306732

67316733
func TestChangefeedErrors(t *testing.T) {
@@ -7400,7 +7402,7 @@ func TestChangefeedDescription(t *testing.T) {
74007402

74017403
// Intentionally don't use the TestFeedFactory because we want to
74027404
// control the placeholders.
7403-
s, stopServer := makeServer(t)
7405+
s, stopServer := makeServerWithOptions(t, makeOptions(t, withAllowChangefeedErr("create strange changefeeds that don't actually run")))
74047406
defer stopServer()
74057407

74067408
sqlDB := sqlutils.MakeSQLRunner(s.DB)
@@ -7520,7 +7522,7 @@ func TestChangefeedPanicRecovery(t *testing.T) {
75207522
defer closeFeed(t, foo)
75217523
err := waitForFeedErr(t, foo, 2*time.Minute)
75227524
require.ErrorContains(t, err, "error evaluating CDC expression", "expected panic recovery while evaluating WHERE clause")
7523-
}, feedTestAdditionalSystemPrivs("REPAIRCLUSTER"))
7525+
}, feedTestAdditionalSystemPrivs("REPAIRCLUSTER"), withAllowChangefeedErr("expects error"))
75247526

75257527
// Check that all panics while evaluating the SELECT clause in an expression are recovered from.
75267528
// NB: REPAIRCLUSTER is required to use crdb_internal.force_panic.
@@ -7532,7 +7534,7 @@ func TestChangefeedPanicRecovery(t *testing.T) {
75327534
defer closeFeed(t, foo)
75337535
err := waitForFeedErr(t, foo, 2*time.Minute)
75347536
require.ErrorContains(t, err, "error evaluating CDC expression", "expected panic recovery while evaluating SELECT clause")
7535-
}, feedTestAdditionalSystemPrivs("REPAIRCLUSTER"))
7537+
}, feedTestAdditionalSystemPrivs("REPAIRCLUSTER"), withAllowChangefeedErr("expects error"))
75367538
}
75377539

75387540
func TestChangefeedPauseUnpause(t *testing.T) {
@@ -8007,7 +8009,7 @@ func TestChangefeedHandlesDrainingNodes(t *testing.T) {
80078009
// We use feedTestUseRootUserConnection to prevent the
80088010
// feed factory from trying to create a test user. Because the registry is draining, creating the test user
80098011
// will fail and the test will fail prematurely.
8010-
f, closeSink := makeFeedFactory(t, randomSinkType(feedTestEnterpriseSinks), tc.Server(1), tc.ServerConn(0),
8012+
f, closeSink := makeFeedFactory(t, randomSinkType(t, feedTestEnterpriseSinks), tc.Server(1), tc.ServerConn(0),
80118013
feedTestUseRootUserConnection)
80128014
defer closeSink()
80138015

@@ -8044,7 +8046,7 @@ func TestChangefeedHandlesRollingRestart(t *testing.T) {
80448046

80458047
const numNodes = 4
80468048

8047-
opts := makeOptions()
8049+
opts := makeOptions(t)
80488050
opts.forceRootUserConnection = true
80498051
defer addCloudStorageOptions(t, &opts)()
80508052

@@ -8264,7 +8266,7 @@ func TestChangefeedTimelyResolvedTimestampUpdatePostRollingRestart(t *testing.T)
82648266
skip.UnderDeadlock(t)
82658267
skip.UnderRace(t)
82668268

8267-
opts := makeOptions()
8269+
opts := makeOptions(t)
82688270
defer addCloudStorageOptions(t, &opts)()
82698271
opts.forceRootUserConnection = true
82708272
defer changefeedbase.TestingSetDefaultMinCheckpointFrequency(testSinkFlushFrequency)()
@@ -8365,7 +8367,7 @@ func TestChangefeedPropagatesTerminalError(t *testing.T) {
83658367
defer leaktest.AfterTest(t)()
83668368
defer log.Scope(t).Close(t)
83678369

8368-
opts := makeOptions()
8370+
opts := makeOptions(t)
83698371
defer addCloudStorageOptions(t, &opts)()
83708372
defer changefeedbase.TestingSetDefaultMinCheckpointFrequency(testSinkFlushFrequency)()
83718373
defer testingUseFastRetry()()
@@ -9288,7 +9290,7 @@ func TestChangefeedOrderingWithErrors(t *testing.T) {
92889290

92899291
// only used for webhook sink for now since it's the only testfeed where
92909292
// we can control the ordering of errors
9291-
cdcTest(t, testFn, feedTestForceSink("webhook"), feedTestNoExternalConnection)
9293+
cdcTest(t, testFn, feedTestForceSink("webhook"), feedTestNoExternalConnection, withAllowChangefeedErr("expects error"))
92929294
}
92939295

92949296
func TestChangefeedOnErrorOption(t *testing.T) {
@@ -9383,7 +9385,7 @@ func TestChangefeedOnErrorOption(t *testing.T) {
93839385
})
93849386
}
93859387

9386-
cdcTest(t, testFn, feedTestEnterpriseSinks)
9388+
cdcTest(t, testFn, feedTestEnterpriseSinks, withAllowChangefeedErr("expects error"))
93879389
}
93889390

93899391
func TestDistSenderRangeFeedPopulatesVirtualTable(t *testing.T) {
@@ -9835,7 +9837,7 @@ func TestChangefeedInvalidPredicate(t *testing.T) {
98359837
defer leaktest.AfterTest(t)()
98369838
defer log.Scope(t).Close(t)
98379839

9838-
_, db, stopServer := startTestFullServer(t, feedTestOptions{})
9840+
_, db, stopServer := startTestFullServer(t, makeOptions(t, feedTestNoTenants))
98399841
defer stopServer()
98409842
sqlDB := sqlutils.MakeSQLRunner(db)
98419843

@@ -10112,7 +10114,11 @@ func TestChangefeedPredicateWithSchemaChange(t *testing.T) {
1011210114
},
1011310115
} {
1011410116
t.Run(tc.name, func(t *testing.T) {
10115-
cdcTest(t, testFn(tc), feedTestEnterpriseSinks)
10117+
testOpts := []feedTestOption{feedTestEnterpriseSinks}
10118+
if tc.expectErr != "" {
10119+
testOpts = append(testOpts, withAllowChangefeedErr(tc.expectErr))
10120+
}
10121+
cdcTest(t, testFn(tc), testOpts...)
1011610122
})
1011710123
}
1011810124
}
@@ -10370,7 +10376,7 @@ func TestChangefeedMultiPodTenantPlanning(t *testing.T) {
1037010376
// Ensure both pods can be assigned work
1037110377
waitForTenantPodsActive(t, tenant1Server, 2)
1037210378

10373-
feedFactory, cleanupSink := makeFeedFactory(t, randomSinkType(feedTestEnterpriseSinks), tenant1Server, tenant1DB)
10379+
feedFactory, cleanupSink := makeFeedFactory(t, randomSinkType(t, feedTestEnterpriseSinks), tenant1Server, tenant1DB)
1037410380
defer cleanupSink()
1037510381

1037610382
// Run a changefeed across two tables to guarantee multiple spans that can be spread across the aggregators
@@ -10501,7 +10507,7 @@ func TestChangefeedFailedTelemetryLogs(t *testing.T) {
1050110507
}
1050210508

1050310509
t.Run(`connection_closed`, func(t *testing.T) {
10504-
s, stopServer := makeServer(t)
10510+
s, stopServer := makeServer(t, withAllowChangefeedErr("expects error"))
1050510511
defer stopServer()
1050610512

1050710513
sqlDB := sqlutils.MakeSQLRunner(s.DB)
@@ -10535,7 +10541,7 @@ func TestChangefeedFailedTelemetryLogs(t *testing.T) {
1053510541
failLogs := waitForLogs(t, beforeCreate)
1053610542
require.Equal(t, 1, len(failLogs))
1053710543
require.Equal(t, failLogs[0].FailureType, changefeedbase.UserInput)
10538-
}, feedTestEnterpriseSinks)
10544+
}, feedTestEnterpriseSinks, withAllowChangefeedErr("expects error"))
1053910545

1054010546
cdcTestNamed(t, "unknown_error", func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
1054110547
sqlDB := sqlutils.MakeSQLRunner(s.DB)
@@ -10560,7 +10566,7 @@ func TestChangefeedFailedTelemetryLogs(t *testing.T) {
1056010566
require.Equal(t, failLogs[0].FailureType, changefeedbase.UnknownError)
1056110567
require.Contains(t, []string{`gcpubsub`, `external`}, failLogs[0].SinkType)
1056210568
require.Equal(t, failLogs[0].NumTables, int32(1))
10563-
}, feedTestForceSink("pubsub"))
10569+
}, feedTestForceSink("pubsub"), withAllowChangefeedErr("expects error"))
1056410570
}
1056510571

1056610572
func TestChangefeedCanceledTelemetryLogs(t *testing.T) {
@@ -10817,7 +10823,7 @@ func TestChangefeedKafkaMessageTooLarge(t *testing.T) {
1081710823
}
1081810824
}
1081910825

10820-
cdcTest(t, testFn, feedTestForceSink(`kafka`))
10826+
cdcTest(t, testFn, feedTestForceSink(`kafka`), withAllowChangefeedErr("expects kafka error"))
1082110827
}
1082210828

1082310829
// Regression for #85902.
@@ -11188,7 +11194,7 @@ func TestHighwaterDoesNotRegressOnRetry(t *testing.T) {
1118811194
case <-doneCh:
1118911195
}
1119011196
}
11191-
cdcTest(t, testFn, feedTestEnterpriseSinks)
11197+
cdcTest(t, testFn, feedTestEnterpriseSinks, withAllowChangefeedErr("injects error"))
1119211198
}
1119311199

1119411200
// TestChangefeedPubsubResolvedMessages tests that the pubsub sink emits
@@ -12024,7 +12030,9 @@ func TestCloudstorageParallelCompression(t *testing.T) {
1202412030
const numFeedsEach = 10
1202512031

1202612032
testutils.RunValues(t, "compression", []string{"zstd", "gzip"}, func(t *testing.T, compression string) {
12027-
s, cleanup := makeServer(t)
12033+
opts := makeOptions(t)
12034+
opts.externalIODir = t.TempDir()
12035+
s, cleanup := makeServerWithOptions(t, opts)
1202812036
defer cleanup()
1202912037

1203012038
sqlDB := sqlutils.MakeSQLRunner(s.DB)

pkg/ccl/changefeedccl/encoder_test.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -726,7 +726,10 @@ func TestAvroSchemaNaming(t *testing.T) {
726726

727727
}
728728

729-
cdcTest(t, testFn, feedTestForceSink("kafka"), feedTestUseRootUserConnection)
729+
// TODO(#150537): This test sometimes encounters errors like "CHANGEFEED
730+
// created on a table with a single column family (drivers) cannot now
731+
// target a table with 2 families". Why?
732+
cdcTest(t, testFn, feedTestForceSink("kafka"), feedTestUseRootUserConnection, withAllowChangefeedErr("inexplicable errors"))
730733
}
731734

732735
func TestAvroSchemaNamespace(t *testing.T) {
@@ -879,7 +882,7 @@ func TestAvroMigrateToUnsupportedColumn(t *testing.T) {
879882
}
880883
}
881884

882-
cdcTest(t, testFn, feedTestForceSink("kafka"))
885+
cdcTest(t, testFn, feedTestForceSink("kafka"), withAllowChangefeedErr("checks error manually"))
883886
}
884887

885888
func TestAvroLedger(t *testing.T) {

0 commit comments

Comments
 (0)