Skip to content

Commit 55bde38

Browse files
committed
kvserver: add kv.closed_timestamp.policy_switch_latency_bucket_exceed_threshold
This commit introduces a new cluster setting: kv.closed_timestamp.policy_switch_latency_bucket_exceed_threshold. It defines the fraction of the closed timestamp policy bucket width that must be exceeded before a policy switch is triggered. This helps prevent aggressive policy changes for ranges near bucket boundaries, reducing excessive updates sent via side transport. Part of: #143890 Release note: none Epic: none
1 parent dc26ba0 commit 55bde38

File tree

4 files changed

+305
-2
lines changed

4 files changed

+305
-2
lines changed

pkg/kv/kvserver/closedts/policy_calculation.go

Lines changed: 83 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,90 @@ import (
1414
)
1515

1616
// FindBucketBasedOnNetworkRTT maps a network RTT to a closed timestamp policy
17-
// bucket.
17+
// with zero dampening.
1818
func FindBucketBasedOnNetworkRTT(networkRTT time.Duration) ctpb.RangeClosedTimestampPolicy {
19+
return FindBucketBasedOnNetworkRTTWithDampening(ctpb.LEAD_FOR_GLOBAL_READS_WITH_NO_LATENCY_INFO, networkRTT, 0)
20+
}
21+
22+
// FindBucketBasedOnNetworkRTTWithDampening calculates a new closed timestamp policy
23+
// based on the old policy, the network RTT, and a boundary percentage.
24+
//
25+
// 1. If old policy or new policy is LEAD_FOR_GLOBAL_READS_WITH_NO_LATENCY_INFO,
26+
// the new policy is returned.
27+
//
28+
// 2. If new policy jumps to a non-adjacent bucket, the new policy is returned.
29+
//
30+
// 3. If dampening is 0 or policy is unchanged, the new policy is returned.
31+
//
32+
// 4. Otherwise, the new policy is returned if and only if the network RTT has
33+
// crossed the boundary of the new policy.
34+
//
35+
// Policy change diagram with 20% boundary between two adjacent buckets:
36+
//
37+
// Example: boundaryPercent = 20%
38+
// Case 1: Moving to higher latency bucket (old policy < new policy)
39+
// RTT (ms) |----------20ms----------|----------40ms----------|
40+
// Policy | <20ms bucket | <40ms bucket |
41+
//
42+
// ^
43+
// |--4ms--| RTT must be >=
44+
// | (40ms + 20ms*20%) = 44ms
45+
// | to move to <40ms bucket
46+
//
47+
// Case 2: Moving to lower latency bucket (old policy > new policy)
48+
// RTT (ms) |----------20ms----------|----------40ms----------|
49+
// Policy | <20ms bucket | <40ms bucket |
50+
//
51+
// ^
52+
// RTT must go below |--4ms--|
53+
// (20ms - 20ms*20%) = 16ms |
54+
// to move to <20ms bucket |
55+
func FindBucketBasedOnNetworkRTTWithDampening(
56+
oldPolicy ctpb.RangeClosedTimestampPolicy, networkRTT time.Duration, boundaryPercent float64,
57+
) ctpb.RangeClosedTimestampPolicy {
58+
// Calculate the new policy based on network RTT.
59+
newPolicy := findBucketBasedOnNetworkRTT(networkRTT)
60+
61+
if newPolicy == ctpb.LEAD_FOR_GLOBAL_READS_WITH_NO_LATENCY_INFO ||
62+
oldPolicy == ctpb.LEAD_FOR_GLOBAL_READS_WITH_NO_LATENCY_INFO || boundaryPercent == 0 {
63+
return newPolicy
64+
}
65+
66+
// Apply the new policy if policy is unchanged, or if there's a non-adjacent
67+
// bucket jump.
68+
if newPolicy == oldPolicy || math.Abs(float64(newPolicy-oldPolicy)) > 1 {
69+
return newPolicy
70+
}
71+
72+
// Calculate bucket number by subtracting base policy and adjusting for
73+
// zero-based indexing.
74+
bucket := int(newPolicy) - int(ctpb.LEAD_FOR_GLOBAL_READS_WITH_NO_LATENCY_INFO) - 1
75+
intervalNanos := float64(closedTimestampPolicyBucketWidth.Nanoseconds())
76+
switch {
77+
case oldPolicy < newPolicy:
78+
// The new policy has a higher latency threshold. Only switch to the
79+
// higher latency bucket if the RTT exceeds the bucket boundary.
80+
higherLatencyBucketThreshold := time.Duration((float64(bucket) + boundaryPercent) * intervalNanos)
81+
if networkRTT >= higherLatencyBucketThreshold {
82+
return newPolicy
83+
}
84+
return oldPolicy
85+
case oldPolicy > newPolicy:
86+
// The new policy has a lower latency threshold. Only switch to the lower
87+
// latency bucket if the RTT is below the bucket boundary.
88+
lowerLatencyBucketThreshold := time.Duration((float64(bucket) + 1 - boundaryPercent) * intervalNanos)
89+
if networkRTT < lowerLatencyBucketThreshold {
90+
return newPolicy
91+
}
92+
return oldPolicy
93+
default:
94+
panic("unexpected condition")
95+
}
96+
}
97+
98+
// findBucketBasedOnNetworkRTT maps a network RTT to a closed timestamp policy
99+
// bucket.
100+
func findBucketBasedOnNetworkRTT(networkRTT time.Duration) ctpb.RangeClosedTimestampPolicy {
19101
// If maxLatency is negative (i.e. no peer latency is provided), return
20102
// LEAD_FOR_GLOBAL_READS_WITH_NO_LATENCY_INFO
21103
if networkRTT < 0 {

pkg/kv/kvserver/closedts/policy_calculation_test.go

Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -360,6 +360,15 @@ func TestNetworkRTTAndPolicyCalculations(t *testing.T) {
360360
"expected policy %v for RTT %v, got %v",
361361
tc.expectedPolicy, tc.networkRTT, policy)
362362

363+
// Test RTT -> Policy with 0 percent dampening. We expect the same outcome
364+
// as FindBucketBasedOnNetworkRTT regardless of oldPolicy.
365+
for oldPolicy := ctpb.LEAD_FOR_GLOBAL_READS_WITH_NO_LATENCY_INFO; oldPolicy <= ctpb.LEAD_FOR_GLOBAL_READS_LATENCY_EQUAL_OR_GREATER_THAN_300MS; oldPolicy++ {
366+
newPolicy := FindBucketBasedOnNetworkRTTWithDampening(oldPolicy, tc.networkRTT, 0)
367+
require.Equal(t, tc.expectedPolicy, newPolicy,
368+
"expected policy %v for RTT %v, got %v",
369+
tc.expectedPolicy, tc.networkRTT, policy)
370+
}
371+
363372
// Test Policy -> RTT conversion.
364373
rtt := computeNetworkRTTBasedOnPolicy(policy)
365374
require.Equal(t, tc.expectedRTT, rtt,
@@ -368,3 +377,198 @@ func TestNetworkRTTAndPolicyCalculations(t *testing.T) {
368377
})
369378
}
370379
}
380+
381+
// TestRefreshPolicyWithDampening tests the RefreshPolicy method of
382+
// replica.RefreshPolicy works expectedly with different dampening fractions.
383+
func TestRefreshPolicyWithDampening(t *testing.T) {
384+
defer leaktest.AfterTest(t)()
385+
defer log.Scope(t).Close(t)
386+
387+
testCases := []struct {
388+
name string
389+
dampeningFraction float64
390+
oldPolicy ctpb.RangeClosedTimestampPolicy
391+
networkRTT time.Duration
392+
expectedPolicy ctpb.RangeClosedTimestampPolicy
393+
}{
394+
{
395+
name: "from no latency info to low latency",
396+
dampeningFraction: 0.2,
397+
oldPolicy: ctpb.LEAD_FOR_GLOBAL_READS_WITH_NO_LATENCY_INFO,
398+
networkRTT: 10 * time.Millisecond,
399+
expectedPolicy: ctpb.LEAD_FOR_GLOBAL_READS_LATENCY_LESS_THAN_20MS,
400+
},
401+
{
402+
name: "from low latency to no latency info",
403+
dampeningFraction: 0.2,
404+
oldPolicy: ctpb.LEAD_FOR_GLOBAL_READS_LATENCY_LESS_THAN_20MS,
405+
networkRTT: -1 * time.Millisecond,
406+
expectedPolicy: ctpb.LEAD_FOR_GLOBAL_READS_WITH_NO_LATENCY_INFO,
407+
},
408+
{
409+
name: "latency increases but below the lower bound threshold",
410+
dampeningFraction: 0.2,
411+
oldPolicy: ctpb.LEAD_FOR_GLOBAL_READS_LATENCY_LESS_THAN_40MS,
412+
// 42ms is above 40ms but below the 40+20ms*0.2=44ms boundary.
413+
networkRTT: 42 * time.Millisecond,
414+
expectedPolicy: ctpb.LEAD_FOR_GLOBAL_READS_LATENCY_LESS_THAN_40MS,
415+
},
416+
{
417+
name: "latency increases and above the lower bound threshold",
418+
dampeningFraction: 0.2,
419+
oldPolicy: ctpb.LEAD_FOR_GLOBAL_READS_LATENCY_LESS_THAN_40MS,
420+
// 44ms is above the 40+20ms*0.2=44ms boundary.
421+
networkRTT: 44 * time.Millisecond,
422+
expectedPolicy: ctpb.LEAD_FOR_GLOBAL_READS_LATENCY_LESS_THAN_60MS,
423+
},
424+
{
425+
name: "latency increases to next bucket and above its upper bound threshold",
426+
dampeningFraction: 0.2,
427+
oldPolicy: ctpb.LEAD_FOR_GLOBAL_READS_LATENCY_LESS_THAN_20MS,
428+
// 38 is above 20ms+20*0.2=24ms and above the 40-20*0.2=36ms threshold.
429+
networkRTT: 38 * time.Millisecond,
430+
expectedPolicy: ctpb.LEAD_FOR_GLOBAL_READS_LATENCY_LESS_THAN_40MS,
431+
},
432+
{
433+
name: "latency drops to previous bucket but above the upper bound threshold",
434+
dampeningFraction: 0.2,
435+
oldPolicy: ctpb.LEAD_FOR_GLOBAL_READS_LATENCY_LESS_THAN_40MS,
436+
// 18ms is below 20ms but above the 20ms-20ms*0.2=16ms boundary.
437+
networkRTT: 18 * time.Millisecond,
438+
expectedPolicy: ctpb.LEAD_FOR_GLOBAL_READS_LATENCY_LESS_THAN_40MS,
439+
},
440+
{
441+
name: "latency drops to previous bucket and below the upper bound threshold",
442+
dampeningFraction: 0.2,
443+
oldPolicy: ctpb.LEAD_FOR_GLOBAL_READS_LATENCY_LESS_THAN_40MS,
444+
// 14ms is below 20ms and below the 20ms-20ms*0.2=16ms boundary.
445+
networkRTT: 14 * time.Millisecond,
446+
expectedPolicy: ctpb.LEAD_FOR_GLOBAL_READS_LATENCY_LESS_THAN_20MS,
447+
},
448+
{
449+
name: "latency drops to previous bucket and below the lower bound threshold",
450+
dampeningFraction: 0.2,
451+
oldPolicy: ctpb.LEAD_FOR_GLOBAL_READS_LATENCY_LESS_THAN_40MS,
452+
// 3ms is below 20ms and below the 20ms-20ms*0.2=16ms boundary and below 20ms*0.2=5ms.
453+
networkRTT: 3 * time.Millisecond,
454+
expectedPolicy: ctpb.LEAD_FOR_GLOBAL_READS_LATENCY_LESS_THAN_20MS,
455+
},
456+
{
457+
name: "boundary case at 300ms",
458+
dampeningFraction: 0.2,
459+
oldPolicy: ctpb.LEAD_FOR_GLOBAL_READS_LATENCY_LESS_THAN_300MS,
460+
// 300ms is below the 300ms+20ms*0.2=304ms boundary.
461+
networkRTT: 300 * time.Millisecond,
462+
expectedPolicy: ctpb.LEAD_FOR_GLOBAL_READS_LATENCY_LESS_THAN_300MS,
463+
},
464+
{
465+
name: "boundary case at 320ms",
466+
dampeningFraction: 0.2,
467+
oldPolicy: ctpb.LEAD_FOR_GLOBAL_READS_LATENCY_LESS_THAN_300MS,
468+
// 320ms is above the 300ms+20ms*0.2=304ms boundary.
469+
networkRTT: 320 * time.Millisecond,
470+
expectedPolicy: ctpb.LEAD_FOR_GLOBAL_READS_LATENCY_EQUAL_OR_GREATER_THAN_300MS,
471+
},
472+
{
473+
name: "jump to higher bucket case at 600ms",
474+
dampeningFraction: 0.2,
475+
oldPolicy: ctpb.LEAD_FOR_GLOBAL_READS_LATENCY_LESS_THAN_300MS,
476+
// 600ms is above the 300ms+20ms*0.2=304ms boundary.
477+
networkRTT: 600 * time.Millisecond,
478+
expectedPolicy: ctpb.LEAD_FOR_GLOBAL_READS_LATENCY_EQUAL_OR_GREATER_THAN_300MS,
479+
},
480+
// Zero Dampening Cases (Most Sensitive)
481+
{
482+
name: "zero dampening - tiny increase",
483+
dampeningFraction: 0.0,
484+
oldPolicy: ctpb.LEAD_FOR_GLOBAL_READS_LATENCY_LESS_THAN_40MS,
485+
networkRTT: 40 * time.Millisecond, // Tiny increase
486+
expectedPolicy: ctpb.LEAD_FOR_GLOBAL_READS_LATENCY_LESS_THAN_60MS,
487+
},
488+
{
489+
name: "zero dampening - tiny decrease",
490+
dampeningFraction: 0.0,
491+
oldPolicy: ctpb.LEAD_FOR_GLOBAL_READS_LATENCY_LESS_THAN_60MS,
492+
networkRTT: 39 * time.Millisecond,
493+
expectedPolicy: ctpb.LEAD_FOR_GLOBAL_READS_LATENCY_LESS_THAN_40MS,
494+
},
495+
// 100% Dampening Cases (Most Conservative)
496+
{
497+
name: "full dampening - significant increase",
498+
dampeningFraction: 1.0,
499+
oldPolicy: ctpb.LEAD_FOR_GLOBAL_READS_LATENCY_LESS_THAN_40MS,
500+
networkRTT: 58 * time.Millisecond,
501+
expectedPolicy: ctpb.LEAD_FOR_GLOBAL_READS_LATENCY_LESS_THAN_40MS,
502+
},
503+
{
504+
name: "full dampening - multi-bucket jump",
505+
dampeningFraction: 1.0,
506+
oldPolicy: ctpb.LEAD_FOR_GLOBAL_READS_LATENCY_LESS_THAN_40MS,
507+
networkRTT: 60 * time.Millisecond,
508+
expectedPolicy: ctpb.LEAD_FOR_GLOBAL_READS_LATENCY_LESS_THAN_80MS,
509+
},
510+
// 0.001 Dampening Cases (Very Sensitive but not quite zero)
511+
{
512+
name: "0.001 dampening - small increase",
513+
dampeningFraction: 0.001,
514+
oldPolicy: ctpb.LEAD_FOR_GLOBAL_READS_LATENCY_LESS_THAN_40MS,
515+
// Just barely above 40ms + (20ms * 0.001) = 40.02ms.
516+
networkRTT: 41 * time.Millisecond,
517+
expectedPolicy: ctpb.LEAD_FOR_GLOBAL_READS_LATENCY_LESS_THAN_60MS,
518+
},
519+
{
520+
name: "0.001 dampening - small decrease",
521+
dampeningFraction: 0.001,
522+
oldPolicy: ctpb.LEAD_FOR_GLOBAL_READS_LATENCY_LESS_THAN_60MS,
523+
// Just barely below 40ms - (20ms * 0.001) = 39.98ms.
524+
networkRTT: 39 * time.Millisecond,
525+
expectedPolicy: ctpb.LEAD_FOR_GLOBAL_READS_LATENCY_LESS_THAN_40MS,
526+
},
527+
{
528+
name: "0.001 dampening - no change on small increase",
529+
dampeningFraction: 0.001,
530+
oldPolicy: ctpb.LEAD_FOR_GLOBAL_READS_LATENCY_LESS_THAN_40MS,
531+
// Just below 40ms + (20ms * 0.001) = 40.02ms.
532+
networkRTT: 40 * time.Millisecond,
533+
expectedPolicy: ctpb.LEAD_FOR_GLOBAL_READS_LATENCY_LESS_THAN_40MS,
534+
},
535+
{
536+
name: "0.001 dampening - no change on small decrease",
537+
dampeningFraction: 0.001,
538+
oldPolicy: ctpb.LEAD_FOR_GLOBAL_READS_LATENCY_LESS_THAN_60MS,
539+
// Just above 40ms - (20ms * 0.001) = 39.98ms.
540+
networkRTT: time.Duration(39.99 * float64(time.Millisecond)),
541+
expectedPolicy: ctpb.LEAD_FOR_GLOBAL_READS_LATENCY_LESS_THAN_60MS,
542+
},
543+
{
544+
name: "0.001 dampening - boundary at 300ms",
545+
dampeningFraction: 0.001,
546+
oldPolicy: ctpb.LEAD_FOR_GLOBAL_READS_LATENCY_LESS_THAN_300MS,
547+
// Just barely above 300ms + (20ms * 0.001) = 300.02ms.
548+
networkRTT: 301 * time.Millisecond,
549+
expectedPolicy: ctpb.LEAD_FOR_GLOBAL_READS_LATENCY_EQUAL_OR_GREATER_THAN_300MS,
550+
},
551+
{
552+
name: "0.001 dampening - multi-bucket jump to higher latency",
553+
dampeningFraction: 0.001,
554+
oldPolicy: ctpb.LEAD_FOR_GLOBAL_READS_LATENCY_LESS_THAN_40MS,
555+
networkRTT: 100 * time.Millisecond,
556+
expectedPolicy: ctpb.LEAD_FOR_GLOBAL_READS_LATENCY_LESS_THAN_120MS,
557+
},
558+
{
559+
name: "0.001 dampening - multi-bucket jump to lower latency",
560+
dampeningFraction: 0.001,
561+
oldPolicy: ctpb.LEAD_FOR_GLOBAL_READS_LATENCY_LESS_THAN_80MS,
562+
// Above 40ms - (20ms * 0.001) = 39.98ms, but it is a multi-bucket jump.
563+
networkRTT: time.Duration(39.99 * float64(time.Millisecond)),
564+
expectedPolicy: ctpb.LEAD_FOR_GLOBAL_READS_LATENCY_LESS_THAN_40MS,
565+
},
566+
}
567+
568+
for _, tc := range testCases {
569+
t.Run(tc.name, func(t *testing.T) {
570+
newPolicy := FindBucketBasedOnNetworkRTTWithDampening(tc.oldPolicy, tc.networkRTT, tc.dampeningFraction)
571+
require.Equal(t, tc.expectedPolicy, newPolicy)
572+
})
573+
}
574+
}

