Skip to content

Commit e837db7

Browse files
craig[bot]yuzefovichrickystewartpav-kvsumeerbhola
committed
147168: colexecdisk: propagate DiskFull errors as expected r=yuzefovich a=yuzefovich We just saw a sentry report that was issued due to InternalError raised after Dequeue'ing from a disk queue. It's not clear what the error was (since it was redacted), but it might have been a DiskFull error. We already have special handling for it on the Enqueue path, but the Dequeue path can also trigger this error (on the first call to Dequeue after some Enqueue calls - in order to flush the buffered batches), so this commit audits all disk queue methods to use the helper for error propagation. The only place where we do disk usage accounting is `diskQueue.writeFooterAndFlush`, so I traced which methods could end up calling it (both Enqueue and Dequeue, but also Close) and their call sites - this is how the affected places were chosen. Additionally, I didn't want to introduce the error propagation via panics if it wasn't there already, so one spot wasn't modified. Fixes: #147132. Release note: None 157847: bench/rttanalysis: shard TestBenchmarkExpectation to avoid timeouts r=rafiss a=rickystewart Re-apply `9fecc53b0b3cde307b5379ee5b88fae0fc8f34e2`, but add a `skip` if the test is running under `s390x`. Release note: none Epic: none 157868: testcluster: assign unique ClusterName r=RaduBerinde,stevendanna a=pav-kv This commit makes `TestCluster` by default assign a unique cluster name in the `TestServerArgs`. The cluster name is shared by all participating or added nodes, unless overridden in the per-node args. This helps preventing accidental message exchange between `TestCluster`s in the same environment that use the same TCP port in close proximity from each other. Addresses #157838 157930: mma: fix a couple of todos related to changes r=wenyihu6 a=sumeerbhola - subsumesChanges no longer needs the prev state as a parameter, since subsumption is only a function of the observed state and expected next state. The existing comparison with prev.IsLeaseholder was unnecessary and flawed. - Removed the todo around the panic in applyReplicaChange, since the expectation is that the range and store must exist. The callers are responsible for ensuring that. There was an old flawed idea that we would try to add the range at this point in the code, which we had long abandoned, but never removed from the code comment. Epic: CRDB-55052 Release note: None Co-authored-by: Yahor Yuzefovich <[email protected]> Co-authored-by: Ricky Stewart <[email protected]> Co-authored-by: Pavel Kalinnikov <[email protected]> Co-authored-by: sumeerbhola <[email protected]>
5 parents f0afff0 + a077113 + 3889649 + 1757367 + 426ce7f commit e837db7

File tree

17 files changed

+203
-88
lines changed

17 files changed

+203
-88
lines changed

