@@ -40,19 +40,25 @@ type mockSpanConfig struct {
4040
4141type mockReplica struct {
4242 // Note that all fields below are protected by mu.
43- mu syncutil.Mutex
44- conf mockSpanConfig
45- policy ctpb.RangeClosedTimestampPolicy
43+ mu syncutil.Mutex
44+ conf mockSpanConfig
45+ policy ctpb.RangeClosedTimestampPolicy
46+ furthestNodeID roachpb.NodeID
4647}
4748
48- func (m * mockReplica ) RefreshPolicy (_ map [roachpb.NodeID ]time.Duration ) {
49+ func (m * mockReplica ) RefreshPolicy (latencies map [roachpb.NodeID ]time.Duration ) {
4950 m .mu .Lock ()
5051 defer m .mu .Unlock ()
51- if m .conf .isGlobalRead {
52- m .policy = ctpb .LEAD_FOR_GLOBAL_READS_WITH_NO_LATENCY_INFO
53- } else {
52+ if ! m .conf .isGlobalRead {
5453 m .policy = ctpb .LAG_BY_CLUSTER_SETTING
54+ return
55+ }
56+ latency , ok := latencies [m .furthestNodeID ]
57+ if ! ok {
58+ m .policy = ctpb .LEAD_FOR_GLOBAL_READS_WITH_NO_LATENCY_INFO
59+ return
5560 }
61+ m .policy = closedts .FindBucketBasedOnNetworkRTT (latency )
5662}
5763
5864func (m * mockReplica ) GetPolicy () ctpb.RangeClosedTimestampPolicy {
@@ -61,6 +67,12 @@ func (m *mockReplica) GetPolicy() ctpb.RangeClosedTimestampPolicy {
6167 return m .policy
6268}
6369
70+ func (m * mockReplica ) SetFurthestNodeID (nodeID roachpb.NodeID ) {
71+ m .mu .Lock ()
72+ defer m .mu .Unlock ()
73+ m .furthestNodeID = nodeID
74+ }
75+
6476func (m * mockReplica ) BlockReplica () (unblock func ()) {
6577 m .mu .Lock ()
6678 var once sync.Once
@@ -243,3 +255,65 @@ func TestPolicyRefresherOnLatencyIntervalUpdate(t *testing.T) {
243255 return errors .New ("expected latency update" )
244256 })
245257}
258+
259+ // TestPolicyRefresherWithLatencies tests that the policy refresher correctly
260+ // updates policies based on latency information from different nodes.
261+ func TestPolicyRefresherWithLatencies (t * testing.T ) {
262+ defer leaktest .AfterTest (t )()
263+
264+ ctx := context .Background ()
265+ stopper := stop .NewStopper ()
266+ defer stopper .Stop (ctx )
267+ st := cluster .MakeTestingClusterSettings ()
268+ closedts .LeadForGlobalReadsAutoTuneEnabled .Override (ctx , & st .SV , true )
269+
270+ // Create a policy refresher with test latencies.
271+ latencies := map [roachpb.NodeID ]time.Duration {
272+ 1 : 10 * time .Millisecond ,
273+ 2 : 50 * time .Millisecond ,
274+ 3 : 90 * time .Millisecond ,
275+ }
276+
277+ r := & mockReplica {}
278+ // Configure replica to not use global reads span config initially.
279+ r .SetSpanConfig (false )
280+
281+ getLeaseholders := func () []Replica { return []Replica {r } }
282+ getLatencies := func () map [roachpb.NodeID ]time.Duration { return latencies }
283+
284+ pr := NewPolicyRefresher (stopper , st , getLeaseholders , getLatencies , nil )
285+ require .NotNil (t , pr )
286+ require .Nil (t , pr .getCurrentLatencies ())
287+
288+ // Initially, the policy should be LAG_BY_CLUSTER_SETTING.
289+ require .Equal (t , r .GetPolicy (), ctpb .LAG_BY_CLUSTER_SETTING )
290+
291+ // Enable global reads and set furthest node to 1. We should get
292+ // LEAD_FOR_GLOBAL_READS_WITH_NO_LATENCY_INFO policy before the cache update.
293+ r .SetSpanConfig (true )
294+ r .SetFurthestNodeID (1 )
295+ pr .refreshPolicies ([]Replica {r })
296+ require .Equal (t , r .GetPolicy (), ctpb .LEAD_FOR_GLOBAL_READS_WITH_NO_LATENCY_INFO )
297+
298+ // Update latency cache and verify we now get <20ms policy for node 1.
299+ pr .updateLatencyCache ()
300+ require .NotNil (t , pr .getCurrentLatencies ())
301+ pr .refreshPolicies ([]Replica {r })
302+ require .Equal (t , r .GetPolicy (), ctpb .LEAD_FOR_GLOBAL_READS_LATENCY_LESS_THAN_20MS )
303+
304+ // Set furthest node to 2 and verify we get <60ms policy.
305+ r .SetFurthestNodeID (2 )
306+ pr .refreshPolicies ([]Replica {r })
307+ require .Equal (t , r .GetPolicy (), ctpb .LEAD_FOR_GLOBAL_READS_LATENCY_LESS_THAN_60MS )
308+
309+ // Set furthest node to 3 and verify we get <100ms policy.
310+ r .SetFurthestNodeID (3 )
311+ pr .refreshPolicies ([]Replica {r })
312+ require .Equal (t , r .GetPolicy (), ctpb .LEAD_FOR_GLOBAL_READS_LATENCY_LESS_THAN_100MS )
313+
314+ // Update node 3's latency to 300ms and verify we get >=300ms policy.
315+ latencies [3 ] = 300 * time .Millisecond
316+ pr .updateLatencyCache ()
317+ pr .refreshPolicies ([]Replica {r })
318+ require .Equal (t , r .GetPolicy (), ctpb .LEAD_FOR_GLOBAL_READS_LATENCY_EQUAL_OR_GREATER_THAN_300MS )
319+ }
0 commit comments