pkg/kv/kvserver/closedts/setting.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,3 +85,16 @@ var LeadForGlobalReadsAutoTuneEnabled = settings.RegisterBoolSetting(
8585
metamorphic.ConstantWithTestBool("kv.closed_timestamp.lead_for_global_reads_auto_tune.enabled", false),
8686
settings.WithPublic,
8787
)
88+
89+
// PolicySwitchWhenLatencyExceedsBucketFraction determines the threshold for
90+
// changing the closed timestamp policy based on observed latency between
91+
// leaseholders and their furthest follower. This is used to prevent
92+
// frequent changes in the closed timestamp policy when the latency is close
93+
// to the boundary of the policy bucket. By default, this is disabled (0).
94+
var PolicySwitchWhenLatencyExceedsBucketFraction = settings.RegisterFloatSetting(
95+
settings.SystemOnly,
96+
"kv.closed_timestamp.policy_switch_latency_bucket_exceed_threshold",
97+
"the fraction of the closed timestamp policy bucket width which need be "+
98+
"exceeded before the closed timestamp policy will be changed",
99+
0.2,
100+
)

pkg/kv/kvserver/replica.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1373,7 +1373,11 @@ func (r *Replica) RefreshPolicy(latencies map[roachpb.NodeID]time.Duration) {
13731373
}
13741374
maxLatency = max(maxLatency, peerLatency)
13751375
}
1376-
return closedts.FindBucketBasedOnNetworkRTT(maxLatency)
1376+
return closedts.FindBucketBasedOnNetworkRTTWithDampening(
1377+
ctpb.RangeClosedTimestampPolicy(r.cachedClosedTimestampPolicy.Load()),
1378+
maxLatency,
1379+
closedts.PolicySwitchWhenLatencyExceedsBucketFraction.Get(&r.store.GetStoreConfig().Settings.SV),
1380+
)
13771381
}
13781382
r.cachedClosedTimestampPolicy.Store(int32(policy()))
13791383
}

0 commit comments

Comments
 (0)