Skip to content

Commit c9dd81c

Browse files
committed
changefeedccl: add helper to detect changefeed errors
Add a test helper to detect changefeed errors rather than just observing timeouts. Epic: None Release note: None
1 parent 2f72cf4 commit c9dd81c

File tree

6 files changed

+171
-62
lines changed

6 files changed

+171
-62
lines changed

pkg/ccl/changefeedccl/alter_changefeed_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -514,7 +514,7 @@ func TestAlterChangefeedDropTargetAfterTableDrop(t *testing.T) {
514514
})
515515
}
516516

517-
cdcTest(t, testFn, feedTestEnterpriseSinks, feedTestNoExternalConnection)
517+
cdcTest(t, testFn, feedTestEnterpriseSinks, feedTestNoExternalConnection, withAllowChangefeedErr("error is expected when dropping"))
518518
}
519519

520520
func TestAlterChangefeedDropTargetFamily(t *testing.T) {

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 49 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -416,7 +416,7 @@ func TestRLSBlocking(t *testing.T) {
416416
require.Contains(t, err.Error(), expErrSubstr)
417417
}
418418

419-
cdcTest(t, testFn)
419+
cdcTest(t, testFn, withAllowChangefeedErr("expects terminal error"))
420420
}
421421

422422
func TestToJSONAsChangefeed(t *testing.T) {
@@ -744,7 +744,7 @@ func TestChangefeedSendError(t *testing.T) {
744744
`foo: [3]->{"after": {"a": 3}}`,
745745
`foo: [4]->{"after": {"a": 4}}`,
746746
})
747-
}, feedTestEnterpriseSinks)
747+
}, feedTestEnterpriseSinks, withAllowChangefeedErr("injects error"))
748748
}
749749

750750
func TestChangefeedBasicConfluentKafka(t *testing.T) {
@@ -837,9 +837,9 @@ func TestChangefeedTenants(t *testing.T) {
837837
defer leaktest.AfterTest(t)()
838838
defer log.Scope(t).Close(t)
839839

840-
kvServer, kvSQLdb, cleanup := startTestFullServer(t, feedTestOptions{argsFn: func(args *base.TestServerArgs) {
840+
kvServer, kvSQLdb, cleanup := startTestFullServer(t, makeOptions(t, withArgsFn(func(args *base.TestServerArgs) {
841841
args.ExternalIODirConfig.DisableOutbound = true
842-
}})
842+
})))
843843
defer cleanup()
844844

845845
tenantArgs := base.TestTenantArgs{
@@ -2404,7 +2404,7 @@ func TestChangefeedLaggingSpanCheckpointing(t *testing.T) {
24042404
defer leaktest.AfterTest(t)()
24052405
defer log.Scope(t).Close(t)
24062406

2407-
s, db, stopServer := startTestFullServer(t, feedTestOptions{})
2407+
s, db, stopServer := startTestFullServer(t, makeOptions(t, feedTestNoTenants))
24082408
defer stopServer()
24092409
sqlDB := sqlutils.MakeSQLRunner(db)
24102410

@@ -3381,7 +3381,7 @@ func TestChangefeedEachColumnFamily(t *testing.T) {
33813381
}
33823382
}
33833383

3384-
cdcTest(t, testFn)
3384+
cdcTest(t, testFn, withAllowChangefeedErr("expects terminal error"))
33853385
}
33863386

33873387
func TestChangefeedSingleColumnFamily(t *testing.T) {
@@ -3475,12 +3475,12 @@ func TestChangefeedSingleColumnFamilySchemaChanges(t *testing.T) {
34753475

34763476
// Removing all columns in a watched family fails the feed
34773477
waitForSchemaChange(t, sqlDB, `ALTER TABLE foo DROP column c`)
3478-
requireErrorSoon(context.Background(), t, fooRest,
3478+
requireTerminalErrorSoon(context.Background(), t, fooRest,
34793479
regexp.MustCompile(`CHANGEFEED targeting nonexistent or removed column family rest of table foo`))
34803480
}
34813481

34823482
runWithAndWithoutRegression141453(t, testFn, func(t *testing.T, testFn cdcTestFn) {
3483-
cdcTest(t, testFn)
3483+
cdcTest(t, testFn, withAllowChangefeedErr("expects terminal error"))
34843484
}, withMaybeUseLegacySchemaChanger())
34853485
}
34863486

@@ -3811,9 +3811,9 @@ func TestChangefeedCustomKey(t *testing.T) {
38113811
`foo: ["dog"]->{"after": {"a": 1, "b": "dog", "c": "zebra"}}`,
38123812
})
38133813
sqlDB.Exec(t, `ALTER TABLE foo RENAME COLUMN b to b2`)
3814-
requireErrorSoon(context.Background(), t, foo, regexp.MustCompile(`required column b not present`))
3814+
requireTerminalErrorSoon(context.Background(), t, foo, regexp.MustCompile(`required column b not present`))
38153815
}
3816-
cdcTest(t, testFn, feedTestForceSink("kafka"))
3816+
cdcTest(t, testFn, feedTestForceSink("kafka"), withAllowChangefeedErr("expects error"))
38173817
}
38183818

