Skip to content

Commit 7a8a7c9

Browse files
committed
kvserver: add PolicyRefresher.TestingKnobs
This commit adds PolicyRefresher.TestingKnobs, which enables testing race scenarios where the sender hasn't observed a cluster version upgrade when creating the policy map, but the PolicyRefresher later does. Epic: none Release note: none
1 parent 89e3036 commit 7a8a7c9

File tree

7 files changed

+54
-9
lines changed

7 files changed

+54
-9
lines changed

pkg/base/testing_knobs.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,4 +57,5 @@ type TestingKnobs struct {
5757
TableMetadata ModuleTestingKnobs
5858
LicenseTestingKnobs ModuleTestingKnobs
5959
VecIndexTestingKnobs ModuleTestingKnobs
60+
PolicyRefresherTestingKnobs ModuleTestingKnobs
6061
}

pkg/kv/kvserver/closedts/policyrefresher/BUILD.bazel

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
22

33
go_library(
44
name = "policyrefresher",
5-
srcs = ["policy_refresher.go"],
5+
srcs = [
6+
"config.go",
7+
"policy_refresher.go",
8+
],
69
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/policyrefresher",
710
visibility = ["//visibility:public"],
811
deps = [
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package policyrefresher
7+
8+
import (
9+
"time"
10+
11+
"github.com/cockroachdb/cockroach/pkg/roachpb"
12+
)
13+
14+
// TestingKnobs contains testing knobs for the policy refresher.
15+
type TestingKnobs struct {
16+
// InjectedLatencies returns a callback that returns a map of node IDs to
17+
// latencies that will be used by the policy refresher instead of observed
18+
// latencies from rpc context without checking for cluster settings or
19+
// version compatibility.
20+
InjectedLatencies func() map[roachpb.NodeID]time.Duration
21+
}
22+
23+
// TestingKnobs implements the ModuleTestingKnobs interface.
24+
func (*TestingKnobs) ModuleTestingKnobs() {}

pkg/kv/kvserver/closedts/policyrefresher/policy_refresher.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
type PolicyRefresher struct {
3030
stopper *stop.Stopper
3131
settings *cluster.Settings
32+
knobs *TestingKnobs
3233

3334
// getLeaseholderReplicas returns the set of replicas that are currently
3435
// leaseholders of the node.
@@ -64,6 +65,7 @@ func NewPolicyRefresher(
6465
settings *cluster.Settings,
6566
getLeaseholderReplicas func() []Replica,
6667
getNodeLatencies func() map[roachpb.NodeID]time.Duration,
68+
knobs *TestingKnobs,
6769
) *PolicyRefresher {
6870
if getLeaseholderReplicas == nil || getNodeLatencies == nil {
6971
log.Fatalf(context.Background(), "getLeaseholderReplicas and getNodeLatencies must be non-nil")
@@ -72,6 +74,7 @@ func NewPolicyRefresher(
7274
refresher := &PolicyRefresher{
7375
stopper: stopper,
7476
settings: settings,
77+
knobs: knobs,
7578
getLeaseholderReplicas: getLeaseholderReplicas,
7679
getNodeLatencies: getNodeLatencies,
7780
refreshNotificationCh: make(chan struct{}, 1),
@@ -127,6 +130,10 @@ func (pr *PolicyRefresher) updateLatencyCache() {
127130
// getCurrentLatencies returns the current latency information if auto-tuning is
128131
// enabled and the cluster has been fully upgraded to v25.2, or nil otherwise.
129132
func (pr *PolicyRefresher) getCurrentLatencies() map[roachpb.NodeID]time.Duration {
133+
// Testing knobs only.
134+
if pr.knobs != nil && pr.knobs.InjectedLatencies != nil {
135+
return pr.knobs.InjectedLatencies()
136+
}
130137
if !closedts.LeadForGlobalReadsAutoTuneEnabled.Get(&pr.settings.SV) || !pr.settings.Version.IsActive(context.TODO(), clusterversion.V25_2) {
131138
return nil
132139
}

pkg/kv/kvserver/closedts/policyrefresher/policy_refresher_test.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@ func newNoopPolicyRefresher(stopper *stop.Stopper, settings *cluster.Settings) *
2929
func() []Replica { return nil },
3030
func() map[roachpb.NodeID]time.Duration {
3131
return nil
32-
})
32+
},
33+
nil,
34+
)
3335
}
3436

3537
type mockSpanConfig struct {
@@ -134,7 +136,7 @@ func TestPolicyRefreshOnRefreshIntervalUpdate(t *testing.T) {
134136
getLeaseholders := func() []Replica { return []Replica{r} }
135137
getLatencies := func() map[roachpb.NodeID]time.Duration { return nil }
136138

137-
pr := NewPolicyRefresher(stopper, st, getLeaseholders, getLatencies)
139+
pr := NewPolicyRefresher(stopper, st, getLeaseholders, getLatencies, nil)
138140
require.NotNil(t, pr)
139141

140142
// Start the refresher.
@@ -222,7 +224,7 @@ func TestPolicyRefresherOnLatencyIntervalUpdate(t *testing.T) {
222224
closedts.RangeClosedTimestampPolicyLatencyRefreshInterval.Override(
223225
ctx, &st.SV, 1*time.Hour)
224226

225-
pr := NewPolicyRefresher(stopper, st, getLeaseholders, getLatencies)
227+
pr := NewPolicyRefresher(stopper, st, getLeaseholders, getLatencies, nil)
226228
require.NotNil(t, pr)
227229
pr.Run(ctx)
228230

pkg/kv/kvserver/closedts/sidetransport/sender_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -415,10 +415,8 @@ func TestSenderWithLatencyTracker(t *testing.T) {
415415
}
416416
}
417417

418-
// Add a leaseholder with replicas in different regions.
419-
r := newMockReplica(15, ctpb.LEAD_FOR_GLOBAL_READS_WITH_NO_LATENCY_INFO, 1, 2, 3)
420418
s, stopper := newMockSender(connFactory)
421-
policyRefresher := policyrefresher.NewPolicyRefresher(stopper, st, s.GetLeaseholders, getLatencyFn)
419+
policyRefresher := policyrefresher.NewPolicyRefresher(stopper, st, s.GetLeaseholders, getLatencyFn, nil)
422420
defer stopper.Stop(ctx)
423421
policyRefresher.Run(ctx)
424422

@@ -439,6 +437,9 @@ func TestSenderWithLatencyTracker(t *testing.T) {
439437
require.Nil(t, up.Removed)
440438
require.Nil(t, up.AddedOrUpdated)
441439

440+
// Add a leaseholder with replicas in different regions.
441+
r := newMockReplica(15, ctpb.LEAD_FOR_GLOBAL_READS_WITH_NO_LATENCY_INFO, 1, 2, 3)
442+
442443
// Verify policy updates when adding a leaseholder with far-away replicas.
443444
s.RegisterLeaseholder(ctx, r, 1)
444445
testutils.SucceedsSoon(t, func() error {

pkg/server/server.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -678,8 +678,15 @@ func NewServer(cfg Config, stopper *stop.Stopper) (serverctl.ServerStartupInterf
678678

679679
ctSender := sidetransport.NewSender(stopper, st, clock, kvNodeDialer)
680680
ctReceiver := sidetransport.NewReceiver(nodeIDContainer, stopper, stores, nil /* testingKnobs */)
681-
policyRefresher := policyrefresher.NewPolicyRefresher(stopper, st, ctSender.GetLeaseholders,
682-
rpcContext.RemoteClocks.AllLatencies)
681+
var policyRefresher *policyrefresher.PolicyRefresher
682+
{
683+
var knobs *policyrefresher.TestingKnobs
684+
if policyRefresherKnobs := cfg.TestingKnobs.PolicyRefresherTestingKnobs; policyRefresherKnobs != nil {
685+
knobs = policyRefresherKnobs.(*policyrefresher.TestingKnobs)
686+
}
687+
policyRefresher = policyrefresher.NewPolicyRefresher(stopper, st, ctSender.GetLeaseholders,
688+
rpcContext.RemoteClocks.AllLatencies, knobs)
689+
}
683690

684691
// The Executor will be further initialized later, as we create more
685692
// of the server's components. There's a circular dependency - many things

0 commit comments

Comments
 (0)