Skip to content

Commit 44a9ab5

Browse files
craig[bot]andyyang890jeffswensonyuzefovichrafiss
committed
149094: changefeedccl: add test-only assertion for checkpoint fields invariant r=DarrylWong,asg0451 a=andyyang890 Fixes #149100 --- **roachtest: surface changefeed failures in mixed-version CDC tests** Previously, if the changefeed failed in a mixed-version CDC test, the test wouldn't fail until it hit the timeout. We now periodically check for changefeed failures and fail the test if one is found. Release note: None --- **mixedversion: add RunE method to Test** This patch adds a `RunE` method to `Test`. Unlike the existing `Run` function, it will not immediately fatal the test if an error is encountered and will instead return the error along with the test plan so that callers may decide how to handle the error. Release note: None --- **changefeedccl: add test-only assertion for checkpoint fields invariant** This patch adds a test-only assertion that we won't ever see both checkpoint fields set on a changefeed job progress struct. It is test-only because a bug that existed on earlier versions of 25.2 could cause both checkpoint fields to be set and so the production code continues to discard the legacy checkpoint when that happens. Release note: None 149260: sql: fix check external connection evaluation r=jeffswenson a=jeffswenson Previously, `CHECK EXTERNAL CONNECTION '' WITH CONCURRENCY = (SELECT 1)` would panic because its not able to evaluate an expression containing a sub query. Now, passing a sub query will fail with a user error. ``` CHECK EXTERNAL CONNECTION NULLIF WITH CONCURRENTLY = EXISTS ( ( TABLE error ) ); ERROR: subqueries are not allowed in check_external_connection SQLSTATE: 0A000 ``` Release note: none Informs: #147876 Informs: #147877 149300: logictest: fix up recent change r=yuzefovich a=yuzefovich Just merged 55108a1 made one query flaky: namely, we removed ORDER BY from `array_agg` so now the order of two items can be arbitrary. This commit changes the test to assert that the query succeeds (which this test is really about) without hitting an error. Epic: None Release note: None 149341: upgrades: fix order of AddHotRangeLoggerJob migration r=rafiss a=rafiss The V25_3_AddHotRangeLoggerJob cluster version comes _after_ the V25_3_AddEstimatedLastLoginTime version. Yet, the corresponding upgrade for adding the hot ranges logger was incorrectly being done _before_ the last login time column. informs: #148981 informs: #148998 Release note: None Co-authored-by: Andy Yang <[email protected]> Co-authored-by: Jeff Swenson <[email protected]> Co-authored-by: Yahor Yuzefovich <[email protected]> Co-authored-by: Rafi Shamim <[email protected]>
5 parents c63a85b + fe64979 + ac6605b + 88332e2 + 6ec98ab commit 44a9ab5

File tree

8 files changed

+96
-25
lines changed

8 files changed

+96
-25
lines changed

pkg/ccl/changefeedccl/changefeed_dist.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -277,11 +277,15 @@ func startDistChangefeed(
277277
return errors.AssertionFailedf("both legacy and current checkpoint set on " +
278278
"changefeed job progress and legacy checkpoint has later timestamp")
279279
}
280-
// This should be an assertion failure but unfortunately due to a bug
280+
// This should always be an assertion failure but unfortunately due to a bug
281281
// that was included in earlier versions of 25.2 (#148620), we may fail
282282
// to clear the legacy checkpoint when we start writing the new one.
283283
// We instead discard the legacy checkpoint here and it will eventually be
284284
// cleared once the cluster is running a newer patch release with the fix.
285+
if buildutil.CrdbTestBuild {
286+
return errors.AssertionFailedf("both legacy and current checkpoint set on " +
287+
"changefeed job progress")
288+
}
285289
log.Warningf(ctx, "both legacy and current checkpoint set on changefeed job progress; "+
286290
"discarding legacy checkpoint")
287291
legacyCheckpoint = nil

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11874,13 +11874,9 @@ func TestChangefeedResumeWithBothLegacyAndCurrentCheckpoint(t *testing.T) {
1187411874
require.NoError(t, err)
1187511875

1187611876
sqlDB.Exec(t, `RESUME JOB $1`, jobFeed.JobID())
11877-
waitForJobState(sqlDB, t, jobFeed.JobID(), jobs.StateRunning)
11878-
11879-
// Wait for highwater to advance past the current time.
11880-
var tsStr string
11881-
sqlDB.QueryRow(t, `SELECT cluster_logical_timestamp()`).Scan(&tsStr)
11882-
ts := parseTimeToHLC(t, tsStr)
11883-
require.NoError(t, jobFeed.WaitForHighWaterMark(ts))
11877+
waitForJobState(sqlDB, t, jobFeed.JobID(), jobs.StateFailed)
11878+
require.ErrorContains(t, jobFeed.FetchTerminalJobErr(),
11879+
"both legacy and current checkpoint set on changefeed job progress")
1188411880
}
1188511881

