Skip to content

Commit 9e89769

Browse files
craig[bot]iskettaneh
andcommitted
Merge #154481
154481: sidetransport: pace sidetransport sender messages r=iskettaneh a=iskettaneh This PR paces sending sidetransport updates to all nodes. This way, we will have a lower number of runnable goroutines at any given time. Before this change, you can see how at fixed intervals, multiple goroutines unblock at the exact same time: <img width="951" height="489" alt="image" src="https://github.com/user-attachments/assets/6b225ed1-17a7-4a2d-9794-8b175818a200" /> After this change, that unblocking happens in a slightly staggered fashion: <img width="957" height="584" alt="image" src="https://github.com/user-attachments/assets/7fe7b21f-dbd9-454c-ad9e-2e698db86754" /> This showed a significant reduction in the number of runnable goroutines. More results can be found [in this document](https://docs.google.com/document/d/1mzl3vyPD8KASi2c14tJ8iRxbtGtGg7OEQvkdYk-Fhpk/edit?tab=t.0). Fixes: #148211 Release note: None Co-authored-by: iskettaneh <[email protected]>
2 parents 1b0d0ec + 18ed0b4 commit 9e89769

File tree

11 files changed

+367
-23
lines changed

11 files changed

+367
-23
lines changed

docs/generated/settings/settings-for-tenants.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,8 @@ kv.closed_timestamp.follower_reads.enabled (alias: kv.closed_timestamp.follower_
8484
kv.closed_timestamp.lead_for_global_reads_auto_tune.enabled boolean false if enabled, observed network latency between leaseholders and their furthest follower will be used to adjust closed timestamp policies for rangesranges configured to serve global reads. kv.closed_timestamp.lead_for_global_reads_override takes precedence if set. system-visible
8585
kv.closed_timestamp.lead_for_global_reads_override duration 0s if nonzero, overrides the lead time that global_read ranges use to publish closed timestamps system-visible
8686
kv.closed_timestamp.side_transport_interval duration 200ms the interval at which the closed timestamp side-transport attempts to advance each range's closed timestamp; set to 0 to disable the side-transport system-visible
87+
kv.closed_timestamp.side_transport_pacing_refresh_interval duration 10ms the refresh interval for the task pacer that controls pacing of sending sidetransport updates to avoid overloading the system when many connections are waiting system-visible
88+
kv.closed_timestamp.side_transport_pacing_smear_interval duration 1ms the smear interval for the task pacer that controls the amount of time each paced batch is going to take when broadcasting sidetransport updates system-visible
8789
kv.closed_timestamp.target_duration duration 3s if nonzero, attempt to provide closed timestamp notifications for timestamps trailing cluster time by approximately this duration system-visible
8890
kv.dist_sender.circuit_breaker.cancellation.enabled boolean true when enabled, in-flight requests will be cancelled when the circuit breaker trips application
8991
kv.dist_sender.circuit_breaker.cancellation.write_grace_period duration 10s how long after the circuit breaker trips to cancel write requests (these can't retry internally, so should be long enough to allow quorum/lease recovery) application

docs/generated/settings/settings.html

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,8 @@
100100
<tr><td><div id="setting-kv-closed-timestamp-lead-for-global-reads-auto-tune-enabled" class="anchored"><code>kv.closed_timestamp.lead_for_global_reads_auto_tune.enabled</code></div></td><td>boolean</td><td><code>false</code></td><td>if enabled, observed network latency between leaseholders and their furthest follower will be used to adjust closed timestamp policies for rangesranges configured to serve global reads. kv.closed_timestamp.lead_for_global_reads_override takes precedence if set.</td><td>Advanced/Self-hosted (read-write); Basic/Standard (read-only)</td></tr>
101101
<tr><td><div id="setting-kv-closed-timestamp-lead-for-global-reads-override" class="anchored"><code>kv.closed_timestamp.lead_for_global_reads_override</code></div></td><td>duration</td><td><code>0s</code></td><td>if nonzero, overrides the lead time that global_read ranges use to publish closed timestamps</td><td>Advanced/Self-hosted (read-write); Basic/Standard (read-only)</td></tr>
102102
<tr><td><div id="setting-kv-closed-timestamp-side-transport-interval" class="anchored"><code>kv.closed_timestamp.side_transport_interval</code></div></td><td>duration</td><td><code>200ms</code></td><td>the interval at which the closed timestamp side-transport attempts to advance each range&#39;s closed timestamp; set to 0 to disable the side-transport</td><td>Advanced/Self-hosted (read-write); Basic/Standard (read-only)</td></tr>
103+
<tr><td><div id="setting-kv-closed-timestamp-side-transport-pacing-refresh-interval" class="anchored"><code>kv.closed_timestamp.side_transport_pacing_refresh_interval</code></div></td><td>duration</td><td><code>10ms</code></td><td>the refresh interval for the task pacer that controls pacing of sending sidetransport updates to avoid overloading the system when many connections are waiting</td><td>Advanced/Self-hosted (read-write); Basic/Standard (read-only)</td></tr>
104+
<tr><td><div id="setting-kv-closed-timestamp-side-transport-pacing-smear-interval" class="anchored"><code>kv.closed_timestamp.side_transport_pacing_smear_interval</code></div></td><td>duration</td><td><code>1ms</code></td><td>the smear interval for the task pacer that controls the amount of time each paced batch is going to take when broadcasting sidetransport updates</td><td>Advanced/Self-hosted (read-write); Basic/Standard (read-only)</td></tr>
103105
<tr><td><div id="setting-kv-closed-timestamp-target-duration" class="anchored"><code>kv.closed_timestamp.target_duration</code></div></td><td>duration</td><td><code>3s</code></td><td>if nonzero, attempt to provide closed timestamp notifications for timestamps trailing cluster time by approximately this duration</td><td>Advanced/Self-hosted (read-write); Basic/Standard (read-only)</td></tr>
104106
<tr><td><div id="setting-kv-dist-sender-circuit-breaker-cancellation-enabled" class="anchored"><code>kv.dist_sender.circuit_breaker.cancellation.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>when enabled, in-flight requests will be cancelled when the circuit breaker trips</td><td>Basic/Standard/Advanced/Self-Hosted</td></tr>
105107
<tr><td><div id="setting-kv-dist-sender-circuit-breaker-cancellation-write-grace-period" class="anchored"><code>kv.dist_sender.circuit_breaker.cancellation.write_grace_period</code></div></td><td>duration</td><td><code>10s</code></td><td>how long after the circuit breaker trips to cancel write requests (these can&#39;t retry internally, so should be long enough to allow quorum/lease recovery)</td><td>Basic/Standard/Advanced/Self-Hosted</td></tr>

pkg/kv/kvserver/closedts/policy.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,10 @@ const (
2222
// global reads. It accounts for network latency, clock offset, and both Raft
2323
// and side-transport propagation delays.
2424
func computeLeadTimeForGlobalReads(
25-
networkRTT time.Duration, maxClockOffset time.Duration, sideTransportCloseInterval time.Duration,
25+
networkRTT time.Duration,
26+
maxClockOffset time.Duration,
27+
sideTransportCloseInterval time.Duration,
28+
sideTransportPacingInterval time.Duration,
2629
) time.Duration {
2730
// The LEAD_FOR_GLOBAL_READS calculation is more complex. Instead of the
2831
// policy defining an offset from the publisher's perspective, the
@@ -61,10 +64,13 @@ func computeLeadTimeForGlobalReads(
6164
// # the worst-case.
6265
// side_propagation_time = max_network_rtt * 0.5 + side_transport_close_interval
6366
//
67+
// # We pace sending side-transport updates up to side_transport_pacing_refresh_interval.
68+
// # We have to also include it into our calculation.
69+
//
6470
// # Combine, we get the following result
6571
// closed_ts_at_sender = now + max_offset + max(
6672
// max_network_rtt * 1.5 + raft_overhead,
67-
// max_network_rtt * 0.5 + side_transport_close_interval,
73+
// max_network_rtt * 0.5 + side_transport_close_interval + side_transport_pacing_refresh_interval,
6874
// )
6975
//
7076
// By default, this leads to a closed timestamp target that leads the
@@ -86,7 +92,7 @@ func computeLeadTimeForGlobalReads(
8692
raftTransportPropTime := (networkRTT*3)/2 + raftTransportOverhead
8793

8894
// See side_propagation_time.
89-
sideTransportPropTime := networkRTT/2 + sideTransportCloseInterval
95+
sideTransportPropTime := networkRTT/2 + sideTransportCloseInterval + sideTransportPacingInterval
9096

9197
// See propagation_time.
9298
maxTransportPropTime := max(sideTransportPropTime, raftTransportPropTime)
@@ -106,6 +112,7 @@ func TargetForPolicy(
106112
lagTargetDuration time.Duration,
107113
leadTargetOverride time.Duration,
108114
sideTransportCloseInterval time.Duration,
115+
sideTransportPacingInterval time.Duration,
109116
policy ctpb.RangeClosedTimestampPolicy,
110117
) hlc.Timestamp {
111118
var targetOffsetTime time.Duration
@@ -121,7 +128,7 @@ func TargetForPolicy(
121128
break
122129
}
123130
targetOffsetTime = computeLeadTimeForGlobalReads(computeNetworkRTTBasedOnPolicy(policy),
124-
maxClockOffset, sideTransportCloseInterval)
131+
maxClockOffset, sideTransportCloseInterval, sideTransportPacingInterval)
125132
default:
126133
panic("unexpected RangeClosedTimestampPolicy")
127134
}

pkg/kv/kvserver/closedts/policy_test.go

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,12 @@ func TestTargetForPolicy(t *testing.T) {
2828
maxClockOffset := millis(500)
2929

3030
for _, tc := range []struct {
31-
lagTargetNanos time.Duration
32-
leadTargetOverride time.Duration
33-
sideTransportCloseInterval time.Duration
34-
rangePolicy ctpb.RangeClosedTimestampPolicy
35-
expClosedTSTarget hlc.Timestamp
31+
lagTargetNanos time.Duration
32+
leadTargetOverride time.Duration
33+
sideTransportCloseInterval time.Duration
34+
sideTransportPacingInterval time.Duration
35+
rangePolicy ctpb.RangeClosedTimestampPolicy
36+
expClosedTSTarget hlc.Timestamp
3637
}{
3738
{
3839
lagTargetNanos: secs(3),
@@ -52,6 +53,16 @@ func TestTargetForPolicy(t *testing.T) {
5253
millis(275) /* sideTransportPropTime */ +
5354
millis(25) /* bufferTime */).Nanoseconds(), 0),
5455
},
56+
{
57+
sideTransportCloseInterval: millis(200),
58+
sideTransportPacingInterval: millis(10),
59+
rangePolicy: ctpb.LEAD_FOR_GLOBAL_READS_WITH_NO_LATENCY_INFO,
60+
expClosedTSTarget: now.
61+
Add((maxClockOffset +
62+
millis(275) /* sideTransportPropTime */ +
63+
millis(10) /* sideTransportPacing */ +
64+
millis(25) /* bufferTime */).Nanoseconds(), 0),
65+
},
5566
{
5667
sideTransportCloseInterval: millis(50),
5768
rangePolicy: ctpb.LEAD_FOR_GLOBAL_READS_WITH_NO_LATENCY_INFO,
@@ -61,10 +72,11 @@ func TestTargetForPolicy(t *testing.T) {
6172
millis(25) /* bufferTime */).Nanoseconds(), 0),
6273
},
6374
{
64-
leadTargetOverride: millis(1234),
65-
sideTransportCloseInterval: millis(200),
66-
rangePolicy: ctpb.LEAD_FOR_GLOBAL_READS_WITH_NO_LATENCY_INFO,
67-
expClosedTSTarget: now.Add(millis(1234).Nanoseconds(), 0),
75+
leadTargetOverride: millis(1234),
76+
sideTransportCloseInterval: millis(200),
77+
sideTransportPacingInterval: millis(10),
78+
rangePolicy: ctpb.LEAD_FOR_GLOBAL_READS_WITH_NO_LATENCY_INFO,
79+
expClosedTSTarget: now.Add(millis(1234).Nanoseconds(), 0),
6880
},
6981
} {
7082
t.Run("", func(t *testing.T) {
@@ -74,6 +86,7 @@ func TestTargetForPolicy(t *testing.T) {
7486
tc.lagTargetNanos,
7587
tc.leadTargetOverride,
7688
tc.sideTransportCloseInterval,
89+
tc.sideTransportPacingInterval,
7790
tc.rangePolicy,
7891
)
7992
require.Equal(t, tc.expClosedTSTarget, target)

pkg/kv/kvserver/closedts/setting.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,3 +98,28 @@ var PolicySwitchWhenLatencyExceedsBucketFraction = settings.RegisterFloatSetting
9898
"exceeded before the closed timestamp policy will be changed",
9999
0.2,
100100
)
101+
102+
// SideTransportPacingRefreshInterval controls the task pacer refresh interval
103+
// for pacing broadcast updates in the side transport. This determines how long
104+
// the pacer takes to complete signaling all waiting connections.
105+
var SideTransportPacingRefreshInterval = settings.RegisterDurationSetting(
106+
settings.SystemVisible,
107+
"kv.closed_timestamp.side_transport_pacing_refresh_interval",
108+
"the refresh interval for the task pacer that controls pacing of sending "+
109+
"sidetransport updates to avoid overloading the system when many connections are waiting",
110+
10*time.Millisecond,
111+
settings.WithPublic,
112+
)
113+
114+
// SideTransportPacingSmearInterval controls the time interval between batches
115+
// when broadcasting side transport updates. The pacer uses this to spread out
116+
// work over time, sleeping between batches to avoid creating spikes in runnable
117+
// goroutines when many connections are waiting for updates.
118+
var SideTransportPacingSmearInterval = settings.RegisterDurationSetting(
119+
settings.SystemVisible,
120+
"kv.closed_timestamp.side_transport_pacing_smear_interval",
121+
"the smear interval for the task pacer that controls the amount of time "+
122+
"each paced batch is going to take when broadcasting sidetransport updates",
123+
1*time.Millisecond,
124+
settings.WithPublic,
125+
)

pkg/kv/kvserver/closedts/sidetransport/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ go_library(
2828
"//pkg/util/netutil",
2929
"//pkg/util/stop",
3030
"//pkg/util/syncutil",
31+
"//pkg/util/taskpacer",
3132
"//pkg/util/timeutil",
3233
"@com_github_cockroachdb_errors//:errors",
3334
],
@@ -51,6 +52,7 @@ go_test(
5152
"//pkg/rpc/rpcbase",
5253
"//pkg/settings/cluster",
5354
"//pkg/testutils",
55+
"//pkg/testutils/skip",
5456
"//pkg/util/hlc",
5557
"//pkg/util/leaktest",
5658
"//pkg/util/log",

0 commit comments

Comments
 (0)