Skip to content

Commit 6708f9d

Browse files
craig[bot]DarrylWongmsbutler
committed
154180: roachtest: add helper to retry cluster settings r=herkolategan,srosenberg a=DarrylWong We occasionally see cluster setting statements time out due to overloaded clusters (e.g. mutators in mixed version tests, post test assertions). This change adds a small helper to retry those cluster setting statements. Informs: #143791 Informs: #145092 156795: roachtest: fix allow_unsafe_internals edge cases r=herkolategan,srosenberg a=DarrylWong In #156581, we started setting `allow_unsafe_internals` unconditionally for roachtests. However, there are a few edge cases where we can't set this. Namely when using `psql` to connect to a postgres database where the CRDB only parameter does not exist. Fixes: #156702 Fixes: #156703 Fixes: #156706 Release note: none 156975: crosscluster/logical: test seq support r=jeffswenson a=msbutler Fixes #155521 Release note (ops change): ensure the user can run CREATE LOGICAL REPLICATION STREAM where the source table has a column with a sequence expression. Co-authored-by: DarrylWong <[email protected]> Co-authored-by: Michael Butler <[email protected]>
4 parents 94f4f40 + 0a22b4b + 8913ec0 + dd92f14 commit 6708f9d

File tree

21 files changed

+225
-60
lines changed

21 files changed

+225
-60
lines changed

pkg/cmd/roachtest/cluster.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2721,9 +2721,6 @@ func (c *clusterImpl) InternalPGUrl(
27212721
return c.pgURLErr(ctx, l, nodes, opts)
27222722
}
27232723

