@@ -20,6 +20,7 @@ import (
20
20
"encoding/pem"
21
21
"fmt"
22
22
"io"
23
+ "maps"
23
24
"math/big"
24
25
"math/rand"
25
26
"net"
@@ -31,6 +32,7 @@ import (
31
32
"path/filepath"
32
33
"regexp"
33
34
"runtime"
35
+ "slices"
34
36
"sort"
35
37
"strconv"
36
38
"strings"
@@ -1947,26 +1949,18 @@ func getDiagramProcessors(ctx context.Context, db *gosql.DB) ([]any, error) {
1947
1949
}
1948
1950
1949
1951
type ChangefeedDistribution struct {
1950
- NodeToSpansWatched map [int ]int
1951
1952
ZoneToSpansWatched map [string ]int
1952
1953
TotalSpansWatched int
1953
1954
TotalAggregators int
1954
- TotalLeaseHolders int
1955
- TotalRanges int
1956
- NodeToZone map [int ]string
1957
1955
}
1958
1956
1959
1957
func getChangefeedDistribution (
1960
1958
processors []any , nodeToZone map [int ]string , t test.Test ,
1961
1959
) ChangefeedDistribution {
1962
1960
changefeedDistribution := ChangefeedDistribution {
1963
- NodeToSpansWatched : make (map [int ]int ),
1964
1961
ZoneToSpansWatched : make (map [string ]int ),
1965
1962
TotalSpansWatched : 0 ,
1966
1963
TotalAggregators : 0 ,
1967
- TotalLeaseHolders : 0 ,
1968
- TotalRanges : 0 ,
1969
- NodeToZone : nodeToZone ,
1970
1964
}
1971
1965
for _ , p := range processors {
1972
1966
procMap , ok := p .(map [string ]any )
@@ -1989,10 +1983,8 @@ func getChangefeedDistribution(
1989
1983
if len (matches ) > 1 {
1990
1984
numWatches , err := strconv .Atoi (matches [1 ])
1991
1985
require .NoError (t , err )
1992
- changefeedDistribution .NodeToSpansWatched [int (nodeIdx )] += numWatches
1993
1986
changefeedDistribution .TotalSpansWatched += numWatches
1994
- changefeedDistribution .ZoneToSpansWatched [changefeedDistribution .NodeToZone [int (nodeIdx )]] += numWatches
1995
-
1987
+ changefeedDistribution .ZoneToSpansWatched [nodeToZone [int (nodeIdx )]] += numWatches
1996
1988
}
1997
1989
}
1998
1990
}
@@ -2001,42 +1993,36 @@ func getChangefeedDistribution(
2001
1993
return changefeedDistribution
2002
1994
}
2003
1995
2004
- func veryifyLeaseHolderDistribution (
2005
- db * gosql.DB , t test.Test , nodeToZone map [int ]string ,
2006
- ) map [string ]int {
2007
- var rows * gosql.Rows
2008
- // Get lease holders for all ranges in tpcc database.
2009
- leaseHolderQuery := `SELECT r.start_pretty, r.replicas, r.replica_localities, r.lease_holder
2010
- FROM crdb_internal.ranges r
2011
- JOIN crdb_internal.tables t ON r.start_pretty like concat('/Table/', t.table_id::STRING,'%')
2012
- WHERE t.database_name = 'tpcc'`
2013
- rows , err := db .Query (leaseHolderQuery )
2014
- zoneToLeaseHolderCount := make (map [string ]int )
2015
- require .NoError (t , err )
2016
- defer rows .Close ()
2017
- for rows .Next () {
2018
- var startKeyPretty string
2019
- var replicas []uint8
2020
- var replicaLocalities []uint8
2021
- var leaseHolder int
2022
- require .NoError (t , rows .Scan (& startKeyPretty , & replicas , & replicaLocalities , & leaseHolder ))
2023
- for indx := range replicas {
2024
- require .NotEqual (t , replicas [indx ], 0 )
2025
- replicas [indx ]--
1996
+ func verifyLeaseHolderLocality (db * gosql.DB , t test.Test , primaryRegion string ) {
1997
+ leaseHolderQuery := `SELECT NOT EXISTS (
1998
+ SELECT 1
1999
+ FROM [SHOW CLUSTER RANGES WITH TABLES, DETAILS]
2000
+ WHERE database_name = 'tpcc'
2001
+ AND (lease_holder_locality IS DISTINCT FROM $1::STRING OR lease_holder_locality IS NULL)
2002
+ )`
2003
+ t .L ().Printf ("Waiting for all lease holders to be in region %s" , primaryRegion )
2004
+ start := timeutil .Now ()
2005
+ ok := false
2006
+ for {
2007
+ if timeutil .Since (start ) > 5 * time .Minute {
2008
+ t .Fatalf ("Timeout waiting for lease holders to be in region %s; waited for %s" , primaryRegion , timeutil .Since (start ))
2026
2009
}
2027
- leaseHolder --
2028
- zoneToLeaseHolderCount [nodeToZone [leaseHolder ]]++
2010
+ require .NoError (t , db .QueryRow (leaseHolderQuery , primaryRegion ).Scan (& ok ))
2011
+ if ok {
2012
+ break
2013
+ }
2014
+ time .Sleep (time .Second )
2029
2015
}
2030
- return zoneToLeaseHolderCount
2031
2016
}
2032
2017
2033
2018
func registerCDC (r registry.Registry ) {
2034
2019
r .Add (registry.TestSpec {
2035
2020
// This test
2036
- // 1. Creates a cluster with 3 nodes each in us-east and us-west
2037
- // 2. Runs a tpcc workload, then sets tpcc database to primary region us-west
2038
- // 3. Creates a changefeed with execution locality set to us-east
2039
- // 4. Gets the changefeed diagram and creates mappings
2021
+ // 1. Creates a cluster with 3 nodes each in us-east and us-west;
2022
+ // 2. Runs a tpcc workload, then congigures tpcc database to have lease holders in region us-west;
2023
+ // 3. Creates a changefeed with execution locality set to us-east;
2024
+ // 4. Gets the changefeed diagram and creates mappings;
2025
+ // 5. Verifies that spans are assigned to multiple change aggregators in region us-east.
2040
2026
2041
2027
// This test is used to verify that ranges are evenly distributed across
2042
2028
// change aggregators in the execution_locality region while targeting tables
@@ -2048,7 +2034,7 @@ func registerCDC(r registry.Registry) {
2048
2034
Owner : registry .OwnerCDC ,
2049
2035
Cluster : r .MakeClusterSpec (7 , spec .Geo (), spec .GatherCores (), spec .GCEZones ("us-east1-b,us-west1-b" )),
2050
2036
CompatibleClouds : registry .OnlyGCE ,
2051
- Suites : registry .Suites (),
2037
+ Suites : registry .Suites (registry . Nightly ),
2052
2038
Run : func (ctx context.Context , t test.Test , c cluster.Cluster ) {
2053
2039
nodeToZone := map [int ]string {
2054
2040
0 : "us-east1-b" ,
@@ -2061,17 +2047,24 @@ func registerCDC(r registry.Registry) {
2061
2047
ct := newCDCTester (ctx , t , c )
2062
2048
defer ct .Close ()
2063
2049
2064
- ct .runTPCCWorkload (tpccArgs {warehouses : 100 })
2050
+ ct .runTPCCWorkload (tpccArgs {warehouses : 20 })
2065
2051
2066
2052
var err error
2067
- _ , err = ct .DB ().Exec ("ALTER DATABASE tpcc SET PRIMARY REGION 'us-west1'" )
2053
+ _ , err = ct .DB ().Exec (`ALTER DATABASE tpcc
2054
+ CONFIGURE ZONE USING
2055
+ constraints = '{+region=us-west1: 1, +region=us-east1: 1}',
2056
+ lease_preferences = '[[+region=us-west1]]', num_replicas = 3` )
2068
2057
require .NoError (t , err )
2069
2058
2059
+ // Verify lease holders are in us-west1-b.
2060
+ verifyLeaseHolderLocality (ct .DB (), t , "cloud=gce,region=us-west1,zone=us-west1-b" )
2061
+
2070
2062
feed := ct .newChangefeed (feedArgs {
2071
2063
sinkType : cloudStorageSink ,
2072
2064
targets : allTpccTargets ,
2073
2065
opts : map [string ]string {
2074
2066
"execution_locality" : "'region=us-east1'" ,
2067
+ "initial_scan" : "'only'" ,
2075
2068
},
2076
2069
})
2077
2070
ct .waitForWorkload ()
@@ -2080,18 +2073,12 @@ func registerCDC(r registry.Registry) {
2080
2073
processors , err := getDiagramProcessors (ctx , ct .DB ())
2081
2074
require .NoError (t , err )
2082
2075
2076
+ // Verify changefeed aggregators are distributed across nodes in region us-east.
2083
2077
changefeedDistribution := getChangefeedDistribution (processors , nodeToZone , t )
2084
2078
require .Greater (t , changefeedDistribution .TotalAggregators , 1 )
2085
- for nodeIdx , spansWatched := range changefeedDistribution .NodeToSpansWatched {
2086
- require .LessOrEqual (t , spansWatched , changefeedDistribution .TotalSpansWatched / 2 , "nodeIdx %d watched %d spans, total spans watched %d" , nodeIdx , spansWatched , changefeedDistribution .TotalSpansWatched )
2087
- }
2088
- require .Equal (t , 1 , len (changefeedDistribution .ZoneToSpansWatched ))
2079
+ require .ElementsMatch (t , []string {"us-east1-b" }, slices .Collect (maps .Keys (changefeedDistribution .ZoneToSpansWatched )))
2089
2080
require .Equal (t , changefeedDistribution .ZoneToSpansWatched ["us-east1-b" ], changefeedDistribution .TotalSpansWatched )
2090
- zoneToLeaseHolderCount := veryifyLeaseHolderDistribution (ct .DB (), t , nodeToZone )
2091
- // Majority of lease holders should be in us-west1-b. Some may not, but most should.
2092
- if zoneToLeaseHolderCount ["us-east1-b" ] != 0 {
2093
- require .Greater (t , zoneToLeaseHolderCount ["us-west1-b" ]/ zoneToLeaseHolderCount ["us-east1-b" ], 10 )
2094
- }
2081
+ require .Greater (t , changefeedDistribution .TotalSpansWatched , 0 )
2095
2082
},
2096
2083
})
2097
2084
r .Add (registry.TestSpec {
0 commit comments