38193819
// Reproduce issue for #114196. This test verifies that changefeed with custom
@@ -5184,12 +5184,14 @@ func TestChangefeedOutputTopics(t *testing.T) {
51845184
})
51855185
}
51865186

5187-
// requireErrorSoon polls for the test feed for an error and asserts that
5188-
// the error matches the provided regex.
5189-
func requireErrorSoon(
5187+
// requireTerminalErrorSoon polls for the test feed for an error and asserts
5188+
// that the error matches the provided regex. This can either be a terminal
5189+
// error or an error encountered while parsing messages and doing testfeed
5190+
// things.
5191+
func requireTerminalErrorSoon(
51905192
ctx context.Context, t *testing.T, f cdctest.TestFeed, errRegex *regexp.Regexp,
51915193
) {
5192-
err := timeutil.RunWithTimeout(ctx, "requireErrorSoon", 30*time.Second, func(ctx context.Context) error {
5194+
err := timeutil.RunWithTimeout(ctx, "requireTerminalErrorSoon", 30*time.Second, func(ctx context.Context) error {
51935195
for {
51945196
select {
51955197
case <-ctx.Done():
@@ -5235,9 +5237,9 @@ func TestChangefeedFailOnTableOffline(t *testing.T) {
52355237
`for_import: [0]->{"after": {"a": 0, "b": null}}`,
52365238
})
52375239
sqlDB.Exec(t, `IMPORT INTO for_import CSV DATA ($1)`, dataSrv.URL)
5238-
requireErrorSoon(context.Background(), t, forImport,
5240+
requireTerminalErrorSoon(context.Background(), t, forImport,
52395241
regexp.MustCompile(`CHANGEFEED cannot target offline table: for_import \(offline reason: "importing"\)`))
5240-
})
5242+
}, withAllowChangefeedErr("expects terminal error"))
52415243

52425244
cdcTestNamedWithSystem(t, "reverted import fails changefeed with earlier cursor", func(t *testing.T, s TestServerWithSystem, f cdctest.TestFeedFactory) {
52435245
sysSQLDB := sqlutils.MakeSQLRunner(s.SystemDB)
@@ -5275,9 +5277,9 @@ func TestChangefeedFailOnTableOffline(t *testing.T) {
52755277
// Changefeed should fail regardless
52765278
forImport := feed(t, f, `CREATE CHANGEFEED FOR for_import WITH cursor=$1`, start)
52775279
defer closeFeed(t, forImport)
5278-
requireErrorSoon(context.Background(), t, forImport,
5280+
requireTerminalErrorSoon(context.Background(), t, forImport,
52795281
regexp.MustCompile(`CHANGEFEED cannot target offline table: for_import \(offline reason: "importing"\)`))
5280-
})
5282+
}, withAllowChangefeedErr("expects terminal error"))
52815283
}
52825284

52835285
func TestChangefeedRestartMultiNode(t *testing.T) {
@@ -5987,7 +5989,7 @@ func TestChangefeedTruncateOrDrop(t *testing.T) {
59875989
assertFailuresCounter(t, metrics, 3)
59885990
}
59895991

5990-
cdcTest(t, testFn, feedTestEnterpriseSinks)
5992+
cdcTest(t, testFn, feedTestEnterpriseSinks, withAllowChangefeedErr("expects errors"))
59915993
// will sometimes fail, non deterministic
59925994
}
59935995

@@ -6183,7 +6185,7 @@ func TestChangefeedRetryableError(t *testing.T) {
61836185
}
61846186
}
61856187

6186-
cdcTest(t, testFn, feedTestEnterpriseSinks)
6188+
cdcTest(t, testFn, feedTestEnterpriseSinks, withAllowChangefeedErr("expects error"))
61876189
}
61886190

61896191
func TestChangefeedJobUpdateFailsIfNotClaimed(t *testing.T) {
@@ -6378,7 +6380,7 @@ func TestChangefeedDataTTL(t *testing.T) {
63786380
// timestamp before beginning their backfill.
63796381
// TODO(samiskin): Tenant test disabled because this test requires
63806382
// forceTableGC which doesn't work on tenants
6381-
cdcTestWithSystem(t, testFn, feedTestForceSink("sinkless"), feedTestNoTenants)
6383+
cdcTestWithSystem(t, testFn, feedTestForceSink("sinkless"), feedTestNoTenants, withAllowChangefeedErr("expects batch ts gc error"))
63826384
}
63836385

63846386
// TestChangefeedOutdatedCursor ensures that create changefeeds fail with an
@@ -6418,7 +6420,7 @@ func TestChangefeedCursorAgeWarning(t *testing.T) {
64186420
}
64196421

64206422
testutils.RunValues(t, "cursor age", cursorAges, func(t *testing.T, cursorAge time.Duration) {
6421-
s, stopServer := makeServer(t)
6423+
s, stopServer := makeServer(t, withAllowChangefeedErr("expects batch ts gc error"))
64226424
defer stopServer()
64236425
knobs := s.TestingKnobs.
64246426
DistSQL.(*execinfra.TestingKnobs).
@@ -6540,7 +6542,7 @@ func TestChangefeedSchemaTTL(t *testing.T) {
65406542

65416543
// TODO(samiskin): tenant tests skipped because of forceTableGC not working
65426544
// with a ApplicationLayerInterface
6543-
cdcTestWithSystem(t, testFn, feedTestNoTenants)
6545+
cdcTestWithSystem(t, testFn, feedTestNoTenants, withAllowChangefeedErr("expects batch ts gc error"))
65446546
}
65456547

65466548
func TestChangefeedErrors(t *testing.T) {
@@ -7215,7 +7217,7 @@ func TestChangefeedDescription(t *testing.T) {
72157217

72167218
// Intentionally don't use the TestFeedFactory because we want to
72177219
// control the placeholders.
7218-
s, stopServer := makeServer(t)
7220+
s, stopServer := makeServerWithOptions(t, makeOptions(t, withAllowChangefeedErr("create strange changefeeds that don't actually run")))
72197221
defer stopServer()
72207222

72217223
sqlDB := sqlutils.MakeSQLRunner(s.DB)
@@ -7335,7 +7337,7 @@ func TestChangefeedPanicRecovery(t *testing.T) {
73357337
defer closeFeed(t, foo)
73367338
err := waitForFeedErr(t, foo, 2*time.Minute)
73377339
require.ErrorContains(t, err, "error evaluating CDC expression", "expected panic recovery while evaluating WHERE clause")
7338-
}, feedTestAdditionalSystemPrivs("REPAIRCLUSTER"))
7340+
}, feedTestAdditionalSystemPrivs("REPAIRCLUSTER"), withAllowChangefeedErr("expects error"))
73397341

73407342
// Check that all panics while evaluating the SELECT clause in an expression are recovered from.
73417343
// NB: REPAIRCLUSTER is required to use crdb_internal.force_panic.
@@ -7347,7 +7349,7 @@ func TestChangefeedPanicRecovery(t *testing.T) {
73477349
defer closeFeed(t, foo)
73487350
err := waitForFeedErr(t, foo, 2*time.Minute)
73497351
require.ErrorContains(t, err, "error evaluating CDC expression", "expected panic recovery while evaluating SELECT clause")
7350-
}, feedTestAdditionalSystemPrivs("REPAIRCLUSTER"))
7352+
}, feedTestAdditionalSystemPrivs("REPAIRCLUSTER"), withAllowChangefeedErr("expects error"))
73517353
}
73527354