2724-
// Silence unused warning.
2725-
var _ = (&clusterImpl{}).InternalPGUrl
2726-
27272724
// ExternalPGUrl returns the external Postgres endpoint for the specified nodes.
27282725
func (c *clusterImpl) ExternalPGUrl(
27292726
ctx context.Context, l *logger.Logger, nodes option.NodeListOption, opts roachprod.PGURLOptions,

pkg/cmd/roachtest/roachtestutil/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ go_library(
3939
"//pkg/util/httputil",
4040
"//pkg/util/humanizeutil",
4141
"//pkg/util/protoutil",
42+
"//pkg/util/retry",
4243
"//pkg/util/syncutil",
4344
"//pkg/util/timeutil",
4445
"//pkg/workload/histogram/exporter",

pkg/cmd/roachtest/roachtestutil/clusterupgrade/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ go_library(
99
"//pkg/build",
1010
"//pkg/cmd/roachtest/cluster",
1111
"//pkg/cmd/roachtest/option",
12+
"//pkg/cmd/roachtest/roachtestutil",
1213
"//pkg/cmd/roachtest/test",
1314
"//pkg/roachpb",
1415
"//pkg/roachprod/install",

pkg/cmd/roachtest/roachtestutil/clusterupgrade/clusterupgrade.go

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"github.com/cockroachdb/cockroach/pkg/build"
2020
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
2121
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option"
22+
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil"
2223
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
2324
"github.com/cockroachdb/cockroach/pkg/roachpb"
2425
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
@@ -156,10 +157,22 @@ func LatestPatchRelease(series string) (*Version, error) {
156157
// associated with the given database connection.
157158
// NB: version means major.minor[-internal]; the patch level isn't
158159
// returned. For example, a binary of version 19.2.4 will return 19.2.
159-
func BinaryVersion(ctx context.Context, db *gosql.DB) (roachpb.Version, error) {
160+
func BinaryVersion(ctx context.Context, l *logger.Logger, db *gosql.DB) (roachpb.Version, error) {
160161
zero := roachpb.Version{}
161162
var sv string
162-
if err := db.QueryRowContext(ctx, `SELECT crdb_internal.node_executable_version();`).Scan(&sv); err != nil {
163+
rows, err := roachtestutil.QueryWithRetry(
164+
ctx, l, db, roachtestutil.ClusterSettingRetryOpts, `SELECT crdb_internal.node_executable_version();`,
165+
)
166+
if err != nil {
167+
return zero, err
168+
}
169+
defer rows.Close()
170+
171+
if !rows.Next() {
172+
return zero, fmt.Errorf("no rows returned")
173+
}
174+
175+
if err := rows.Scan(&sv); err != nil {
163176
return zero, err
164177
}
165178

@@ -176,10 +189,22 @@ func BinaryVersion(ctx context.Context, db *gosql.DB) (roachpb.Version, error) {
176189
// in the background plus gossip asynchronicity.
177190
// NB: cluster versions are always major.minor[-internal]; there isn't
178191
// a patch level.
179-
func ClusterVersion(ctx context.Context, db *gosql.DB) (roachpb.Version, error) {
192+
func ClusterVersion(ctx context.Context, l *logger.Logger, db *gosql.DB) (roachpb.Version, error) {
180193
zero := roachpb.Version{}
181194
var sv string
182-
if err := db.QueryRowContext(ctx, `SHOW CLUSTER SETTING version`).Scan(&sv); err != nil {
195+
rows, err := roachtestutil.QueryWithRetry(
196+
ctx, l, db, roachtestutil.ClusterSettingRetryOpts, `SHOW CLUSTER SETTING version`,
197+
)
198+
if err != nil {
199+
return zero, err
200+
}
201+
defer rows.Close()
202+
203+
if !rows.Next() {
204+
return zero, fmt.Errorf("no rows returned")
205+
}
206+
207+
if err := rows.Scan(&sv); err != nil {
183208
return zero, err
184209
}
185210

@@ -483,7 +508,7 @@ func WaitForClusterUpgrade(
483508
timeout time.Duration,
484509
) error {
485510
firstNode := nodes[0]
486-
newVersion, err := BinaryVersion(ctx, dbFunc(firstNode))
511+
newVersion, err := BinaryVersion(ctx, l, dbFunc(firstNode))
487512
if err != nil {
488513
return err
489514
}
@@ -496,7 +521,7 @@ func WaitForClusterUpgrade(
496521
retryCtx, cancel := context.WithTimeout(ctx, timeout)
497522
defer cancel()
498523
err := opts.Do(retryCtx, func(ctx context.Context) error {
499-
currentVersion, err := ClusterVersion(ctx, dbFunc(node))
524+
currentVersion, err := ClusterVersion(ctx, l, dbFunc(node))
500525
if err != nil {
501526
return err
502527
}

pkg/cmd/roachtest/roachtestutil/mixedversion/helper.go

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,14 @@ import (
1515
"sync/atomic"
1616

1717
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option"
18+
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil"
1819
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/clusterupgrade"
1920
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/task"
2021
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
2122
"github.com/cockroachdb/cockroach/pkg/roachpb"
2223
"github.com/cockroachdb/cockroach/pkg/roachprod/logger"
2324
"github.com/cockroachdb/cockroach/pkg/testutils/release"
25+
"github.com/cockroachdb/cockroach/pkg/util/retry"
2426
"github.com/cockroachdb/errors"
2527
)
2628

@@ -152,11 +154,27 @@ func (s *Service) ExecWithGateway(
152154
return err
153155
}
154156

157+
func (s *Service) ExecWithRetry(
158+
rng *rand.Rand,
159+
nodes option.NodeListOption,
160+
retryOpts retry.Options,
161+
query string,
162+
args ...interface{},
163+
) error {
164+
db, err := s.prepareQuery(rng, nodes, query, args...)
165+
if err != nil {
166+
return err
167+
}
168+
169+
_, err = roachtestutil.ExecWithRetry(s.ctx, s.stepLogger, db, retryOpts, query, args...)
170+
return err
171+
}
172+
155173
func (s *Service) ClusterVersion(rng *rand.Rand) (roachpb.Version, error) {
156174
if s.Finalizing {
157175
n, db := s.RandomDB(rng)
158176
s.stepLogger.Printf("querying cluster version through node %d", n)
159-
cv, err := clusterupgrade.ClusterVersion(s.ctx, db)
177+
cv, err := clusterupgrade.ClusterVersion(s.ctx, s.stepLogger, db)
160178
if err != nil {
161179
return roachpb.Version{}, fmt.Errorf("failed to query cluster version: %w", err)
162180
}
@@ -249,6 +267,18 @@ func (h *Helper) ExecWithGateway(
249267
return h.DefaultService().ExecWithGateway(rng, nodes, query, args...)
250268
}
251269

270+
// ExecWithRetry is like ExecWithGateway, but retries the execution of
271+
// the statement on errors, using the retry options provided.
272+
func (h *Helper) ExecWithRetry(
273+
rng *rand.Rand,
274+
nodes option.NodeListOption,
275+
retryOpts retry.Options,
276+
query string,
277+
args ...interface{},
278+
) error {
279+
return h.DefaultService().ExecWithRetry(rng, nodes, retryOpts, query, args...)
280+
}
281+
252282
// defaultTaskOptions returns the default options that are passed to all tasks
253283
// started by the helper.
254284
func (h *Helper) defaultTaskOptions() []task.Option {

pkg/cmd/roachtest/roachtestutil/mixedversion/runner.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ var (
7979
// for an internal query (i.e., performed by the framework) to
8080
// complete. These queries are typically associated with gathering
8181
// upgrade state data to be displayed during execution.
82-
internalQueryTimeout = 30 * time.Second
82+
internalQueryTimeout = 90 * time.Second
8383
)
8484

8585
func newServiceRuntime(desc *ServiceDescriptor) *serviceRuntime {
@@ -534,7 +534,7 @@ func (tr *testRunner) refreshBinaryVersions(ctx context.Context, service *servic
534534
availableNodes := tr.getAvailableNodes(service.descriptor)
535535
for j, node := range service.descriptor.Nodes {
536536
group.GoCtx(func(ctx context.Context) error {
537-
bv, err := clusterupgrade.BinaryVersion(ctx, tr.conn(node, service.descriptor.Name))
537+
bv, err := clusterupgrade.BinaryVersion(ctx, tr.logger, tr.conn(node, service.descriptor.Name))
538538
if err != nil {
539539
if !availableNodes.Contains(node) {
540540
return nil
@@ -575,7 +575,7 @@ func (tr *testRunner) refreshClusterVersions(ctx context.Context, service *servi
575575
availableNodes := tr.getAvailableNodes(service.descriptor)
576576
for j, node := range service.descriptor.Nodes {
577577
group.GoCtx(func(ctx context.Context) error {
578-
cv, err := clusterupgrade.ClusterVersion(ctx, tr.conn(node, service.descriptor.Name))
578+
cv, err := clusterupgrade.ClusterVersion(ctx, tr.logger, tr.conn(node, service.descriptor.Name))
579579
if err != nil {
580580
if !availableNodes.Contains(node) {
581581
return nil

pkg/cmd/roachtest/roachtestutil/mixedversion/steps.go

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -306,12 +306,14 @@ func (s preserveDowngradeOptionStep) Run(
306306
service := serviceByName(h, s.virtualClusterName)
307307
node, db := service.RandomDB(rng)
308308
l.Printf("checking binary version (via node %d)", node)
309-
bv, err := clusterupgrade.BinaryVersion(ctx, db)
309+
bv, err := clusterupgrade.BinaryVersion(ctx, l, db)
310310
if err != nil {
311311
return err
312312
}
313313

314-
return service.Exec(rng, "SET CLUSTER SETTING cluster.preserve_downgrade_option = $1", bv.String())
314+
return service.ExecWithRetry(rng, service.Descriptor.Nodes, roachtestutil.ClusterSettingRetryOpts,
315+
"SET CLUSTER SETTING cluster.preserve_downgrade_option = $1", bv.String(),
316+
)
315317
}
316318

317319
func (s preserveDowngradeOptionStep) ConcurrencyDisabled() bool {
@@ -408,8 +410,10 @@ func (s allowUpgradeStep) Description(debug bool) string {
408410
func (s allowUpgradeStep) Run(
409411
ctx context.Context, l *logger.Logger, rng *rand.Rand, h *Helper,
410412
) error {
411-
return serviceByName(h, s.virtualClusterName).Exec(
412-
rng, "RESET CLUSTER SETTING cluster.preserve_downgrade_option",
413+
service := serviceByName(h, s.virtualClusterName)
414+
return service.ExecWithRetry(
415+
rng, service.Descriptor.Nodes, roachtestutil.ClusterSettingRetryOpts,
416+
"RESET CLUSTER SETTING cluster.preserve_downgrade_option",
413417
)
414418
}
415419

@@ -518,8 +522,8 @@ func (s setClusterSettingStep) Run(
518522
args = []interface{}{val}
519523
}
520524

521-
return serviceByName(h, serviceName).ExecWithGateway(
522-
rng, nodesRunningAtLeast(s.virtualClusterName, s.minVersion, h), stmt, args...,
525+
return serviceByName(h, serviceName).ExecWithRetry(
526+
rng, nodesRunningAtLeast(s.virtualClusterName, s.minVersion, h), roachtestutil.ClusterSettingRetryOpts, stmt, args...,
523527
)
524528
}
525529

@@ -557,7 +561,7 @@ func (s setClusterVersionStep) Run(
557561
node, db := service.RandomDB(rng)
558562
l.Printf("fetching binary version via n%d", node)
559563

560-
bv, err := clusterupgrade.BinaryVersion(ctx, db)
564+
bv, err := clusterupgrade.BinaryVersion(ctx, l, db)
561565
if err != nil {
562566
return errors.Wrapf(err, "getting binary version on n%d", node)
563567
}
@@ -566,7 +570,9 @@ func (s setClusterVersionStep) Run(
566570
}
567571

568572
l.Printf("setting cluster version to '%s'", binaryVersion)
569-
return service.Exec(rng, "SET CLUSTER SETTING version = $1", binaryVersion)
573+
return service.ExecWithRetry(rng, service.Descriptor.Nodes, roachtestutil.ClusterSettingRetryOpts,
574+
"SET CLUSTER SETTING version = $1", binaryVersion,
575+
)
570576
}
571577

572578
func (s setClusterVersionStep) ConcurrencyDisabled() bool {
@@ -590,8 +596,8 @@ func (s resetClusterSettingStep) Run(
590596
ctx context.Context, l *logger.Logger, rng *rand.Rand, h *Helper,
591597
) error {
592598
stmt := fmt.Sprintf("RESET CLUSTER SETTING %s", s.name)
593-
return serviceByName(h, s.virtualClusterName).ExecWithGateway(
594-
rng, nodesRunningAtLeast(s.virtualClusterName, s.minVersion, h), stmt,
599+
return serviceByName(h, s.virtualClusterName).ExecWithRetry(
600+
rng, nodesRunningAtLeast(s.virtualClusterName, s.minVersion, h), roachtestutil.ClusterSettingRetryOpts, stmt,
595601
)
596602
}
597603

pkg/cmd/roachtest/roachtestutil/utils.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
2828
"github.com/cockroachdb/cockroach/pkg/roachprod/logger"
2929
"github.com/cockroachdb/cockroach/pkg/util"
30+
"github.com/cockroachdb/cockroach/pkg/util/retry"
3031
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
3132
"github.com/cockroachdb/errors"
3233
"github.com/stretchr/testify/require"
@@ -336,3 +337,61 @@ func SimulateMultiRegionCluster(
336337

337338
return cleanupFunc, nil
338339
}
340+
341+
// ExecWithRetry executes the given SQL statement with the specified retry logic.
342+
func ExecWithRetry(
343+
ctx context.Context,
344+
l *logger.Logger,
345+
db *gosql.DB,
346+
retryOpts retry.Options,
347+
query string,
348+
args ...any,
349+
) (gosql.Result, error) {
350+
var result gosql.Result
351+
err := retryOpts.Do(ctx, func(ctx context.Context) error {
352+
var err error
353+
result, err = db.ExecContext(ctx, query, args...)
354+
if err != nil {
355+
l.Printf("%s failed (retrying): %v", query, err)
356+
return err
357+
}
358+
return nil
359+
})
360+
361+
return result, err
362+
}
363+
364+
// QueryWithRetry queries the given SQL statement with the specified retry logic.
365+
func QueryWithRetry(
366+
ctx context.Context,
367+
l *logger.Logger,
368+
db *gosql.DB,
369+
retryOpts retry.Options,
370+
query string,
371+
args ...any,
372+
) (*gosql.Rows, error) {
373+
var rows *gosql.Rows
374+
err := retryOpts.Do(ctx, func(ctx context.Context) error {
375+
var err error
376+
rows, err = db.QueryContext(ctx, query, args...)
377+
if err != nil {
378+
l.Printf("%s failed (retrying): %v", query, err)
379+
return err
380+
}
381+
return nil
382+
})
383+
384+
return rows, err
385+
}
386+
387+
// ClusterSettingRetryOpts are retry options intended for cluster setting operations.
388+
//
389+
// We use relatively high backoff parameters with the assumption that:
390+
// 1. If we fail, it's likely due to cluster overload, and we want to give
391+
// the cluster adequate time to recover.
392+
// 2. Setting a cluster setting in roachtest is not latency sensitive.
393+
var ClusterSettingRetryOpts = retry.Options{
394+
InitialBackoff: 3 * time.Second,
395+
MaxBackoff: 5 * time.Second,
396+
MaxRetries: 5,
397+
}

pkg/cmd/roachtest/roachtestutil/validation_check.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ func CheckReplicaDivergenceOnDB(ctx context.Context, l *logger.Logger, db *gosql
4646
defer cancel()
4747

4848
// Speed up consistency checks. The test is done, so let's go full throttle.
49-
_, err := db.ExecContext(ctx, "SET CLUSTER SETTING server.consistency_check.max_rate = '1GB'")
49+
_, err := ExecWithRetry(ctx, l, db, ClusterSettingRetryOpts, "SET CLUSTER SETTING server.consistency_check.max_rate = '1GB'")
5050
if err != nil {
5151
return errors.Wrap(err, "unable to set 'server.consistency_check.max_rate'")
5252
}
@@ -107,14 +107,18 @@ FROM crdb_internal.check_consistency(false, '', '') as t;`)
107107

108108
// CheckInvalidDescriptors returns an error if there exists any descriptors in
109109
// the crdb_internal.invalid_objects virtual table.
110-
func CheckInvalidDescriptors(ctx context.Context, db *gosql.DB) error {
110+
func CheckInvalidDescriptors(ctx context.Context, l *logger.Logger, db *gosql.DB) error {
111111
var invalidIDs string
112+
l.Printf("checking for invalid descriptors")
112113
if err := timeutil.RunWithTimeout(ctx, "descriptor validation", time.Minute, func(ctx context.Context) error {
113114
// Because crdb_internal.invalid_objects is a virtual table, by default, the
114115
// query will take a lease on the database sqlDB is connected to and only run
115116
// the query on the given database. The "" prefix prevents this lease
116117
// acquisition and allows the query to fetch all descriptors in the cluster.
117-
rows, err := db.QueryContext(ctx, `SELECT id, obj_name, error FROM "".crdb_internal.invalid_objects`)
118+
//
119+
// This often times out when the cluster is overloaded. Since this is just
120+
// for validation, we retry with a relatively high backoff.
121+
rows, err := QueryWithRetry(ctx, l, db, ClusterSettingRetryOpts, `SELECT id, obj_name, error FROM "".crdb_internal.invalid_objects`)
118122
if err != nil {
119123
return err
120124
}

pkg/cmd/roachtest/test_runner.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1763,7 +1763,7 @@ func (r *testRunner) postTestAssertions(
17631763
// NB: the invalid description checks should run at the system tenant level.
17641764
db := c.Conn(ctx, t.L(), validationNode, option.VirtualClusterName(install.SystemInterfaceName))
17651765
defer db.Close()
1766-
if err := roachtestutil.CheckInvalidDescriptors(ctx, db); err != nil {
1766+
if err := roachtestutil.CheckInvalidDescriptors(ctx, t.L(), db); err != nil {
17671767
postAssertionErr(errors.WithDetail(err, "invalid descriptors check failed"))
17681768
}
17691769
}()

0 commit comments

Comments
 (0)