@@ -20,6 +20,7 @@ import (
20
20
"encoding/pem"
21
21
"fmt"
22
22
"io"
23
+ "maps"
23
24
"math/big"
24
25
"math/rand"
25
26
"net"
@@ -31,6 +32,7 @@ import (
31
32
"path/filepath"
32
33
"regexp"
33
34
"runtime"
35
+ "slices"
34
36
"sort"
35
37
"strconv"
36
38
"strings"
@@ -1951,26 +1953,18 @@ func getDiagramProcessors(ctx context.Context, db *gosql.DB) ([]any, error) {
1951
1953
}
1952
1954
1953
1955
type ChangefeedDistribution struct {
1954
- NodeToSpansWatched map [int ]int
1955
1956
ZoneToSpansWatched map [string ]int
1956
1957
TotalSpansWatched int
1957
1958
TotalAggregators int
1958
- TotalLeaseHolders int
1959
- TotalRanges int
1960
- NodeToZone map [int ]string
1961
1959
}
1962
1960
1963
1961
func getChangefeedDistribution (
1964
1962
processors []any , nodeToZone map [int ]string , t test.Test ,
1965
1963
) ChangefeedDistribution {
1966
1964
changefeedDistribution := ChangefeedDistribution {
1967
- NodeToSpansWatched : make (map [int ]int ),
1968
1965
ZoneToSpansWatched : make (map [string ]int ),
1969
1966
TotalSpansWatched : 0 ,
1970
1967
TotalAggregators : 0 ,
1971
- TotalLeaseHolders : 0 ,
1972
- TotalRanges : 0 ,
1973
- NodeToZone : nodeToZone ,
1974
1968
}
1975
1969
for _ , p := range processors {
1976
1970
procMap , ok := p .(map [string ]any )
@@ -1993,10 +1987,8 @@ func getChangefeedDistribution(
1993
1987
if len (matches ) > 1 {
1994
1988
numWatches , err := strconv .Atoi (matches [1 ])
1995
1989
require .NoError (t , err )
1996
- changefeedDistribution .NodeToSpansWatched [int (nodeIdx )] += numWatches
1997
1990
changefeedDistribution .TotalSpansWatched += numWatches
1998
- changefeedDistribution .ZoneToSpansWatched [changefeedDistribution .NodeToZone [int (nodeIdx )]] += numWatches
1999
-
1991
+ changefeedDistribution .ZoneToSpansWatched [nodeToZone [int (nodeIdx )]] += numWatches
2000
1992
}
2001
1993
}
2002
1994
}
@@ -2005,42 +1997,36 @@ func getChangefeedDistribution(
2005
1997
return changefeedDistribution
2006
1998
}
2007
1999
2008
- func veryifyLeaseHolderDistribution (
2009
- db * gosql.DB , t test.Test , nodeToZone map [int ]string ,
2010
- ) map [string ]int {
2011
- var rows * gosql.Rows
2012
- // Get lease holders for all ranges in tpcc database.
2013
- leaseHolderQuery := `SELECT r.start_pretty, r.replicas, r.replica_localities, r.lease_holder
2014
- FROM crdb_internal.ranges r
2015
- JOIN crdb_internal.tables t ON r.start_pretty like concat('/Table/', t.table_id::STRING,'%')
2016
- WHERE t.database_name = 'tpcc'`
2017
- rows , err := db .Query (leaseHolderQuery )
2018
- zoneToLeaseHolderCount := make (map [string ]int )
2019
- require .NoError (t , err )
2020
- defer rows .Close ()
2021
- for rows .Next () {
2022
- var startKeyPretty string
2023
- var replicas []uint8
2024
- var replicaLocalities []uint8
2025
- var leaseHolder int
2026
- require .NoError (t , rows .Scan (& startKeyPretty , & replicas , & replicaLocalities , & leaseHolder ))
2027
- for indx := range replicas {
2028
- require .NotEqual (t , replicas [indx ], 0 )
2029
- replicas [indx ]--
2000
+ func verifyLeaseHolderLocality (db * gosql.DB , t test.Test , primaryRegion string ) {
2001
+ leaseHolderQuery := `SELECT NOT EXISTS (
2002
+ SELECT 1
2003
+ FROM [SHOW CLUSTER RANGES WITH TABLES, DETAILS]
2004
+ WHERE database_name = 'tpcc'
2005
+ AND (lease_holder_locality IS DISTINCT FROM $1::STRING OR lease_holder_locality IS NULL)
2006
+ )`
2007
+ t .L ().Printf ("Waiting for all lease holders to be in region %s" , primaryRegion )
2008
+ start := timeutil .Now ()
2009
+ ok := false
2010
+ for {
2011
+ if timeutil .Since (start ) > 5 * time .Minute {
2012
+ t .Fatalf ("Timeout waiting for lease holders to be in region %s; waited for %s" , primaryRegion , timeutil .Since (start ))
2030
2013
}
2031
- leaseHolder --
2032
- zoneToLeaseHolderCount [nodeToZone [leaseHolder ]]++
2014
+ require .NoError (t , db .QueryRow (leaseHolderQuery , primaryRegion ).Scan (& ok ))
2015
+ if ok {
2016
+ break
2017
+ }
2018
+ time .Sleep (time .Second )
2033
2019
}
2034
- return zoneToLeaseHolderCount
2035
2020
}
2036
2021
2037
2022
func registerCDC (r registry.Registry ) {
2038
2023
r .Add (registry.TestSpec {
2039
2024
// This test
2040
- // 1. Creates a cluster with 3 nodes each in us-east and us-west
2041
- // 2. Runs a tpcc workload, then sets tpcc database to primary region us-west
2042
- // 3. Creates a changefeed with execution locality set to us-east
2043
- // 4. Gets the changefeed diagram and creates mappings
2025
+ // 1. Creates a cluster with 3 nodes each in us-east and us-west;
2026
+ // 2. Runs a tpcc workload, then congigures tpcc database to have lease holders in region us-west;
2027
+ // 3. Creates a changefeed with execution locality set to us-east;
2028
+ // 4. Gets the changefeed diagram and creates mappings;
2029
+ // 5. Verifies that spans are assigned to multiple change aggregators in region us-east.
2044
2030
2045
2031
// This test is used to verify that ranges are evenly distributed across
2046
2032
// change aggregators in the execution_locality region while targeting tables
@@ -2052,7 +2038,7 @@ func registerCDC(r registry.Registry) {
2052
2038
Owner : registry .OwnerCDC ,
2053
2039
Cluster : r .MakeClusterSpec (7 , spec .Geo (), spec .GatherCores (), spec .GCEZones ("us-east1-b,us-west1-b" )),
2054
2040
CompatibleClouds : registry .OnlyGCE ,
2055
- Suites : registry .Suites (),
2041
+ Suites : registry .Suites (registry . Nightly ),
2056
2042
Run : func (ctx context.Context , t test.Test , c cluster.Cluster ) {
2057
2043
nodeToZone := map [int ]string {
2058
2044
0 : "us-east1-b" ,
@@ -2065,17 +2051,24 @@ func registerCDC(r registry.Registry) {
2065
2051
ct := newCDCTester (ctx , t , c )
2066
2052
defer ct .Close ()
2067
2053
2068
- ct .runTPCCWorkload (tpccArgs {warehouses : 100 })
2054
+ ct .runTPCCWorkload (tpccArgs {warehouses : 20 })
2069
2055
2070
2056
var err error
2071
- _ , err = ct .DB ().Exec ("ALTER DATABASE tpcc SET PRIMARY REGION 'us-west1'" )
2057
+ _ , err = ct .DB ().Exec (`ALTER DATABASE tpcc
2058
+ CONFIGURE ZONE USING
2059
+ constraints = '{+region=us-west1: 1, +region=us-east1: 1}',
2060
+ lease_preferences = '[[+region=us-west1]]', num_replicas = 3` )
2072
2061
require .NoError (t , err )
2073
2062
2063
+ // Verify lease holders are in us-west1-b.
2064
+ verifyLeaseHolderLocality (ct .DB (), t , "cloud=gce,region=us-west1,zone=us-west1-b" )
2065
+
2074
2066
feed := ct .newChangefeed (feedArgs {
2075
2067
sinkType : cloudStorageSink ,
2076
2068
targets : allTpccTargets ,
2077
2069
opts : map [string ]string {
2078
2070
"execution_locality" : "'region=us-east1'" ,
2071
+ "initial_scan" : "'only'" ,
2079
2072
},
2080
2073
})
2081
2074
ct .waitForWorkload ()
@@ -2084,18 +2077,12 @@ func registerCDC(r registry.Registry) {
2084
2077
processors , err := getDiagramProcessors (ctx , ct .DB ())
2085
2078
require .NoError (t , err )
2086
2079
2080
+ // Verify changefeed aggregators are distributed across nodes in region us-east.
2087
2081
changefeedDistribution := getChangefeedDistribution (processors , nodeToZone , t )
2088
2082
require .Greater (t , changefeedDistribution .TotalAggregators , 1 )
2089
- for nodeIdx , spansWatched := range changefeedDistribution .NodeToSpansWatched {
2090
- require .LessOrEqual (t , spansWatched , changefeedDistribution .TotalSpansWatched / 2 , "nodeIdx %d watched %d spans, total spans watched %d" , nodeIdx , spansWatched , changefeedDistribution .TotalSpansWatched )
2091
- }
2092
- require .Equal (t , 1 , len (changefeedDistribution .ZoneToSpansWatched ))
2083
+ require .ElementsMatch (t , []string {"us-east1-b" }, slices .Collect (maps .Keys (changefeedDistribution .ZoneToSpansWatched )))
2093
2084
require .Equal (t , changefeedDistribution .ZoneToSpansWatched ["us-east1-b" ], changefeedDistribution .TotalSpansWatched )
2094
- zoneToLeaseHolderCount := veryifyLeaseHolderDistribution (ct .DB (), t , nodeToZone )
2095
- // Majority of lease holders should be in us-west1-b. Some may not, but most should.
2096
- if zoneToLeaseHolderCount ["us-east1-b" ] != 0 {
2097
- require .Greater (t , zoneToLeaseHolderCount ["us-west1-b" ]/ zoneToLeaseHolderCount ["us-east1-b" ], 10 )
2098
- }
2085
+ require .Greater (t , changefeedDistribution .TotalSpansWatched , 0 )
2099
2086
},
2100
2087
})
2101
2088
r .Add (registry.TestSpec {
0 commit comments