@@ -20,6 +20,7 @@ import (
2020 "encoding/pem"
2121 "fmt"
2222 "io"
23+ "maps"
2324 "math/big"
2425 "math/rand"
2526 "net"
@@ -31,6 +32,7 @@ import (
3132 "path/filepath"
3233 "regexp"
3334 "runtime"
35+ "slices"
3436 "sort"
3537 "strconv"
3638 "strings"
@@ -1957,26 +1959,18 @@ func getDiagramProcessors(ctx context.Context, db *gosql.DB) ([]any, error) {
19571959}
19581960
19591961type ChangefeedDistribution struct {
1960- NodeToSpansWatched map [int ]int
19611962 ZoneToSpansWatched map [string ]int
19621963 TotalSpansWatched int
19631964 TotalAggregators int
1964- TotalLeaseHolders int
1965- TotalRanges int
1966- NodeToZone map [int ]string
19671965}
19681966
19691967func getChangefeedDistribution (
19701968 processors []any , nodeToZone map [int ]string , t test.Test ,
19711969) ChangefeedDistribution {
19721970 changefeedDistribution := ChangefeedDistribution {
1973- NodeToSpansWatched : make (map [int ]int ),
19741971 ZoneToSpansWatched : make (map [string ]int ),
19751972 TotalSpansWatched : 0 ,
19761973 TotalAggregators : 0 ,
1977- TotalLeaseHolders : 0 ,
1978- TotalRanges : 0 ,
1979- NodeToZone : nodeToZone ,
19801974 }
19811975 for _ , p := range processors {
19821976 procMap , ok := p .(map [string ]any )
@@ -1999,10 +1993,8 @@ func getChangefeedDistribution(
19991993 if len (matches ) > 1 {
20001994 numWatches , err := strconv .Atoi (matches [1 ])
20011995 require .NoError (t , err )
2002- changefeedDistribution .NodeToSpansWatched [int (nodeIdx )] += numWatches
20031996 changefeedDistribution .TotalSpansWatched += numWatches
2004- changefeedDistribution .ZoneToSpansWatched [changefeedDistribution .NodeToZone [int (nodeIdx )]] += numWatches
2005-
1997+ changefeedDistribution .ZoneToSpansWatched [nodeToZone [int (nodeIdx )]] += numWatches
20061998 }
20071999 }
20082000 }
@@ -2011,42 +2003,36 @@ func getChangefeedDistribution(
20112003 return changefeedDistribution
20122004}
20132005
2014- func veryifyLeaseHolderDistribution (
2015- db * gosql.DB , t test.Test , nodeToZone map [int ]string ,
2016- ) map [string ]int {
2017- var rows * gosql.Rows
2018- // Get lease holders for all ranges in tpcc database.
2019- leaseHolderQuery := `SELECT r.start_pretty, r.replicas, r.replica_localities, r.lease_holder
2020- FROM crdb_internal.ranges r
2021- JOIN crdb_internal.tables t ON r.start_pretty like concat('/Table/', t.table_id::STRING,'%')
2022- WHERE t.database_name = 'tpcc'`
2023- rows , err := db .Query (leaseHolderQuery )
2024- zoneToLeaseHolderCount := make (map [string ]int )
2025- require .NoError (t , err )
2026- defer rows .Close ()
2027- for rows .Next () {
2028- var startKeyPretty string
2029- var replicas []uint8
2030- var replicaLocalities []uint8
2031- var leaseHolder int
2032- require .NoError (t , rows .Scan (& startKeyPretty , & replicas , & replicaLocalities , & leaseHolder ))
2033- for indx := range replicas {
2034- require .NotEqual (t , replicas [indx ], 0 )
2035- replicas [indx ]--
2006+ func verifyLeaseHolderLocality (db * gosql.DB , t test.Test , primaryRegion string ) {
2007+ leaseHolderQuery := `SELECT NOT EXISTS (
2008+ SELECT 1
2009+ FROM [SHOW CLUSTER RANGES WITH TABLES, DETAILS]
2010+ WHERE database_name = 'tpcc'
2011+ AND (lease_holder_locality IS DISTINCT FROM $1::STRING OR lease_holder_locality IS NULL)
2012+ )`
2013+ t .L ().Printf ("Waiting for all lease holders to be in region %s" , primaryRegion )
2014+ start := timeutil .Now ()
2015+ ok := false
2016+ for {
2017+ if timeutil .Since (start ) > 5 * time .Minute {
2018+ t .Fatalf ("Timeout waiting for lease holders to be in region %s; waited for %s" , primaryRegion , timeutil .Since (start ))
20362019 }
2037- leaseHolder --
2038- zoneToLeaseHolderCount [nodeToZone [leaseHolder ]]++
2020+ require .NoError (t , db .QueryRow (leaseHolderQuery , primaryRegion ).Scan (& ok ))
2021+ if ok {
2022+ break
2023+ }
2024+ time .Sleep (time .Second )
20392025 }
2040- return zoneToLeaseHolderCount
20412026}
20422027
20432028func registerCDC (r registry.Registry ) {
20442029 r .Add (registry.TestSpec {
20452030 // This test
2046- // 1. Creates a cluster with 3 nodes each in us-east and us-west
2047- // 2. Runs a tpcc workload, then sets tpcc database to primary region us-west
2048- // 3. Creates a changefeed with execution locality set to us-east
2049- // 4. Gets the changefeed diagram and creates mappings
2031+ // 1. Creates a cluster with 3 nodes each in us-east and us-west;
2032+ // 2. Runs a tpcc workload, then congigures tpcc database to have lease holders in region us-west;
2033+ // 3. Creates a changefeed with execution locality set to us-east;
2034+ // 4. Gets the changefeed diagram and creates mappings;
2035+ // 5. Verifies that spans are assigned to multiple change aggregators in region us-east.
20502036
20512037 // This test is used to verify that ranges are evenly distributed across
20522038 // change aggregators in the execution_locality region while targeting tables
@@ -2058,7 +2044,7 @@ func registerCDC(r registry.Registry) {
20582044 Owner : registry .OwnerCDC ,
20592045 Cluster : r .MakeClusterSpec (7 , spec .Geo (), spec .GatherCores (), spec .GCEZones ("us-east1-b,us-west1-b" )),
20602046 CompatibleClouds : registry .OnlyGCE ,
2061- Suites : registry .Suites (),
2047+ Suites : registry .Suites (registry . Nightly ),
20622048 Run : func (ctx context.Context , t test.Test , c cluster.Cluster ) {
20632049 nodeToZone := map [int ]string {
20642050 0 : "us-east1-b" ,
@@ -2071,17 +2057,24 @@ func registerCDC(r registry.Registry) {
20712057 ct := newCDCTester (ctx , t , c )
20722058 defer ct .Close ()
20732059
2074- ct .runTPCCWorkload (tpccArgs {warehouses : 100 })
2060+ ct .runTPCCWorkload (tpccArgs {warehouses : 20 })
20752061
20762062 var err error
2077- _ , err = ct .DB ().Exec ("ALTER DATABASE tpcc SET PRIMARY REGION 'us-west1'" )
2063+ _ , err = ct .DB ().Exec (`ALTER DATABASE tpcc
2064+ CONFIGURE ZONE USING
2065+ constraints = '{+region=us-west1: 1, +region=us-east1: 1}',
2066+ lease_preferences = '[[+region=us-west1]]', num_replicas = 3` )
20782067 require .NoError (t , err )
20792068
2069+ // Verify lease holders are in us-west1-b.
2070+ verifyLeaseHolderLocality (ct .DB (), t , "cloud=gce,region=us-west1,zone=us-west1-b" )
2071+
20802072 feed := ct .newChangefeed (feedArgs {
20812073 sinkType : cloudStorageSink ,
20822074 targets : allTpccTargets ,
20832075 opts : map [string ]string {
20842076 "execution_locality" : "'region=us-east1'" ,
2077+ "initial_scan" : "'only'" ,
20852078 },
20862079 })
20872080 ct .waitForWorkload ()
@@ -2090,18 +2083,12 @@ func registerCDC(r registry.Registry) {
20902083 processors , err := getDiagramProcessors (ctx , ct .DB ())
20912084 require .NoError (t , err )
20922085
2086+ // Verify changefeed aggregators are distributed across nodes in region us-east.
20932087 changefeedDistribution := getChangefeedDistribution (processors , nodeToZone , t )
20942088 require .Greater (t , changefeedDistribution .TotalAggregators , 1 )
2095- for nodeIdx , spansWatched := range changefeedDistribution .NodeToSpansWatched {
2096- require .LessOrEqual (t , spansWatched , changefeedDistribution .TotalSpansWatched / 2 , "nodeIdx %d watched %d spans, total spans watched %d" , nodeIdx , spansWatched , changefeedDistribution .TotalSpansWatched )
2097- }
2098- require .Equal (t , 1 , len (changefeedDistribution .ZoneToSpansWatched ))
2089+ require .ElementsMatch (t , []string {"us-east1-b" }, slices .Collect (maps .Keys (changefeedDistribution .ZoneToSpansWatched )))
20992090 require .Equal (t , changefeedDistribution .ZoneToSpansWatched ["us-east1-b" ], changefeedDistribution .TotalSpansWatched )
2100- zoneToLeaseHolderCount := veryifyLeaseHolderDistribution (ct .DB (), t , nodeToZone )
2101- // Majority of lease holders should be in us-west1-b. Some may not, but most should.
2102- if zoneToLeaseHolderCount ["us-east1-b" ] != 0 {
2103- require .Greater (t , zoneToLeaseHolderCount ["us-west1-b" ]/ zoneToLeaseHolderCount ["us-east1-b" ], 10 )
2104- }
2091+ require .Greater (t , changefeedDistribution .TotalSpansWatched , 0 )
21052092 },
21062093 })
21072094 r .Add (registry.TestSpec {
0 commit comments