Skip to content

Commit 5feef23

Browse files
craig[bot]log-headspilchenaerfreitbg
committed
150859: cmd/roachtest: add changefeed roachtests exercising execution_locality r=aerfrei a=log-head in multi-region clusters Previously, we were not testing to verify that a changefeed with execution_locality enabled creates an acceptable plan and is able to progress. Prior to 24.1, the changefeed would only put aggregators on nodes that were leaseholders for the data. If execution locality is enabled to restrict the changefeed to nodes without leaseholders for that data, then all the ranges would be planned on the gateway node. This roachtest verifies that aggregators are spread across nodes within a region (no one node has a majority of changefeed aggregators). Epic: CRDB-38755 Fixes: #153736 Release note: None 153721: bench/rttanalysis: shard TestBenchmarkExpectation to avoid timeouts r=spilchen a=spilchen The TestBenchmarkExpectation benchmark has been frequently timing out after 15 minutes. This appears to be caused by slow CI machines rather than issues with the test logic itself. To address this, the test is now split into four shards. Each shard is executed separately and receives the full 15-minute timeout budget. This should reduce the likelihood of timeout test failures. Fixes #148384 Release note: none Epic: none 153750: changefeedccl: explicitly set per-table PTS cluster setting in tests r=andyyang890 a=aerfrei Previously, the per-table protected timestamp cluster setting was only explicitly set to true in tests, which matched the default value. As a result, when tests attempted to disable the setting, it remained enabled, leading to unexpected behavior and assertion failures. This change ensures the cluster setting is always explicitly set to the value provided by the test, avoiding reliance on defaults and making test behavior consistent and predictable. Fixes: #153088 Epic: CRDB-1421 Release note: None 153776: asim: fix lower_bound r=wenyihu6 a=tbg It wasn't being removed from the args, so using it necessarily tripped the test. Epic: CRDB-49117 153783: sql: fix a couple of edge case bugs with CHECK EXTERNAL CONNECTION r=yuzefovich a=yuzefovich This commit fixes a couple of edge case bugs with CHECK EXTERNAL CONNECTION implementation. Namely: - previously, the execution goroutine (that is created in `startExec`) would use the tracing span that is finished, which is not allowed. This is now fixed by deriving a new tracing span. - also previously we would call `MoveToDraining` from an auxiliary goroutine of the cloud check processor when the context cancellation is observed. This would race with `MoveToDraining` call from the main goroutine in `Next`. The implicit contract of this helper method is that it's only called from the main goroutine, and otherwise it can lead to "MoveToDraining called in state × with err" errors. This is now fixed by removing the call from the auxiliary goroutine since it's actually not needed. Additionally, this commit simplifies dealing with the `rows` channel a bit. I decided to omit the release note given we recently merged a fix that contained one. Fixes: #153378. Release note: None 153785: remove stray cpu.out file r=rickystewart a=stevendanna Epic: none Release note: None Co-authored-by: Matthew Lougheed <[email protected]> Co-authored-by: Matt Spilchen <[email protected]> Co-authored-by: Aerin Freilich <[email protected]> Co-authored-by: Tobias Grieger <[email protected]> Co-authored-by: Yahor Yuzefovich <[email protected]> Co-authored-by: Steven Danna <[email protected]>
7 parents 3c9f2ee + 47bb79c + 9fecc53 + 685cd95 + 0ea687f + 92e2bcc + 7c00e62 commit 5feef23

File tree

12 files changed

+340
-27
lines changed

12 files changed

+340
-27
lines changed

cpu.out

-11.6 KB
Binary file not shown.