73537355
func TestChangefeedPauseUnpause(t *testing.T) {
@@ -7822,7 +7824,7 @@ func TestChangefeedHandlesDrainingNodes(t *testing.T) {
78227824
// We use feedTestUseRootUserConnection to prevent the
78237825
// feed factory from trying to create a test user. Because the registry is draining, creating the test user
78247826
// will fail and the test will fail prematurely.
7825-
f, closeSink := makeFeedFactory(t, randomSinkType(feedTestEnterpriseSinks), tc.Server(1), tc.ServerConn(0),
7827+
f, closeSink := makeFeedFactory(t, randomSinkType(t, feedTestEnterpriseSinks), tc.Server(1), tc.ServerConn(0),
78267828
feedTestUseRootUserConnection)
78277829
defer closeSink()
78287830

@@ -7859,7 +7861,7 @@ func TestChangefeedHandlesRollingRestart(t *testing.T) {
78597861

78607862
const numNodes = 4
78617863

7862-
opts := makeOptions()
7864+
opts := makeOptions(t)
78637865
opts.forceRootUserConnection = true
78647866
defer addCloudStorageOptions(t, &opts)()
78657867

@@ -8079,7 +8081,7 @@ func TestChangefeedTimelyResolvedTimestampUpdatePostRollingRestart(t *testing.T)
80798081
skip.UnderDeadlock(t)
80808082
skip.UnderRace(t)
80818083

8082-
opts := makeOptions()
8084+
opts := makeOptions(t)
80838085
defer addCloudStorageOptions(t, &opts)()
80848086
opts.forceRootUserConnection = true
80858087
defer changefeedbase.TestingSetDefaultMinCheckpointFrequency(testSinkFlushFrequency)()
@@ -8180,7 +8182,7 @@ func TestChangefeedPropagatesTerminalError(t *testing.T) {
81808182
defer leaktest.AfterTest(t)()
81818183
defer log.Scope(t).Close(t)
81828184

8183-
opts := makeOptions()
8185+
opts := makeOptions(t)
81848186
defer addCloudStorageOptions(t, &opts)()
81858187
defer changefeedbase.TestingSetDefaultMinCheckpointFrequency(testSinkFlushFrequency)()
81868188
defer testingUseFastRetry()()
@@ -9103,7 +9105,7 @@ func TestChangefeedOrderingWithErrors(t *testing.T) {
91039105

91049106
// only used for webhook sink for now since it's the only testfeed where
91059107
// we can control the ordering of errors
9106-
cdcTest(t, testFn, feedTestForceSink("webhook"), feedTestNoExternalConnection)
9108+
cdcTest(t, testFn, feedTestForceSink("webhook"), feedTestNoExternalConnection, withAllowChangefeedErr("expects error"))
91079109
}
91089110

91099111
func TestChangefeedOnErrorOption(t *testing.T) {
@@ -9198,7 +9200,7 @@ func TestChangefeedOnErrorOption(t *testing.T) {
91989200
})
91999201
}
92009202

9201-
cdcTest(t, testFn, feedTestEnterpriseSinks)
9203+
cdcTest(t, testFn, feedTestEnterpriseSinks, withAllowChangefeedErr("expects error"))
92029204
}
92039205

92049206
func TestDistSenderRangeFeedPopulatesVirtualTable(t *testing.T) {
@@ -9650,7 +9652,7 @@ func TestChangefeedInvalidPredicate(t *testing.T) {
96509652
defer leaktest.AfterTest(t)()
96519653
defer log.Scope(t).Close(t)
96529654

9653-
_, db, stopServer := startTestFullServer(t, feedTestOptions{})
9655+
_, db, stopServer := startTestFullServer(t, makeOptions(t, feedTestNoTenants))
96549656
defer stopServer()
96559657
sqlDB := sqlutils.MakeSQLRunner(db)
96569658

@@ -9927,7 +9929,11 @@ func TestChangefeedPredicateWithSchemaChange(t *testing.T) {
99279929
},
99289930
} {
99299931
t.Run(tc.name, func(t *testing.T) {
9930-
cdcTest(t, testFn(tc), feedTestEnterpriseSinks)
9932+
testOpts := []feedTestOption{feedTestEnterpriseSinks}
9933+
if tc.expectErr != "" {
9934+
testOpts = append(testOpts, withAllowChangefeedErr(tc.expectErr))
9935+
}
9936+
cdcTest(t, testFn(tc), testOpts...)
99319937
})
99329938
}
99339939
}
@@ -10185,7 +10191,7 @@ func TestChangefeedMultiPodTenantPlanning(t *testing.T) {
1018510191
// Ensure both pods can be assigned work
1018610192
waitForTenantPodsActive(t, tenant1Server, 2)
1018710193

10188-
feedFactory, cleanupSink := makeFeedFactory(t, randomSinkType(feedTestEnterpriseSinks), tenant1Server, tenant1DB)
10194+
feedFactory, cleanupSink := makeFeedFactory(t, randomSinkType(t, feedTestEnterpriseSinks), tenant1Server, tenant1DB)
1018910195
defer cleanupSink()
1019010196

1019110197
// Run a changefeed across two tables to guarantee multiple spans that can be spread across the aggregators
@@ -10316,7 +10322,7 @@ func TestChangefeedFailedTelemetryLogs(t *testing.T) {
1031610322
}
1031710323

1031810324
t.Run(`connection_closed`, func(t *testing.T) {
10319-
s, stopServer := makeServer(t)
10325+
s, stopServer := makeServer(t, withAllowChangefeedErr("expects error"))
1032010326
defer stopServer()
1032110327

1032210328
sqlDB := sqlutils.MakeSQLRunner(s.DB)
@@ -10350,7 +10356,7 @@ func TestChangefeedFailedTelemetryLogs(t *testing.T) {
1035010356
failLogs := waitForLogs(t, beforeCreate)
1035110357
require.Equal(t, 1, len(failLogs))
1035210358
require.Equal(t, failLogs[0].FailureType, changefeedbase.UserInput)
10353-
}, feedTestEnterpriseSinks)
10359+
}, feedTestEnterpriseSinks, withAllowChangefeedErr("expects error"))
1035410360

1035510361
cdcTestNamed(t, "unknown_error", func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
1035610362
sqlDB := sqlutils.MakeSQLRunner(s.DB)
@@ -10375,7 +10381,7 @@ func TestChangefeedFailedTelemetryLogs(t *testing.T) {
1037510381
require.Equal(t, failLogs[0].FailureType, changefeedbase.UnknownError)
1037610382
require.Contains(t, []string{`gcpubsub`, `external`}, failLogs[0].SinkType)
1037710383
require.Equal(t, failLogs[0].NumTables, int32(1))
10378-
}, feedTestForceSink("pubsub"))
10384+
}, feedTestForceSink("pubsub"), withAllowChangefeedErr("expects error"))
1037910385
}
1038010386