pkg/base/test_server_args.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,14 @@ type TestServerArgs struct {
122122
UseDatabase string
123123

124124
// If set, this will be configured in the test server to check connections
125-
// from other test servers and to report in the SQL introspection.
125+
// from other test servers and to report in the SQL introspection. It is
126+
// advised to make the name sufficiently unique, in order to prevent a
127+
// TestCluster from accidentally getting messages from unrelated clusters in
128+
// the same environment that used the same TCP ports recently (e.g. see
129+
// https://github.com/cockroachdb/cockroach/issues/157838).
130+
//
131+
// If empty (most cases), a unique ClusterName is generated automatically, or
132+
// a higher-level default is used (e.g. taken from TestClusterArgs).
126133
ClusterName string
127134

128135
// Stopper can be used to stop the server. If not set, a stopper will be

pkg/bench/rttanalysis/BUILD.bazel

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ go_library(
1313
visibility = ["//visibility:public"],
1414
deps = [
1515
"//pkg/base",
16+
"//pkg/jobs",
17+
"//pkg/jobs/jobspb",
1618
"//pkg/kv/kvclient/kvcoord",
1719
"//pkg/sql",
1820
"//pkg/sql/parser",
@@ -56,9 +58,9 @@ go_test(
5658
data = glob(["testdata/**"]),
5759
embed = [":rttanalysis"],
5860
exec_properties = {"test.Pool": "large"},
61+
shard_count = 4,
5962
deps = [
6063
"//pkg/base",
61-
"//pkg/jobs",
6264
"//pkg/jobs/jobspb",
6365
"//pkg/security/securityassets",
6466
"//pkg/security/securitytest",
@@ -70,6 +72,7 @@ go_test(
7072
"//pkg/testutils/serverutils",
7173
"//pkg/testutils/skip",
7274
"//pkg/testutils/testcluster",
75+
"//pkg/util/envutil",
7376
"//pkg/util/protoutil",
7477
"//pkg/util/randutil",
7578
],

pkg/bench/rttanalysis/registry.go

Lines changed: 63 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,12 @@
66
package rttanalysis
77

88
import (
9+
"runtime"
910
"strings"
1011
"testing"
1112

13+
"github.com/cockroachdb/cockroach/pkg/jobs"
14+
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
1215
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
1316
"github.com/cockroachdb/errors"
1417
"github.com/stretchr/testify/require"
@@ -51,15 +54,69 @@ func (r *Registry) Run(b *testing.B) {
5154
// benchmarks can be filtered by passing the usual test filters underneath
5255
// this test's name.
5356
//
54-
// It takes a long time and thus is skipped under stress, race
55-
// and short.
57+
// It takes a long time and thus is skipped under duress and short.
5658
func (r *Registry) RunExpectations(t *testing.T) {
57-
skip.UnderStress(t)
58-
skip.UnderRace(t)
59+
r.RunExpectationsSharded(t, 1, 1)
60+
}
61+
62+
// RunExpectationsSharded runs all the benchmarks for one iteration
63+
// and validates that the number of RPCs meets the expectation. If run
64+
// with the --rewrite flag, it will rewrite the run benchmarks. The
65+
// benchmarks can be filtered by passing the usual test filters underneath
66+
// this test's name.
67+
//
68+
// It takes a long time and thus is skipped under duress and short.
69+
//
70+
// When shard and totalShards are provided (> 1), only a subset of benchmarks
71+
// assigned to the specific shard will be run, enabling parallel execution.
72+
// Test groups are distributed across shards using round-robin assignment.
73+
func (r *Registry) RunExpectationsSharded(t *testing.T, shard, totalShards int) {
74+
defer jobs.TestingSetIDsToIgnore(map[jobspb.JobID]struct{}{3001: {}, 3002: {}})()
75+
skip.UnderDuress(t)
5976
skip.UnderShort(t)
60-
skip.UnderDeadlock(t)
77+
if runtime.GOARCH == "s390x" {
78+
skip.IgnoreLint(t, "test prone to crashing under s390x (see #154317)")
79+
}
80+
81+
// If totalShards is 1, run all tests; otherwise shard them
82+
var registryToUse *Registry
83+
if totalShards <= 1 {
84+
// Run all test groups
85+
registryToUse = r
86+
} else {
87+
// Create a registry with only the test groups assigned to this shard
88+
shardRegistry := &Registry{
89+
numNodes: r.numNodes,
90+
cc: r.cc,
91+
r: make(map[string][]RoundTripBenchTestCase),
92+
}
93+
94+
// Distribute test groups across shards using round-robin assignment
95+
// First, get all group names and sort them for consistent ordering
96+
groupNames := make([]string, 0, len(r.r))
97+
for groupName := range r.r {
98+
groupNames = append(groupNames, groupName)
99+
}
100+
// Sort for deterministic assignment across runs
101+
for i := 0; i < len(groupNames); i++ {
102+
for j := i + 1; j < len(groupNames); j++ {
103+
if groupNames[i] > groupNames[j] {
104+
groupNames[i], groupNames[j] = groupNames[j], groupNames[i]
105+
}
106+
}
107+
}
108+
109+
// Assign groups to shards using round-robin
110+
for i, groupName := range groupNames {
111+
assignedShard := (i % totalShards) + 1
112+
if assignedShard == shard {
113+
shardRegistry.r[groupName] = r.r[groupName]
114+
}
115+
}
116+
registryToUse = shardRegistry
117+
}
61118

62-
runBenchmarkExpectationTests(t, r)
119+
runBenchmarkExpectationTests(t, registryToUse)
63120
}
64121

65122
// Register registers a set of test cases to a given benchmark name. It is

pkg/bench/rttanalysis/validate_benchmark_data_test.go

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,44 @@
66
package rttanalysis
77

88
import (
9+
"strconv"
910
"testing"
1011

11-
"github.com/cockroachdb/cockroach/pkg/jobs"
12-
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
12+
"github.com/cockroachdb/cockroach/pkg/util/envutil"
1313
)
1414

15-
func TestBenchmarkExpectation(t *testing.T) {
16-
defer jobs.TestingSetIDsToIgnore(map[jobspb.JobID]struct{}{3001: {}, 3002: {}})()
17-
reg.RunExpectations(t)
15+
// NOTE: If you change the number of shards, you must also update the
16+
// shard_count in BUILD.bazel to match.
17+
const shardCount = 4
18+
19+
// Validate that shardCount matches TEST_TOTAL_SHARDS environment variable at init time
20+
var _ = func() int {
21+
totalShardsStr, found := envutil.ExternalEnvString("TEST_TOTAL_SHARDS", 1)
22+
if totalShardsStr == "" || !found {
23+
return 0
24+
}
25+
totalShards, err := strconv.Atoi(totalShardsStr)
26+
if err != nil {
27+
return 0
28+
}
29+
if totalShards != shardCount {
30+
panic("shardCount mismatch: update shard_count in pkg/bench/rttanalysis/BUILD.bazel to match shardCount constant")
31+
}
32+
return 0
33+
}()
34+
35+
func TestBenchmarkExpectationShard1(t *testing.T) {
36+
reg.RunExpectationsSharded(t, 1, shardCount)
37+
}
38+
39+
func TestBenchmarkExpectationShard2(t *testing.T) {
40+
reg.RunExpectationsSharded(t, 2, shardCount)
41+
}
42+
43+
func TestBenchmarkExpectationShard3(t *testing.T) {
44+
reg.RunExpectationsSharded(t, 3, shardCount)
45+
}
46+
47+
func TestBenchmarkExpectationShard4(t *testing.T) {
48+
reg.RunExpectationsSharded(t, 4, shardCount)
1849
}

pkg/cli/debug_recover_loss_of_quorum_test.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,8 +137,8 @@ func TestCollectInfoFromOnlineCluster(t *testing.T) {
137137
"recover",
138138
"collect-info",
139139
"--insecure",
140-
"--host",
141-
tc.Server(2).AdvRPCAddr(),
140+
"--host", tc.Server(2).AdvRPCAddr(),
141+
"--cluster-name", tc.ClusterName(),
142142
replicaInfoFileName,
143143
})
144144

@@ -554,6 +554,7 @@ func TestHalfOnlineLossOfQuorumRecovery(t *testing.T) {
554554
"--confirm=y",
555555
"--certs-dir=test_certs",
556556
"--host=" + tc.Server(0).AdvRPCAddr(),
557+
"--cluster-name=" + tc.ClusterName(),
557558
"--plan=" + planFile,
558559
})
559560
require.NoError(t, err, "failed to run make-plan")
@@ -577,6 +578,7 @@ func TestHalfOnlineLossOfQuorumRecovery(t *testing.T) {
577578
"debug", "recover", "apply-plan",
578579
"--certs-dir=test_certs",
579580
"--host=" + tc.Server(0).AdvRPCAddr(),
581+
"--cluster-name=" + tc.ClusterName(),
580582
"--confirm=y", planFile,
581583
})
582584
require.NoError(t, err, "failed to run apply plan")
@@ -592,6 +594,7 @@ func TestHalfOnlineLossOfQuorumRecovery(t *testing.T) {
592594
"debug", "recover", "verify",
593595
"--certs-dir=test_certs",
594596
"--host=" + tc.Server(0).AdvRPCAddr(),
597+
"--cluster-name=" + tc.ClusterName(),
595598
planFile,
596599
})
597600
require.NoError(t, err, "failed to run verify plan")
@@ -641,6 +644,7 @@ func TestHalfOnlineLossOfQuorumRecovery(t *testing.T) {
641644
"debug", "recover", "verify",
642645
"--certs-dir=test_certs",
643646
"--host=" + tc.Server(0).AdvRPCAddr(),
647+
"--cluster-name=" + tc.ClusterName(),
644648
planFile,
645649
})
646650
require.NoError(t, err, "failed to run verify plan")

pkg/cli/testdata/zip/partial1

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
zip
22
----
3-
debug zip --concurrency=1 --cpu-profile-duration=0s --validate-zip-file=false /dev/null
3+
debug zip --concurrency=1 --cpu-profile-duration=0s --validate-zip-file=false --cluster-name=<cluster-name> /dev/null
44
[cluster] discovering virtual clusters... done
55
[cluster] creating output file /dev/null... done
66
[cluster] establishing RPC connection to ...

pkg/cli/testdata/zip/partial1_excluded

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
zip
22
----
3-
debug zip /dev/null --concurrency=1 --exclude-nodes=2 --cpu-profile-duration=0 --validate-zip-file=false
3+
debug zip --concurrency=1 --exclude-nodes=2 --cpu-profile-duration=0 --validate-zip-file=false --cluster-name=<cluster-name> /dev/null
44
[cluster] discovering virtual clusters... done
55
[cluster] creating output file /dev/null... done
66
[cluster] establishing RPC connection to ...

pkg/cli/testdata/zip/partial2

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
zip
22
----
3-
debug zip --concurrency=1 --cpu-profile-duration=0 --validate-zip-file=false /dev/null
3+
debug zip --concurrency=1 --cpu-profile-duration=0 --validate-zip-file=false --cluster-name=<cluster-name> /dev/null
44
[cluster] discovering virtual clusters... done
55
[cluster] creating output file /dev/null... done
66
[cluster] establishing RPC connection to ...

pkg/cli/testdata/zip/testzip_concurrent

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -227,4 +227,4 @@ zip
227227
[node ?] ? log files found
228228
[node ?] ? log files found
229229
[node ?] ? log files found
230-
debug zip --timeout=30s --cpu-profile-duration=0s --validate-zip-file=false /dev/null
230+
debug zip --timeout=30s --cpu-profile-duration=0s --validate-zip-file=false --cluster-name=<cluster-name> /dev/null

pkg/cli/zip_test.go

Lines changed: 40 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -421,10 +421,11 @@ func TestConcurrentZip(t *testing.T) {
421421
defer func(prevStderr *os.File) { stderr = prevStderr }(stderr)
422422
stderr = os.Stdout
423423

424-
out, err := c.RunWithCapture("debug zip --timeout=30s --cpu-profile-duration=0s --validate-zip-file=false " + os.DevNull)
425-
if err != nil {
426-
t.Fatal(err)
427-
}
424+
out, err := c.RunWithCapture(fmt.Sprintf(
425+
"debug zip --timeout=30s --cpu-profile-duration=0s --validate-zip-file=false --cluster-name=%s %s",
426+
tc.ClusterName(), os.DevNull,
427+
))
428+
require.NoError(t, err)
428429

429430
// Strip any non-deterministic messages.
430431
out = eraseNonDeterministicZipOutput(out)
@@ -437,6 +438,8 @@ func TestConcurrentZip(t *testing.T) {
437438
// which the original messages interleve with other messages mean the number
438439
// of them after each series is collapsed is also non-derministic.
439440
out = regexp.MustCompile(`<dumping SQL tables>\n`).ReplaceAllString(out, "")
441+
// Replace the non-deterministic cluster name with a placeholder.
442+
out = eraseClusterName(out, tc.ClusterName())
440443

441444
// We use datadriven simply to read the golden output file; we don't actually
442445
// run any commands. Using datadriven allows TESTFLAGS=-rewrite.
@@ -541,9 +544,8 @@ func TestUnavailableZip(t *testing.T) {
541544
tc := testcluster.StartTestCluster(t, 3,
542545
base.TestClusterArgs{ServerArgs: base.TestServerArgs{
543546
DefaultTestTenant: base.TestIsSpecificToStorageLayerAndNeedsASystemTenant,
544-
545-
Insecure: true,
546-
Knobs: base.TestingKnobs{Store: knobs},
547+
Insecure: true,
548+
Knobs: base.TestingKnobs{Store: knobs},
547549
}})
548550
defer tc.Stopper().Stop(context.Background())
549551

@@ -559,9 +561,10 @@ func TestUnavailableZip(t *testing.T) {
559561
defer close(ch)
560562

561563
// Run debug zip against node 1.
562-
debugZipCommand :=
563-
"debug zip --concurrency=1 --cpu-profile-duration=0 " + os.
564-
DevNull + " --timeout=.5s"
564+
debugZipCommand := fmt.Sprintf(
565+
"debug zip --concurrency=1 --cpu-profile-duration=0 --timeout=.5s --cluster-name=%s %s",
566+
tc.ClusterName(), os.DevNull,
567+
)
565568

566569
t.Run("server 1", func(t *testing.T) {
567570
c := TestCLI{
@@ -651,6 +654,10 @@ func baseZipOutput(nodeId int) []string {
651654
return output
652655
}
653656

657+
func eraseClusterName(str, name string) string {
658+
return strings.ReplaceAll(str, name, "<cluster-name>")
659+
}
660+
654661
func eraseNonDeterministicZipOutput(out string) string {
655662
re := regexp.MustCompile(`(?m)postgresql://.*$`)
656663
out = re.ReplaceAllString(out, `postgresql://...`)
@@ -736,13 +743,15 @@ func TestPartialZip(t *testing.T) {
736743
defer func(prevStderr *os.File) { stderr = prevStderr }(stderr)
737744
stderr = os.Stdout
738745

739-
out, err := c.RunWithCapture("debug zip --concurrency=1 --cpu-profile-duration=0s --validate-zip-file=false " + os.DevNull)
740-
if err != nil {
741-
t.Fatal(err)
742-
}
746+
out, err := c.RunWithCapture(fmt.Sprintf(
747+
"debug zip --concurrency=1 --cpu-profile-duration=0s --validate-zip-file=false --cluster-name=%s %s",
748+
tc.ClusterName(), os.DevNull,
749+
))
750+
require.NoError(t, err)
743751

744752
// Strip any non-deterministic messages.
745753
t.Log(out)
754+
out = eraseClusterName(out, tc.ClusterName())
746755
out = eraseNonDeterministicZipOutput(out)
747756

748757
datadriven.RunTest(t, datapathutils.TestDataPath(t, "zip", "partial1"),
@@ -751,12 +760,13 @@ func TestPartialZip(t *testing.T) {
751760
})
752761

753762
// Now do it again and exclude the down node explicitly.
754-
out, err = c.RunWithCapture("debug zip " + os.DevNull + " --concurrency=1 --exclude-nodes=2 --cpu-profile-duration=0" +
755-
" --validate-zip-file=false")
756-
if err != nil {
757-
t.Fatal(err)
758-
}
763+
out, err = c.RunWithCapture(fmt.Sprintf(
764+
"debug zip --concurrency=1 --exclude-nodes=2 --cpu-profile-duration=0 --validate-zip-file=false --cluster-name=%s %s",
765+
tc.ClusterName(), os.DevNull,
766+
))
767+
require.NoError(t, err)
759768

769+
out = eraseClusterName(out, tc.ClusterName())
760770
out = eraseNonDeterministicZipOutput(out)
761771
datadriven.RunTest(t, datapathutils.TestDataPath(t, "zip", "partial1_excluded"),
762772
func(t *testing.T, td *datadriven.TestData) string {
@@ -767,12 +777,11 @@ func TestPartialZip(t *testing.T) {
767777
// skips over it automatically. We specifically use --wait=none because
768778
// we're decommissioning a node in a 3-node cluster, so there's no node to
769779
// up-replicate the under-replicated ranges to.
770-
{
771-
_, err := c.RunWithCapture(fmt.Sprintf("node decommission --checks=skip --wait=none %d", 2))
772-
if err != nil {
773-
t.Fatal(err)
774-
}
775-
}
780+
_, err = c.RunWithCapture(fmt.Sprintf(
781+
"node decommission --checks=skip --wait=none --cluster-name=%s %d",
782+
tc.ClusterName(), 2,
783+
))
784+
require.NoError(t, err)
776785

777786
// We use .Override() here instead of SET CLUSTER SETTING in SQL to
778787
// override the 1m15s minimum placed on the cluster setting. There
@@ -787,12 +796,13 @@ func TestPartialZip(t *testing.T) {
787796
datadriven.RunTest(t, datapathutils.TestDataPath(t, "zip", "partial2"),
788797
func(t *testing.T, td *datadriven.TestData) string {
789798
f := func() string {
790-
out, err := c.RunWithCapture("debug zip --concurrency=1 --cpu-profile-duration=0 --validate-zip-file=false " + os.DevNull)
791-
if err != nil {
792-
t.Fatal(err)
793-
}
794-
799+
out, err := c.RunWithCapture(fmt.Sprintf(
800+
"debug zip --concurrency=1 --cpu-profile-duration=0 --validate-zip-file=false --cluster-name=%s %s",
801+
tc.ClusterName(), os.DevNull,
802+
))
803+
require.NoError(t, err)
795804
// Strip any non-deterministic messages.
805+
out = eraseClusterName(out, tc.ClusterName())
796806
return eraseNonDeterministicZipOutput(out)
797807
}
798808

0 commit comments

Comments
 (0)