Skip to content

Commit 47bb79c

Browse files
committed
cmd/roachtest: add changefeed roachtests exercising execution_locality
in multi-region clusters Previously, we were not testing to verify that a changefeed with execution_locality enabled creates an acceptable plan and is able to progress. Prior to 24.1, the changefeed would only put aggregators on nodes that were leaseholders for the data. If execution locality is enabled to restrict the changefeed to nodes without leaseholders for that data, then all the ranges would be planned on the gateway node. This roachtest verifies that aggregators are spread across nodes within a region (no one node has a majority of changefeed aggregators). Fixes: #153736 Epic: CRDB-38755 Release note: None
1 parent 3ca17dc commit 47bb79c

File tree

2 files changed

+177
-0
lines changed

2 files changed

+177
-0
lines changed

pkg/cmd/roachtest/tests/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,7 @@ go_library(
276276
"//pkg/settings/cluster",
277277
"//pkg/sql",
278278
"//pkg/sql/catalog/catpb",
279+
"//pkg/sql/execinfrapb",
279280
"//pkg/sql/pgwire/pgcode",
280281
"//pkg/sql/pgwire/pgerror",
281282
"//pkg/sql/sem/tree",

pkg/cmd/roachtest/tests/cdc.go

Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ import (
5959
"github.com/cockroachdb/cockroach/pkg/roachprod/prometheus"
6060
"github.com/cockroachdb/cockroach/pkg/roachprod/vm"
6161
roachprodaws "github.com/cockroachdb/cockroach/pkg/roachprod/vm/aws"
62+
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
6263
"github.com/cockroachdb/cockroach/pkg/testutils"
6364
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
6465
"github.com/cockroachdb/cockroach/pkg/util/retry"
@@ -1899,7 +1900,182 @@ func configureDBForMultiTablePTSBenchmark(db *gosql.DB) error {
18991900
return nil
19001901
}
19011902

1903+
func getDiagramProcessors(ctx context.Context, db *gosql.DB) ([]any, error) {
1904+
var diagramURL string
1905+
diagramQuery := `SELECT value
1906+
FROM system.job_info ji
1907+
INNER JOIN system.jobs j ON ji.job_id = j.id
1908+
WHERE j.job_type = 'CHANGEFEED' AND ji.info_key LIKE '~dsp-diag-url-%'`
1909+
if err := db.QueryRowContext(ctx, diagramQuery).Scan(&diagramURL); err != nil {
1910+
return nil, err
1911+
}
1912+
diagram, err := execinfrapb.FromURL(diagramURL)
1913+
if err != nil {
1914+
return nil, err
1915+
}
1916+
diagramJSON, err := json.Marshal(diagram)
1917+
if err != nil {
1918+
return nil, err
1919+
}
1920+
var flow map[string]any
1921+
if err := json.Unmarshal(diagramJSON, &flow); err != nil {
1922+
return nil, err
1923+
}
1924+
processors, ok := flow["processors"].([]any)
1925+
if !ok {
1926+
return nil, fmt.Errorf("processors not found in flow")
1927+
}
1928+
return processors, nil
1929+
}
1930+
1931+
type ChangefeedDistribution struct {
1932+
NodeToSpansWatched map[int]int
1933+
ZoneToSpansWatched map[string]int
1934+
TotalSpansWatched int
1935+
TotalAggregators int
1936+
TotalLeaseHolders int
1937+
TotalRanges int
1938+
NodeToZone map[int]string
1939+
}
1940+
1941+
func getChangefeedDistribution(
1942+
processors []any, nodeToZone map[int]string, t test.Test,
1943+
) ChangefeedDistribution {
1944+
changefeedDistribution := ChangefeedDistribution{
1945+
NodeToSpansWatched: make(map[int]int),
1946+
ZoneToSpansWatched: make(map[string]int),
1947+
TotalSpansWatched: 0,
1948+
TotalAggregators: 0,
1949+
TotalLeaseHolders: 0,
1950+
TotalRanges: 0,
1951+
NodeToZone: nodeToZone,
1952+
}
1953+
for _, p := range processors {
1954+
procMap, ok := p.(map[string]any)
1955+
if !ok {
1956+
t.Fatalf("processor not a map")
1957+
}
1958+
nodeIdx, ok := procMap["nodeIdx"].(float64)
1959+
require.True(t, ok, "node idx not found in processor")
1960+
core, ok := procMap["core"].(map[string]any)
1961+
require.True(t, ok, "core not found in processor")
1962+
title, ok := core["title"].(string)
1963+
require.True(t, ok, "title not found in core")
1964+
if strings.HasPrefix(title, "ChangeAggregator") {
1965+
changefeedDistribution.TotalAggregators++
1966+
details := core["details"].([]any)
1967+
for _, detail := range details {
1968+
if strings.HasPrefix(detail.(string), "Watches") {
1969+
re := regexp.MustCompile(`Watches \[(\d+)\]:`)
1970+
matches := re.FindStringSubmatch(detail.(string))
1971+
if len(matches) > 1 {
1972+
numWatches, err := strconv.Atoi(matches[1])
1973+
require.NoError(t, err)
1974+
changefeedDistribution.NodeToSpansWatched[int(nodeIdx)] += numWatches
1975+
changefeedDistribution.TotalSpansWatched += numWatches
1976+
changefeedDistribution.ZoneToSpansWatched[changefeedDistribution.NodeToZone[int(nodeIdx)]] += numWatches
1977+
1978+
}
1979+
}
1980+
}
1981+
}
1982+
}
1983+
return changefeedDistribution
1984+
}
1985+
1986+
func veryifyLeaseHolderDistribution(
1987+
db *gosql.DB, t test.Test, nodeToZone map[int]string,
1988+
) map[string]int {
1989+
var rows *gosql.Rows
1990+
// Get lease holders for all ranges in tpcc database.
1991+
leaseHolderQuery := `SELECT r.start_pretty, r.replicas, r.replica_localities, r.lease_holder
1992+
FROM crdb_internal.ranges r
1993+
JOIN crdb_internal.tables t ON r.start_pretty like concat('/Table/', t.table_id::STRING,'%')
1994+
WHERE t.database_name = 'tpcc'`
1995+
rows, err := db.Query(leaseHolderQuery)
1996+
zoneToLeaseHolderCount := make(map[string]int)
1997+
require.NoError(t, err)
1998+
defer rows.Close()
1999+
for rows.Next() {
2000+
var startKeyPretty string
2001+
var replicas []uint8
2002+
var replicaLocalities []uint8
2003+
var leaseHolder int
2004+
require.NoError(t, rows.Scan(&startKeyPretty, &replicas, &replicaLocalities, &leaseHolder))
2005+
for indx := range replicas {
2006+
require.NotEqual(t, replicas[indx], 0)
2007+
replicas[indx]--
2008+
}
2009+
leaseHolder--
2010+
zoneToLeaseHolderCount[nodeToZone[leaseHolder]]++
2011+
}
2012+
return zoneToLeaseHolderCount
2013+
}
2014+
19022015
func registerCDC(r registry.Registry) {
2016+
r.Add(registry.TestSpec{
2017+
// This test
2018+
// 1. Creates a cluster with 3 nodes each in us-east and us-west
2019+
// 2. Runs a tpcc workload, then sets tpcc database to primary region us-west
2020+
// 3. Creates a changefeed with execution locality set to us-east
2021+
// 4. Gets the changefeed diagram and creates mappings
2022+
2023+
// This test is used to verify that ranges are evenly distributed across
2024+
// change aggregators in the execution_locality region while targeting tables
2025+
// whose primary region is different. In issue #2955, in that scenario,
2026+
// a single change aggregator (on the gateway node) would watch all the ranges.
2027+
// The above scenario occured with the older bin-packing oracle rather than
2028+
// the bulk oracle.
2029+
Name: "cdc/multi-region-execution-locality-tpcc",
2030+
Owner: registry.OwnerCDC,
2031+
Cluster: r.MakeClusterSpec(7, spec.Geo(), spec.GatherCores(), spec.GCEZones("us-east1-b,us-west1-b")),
2032+
CompatibleClouds: registry.OnlyGCE,
2033+
Suites: registry.Suites(),
2034+
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
2035+
nodeToZone := map[int]string{
2036+
0: "us-east1-b",
2037+
1: "us-east1-b",
2038+
2: "us-east1-b",
2039+
3: "us-west1-b",
2040+
4: "us-west1-b",
2041+
5: "us-west1-b",
2042+
}
2043+
ct := newCDCTester(ctx, t, c)
2044+
defer ct.Close()
2045+
2046+
ct.runTPCCWorkload(tpccArgs{warehouses: 100})
2047+
2048+
var err error
2049+
_, err = ct.DB().Exec("ALTER DATABASE tpcc SET PRIMARY REGION 'us-west1'")
2050+
require.NoError(t, err)
2051+
2052+
feed := ct.newChangefeed(feedArgs{
2053+
sinkType: cloudStorageSink,
2054+
targets: allTpccTargets,
2055+
opts: map[string]string{
2056+
"execution_locality": "'region=us-east1'",
2057+
},
2058+
})
2059+
ct.waitForWorkload()
2060+
feed.waitForCompletion()
2061+
2062+
processors, err := getDiagramProcessors(ctx, ct.DB())
2063+
require.NoError(t, err)
2064+
2065+
changefeedDistribution := getChangefeedDistribution(processors, nodeToZone, t)
2066+
require.Greater(t, changefeedDistribution.TotalAggregators, 1)
2067+
for nodeIdx, spansWatched := range changefeedDistribution.NodeToSpansWatched {
2068+
require.LessOrEqual(t, spansWatched, changefeedDistribution.TotalSpansWatched/2, "nodeIdx %d watched %d spans, total spans watched %d", nodeIdx, spansWatched, changefeedDistribution.TotalSpansWatched)
2069+
}
2070+
require.Equal(t, 1, len(changefeedDistribution.ZoneToSpansWatched))
2071+
require.Equal(t, changefeedDistribution.ZoneToSpansWatched["us-east1-b"], changefeedDistribution.TotalSpansWatched)
2072+
zoneToLeaseHolderCount := veryifyLeaseHolderDistribution(ct.DB(), t, nodeToZone)
2073+
// Majority of lease holders should be in us-west1-b. Some may not, but most should.
2074+
if zoneToLeaseHolderCount["us-east1-b"] != 0 {
2075+
require.Greater(t, zoneToLeaseHolderCount["us-west1-b"]/zoneToLeaseHolderCount["us-east1-b"], 10)
2076+
}
2077+
},
2078+
})
19032079
r.Add(registry.TestSpec{
19042080
Name: "cdc/initial-scan-only",
19052081
Owner: registry.OwnerCDC,

0 commit comments

Comments
 (0)