1038110387
func TestChangefeedCanceledTelemetryLogs(t *testing.T) {
@@ -10632,7 +10638,7 @@ func TestChangefeedKafkaMessageTooLarge(t *testing.T) {
1063210638
}
1063310639
}
1063410640

10635-
cdcTest(t, testFn, feedTestForceSink(`kafka`))
10641+
cdcTest(t, testFn, feedTestForceSink(`kafka`), withAllowChangefeedErr("expects kafka error"))
1063610642
}
1063710643

1063810644
// Regression for #85902.
@@ -11003,7 +11009,7 @@ func TestHighwaterDoesNotRegressOnRetry(t *testing.T) {
1100311009
case <-doneCh:
1100411010
}
1100511011
}
11006-
cdcTest(t, testFn, feedTestEnterpriseSinks)
11012+
cdcTest(t, testFn, feedTestEnterpriseSinks, withAllowChangefeedErr("injects error"))
1100711013
}
1100811014

1100911015
// TestChangefeedPubsubResolvedMessages tests that the pubsub sink emits
@@ -11786,7 +11792,9 @@ func TestCloudstorageParallelCompression(t *testing.T) {
1178611792
const numFeedsEach = 10
1178711793

1178811794
testutils.RunValues(t, "compression", []string{"zstd", "gzip"}, func(t *testing.T, compression string) {
11789-
s, cleanup := makeServer(t)
11795+
opts := makeOptions(t)
11796+
opts.externalIODir = t.TempDir()
11797+
s, cleanup := makeServerWithOptions(t, opts)
1179011798
defer cleanup()
1179111799

1179211800
sqlDB := sqlutils.MakeSQLRunner(s.DB)

pkg/ccl/changefeedccl/encoder_test.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -726,7 +726,10 @@ func TestAvroSchemaNaming(t *testing.T) {
726726

727727
}
728728

729-
cdcTest(t, testFn, feedTestForceSink("kafka"), feedTestUseRootUserConnection)
729+
// TODO(#150537): This test sometimes encounters errors like "CHANGEFEED
730+
// created on a table with a single column family (drivers) cannot now
731+
// target a table with 2 families". Why?
732+
cdcTest(t, testFn, feedTestForceSink("kafka"), feedTestUseRootUserConnection, withAllowChangefeedErr("inexplicable errors"))
730733
}
731734

732735
func TestAvroSchemaNamespace(t *testing.T) {
@@ -879,7 +882,7 @@ func TestAvroMigrateToUnsupportedColumn(t *testing.T) {
879882
}
880883
}
881884

882-
cdcTest(t, testFn, feedTestForceSink("kafka"))
885+
cdcTest(t, testFn, feedTestForceSink("kafka"), withAllowChangefeedErr("checks error manually"))
883886
}
884887

885888
func TestAvroLedger(t *testing.T) {

0 commit comments

Comments
 (0)