1188611882
cdcTest(t, testFn, feedTestEnterpriseSinks)

pkg/ccl/logictestccl/testdata/logic_test/refcursor

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -693,10 +693,8 @@ SELECT lag(y) OVER w, lead(y) OVER w, first_value(y) OVER w, last_value(y) OVER
693693

694694
# Array functions are allowed (but ordering on REFCURSOR and REFCURSOR[] columns
695695
# is not).
696-
query TT
696+
statement ok
697697
SELECT array_agg(x), array_cat_agg(y) FROM t;
698-
----
699-
{foo,baz} {bar}
700698

701699
query TT rowsort
702700
SELECT array_append(y, x), array_prepend(x, y) FROM t;

pkg/cmd/roachtest/roachtestutil/mixedversion/mixedversion.go

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -834,21 +834,29 @@ func (t *Test) Workload(
834834
return t.BackgroundCommand(fmt.Sprintf("%s workload", name), node, runCmd)
835835
}
836836

837-
// Run runs the mixed-version test. It should be called once all
837+
// Run is like RunE, except it fatals the test if any error occurs.
838+
func (t *Test) Run() {
839+
_, err := t.RunE()
840+
if err != nil {
841+
t.rt.Fatal(err)
842+
}
843+
}
844+
845+
// RunE runs the mixed-version test. It should be called once all
838846
// startup, mixed-version, and after-test hooks have been declared. A
839847
// test plan will be generated (and logged), and the test will be
840-
// carried out.
841-
func (t *Test) Run() {
848+
// carried out. A non-nil plan will be returned unless planning fails.
849+
func (t *Test) RunE() (*TestPlan, error) {
842850
plan, err := t.plan()
843851
if err != nil {
844-
t.rt.Fatal(err)
852+
return nil, err
845853
}
846854

847855
t.logger.Printf("mixed-version test:\n%s", plan.PrettyPrint())
848856

849857
if override := os.Getenv(dryRunEnv); override != "" {
850858
t.logger.Printf("skipping test run in dry-run mode")
851-
return
859+
return plan, nil
852860
}
853861

854862
// Mark the deployment mode and versions, so they show up in the github issue. This makes
@@ -857,8 +865,10 @@ func (t *Test) Run() {
857865
t.rt.AddParam("mvtVersions", formatVersions(plan.Versions()))
858866

859867
if err := t.run(plan); err != nil {
860-
t.rt.Fatal(err)
868+
return plan, err
861869
}
870+
871+
return plan, nil
862872
}
863873

864874
func (t *Test) run(plan *TestPlan) error {

pkg/cmd/roachtest/tests/mixed_version_cdc.go

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,8 @@ type cdcMixedVersionTester struct {
140140

141141
validator *cdctest.CountValidator
142142
fprintV *cdctest.FingerprintValidator
143+
144+
jobID int
143145
}
144146

145147
func newCDCMixedVersionTester(ctx context.Context, c cluster.Cluster) cdcMixedVersionTester {
@@ -186,6 +188,25 @@ func (cmvt *cdcMixedVersionTester) StartKafka(t test.Test, c cluster.Cluster) (c
186188
func (cmvt *cdcMixedVersionTester) waitAndValidate(
187189
ctx context.Context, l *logger.Logger, r *rand.Rand, h *mixedversion.Helper,
188190
) error {
191+
cancel := h.GoWithCancel(func(ctx context.Context, l *logger.Logger) error {
192+
for {
193+
select {
194+
case <-time.After(5 * time.Second):
195+
_, db := h.RandomDB(r)
196+
info, err := getChangefeedInfo(db, cmvt.jobID)
197+
if err != nil {
198+
return err
199+
}
200+
if info.GetStatus() == "failed" {
201+
return errors.Newf("changefeed failed: %s", info.GetError())
202+
}
203+
case <-ctx.Done():
204+
return ctx.Err()
205+
}
206+
}
207+
})
208+
defer cancel()
209+
189210
l.Printf("waiting for %d resolved timestamps", resolvedTimestampsPerState)
190211
// create a new channel for the resolved timestamps, allowing any
191212
// new resolved timestamps to be captured and account for in the
@@ -403,6 +424,7 @@ func (cmvt *cdcMixedVersionTester) createChangeFeed(
403424
return err
404425
}
405426
l.Printf("created changefeed job %d", jobID)
427+
cmvt.jobID = jobID
406428
return nil
407429
}
408430

@@ -650,5 +672,40 @@ func runCDCMixedVersionCheckpointing(ctx context.Context, t test.Test, c cluster
650672
mvt.InMixedVersion("wait and validate", tester.waitAndValidate)
651673
mvt.AfterUpgradeFinalized("wait and validate", tester.waitAndValidate)
652674

653-
mvt.Run()
675+
// Run the test.
676+
plan, err := mvt.RunE()
677+
if err != nil {
678+
isAffectedBy148620 := func(plan *mixedversion.TestPlan) bool {
679+
if plan == nil {
680+
return false
681+
}
682+
for _, v := range []string{"v25.2.0", "v25.2.1", "v25.2.2"} {
683+
version := clusterupgrade.MustParseVersion(v)
684+
for _, planVersion := range plan.Versions() {
685+
if planVersion.Equal(version) {
686+
return true
687+
}
688+
}
689+
}
690+
return false
691+
}
692+
693+
isExpectedErrorDueTo148620 := func(err error) bool {
694+
for _, s := range []string{
695+
"both legacy and current checkpoint set on change aggregator spec",
696+
"both legacy and current checkpoint set on changefeed job progress",
697+
} {
698+
if strings.Contains(err.Error(), s) {
699+
return true
700+
}
701+
}
702+
return false
703+
}
704+
705+
if isAffectedBy148620(plan) && isExpectedErrorDueTo148620(err) {
706+
t.Skipf("expected error due to #148620: %s", err)
707+
}
708+
709+
t.Fatal(err)
710+
}
654711
}

pkg/sql/check_external_connection.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@ func (n *checkExternalConnectionNode) Close(_ context.Context) {
150150
}
151151

152152
func (n *checkExternalConnectionNode) parseParams(params runParams) error {
153+
params.p.SemaCtx().Properties.Require("check_external_connection", tree.RejectSubqueries)
153154
exprEval := params.p.ExprEvaluator("CHECK EXTERNAL CONNECTION")
154155
loc, err := exprEval.String(params.ctx, n.node.URI)
155156
if err != nil {

pkg/sql/logictest/testdata/logic_test/show_external_connections

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,3 +68,7 @@ SHOW EXTERNAL CONNECTIONS
6868
connection_name connection_uri connection_type
6969
foo_conn nodelocal://1/foo STORAGE
7070
baz_conn nodelocal://1/baz STORAGE
71+
72+
# Regression test for #147877
73+
statement error subqueries are not allowed in check_external_connection
74+
CHECK EXTERNAL CONNECTION NULLIF WITH CONCURRENTLY = EXISTS ( ( TABLE error ) );

pkg/upgrade/upgrades/upgrades.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -96,13 +96,6 @@ var upgrades = []upgradebase.Upgrade{
9696
eventLogTableMigration,
9797
upgrade.RestoreActionNotRequired("cluster restore does not restore the new column or index"),
9898
),
99-
upgrade.NewTenantUpgrade(
100-
"add new hot range logger job",
101-
clusterversion.V25_3_AddHotRangeLoggerJob.Version(),
102-
upgrade.NoPrecondition,
103-
addHotRangeLoggerJob,
104-
upgrade.RestoreActionNotRequired("cluster restore does not restore this job"),
105-
),
10699

107100
upgrade.NewTenantUpgrade(
108101
"add 'estimated_last_login_time' column to system.users table",
@@ -112,6 +105,14 @@ var upgrades = []upgradebase.Upgrade{
112105
upgrade.RestoreActionNotRequired("cluster restore does not restore the new column"),
113106
),
114107

108+
upgrade.NewTenantUpgrade(
109+
"add new hot range logger job",
110+
clusterversion.V25_3_AddHotRangeLoggerJob.Version(),
111+
upgrade.NoPrecondition,
112+
addHotRangeLoggerJob,
113+
upgrade.RestoreActionNotRequired("cluster restore does not restore this job"),
114+
),
115+
115116
// Note: when starting a new release version, the first upgrade (for
116117
// Vxy_zStart) must be a newFirstUpgrade. Keep this comment at the bottom.
117118
}

0 commit comments

Comments
 (0)