pkg/bench/rttanalysis/BUILD.bazel

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ go_library(
1313
visibility = ["//visibility:public"],
1414
deps = [
1515
"//pkg/base",
16+
"//pkg/jobs",
17+
"//pkg/jobs/jobspb",
1618
"//pkg/kv/kvclient/kvcoord",
1719
"//pkg/sql",
1820
"//pkg/sql/parser",
@@ -56,9 +58,9 @@ go_test(
5658
data = glob(["testdata/**"]),
5759
embed = [":rttanalysis"],
5860
exec_properties = {"test.Pool": "large"},
61+
shard_count = 4,
5962
deps = [
6063
"//pkg/base",
61-
"//pkg/jobs",
6264
"//pkg/jobs/jobspb",
6365
"//pkg/security/securityassets",
6466
"//pkg/security/securitytest",
@@ -69,6 +71,7 @@ go_test(
6971
"//pkg/testutils/serverutils",
7072
"//pkg/testutils/skip",
7173
"//pkg/testutils/testcluster",
74+
"//pkg/util/envutil",
7275
"//pkg/util/protoutil",
7376
"//pkg/util/randutil",
7477
],

pkg/bench/rttanalysis/registry.go

Lines changed: 59 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ import (
99
"strings"
1010
"testing"
1111

12+
"github.com/cockroachdb/cockroach/pkg/jobs"
13+
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
1214
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
1315
"github.com/cockroachdb/errors"
1416
"github.com/stretchr/testify/require"
@@ -51,15 +53,66 @@ func (r *Registry) Run(b *testing.B) {
5153
// benchmarks can be filtered by passing the usual test filters underneath
5254
// this test's name.
5355
//
54-
// It takes a long time and thus is skipped under stress, race
55-
// and short.
56+
// It takes a long time and thus is skipped under duress and short.
5657
func (r *Registry) RunExpectations(t *testing.T) {
57-
skip.UnderStress(t)
58-
skip.UnderRace(t)
58+
r.RunExpectationsSharded(t, 1, 1)
59+
}
60+
61+
// RunExpectationsSharded runs all the benchmarks for one iteration
62+
// and validates that the number of RPCs meets the expectation. If run
63+
// with the --rewrite flag, it will rewrite the run benchmarks. The
64+
// benchmarks can be filtered by passing the usual test filters underneath
65+
// this test's name.
66+
//
67+
// It takes a long time and thus is skipped under duress and short.
68+
//
69+
// When shard and totalShards are provided (> 1), only a subset of benchmarks
70+
// assigned to the specific shard will be run, enabling parallel execution.
71+
// Test groups are distributed across shards using round-robin assignment.
72+
func (r *Registry) RunExpectationsSharded(t *testing.T, shard, totalShards int) {
73+
defer jobs.TestingSetIDsToIgnore(map[jobspb.JobID]struct{}{3001: {}, 3002: {}})()
74+
skip.UnderDuress(t)
5975
skip.UnderShort(t)
60-
skip.UnderDeadlock(t)
6176

62-
runBenchmarkExpectationTests(t, r)
77+
// If totalShards is 1, run all tests; otherwise shard them
78+
var registryToUse *Registry
79+
if totalShards <= 1 {
80+
// Run all test groups
81+
registryToUse = r
82+
} else {
83+
// Create a registry with only the test groups assigned to this shard
84+
shardRegistry := &Registry{
85+
numNodes: r.numNodes,
86+
cc: r.cc,
87+
r: make(map[string][]RoundTripBenchTestCase),
88+
}
89+
90+
// Distribute test groups across shards using round-robin assignment
91+
// First, get all group names and sort them for consistent ordering
92+
groupNames := make([]string, 0, len(r.r))
93+
for groupName := range r.r {
94+
groupNames = append(groupNames, groupName)
95+
}
96+
// Sort for deterministic assignment across runs
97+
for i := 0; i < len(groupNames); i++ {
98+
for j := i + 1; j < len(groupNames); j++ {
99+
if groupNames[i] > groupNames[j] {
100+
groupNames[i], groupNames[j] = groupNames[j], groupNames[i]
101+
}
102+
}
103+
}
104+
105+
// Assign groups to shards using round-robin
106+
for i, groupName := range groupNames {
107+
assignedShard := (i % totalShards) + 1
108+
if assignedShard == shard {
109+
shardRegistry.r[groupName] = r.r[groupName]
110+
}
111+
}
112+
registryToUse = shardRegistry
113+
}
114+
115+
runBenchmarkExpectationTests(t, registryToUse)
63116
}
64117

65118
// Register registers a set of test cases to a given benchmark name. It is

pkg/bench/rttanalysis/validate_benchmark_data_test.go

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,44 @@
66
package rttanalysis
77

88
import (
9+
"strconv"
910
"testing"
1011

11-
"github.com/cockroachdb/cockroach/pkg/jobs"
12-
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
12+
"github.com/cockroachdb/cockroach/pkg/util/envutil"
1313
)
1414

15-
func TestBenchmarkExpectation(t *testing.T) {
16-
defer jobs.TestingSetIDsToIgnore(map[jobspb.JobID]struct{}{3001: {}, 3002: {}})()
17-
reg.RunExpectations(t)
15+
// NOTE: If you change the number of shards, you must also update the
16+
// shard_count in BUILD.bazel to match.
17+
const shardCount = 4
18+
19+
// Validate that shardCount matches TEST_TOTAL_SHARDS environment variable at init time
20+
var _ = func() int {
21+
totalShardsStr, found := envutil.ExternalEnvString("TEST_TOTAL_SHARDS", 1)
22+
if totalShardsStr == "" || !found {
23+
return 0
24+
}
25+
totalShards, err := strconv.Atoi(totalShardsStr)
26+
if err != nil {
27+
return 0
28+
}
29+
if totalShards != shardCount {
30+
panic("shardCount mismatch: update shard_count in pkg/bench/rttanalysis/BUILD.bazel to match shardCount constant")
31+
}
32+
return 0
33+
}()
34+
35+
func TestBenchmarkExpectationShard1(t *testing.T) {
36+
reg.RunExpectationsSharded(t, 1, shardCount)
37+
}
38+
39+
func TestBenchmarkExpectationShard2(t *testing.T) {
40+
reg.RunExpectationsSharded(t, 2, shardCount)
41+
}
42+
43+
func TestBenchmarkExpectationShard3(t *testing.T) {
44+
reg.RunExpectationsSharded(t, 3, shardCount)
45+
}
46+
47+
func TestBenchmarkExpectationShard4(t *testing.T) {
48+
reg.RunExpectationsSharded(t, 4, shardCount)
1849
}

pkg/cmd/roachtest/tests/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,7 @@ go_library(
276276
"//pkg/settings/cluster",
277277
"//pkg/sql",
278278
"//pkg/sql/catalog/catpb",
279+
"//pkg/sql/execinfrapb",
279280
"//pkg/sql/pgwire/pgcode",
280281
"//pkg/sql/pgwire/pgerror",
281282
"//pkg/sql/sem/tree",

pkg/cmd/roachtest/tests/cdc.go

Lines changed: 178 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ import (
5959
"github.com/cockroachdb/cockroach/pkg/roachprod/prometheus"
6060
"github.com/cockroachdb/cockroach/pkg/roachprod/vm"
6161
roachprodaws "github.com/cockroachdb/cockroach/pkg/roachprod/vm/aws"
62+
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
6263
"github.com/cockroachdb/cockroach/pkg/testutils"
6364
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
6465
"github.com/cockroachdb/cockroach/pkg/util/retry"
@@ -1819,10 +1820,8 @@ func runCDCMultiTablePTSBenchmark(
18191820
numRanges = params.numRanges
18201821
}
18211822

1822-
if params.perTablePTS {
1823-
if _, err := db.Exec("SET CLUSTER SETTING changefeed.protected_timestamp.per_table.enabled = true"); err != nil {
1824-
t.Fatalf("failed to set per-table protected timestamps: %v", err)
1825-
}
1823+
if _, err := db.Exec("SET CLUSTER SETTING changefeed.protected_timestamp.per_table.enabled = $1", params.perTablePTS); err != nil {
1824+
t.Fatalf("failed to set per-table protected timestamps: %v", err)
18261825
}
18271826

18281827
initCmd := fmt.Sprintf("./cockroach workload init bank --rows=%d --ranges=%d --tables=%d {pgurl%s}",
@@ -1912,7 +1911,182 @@ func configureDBForMultiTablePTSBenchmark(db *gosql.DB) error {
19121911
return nil
19131912
}
19141913

1914+
func getDiagramProcessors(ctx context.Context, db *gosql.DB) ([]any, error) {
1915+
var diagramURL string
1916+
diagramQuery := `SELECT value
1917+
FROM system.job_info ji
1918+
INNER JOIN system.jobs j ON ji.job_id = j.id
1919+
WHERE j.job_type = 'CHANGEFEED' AND ji.info_key LIKE '~dsp-diag-url-%'`
1920+
if err := db.QueryRowContext(ctx, diagramQuery).Scan(&diagramURL); err != nil {
1921+
return nil, err
1922+
}
1923+
diagram, err := execinfrapb.FromURL(diagramURL)
1924+
if err != nil {
1925+
return nil, err
1926+
}
1927+
diagramJSON, err := json.Marshal(diagram)
1928+
if err != nil {
1929+
return nil, err
1930+
}
1931+
var flow map[string]any
1932+
if err := json.Unmarshal(diagramJSON, &flow); err != nil {
1933+
return nil, err
1934+
}
1935+
processors, ok := flow["processors"].([]any)
1936+
if !ok {
1937+
return nil, fmt.Errorf("processors not found in flow")
1938+
}
1939+
return processors, nil
1940+
}
1941+
1942+
type ChangefeedDistribution struct {
1943+
NodeToSpansWatched map[int]int
1944+
ZoneToSpansWatched map[string]int
1945+
TotalSpansWatched int
1946+
TotalAggregators int
1947+
TotalLeaseHolders int
1948+
TotalRanges int
1949+
NodeToZone map[int]string
1950+
}
1951+
1952+
func getChangefeedDistribution(
1953+
processors []any, nodeToZone map[int]string, t test.Test,
1954+
) ChangefeedDistribution {
1955+
changefeedDistribution := ChangefeedDistribution{
1956+
NodeToSpansWatched: make(map[int]int),
1957+
ZoneToSpansWatched: make(map[string]int),
1958+
TotalSpansWatched: 0,
1959+
TotalAggregators: 0,
1960+
TotalLeaseHolders: 0,
1961+
TotalRanges: 0,
1962+
NodeToZone: nodeToZone,
1963+
}
1964+
for _, p := range processors {
1965+
procMap, ok := p.(map[string]any)
1966+
if !ok {
1967+
t.Fatalf("processor not a map")
1968+
}
1969+
nodeIdx, ok := procMap["nodeIdx"].(float64)
1970+
require.True(t, ok, "node idx not found in processor")
1971+
core, ok := procMap["core"].(map[string]any)
1972+
require.True(t, ok, "core not found in processor")
1973+
title, ok := core["title"].(string)
1974+
require.True(t, ok, "title not found in core")
1975+
if strings.HasPrefix(title, "ChangeAggregator") {
1976+
changefeedDistribution.TotalAggregators++
1977+
details := core["details"].([]any)
1978+
for _, detail := range details {
1979+
if strings.HasPrefix(detail.(string), "Watches") {
1980+
re := regexp.MustCompile(`Watches \[(\d+)\]:`)
1981+
matches := re.FindStringSubmatch(detail.(string))
1982+
if len(matches) > 1 {
1983+
numWatches, err := strconv.Atoi(matches[1])
1984+
require.NoError(t, err)
1985+
changefeedDistribution.NodeToSpansWatched[int(nodeIdx)] += numWatches
1986+
changefeedDistribution.TotalSpansWatched += numWatches
1987+
changefeedDistribution.ZoneToSpansWatched[changefeedDistribution.NodeToZone[int(nodeIdx)]] += numWatches
1988+
1989+
}
1990+
}
1991+
}
1992+
}
1993+
}
1994+
return changefeedDistribution
1995+
}
1996+
1997+
func veryifyLeaseHolderDistribution(
1998+
db *gosql.DB, t test.Test, nodeToZone map[int]string,
1999+
) map[string]int {
2000+
var rows *gosql.Rows
2001+
// Get lease holders for all ranges in tpcc database.
2002+
leaseHolderQuery := `SELECT r.start_pretty, r.replicas, r.replica_localities, r.lease_holder
2003+
FROM crdb_internal.ranges r
2004+
JOIN crdb_internal.tables t ON r.start_pretty like concat('/Table/', t.table_id::STRING,'%')
2005+
WHERE t.database_name = 'tpcc'`
2006+
rows, err := db.Query(leaseHolderQuery)
2007+
zoneToLeaseHolderCount := make(map[string]int)
2008+
require.NoError(t, err)
2009+
defer rows.Close()
2010+
for rows.Next() {
2011+
var startKeyPretty string
2012+
var replicas []uint8
2013+
var replicaLocalities []uint8
2014+
var leaseHolder int
2015+
require.NoError(t, rows.Scan(&startKeyPretty, &replicas, &replicaLocalities, &leaseHolder))
2016+
for indx := range replicas {
2017+
require.NotEqual(t, replicas[indx], 0)
2018+
replicas[indx]--
2019+
}
2020+
leaseHolder--
2021+
zoneToLeaseHolderCount[nodeToZone[leaseHolder]]++
2022+
}
2023+
return zoneToLeaseHolderCount
2024+
}
2025+
19152026
func registerCDC(r registry.Registry) {
2027+
r.Add(registry.TestSpec{
2028+
// This test
2029+
// 1. Creates a cluster with 3 nodes each in us-east and us-west
2030+
// 2. Runs a tpcc workload, then sets tpcc database to primary region us-west
2031+
// 3. Creates a changefeed with execution locality set to us-east
2032+
// 4. Gets the changefeed diagram and creates mappings
2033+
2034+
// This test is used to verify that ranges are evenly distributed across
2035+
// change aggregators in the execution_locality region while targeting tables
2036+
// whose primary region is different. In issue #2955, in that scenario,
2037+
// a single change aggregator (on the gateway node) would watch all the ranges.
2038+
// The above scenario occured with the older bin-packing oracle rather than
2039+
// the bulk oracle.
2040+
Name: "cdc/multi-region-execution-locality-tpcc",
2041+
Owner: registry.OwnerCDC,
2042+
Cluster: r.MakeClusterSpec(7, spec.Geo(), spec.GatherCores(), spec.GCEZones("us-east1-b,us-west1-b")),
2043+
CompatibleClouds: registry.OnlyGCE,
2044+
Suites: registry.Suites(),
2045+
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
2046+
nodeToZone := map[int]string{
2047+
0: "us-east1-b",
2048+
1: "us-east1-b",
2049+
2: "us-east1-b",
2050+
3: "us-west1-b",
2051+
4: "us-west1-b",
2052+
5: "us-west1-b",
2053+
}
2054+
ct := newCDCTester(ctx, t, c)
2055+
defer ct.Close()
2056+
2057+
ct.runTPCCWorkload(tpccArgs{warehouses: 100})
2058+
2059+
var err error
2060+
_, err = ct.DB().Exec("ALTER DATABASE tpcc SET PRIMARY REGION 'us-west1'")
2061+
require.NoError(t, err)
2062+
2063+
feed := ct.newChangefeed(feedArgs{
2064+
sinkType: cloudStorageSink,
2065+
targets: allTpccTargets,
2066+
opts: map[string]string{
2067+
"execution_locality": "'region=us-east1'",
2068+
},
2069+
})
2070+
ct.waitForWorkload()
2071+
feed.waitForCompletion()
2072+
2073+
processors, err := getDiagramProcessors(ctx, ct.DB())
2074+
require.NoError(t, err)
2075+
2076+
changefeedDistribution := getChangefeedDistribution(processors, nodeToZone, t)
2077+
require.Greater(t, changefeedDistribution.TotalAggregators, 1)
2078+
for nodeIdx, spansWatched := range changefeedDistribution.NodeToSpansWatched {
2079+
require.LessOrEqual(t, spansWatched, changefeedDistribution.TotalSpansWatched/2, "nodeIdx %d watched %d spans, total spans watched %d", nodeIdx, spansWatched, changefeedDistribution.TotalSpansWatched)
2080+
}
2081+
require.Equal(t, 1, len(changefeedDistribution.ZoneToSpansWatched))
2082+
require.Equal(t, changefeedDistribution.ZoneToSpansWatched["us-east1-b"], changefeedDistribution.TotalSpansWatched)
2083+
zoneToLeaseHolderCount := veryifyLeaseHolderDistribution(ct.DB(), t, nodeToZone)
2084+
// Majority of lease holders should be in us-west1-b. Some may not, but most should.
2085+
if zoneToLeaseHolderCount["us-east1-b"] != 0 {
2086+
require.Greater(t, zoneToLeaseHolderCount["us-west1-b"]/zoneToLeaseHolderCount["us-east1-b"], 10)
2087+
}
2088+
},
2089+
})
19162090
r.Add(registry.TestSpec{
19172091
Name: "cdc/initial-scan-only",
19182092
Owner: registry.OwnerCDC,

pkg/kv/kvserver/asim/tests/helpers_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ func scanThreshold(t *testing.T, d *datadriven.TestData) (th assertion.Threshold
105105
th.ThresholdType = assertion.UpperBound
106106
return th
107107
}
108-
scanArg(t, d, "lower_bound", &th.Value)
108+
scanMustExist(t, d, "lower_bound", &th.Value)
109109
th.ThresholdType = assertion.LowerBound
110110
return th
111111
}

pkg/sql/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -655,6 +655,7 @@ go_test(
655655
"backfill_test.go",
656656
"builtin_mem_usage_test.go",
657657
"builtin_test.go",
658+
"check_external_connection_test.go",
658659
"check_test.go",
659660
"closed_session_cache_test.go",
660661
"comment_on_column_test.go",

0 commit comments

Comments
 (0)