Skip to content

Commit 18ed0b4

Browse files
committed
closedts: add the pacing to the closed timestamp lead policy calculation
This commit adds the pacing interval to the closed timestamp lead policy calculation, this is important because we pace side transport messages, and that means that some nodes might not receive the closed timestamp update by an extra duration (the pacing interval). Fixes: #148211 Release note: None
1 parent 081128d commit 18ed0b4

File tree

7 files changed

+42
-14
lines changed

7 files changed

+42
-14
lines changed

pkg/kv/kvserver/closedts/policy.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,10 @@ const (
2222
// global reads. It accounts for network latency, clock offset, and both Raft
2323
// and side-transport propagation delays.
2424
func computeLeadTimeForGlobalReads(
25-
networkRTT time.Duration, maxClockOffset time.Duration, sideTransportCloseInterval time.Duration,
25+
networkRTT time.Duration,
26+
maxClockOffset time.Duration,
27+
sideTransportCloseInterval time.Duration,
28+
sideTransportPacingInterval time.Duration,
2629
) time.Duration {
2730
// The LEAD_FOR_GLOBAL_READS calculation is more complex. Instead of the
2831
// policy defining an offset from the publisher's perspective, the
@@ -61,10 +64,13 @@ func computeLeadTimeForGlobalReads(
6164
// # the worst-case.
6265
// side_propagation_time = max_network_rtt * 0.5 + side_transport_close_interval
6366
//
67+
// # We pace sending side-transport updates up to side_transport_pacing_refresh_interval.
68+
// # We have to also include it into our calculation.
69+
//
6470
// # Combine, we get the following result
6571
// closed_ts_at_sender = now + max_offset + max(
6672
// max_network_rtt * 1.5 + raft_overhead,
67-
// max_network_rtt * 0.5 + side_transport_close_interval,
73+
// max_network_rtt * 0.5 + side_transport_close_interval + side_transport_pacing_refresh_interval,
6874
// )
6975
//
7076
// By default, this leads to a closed timestamp target that leads the
@@ -86,7 +92,7 @@ func computeLeadTimeForGlobalReads(
8692
raftTransportPropTime := (networkRTT*3)/2 + raftTransportOverhead
8793

8894
// See side_propagation_time.
89-
sideTransportPropTime := networkRTT/2 + sideTransportCloseInterval
95+
sideTransportPropTime := networkRTT/2 + sideTransportCloseInterval + sideTransportPacingInterval
9096

9197
// See propagation_time.
9298
maxTransportPropTime := max(sideTransportPropTime, raftTransportPropTime)
@@ -106,6 +112,7 @@ func TargetForPolicy(
106112
lagTargetDuration time.Duration,
107113
leadTargetOverride time.Duration,
108114
sideTransportCloseInterval time.Duration,
115+
sideTransportPacingInterval time.Duration,
109116
policy ctpb.RangeClosedTimestampPolicy,
110117
) hlc.Timestamp {
111118
var targetOffsetTime time.Duration
@@ -121,7 +128,7 @@ func TargetForPolicy(
121128
break
122129
}
123130
targetOffsetTime = computeLeadTimeForGlobalReads(computeNetworkRTTBasedOnPolicy(policy),
124-
maxClockOffset, sideTransportCloseInterval)
131+
maxClockOffset, sideTransportCloseInterval, sideTransportPacingInterval)
125132
default:
126133
panic("unexpected RangeClosedTimestampPolicy")
127134
}

pkg/kv/kvserver/closedts/policy_test.go

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,12 @@ func TestTargetForPolicy(t *testing.T) {
2828
maxClockOffset := millis(500)
2929

3030
for _, tc := range []struct {
31-
lagTargetNanos time.Duration
32-
leadTargetOverride time.Duration
33-
sideTransportCloseInterval time.Duration
34-
rangePolicy ctpb.RangeClosedTimestampPolicy
35-
expClosedTSTarget hlc.Timestamp
31+
lagTargetNanos time.Duration
32+
leadTargetOverride time.Duration
33+
sideTransportCloseInterval time.Duration
34+
sideTransportPacingInterval time.Duration
35+
rangePolicy ctpb.RangeClosedTimestampPolicy
36+
expClosedTSTarget hlc.Timestamp
3637
}{
3738
{
3839
lagTargetNanos: secs(3),
@@ -52,6 +53,16 @@ func TestTargetForPolicy(t *testing.T) {
5253
millis(275) /* sideTransportPropTime */ +
5354
millis(25) /* bufferTime */).Nanoseconds(), 0),
5455
},
56+
{
57+
sideTransportCloseInterval: millis(200),
58+
sideTransportPacingInterval: millis(10),
59+
rangePolicy: ctpb.LEAD_FOR_GLOBAL_READS_WITH_NO_LATENCY_INFO,
60+
expClosedTSTarget: now.
61+
Add((maxClockOffset +
62+
millis(275) /* sideTransportPropTime */ +
63+
millis(10) /* sideTransportPacing */ +
64+
millis(25) /* bufferTime */).Nanoseconds(), 0),
65+
},
5566
{
5667
sideTransportCloseInterval: millis(50),
5768
rangePolicy: ctpb.LEAD_FOR_GLOBAL_READS_WITH_NO_LATENCY_INFO,
@@ -61,10 +72,11 @@ func TestTargetForPolicy(t *testing.T) {
6172
millis(25) /* bufferTime */).Nanoseconds(), 0),
6273
},
6374
{
64-
leadTargetOverride: millis(1234),
65-
sideTransportCloseInterval: millis(200),
66-
rangePolicy: ctpb.LEAD_FOR_GLOBAL_READS_WITH_NO_LATENCY_INFO,
67-
expClosedTSTarget: now.Add(millis(1234).Nanoseconds(), 0),
75+
leadTargetOverride: millis(1234),
76+
sideTransportCloseInterval: millis(200),
77+
sideTransportPacingInterval: millis(10),
78+
rangePolicy: ctpb.LEAD_FOR_GLOBAL_READS_WITH_NO_LATENCY_INFO,
79+
expClosedTSTarget: now.Add(millis(1234).Nanoseconds(), 0),
6880
},
6981
} {
7082
t.Run("", func(t *testing.T) {
@@ -74,6 +86,7 @@ func TestTargetForPolicy(t *testing.T) {
7486
tc.lagTargetNanos,
7587
tc.leadTargetOverride,
7688
tc.sideTransportCloseInterval,
89+
tc.sideTransportPacingInterval,
7790
tc.rangePolicy,
7891
)
7992
require.Equal(t, tc.expClosedTSTarget, target)

pkg/kv/kvserver/closedts/sidetransport/sender.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -368,13 +368,15 @@ func (s *Sender) publish(ctx context.Context) hlc.ClockTimestamp {
368368
lagTargetDuration := closedts.TargetDuration.Get(&s.st.SV)
369369
leadTargetOverride := closedts.LeadForGlobalReadsOverride.Get(&s.st.SV)
370370
sideTransportCloseInterval := closedts.SideTransportCloseInterval.Get(&s.st.SV)
371+
sideTransportPacingInterval := closedts.SideTransportPacingRefreshInterval.Get(&s.st.SV)
371372
for pol := ctpb.RangeClosedTimestampPolicy(0); pol < ctpb.RangeClosedTimestampPolicy(numPolicies); pol++ {
372373
target := closedts.TargetForPolicy(
373374
now,
374375
maxClockOffset,
375376
lagTargetDuration,
376377
leadTargetOverride,
377378
sideTransportCloseInterval,
379+
sideTransportPacingInterval,
378380
pol,
379381
)
380382
s.trackedMu.lastClosed[pol] = target

pkg/kv/kvserver/closedts/sidetransport/sender_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,7 @@ func expGroupUpdates(s *Sender, now hlc.ClockTimestamp) []ctpb.Update_GroupUpdat
190190
closedts.TargetDuration.Get(&s.st.SV),
191191
closedts.LeadForGlobalReadsOverride.Get(&s.st.SV),
192192
closedts.SideTransportCloseInterval.Get(&s.st.SV),
193+
closedts.SideTransportPacingRefreshInterval.Get(&s.st.SV),
193194
pol,
194195
)
195196
}

pkg/kv/kvserver/replica_closedts.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@ func (r *Replica) closedTimestampTargetRLocked() hlc.Timestamp {
155155
closedts.TargetDuration.Get(&r.ClusterSettings().SV),
156156
closedts.LeadForGlobalReadsOverride.Get(&r.ClusterSettings().SV),
157157
closedts.SideTransportCloseInterval.Get(&r.ClusterSettings().SV),
158+
closedts.SideTransportPacingRefreshInterval.Get(&r.ClusterSettings().SV),
158159
closedTimestampPolicy(r.descRLocked(), *r.cachedClosedTimestampPolicy.Load()),
159160
)
160161
}

pkg/kv/kvserver/replica_proposal_buf_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ func (t *testProposer) closedTimestampTarget() hlc.Timestamp {
174174
1*time.Second,
175175
0,
176176
200*time.Millisecond,
177+
10*time.Millisecond,
177178
t.rangePolicy,
178179
)
179180
}
@@ -921,6 +922,7 @@ func TestProposalBufferClosedTimestamp(t *testing.T) {
921922
nowMinusTwiceClosedLag := nowTS.Add(-2*closedts.TargetDuration.Get(&st.SV).Nanoseconds(), 0)
922923
nowPlusGlobalReadLead := nowTS.Add((maxOffset +
923924
275*time.Millisecond /* sideTransportPropTime */ +
925+
10*time.Millisecond /* sideTransportPacing */ +
924926
25*time.Millisecond /* bufferTime */).Nanoseconds(), 0)
925927
expiredLeaseTimestamp := nowTS.Add(-1000, 0)
926928
someClosedTS := nowTS.Add(-2000, 0)

pkg/kv/txn.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -969,12 +969,14 @@ func (txn *Txn) DeadlineLikelySufficient() bool {
969969
lagTargetDuration := closedts.TargetDuration.Get(sv)
970970
leadTargetOverride := closedts.LeadForGlobalReadsOverride.Get(sv)
971971
sideTransportCloseInterval := closedts.SideTransportCloseInterval.Get(sv)
972+
sideTransportPacingInterval := closedts.SideTransportPacingRefreshInterval.Get(sv)
973+
972974
// Pass the DefaultMaxNetworkRTT regardless of leadTargetAutoTune because we
973975
// don't have a good way to estimate the network RTT here. We choose to be
974976
// more conservative as this is just for an optimization if the deadline is
975977
// far in the future. Missing the optimization is not a big deal.
976978
return closedts.TargetForPolicy(now, maxClockOffset,
977-
lagTargetDuration, leadTargetOverride, sideTransportCloseInterval,
979+
lagTargetDuration, leadTargetOverride, sideTransportCloseInterval, sideTransportPacingInterval,
978980
ctpb.LEAD_FOR_GLOBAL_READS_WITH_NO_LATENCY_INFO).Add(int64(time.Second), 0)
979981
}
980982

0 commit comments

Comments
 (0)