Skip to content

Commit 13866fc

Browse files
craig[bot]wenyihu6
andcommitted
Merge #144106
144106: kvserver: add mixed version race condition test for policy refresher r=arulajmani a=wenyihu6 **kvserver: add PolicyRefresher.TestingKnobs** This commit adds PolicyRefresher.TestingKnobs, which enables testing race scenarios where the sender hasn't observed a cluster version upgrade when creating the policy map, but the PolicyRefresher later does. Epic: none Release note: none --- **kvserver: add mixed version race condition test for policy refresher** This commit adds TestReplicaClosedTSPolicyWithPolicyRefresherInMixedVersionCluster. TestReplicaClosedTSPolicyWithPolicyRefresherInMixedVersionCluster verifies that the closed timestamp policy refresher behaves correctly in a mixed-version cluster. Particularly, a race condition like below might occur: 1. Side transport prepares a policy map before cluster upgrade is complete. 2. Cluster upgrade completes. 3. Policy refresher sees the upgrade and quickly updates replica policies to use latency-based policies. 4. Replica tries to use a latency-based policy but the policy map from step 1 doesn't include it yet. The logic in replica.getTargetByPolicy handles this race condition by falling back to no-latency based policies if no-latency based policies were included from the map provided by the side transport sender. This test simulates a race condition by using a testing knob to allow the policy refresher to use latency based policies on replicas while the rest of the system is still on an older version. It verifies that even if the policy refresher sees an upgrade to V25_2 and replicas starts holding latency based policies, replicas will correctly fall back to non-latency-based policies if the sender hasn’t yet sent the updated latency-based policies. Part of: #143888 Release note: none Co-authored-by: wenyihu6 <[email protected]>
2 parents 86356f9 + 8f6c489 commit 13866fc

File tree

9 files changed

+192
-9
lines changed

9 files changed

+192
-9
lines changed

pkg/base/testing_knobs.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,4 +57,5 @@ type TestingKnobs struct {
5757
TableMetadata ModuleTestingKnobs
5858
LicenseTestingKnobs ModuleTestingKnobs
5959
VecIndexTestingKnobs ModuleTestingKnobs
60+
PolicyRefresherTestingKnobs ModuleTestingKnobs
6061
}

