@@ -14,12 +14,14 @@ import (
1414 "time"
1515
1616 "github.com/cockroachdb/cockroach/pkg/base"
17+ "github.com/cockroachdb/cockroach/pkg/clusterversion"
1718 "github.com/cockroachdb/cockroach/pkg/keys"
1819 "github.com/cockroachdb/cockroach/pkg/kv"
1920 "github.com/cockroachdb/cockroach/pkg/kv/kvpb"
2021 "github.com/cockroachdb/cockroach/pkg/kv/kvserver"
2122 "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts"
2223 "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/ctpb"
24+ "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/policyrefresher"
2325 "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
2426 "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
2527 "github.com/cockroachdb/cockroach/pkg/roachpb"
@@ -33,6 +35,7 @@ import (
3335 "github.com/cockroachdb/cockroach/pkg/util/hlc"
3436 "github.com/cockroachdb/cockroach/pkg/util/leaktest"
3537 "github.com/cockroachdb/cockroach/pkg/util/log"
38+ "github.com/cockroachdb/cockroach/pkg/util/syncutil"
3639 "github.com/cockroachdb/errors"
3740 "github.com/stretchr/testify/require"
3841 "golang.org/x/sync/errgroup"
@@ -1201,3 +1204,137 @@ func TestRefreshPolicyWithVariousLatencies(t *testing.T) {
12011204 })
12021205 }
12031206}
1207+
1208+ // TestReplicaClosedTSPolicyWithPolicyRefresherInMixedVersionCluster verifies
1209+ // that the closed timestamp policy refresher behaves correctly in a
1210+ // mixed-version cluster.
1211+ //
1212+ // Particularly, a race condition like below might occur:
1213+ // 1. Side transport prepares a policy map before cluster upgrade is complete.
1214+ // 2. Cluster upgrade completes.
1215+ // 3. Policy refresher sees the upgrade and quickly updates replica policies to
1216+ // use latency-based policies.
1217+ // 4. Replica tries to use a latency-based policy but the policy map from step 1
1218+ // doesn't include it yet.
1219+ //
1220+ // The logic in replica.getTargetByPolicy handles this race condition by falling
1221+ // back to no-latency based policies if no-latency based policies were included
1222+ // from the map provided by the side transport sender.
1223+ //
1224+ // This test simulates a race condition by using a testing knob to allow the
1225+ // policy refresher to use latency based policies on replicas while the rest of
1226+ // the system is still on an older version. It verifies that even if the policy
1227+ // refresher sees an upgrade to V25_2 and replicas starts holding latency based
1228+ // policies, replicas will correctly fall back to non-latency-based policies if
1229+ // the sender hasn’t yet sent the updated latency-based policies.
1230+ func TestReplicaClosedTSPolicyWithPolicyRefresherInMixedVersionCluster (t * testing.T ) {
1231+ defer leaktest .AfterTest (t )()
1232+ defer log .Scope (t ).Close (t )
1233+ ctx := context .Background ()
1234+ prevVer := clusterversion .V25_1 .Version ()
1235+ st := cluster .MakeTestingClusterSettingsWithVersions (prevVer , prevVer , true )
1236+ // Helper function to check if a policy is a newly introduced latency-based policy.
1237+ isLatencyBasedPolicy := func (policy ctpb.RangeClosedTimestampPolicy ) bool {
1238+ return policy >= ctpb .LEAD_FOR_GLOBAL_READS_LATENCY_LESS_THAN_20MS &&
1239+ policy <= ctpb .LEAD_FOR_GLOBAL_READS_LATENCY_EQUAL_OR_GREATER_THAN_300MS
1240+ }
1241+
1242+ // Set small intervals for faster testing.
1243+ closedts .RangeClosedTimestampPolicyRefreshInterval .Override (ctx , & st .SV , 5 * time .Millisecond )
1244+ closedts .RangeClosedTimestampPolicyLatencyRefreshInterval .Override (ctx , & st .SV , 5 * time .Millisecond )
1245+ closedts .SideTransportCloseInterval .Override (ctx , & st .SV , 5 * time .Millisecond )
1246+
1247+ type latencyMap struct {
1248+ mu syncutil.Mutex
1249+ m map [roachpb.NodeID ]time.Duration
1250+ }
1251+
1252+ latencies := latencyMap {m : make (map [roachpb.NodeID ]time.Duration )}
1253+ upgradeForPolicyRefresher := func () {
1254+ latencies .mu .Lock ()
1255+ defer latencies .mu .Unlock ()
1256+ latencies .m = map [roachpb.NodeID ]time.Duration {
1257+ 1 : 10 * time .Millisecond ,
1258+ 2 : 20 * time .Millisecond ,
1259+ 3 : 50 * time .Millisecond ,
1260+ }
1261+ }
1262+
1263+ tc := testcluster .StartTestCluster (t , 3 ,
1264+ base.TestClusterArgs {
1265+ ReplicationMode : base .ReplicationManual ,
1266+ ServerArgs : base.TestServerArgs {
1267+ Knobs : base.TestingKnobs {
1268+ PolicyRefresherTestingKnobs : & policyrefresher.TestingKnobs {
1269+ InjectedLatencies : func () map [roachpb.NodeID ]time.Duration {
1270+ latencies .mu .Lock ()
1271+ defer latencies .mu .Unlock ()
1272+ return latencies .m
1273+ },
1274+ },
1275+ },
1276+ Settings : st ,
1277+ },
1278+ })
1279+ defer tc .Stopper ().Stop (ctx )
1280+
1281+ key := tc .ScratchRange (t )
1282+ // Split the range at the table prefix and replicate it across all nodes.
1283+ tc .SplitRangeOrFatal (t , key )
1284+ tc .AddVotersOrFatal (t , key , tc .Target (1 ), tc .Target (2 ))
1285+
1286+ // Get the store and replica for testing.
1287+ store := tc .GetFirstStoreFromServer (t , 0 )
1288+ replica := store .LookupReplica (roachpb .RKey (key ))
1289+ require .NotNil (t , replica )
1290+ spanConfig , err := replica .LoadSpanConfig (ctx )
1291+ spanConfig .GlobalReads = true
1292+ require .NoError (t , err )
1293+ require .NotNil (t , spanConfig )
1294+ replica .SetSpanConfig (* spanConfig , roachpb.Span {Key : key })
1295+
1296+ hasLatencyBasedPolicies := func (snapshot * ctpb.Update ) bool {
1297+ // Verify no latency-based policies are being sent.
1298+ latencyBasedPolicyClosedTimestamps := len (snapshot .ClosedTimestamps ) == int (roachpb .MAX_CLOSED_TIMESTAMP_POLICY )
1299+ // Verify no latency-based policies is chosen by ranges.
1300+ hasLatencyBasedPolicyForAllRanges := func () bool {
1301+ for _ , policy := range snapshot .AddedOrUpdated {
1302+ if isLatencyBasedPolicy (policy .Policy ) {
1303+ return true
1304+ }
1305+ }
1306+ return false
1307+ }
1308+ return ! latencyBasedPolicyClosedTimestamps && ! hasLatencyBasedPolicyForAllRanges ()
1309+ }
1310+
1311+ // Verify that no latency-based policies should be sent initially.
1312+ require .Never (t , func () bool {
1313+ snapshot := store .GetStoreConfig ().ClosedTimestampSender .GetSnapshot ()
1314+ expected := ! hasLatencyBasedPolicies (snapshot ) || len (snapshot .AddedOrUpdated ) == 0
1315+ return ! expected
1316+ }, 5 * time .Second , 50 * time .Millisecond )
1317+
1318+ // Upgrade the cluster version for policy refresher.
1319+ upgradeForPolicyRefresher ()
1320+
1321+ // Replicas should now start holding latency based policies.
1322+ testutils .SucceedsSoon (t , func () error {
1323+ leaseholders := store .GetStoreConfig ().ClosedTimestampSender .GetLeaseholders ()
1324+ for _ , lh := range leaseholders {
1325+ if policy := lh .(* kvserver.Replica ).GetCachedClosedTimestampPolicyForTesting (); isLatencyBasedPolicy (policy ) {
1326+ return nil
1327+ }
1328+ }
1329+ return errors .New ("expected some leaseholder to have a latency-based policy but none had one" )
1330+ })
1331+
1332+ // Sender does not see the cluster version upgrade yet. Replicas should fall
1333+ // back to no-latency based policies when side transport senders consult with
1334+ // leaseholders on policies to be sent.
1335+ require .Never (t , func () bool {
1336+ snapshot := store .GetStoreConfig ().ClosedTimestampSender .GetSnapshot ()
1337+ expected := ! hasLatencyBasedPolicies (snapshot ) || len (snapshot .AddedOrUpdated ) == 0
1338+ return ! expected
1339+ }, 10 * time .Second , 50 * time .Millisecond )
1340+ }
0 commit comments