pkg/kv/kvserver/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -429,6 +429,7 @@ go_test(
429429
"//pkg/kv/kvserver/benignerror",
430430
"//pkg/kv/kvserver/closedts",
431431
"//pkg/kv/kvserver/closedts/ctpb",
432+
"//pkg/kv/kvserver/closedts/policyrefresher",
432433
"//pkg/kv/kvserver/closedts/tracker",
433434
"//pkg/kv/kvserver/concurrency",
434435
"//pkg/kv/kvserver/concurrency/isolation",

pkg/kv/kvserver/closedts/policyrefresher/BUILD.bazel

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
22

33
go_library(
44
name = "policyrefresher",
5-
srcs = ["policy_refresher.go"],
5+
srcs = [
6+
"config.go",
7+
"policy_refresher.go",
8+
],
69
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/policyrefresher",
710
visibility = ["//visibility:public"],
811
deps = [
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package policyrefresher
7+
8+
import (
9+
"time"
10+
11+
"github.com/cockroachdb/cockroach/pkg/roachpb"
12+
)
13+
14+
// TestingKnobs contains testing knobs for the policy refresher.
15+
type TestingKnobs struct {
16+
// InjectedLatencies returns a callback that returns a map of node IDs to
17+
// latencies that will be used by the policy refresher instead of observed
18+
// latencies from rpc context without checking for cluster settings or
19+
// version compatibility.
20+
InjectedLatencies func() map[roachpb.NodeID]time.Duration
21+
}
22+
23+
// TestingKnobs implements the ModuleTestingKnobs interface.
24+
func (*TestingKnobs) ModuleTestingKnobs() {}

pkg/kv/kvserver/closedts/policyrefresher/policy_refresher.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
type PolicyRefresher struct {
3030
stopper *stop.Stopper
3131
settings *cluster.Settings
32+
knobs *TestingKnobs
3233

3334
// getLeaseholderReplicas returns the set of replicas that are currently
3435
// leaseholders of the node.
@@ -64,6 +65,7 @@ func NewPolicyRefresher(
6465
settings *cluster.Settings,
6566
getLeaseholderReplicas func() []Replica,
6667
getNodeLatencies func() map[roachpb.NodeID]time.Duration,
68+
knobs *TestingKnobs,
6769
) *PolicyRefresher {
6870
if getLeaseholderReplicas == nil || getNodeLatencies == nil {
6971
log.Fatalf(context.Background(), "getLeaseholderReplicas and getNodeLatencies must be non-nil")
@@ -72,6 +74,7 @@ func NewPolicyRefresher(
7274
refresher := &PolicyRefresher{
7375
stopper: stopper,
7476
settings: settings,
77+
knobs: knobs,
7578
getLeaseholderReplicas: getLeaseholderReplicas,
7679
getNodeLatencies: getNodeLatencies,
7780
refreshNotificationCh: make(chan struct{}, 1),
@@ -127,6 +130,10 @@ func (pr *PolicyRefresher) updateLatencyCache() {
127130
// getCurrentLatencies returns the current latency information if auto-tuning is
128131
// enabled and the cluster has been fully upgraded to v25.2, or nil otherwise.
129132
func (pr *PolicyRefresher) getCurrentLatencies() map[roachpb.NodeID]time.Duration {
133+
// Testing knobs only.
134+
if pr.knobs != nil && pr.knobs.InjectedLatencies != nil {
135+
return pr.knobs.InjectedLatencies()
136+
}
130137
if !closedts.LeadForGlobalReadsAutoTuneEnabled.Get(&pr.settings.SV) || !pr.settings.Version.IsActive(context.TODO(), clusterversion.V25_2) {
131138
return nil
132139
}

pkg/kv/kvserver/closedts/policyrefresher/policy_refresher_test.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@ func newNoopPolicyRefresher(stopper *stop.Stopper, settings *cluster.Settings) *
2929
func() []Replica { return nil },
3030
func() map[roachpb.NodeID]time.Duration {
3131
return nil
32-
})
32+
},
33+
nil,
34+
)
3335
}
3436

3537
type mockSpanConfig struct {
@@ -134,7 +136,7 @@ func TestPolicyRefreshOnRefreshIntervalUpdate(t *testing.T) {
134136
getLeaseholders := func() []Replica { return []Replica{r} }
135137
getLatencies := func() map[roachpb.NodeID]time.Duration { return nil }
136138

137-
pr := NewPolicyRefresher(stopper, st, getLeaseholders, getLatencies)
139+
pr := NewPolicyRefresher(stopper, st, getLeaseholders, getLatencies, nil)
138140
require.NotNil(t, pr)
139141

140142
// Start the refresher.
@@ -222,7 +224,7 @@ func TestPolicyRefresherOnLatencyIntervalUpdate(t *testing.T) {
222224
closedts.RangeClosedTimestampPolicyLatencyRefreshInterval.Override(
223225
ctx, &st.SV, 1*time.Hour)
224226

225-
pr := NewPolicyRefresher(stopper, st, getLeaseholders, getLatencies)
227+
pr := NewPolicyRefresher(stopper, st, getLeaseholders, getLatencies, nil)
226228
require.NotNil(t, pr)
227229
pr.Run(ctx)
228230

pkg/kv/kvserver/closedts/sidetransport/sender_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -415,10 +415,8 @@ func TestSenderWithLatencyTracker(t *testing.T) {
415415
}
416416
}
417417

418-
// Add a leaseholder with replicas in different regions.
419-
r := newMockReplica(15, ctpb.LEAD_FOR_GLOBAL_READS_WITH_NO_LATENCY_INFO, 1, 2, 3)
420418
s, stopper := newMockSender(connFactory)
421-
policyRefresher := policyrefresher.NewPolicyRefresher(stopper, st, s.GetLeaseholders, getLatencyFn)
419+
policyRefresher := policyrefresher.NewPolicyRefresher(stopper, st, s.GetLeaseholders, getLatencyFn, nil)
422420
defer stopper.Stop(ctx)
423421
policyRefresher.Run(ctx)
424422

@@ -439,6 +437,9 @@ func TestSenderWithLatencyTracker(t *testing.T) {
439437
require.Nil(t, up.Removed)
440438
require.Nil(t, up.AddedOrUpdated)
441439

440+
// Add a leaseholder with replicas in different regions.
441+
r := newMockReplica(15, ctpb.LEAD_FOR_GLOBAL_READS_WITH_NO_LATENCY_INFO, 1, 2, 3)
442+
442443
// Verify policy updates when adding a leaseholder with far-away replicas.
443444
s.RegisterLeaseholder(ctx, r, 1)
444445
testutils.SucceedsSoon(t, func() error {

pkg/kv/kvserver/replica_closedts_test.go

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,14 @@ import (
1414
"time"
1515

1616
"github.com/cockroachdb/cockroach/pkg/base"
17+
"github.com/cockroachdb/cockroach/pkg/clusterversion"
1718
"github.com/cockroachdb/cockroach/pkg/keys"
1819
"github.com/cockroachdb/cockroach/pkg/kv"
1920
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
2021
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
2122
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts"
2223
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/ctpb"
24+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/policyrefresher"
2325
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
2426
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
2527
"github.com/cockroachdb/cockroach/pkg/roachpb"
@@ -33,6 +35,7 @@ import (
3335
"github.com/cockroachdb/cockroach/pkg/util/hlc"
3436
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
3537
"github.com/cockroachdb/cockroach/pkg/util/log"
38+
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
3639
"github.com/cockroachdb/errors"
3740
"github.com/stretchr/testify/require"
3841
"golang.org/x/sync/errgroup"
@@ -1201,3 +1204,137 @@ func TestRefreshPolicyWithVariousLatencies(t *testing.T) {
12011204
})
12021205
}
12031206
}
1207+
1208+
// TestReplicaClosedTSPolicyWithPolicyRefresherInMixedVersionCluster verifies
1209+
// that the closed timestamp policy refresher behaves correctly in a
1210+
// mixed-version cluster.
1211+
//
1212+
// Particularly, a race condition like below might occur:
1213+
// 1. Side transport prepares a policy map before cluster upgrade is complete.
1214+
// 2. Cluster upgrade completes.
1215+
// 3. Policy refresher sees the upgrade and quickly updates replica policies to
1216+
// use latency-based policies.
1217+
// 4. Replica tries to use a latency-based policy but the policy map from step 1
1218+
// doesn't include it yet.
1219+
//
1220+
// The logic in replica.getTargetByPolicy handles this race condition by falling
1221+
// back to no-latency based policies if no-latency based policies were included
1222+
// from the map provided by the side transport sender.
1223+
//
1224+
// This test simulates a race condition by using a testing knob to allow the
1225+
// policy refresher to use latency based policies on replicas while the rest of
1226+
// the system is still on an older version. It verifies that even if the policy
1227+
// refresher sees an upgrade to V25_2 and replicas starts holding latency based
1228+
// policies, replicas will correctly fall back to non-latency-based policies if
1229+
// the sender hasn’t yet sent the updated latency-based policies.
1230+
func TestReplicaClosedTSPolicyWithPolicyRefresherInMixedVersionCluster(t *testing.T) {
1231+
defer leaktest.AfterTest(t)()
1232+
defer log.Scope(t).Close(t)
1233+
ctx := context.Background()
1234+
prevVer := clusterversion.V25_1.Version()
1235+
st := cluster.MakeTestingClusterSettingsWithVersions(prevVer, prevVer, true)
1236+
// Helper function to check if a policy is a newly introduced latency-based policy.
1237+
isLatencyBasedPolicy := func(policy ctpb.RangeClosedTimestampPolicy) bool {
1238+
return policy >= ctpb.LEAD_FOR_GLOBAL_READS_LATENCY_LESS_THAN_20MS &&
1239+
policy <= ctpb.LEAD_FOR_GLOBAL_READS_LATENCY_EQUAL_OR_GREATER_THAN_300MS
1240+
}
1241+
1242+
// Set small intervals for faster testing.
1243+
closedts.RangeClosedTimestampPolicyRefreshInterval.Override(ctx, &st.SV, 5*time.Millisecond)
1244+
closedts.RangeClosedTimestampPolicyLatencyRefreshInterval.Override(ctx, &st.SV, 5*time.Millisecond)
1245+
closedts.SideTransportCloseInterval.Override(ctx, &st.SV, 5*time.Millisecond)
1246+
1247+
type latencyMap struct {
1248+
mu syncutil.Mutex
1249+
m map[roachpb.NodeID]time.Duration
1250+
}
1251+
1252+
latencies := latencyMap{m: make(map[roachpb.NodeID]time.Duration)}
1253+
upgradeForPolicyRefresher := func() {
1254+
latencies.mu.Lock()
1255+
defer latencies.mu.Unlock()
1256+
latencies.m = map[roachpb.NodeID]time.Duration{
1257+
1: 10 * time.Millisecond,
1258+
2: 20 * time.Millisecond,
1259+
3: 50 * time.Millisecond,
1260+
}
1261+
}
1262+
1263+
tc := testcluster.StartTestCluster(t, 3,
1264+
base.TestClusterArgs{
1265+
ReplicationMode: base.ReplicationManual,
1266+
ServerArgs: base.TestServerArgs{
1267+
Knobs: base.TestingKnobs{
1268+
PolicyRefresherTestingKnobs: &policyrefresher.TestingKnobs{
1269+
InjectedLatencies: func() map[roachpb.NodeID]time.Duration {
1270+
latencies.mu.Lock()
1271+
defer latencies.mu.Unlock()
1272+
return latencies.m
1273+
},
1274+
},
1275+
},
1276+
Settings: st,
1277+
},
1278+
})
1279+
defer tc.Stopper().Stop(ctx)
1280+
1281+
key := tc.ScratchRange(t)
1282+
// Split the range at the table prefix and replicate it across all nodes.
1283+
tc.SplitRangeOrFatal(t, key)
1284+
tc.AddVotersOrFatal(t, key, tc.Target(1), tc.Target(2))
1285+
1286+
// Get the store and replica for testing.
1287+
store := tc.GetFirstStoreFromServer(t, 0)
1288+
replica := store.LookupReplica(roachpb.RKey(key))
1289+
require.NotNil(t, replica)
1290+
spanConfig, err := replica.LoadSpanConfig(ctx)
1291+
spanConfig.GlobalReads = true
1292+
require.NoError(t, err)
1293+
require.NotNil(t, spanConfig)
1294+
replica.SetSpanConfig(*spanConfig, roachpb.Span{Key: key})
1295+
1296+
hasLatencyBasedPolicies := func(snapshot *ctpb.Update) bool {
1297+
// Verify no latency-based policies are being sent.
1298+
latencyBasedPolicyClosedTimestamps := len(snapshot.ClosedTimestamps) == int(roachpb.MAX_CLOSED_TIMESTAMP_POLICY)
1299+
// Verify no latency-based policies is chosen by ranges.
1300+
hasLatencyBasedPolicyForAllRanges := func() bool {
1301+
for _, policy := range snapshot.AddedOrUpdated {
1302+
if isLatencyBasedPolicy(policy.Policy) {
1303+
return true
1304+
}
1305+
}
1306+
return false
1307+
}
1308+
return !latencyBasedPolicyClosedTimestamps && !hasLatencyBasedPolicyForAllRanges()
1309+
}
1310+
1311+
// Verify that no latency-based policies should be sent initially.
1312+
require.Never(t, func() bool {
1313+
snapshot := store.GetStoreConfig().ClosedTimestampSender.GetSnapshot()
1314+
expected := !hasLatencyBasedPolicies(snapshot) || len(snapshot.AddedOrUpdated) == 0
1315+
return !expected
1316+
}, 5*time.Second, 50*time.Millisecond)
1317+
1318+
// Upgrade the cluster version for policy refresher.
1319+
upgradeForPolicyRefresher()
1320+
1321+
// Replicas should now start holding latency based policies.
1322+
testutils.SucceedsSoon(t, func() error {
1323+
leaseholders := store.GetStoreConfig().ClosedTimestampSender.GetLeaseholders()
1324+
for _, lh := range leaseholders {
1325+
if policy := lh.(*kvserver.Replica).GetCachedClosedTimestampPolicyForTesting(); isLatencyBasedPolicy(policy) {
1326+
return nil
1327+
}
1328+
}
1329+
return errors.New("expected some leaseholder to have a latency-based policy but none had one")
1330+
})
1331+
1332+
// Sender does not see the cluster version upgrade yet. Replicas should fall
1333+
// back to no-latency based policies when side transport senders consult with
1334+
// leaseholders on policies to be sent.
1335+
require.Never(t, func() bool {
1336+
snapshot := store.GetStoreConfig().ClosedTimestampSender.GetSnapshot()
1337+
expected := !hasLatencyBasedPolicies(snapshot) || len(snapshot.AddedOrUpdated) == 0
1338+
return !expected
1339+
}, 10*time.Second, 50*time.Millisecond)
1340+
}

pkg/server/server.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -678,8 +678,15 @@ func NewServer(cfg Config, stopper *stop.Stopper) (serverctl.ServerStartupInterf
678678

679679
ctSender := sidetransport.NewSender(stopper, st, clock, kvNodeDialer)
680680
ctReceiver := sidetransport.NewReceiver(nodeIDContainer, stopper, stores, nil /* testingKnobs */)
681-
policyRefresher := policyrefresher.NewPolicyRefresher(stopper, st, ctSender.GetLeaseholders,
682-
rpcContext.RemoteClocks.AllLatencies)
681+
var policyRefresher *policyrefresher.PolicyRefresher
682+
{
683+
var knobs *policyrefresher.TestingKnobs
684+
if policyRefresherKnobs := cfg.TestingKnobs.PolicyRefresherTestingKnobs; policyRefresherKnobs != nil {
685+
knobs = policyRefresherKnobs.(*policyrefresher.TestingKnobs)
686+
}
687+
policyRefresher = policyrefresher.NewPolicyRefresher(stopper, st, ctSender.GetLeaseholders,
688+
rpcContext.RemoteClocks.AllLatencies, knobs)
689+
}
683690

684691
// The Executor will be further initialized later, as we create more
685692
// of the server's components. There's a circular dependency - many things

0 commit comments

Comments
 (0)