Skip to content

Commit 081128d

Browse files
committed
sidetransport: pace sidetransport sender messages
Before this commit, every sidetransport sending interval, we used to wake up up-to N goroutines to send the sidetransport messages where N=number of nodes. This adds an overhead on the go scheduler as the number of nodes in the cluster gets larger. This commit addresses that by adding a small pacing duration while we wake up the goroutines to send messages. Fixes: #148211 Release note: None
1 parent 410a90c commit 081128d

File tree

6 files changed

+209
-10
lines changed

6 files changed

+209
-10
lines changed

docs/generated/settings/settings-for-tenants.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,8 @@ kv.closed_timestamp.follower_reads.enabled (alias: kv.closed_timestamp.follower_
8484
kv.closed_timestamp.lead_for_global_reads_auto_tune.enabled boolean false if enabled, observed network latency between leaseholders and their furthest follower will be used to adjust closed timestamp policies for rangesranges configured to serve global reads. kv.closed_timestamp.lead_for_global_reads_override takes precedence if set. system-visible
8585
kv.closed_timestamp.lead_for_global_reads_override duration 0s if nonzero, overrides the lead time that global_read ranges use to publish closed timestamps system-visible
8686
kv.closed_timestamp.side_transport_interval duration 200ms the interval at which the closed timestamp side-transport attempts to advance each range's closed timestamp; set to 0 to disable the side-transport system-visible
87+
kv.closed_timestamp.side_transport_pacing_refresh_interval duration 10ms the refresh interval for the task pacer that controls pacing of sending sidetransport updates to avoid overloading the system when many connections are waiting system-visible
88+
kv.closed_timestamp.side_transport_pacing_smear_interval duration 1ms the smear interval for the task pacer that controls the amount of time each paced batch is going to take when broadcasting sidetransport updates system-visible
8789
kv.closed_timestamp.target_duration duration 3s if nonzero, attempt to provide closed timestamp notifications for timestamps trailing cluster time by approximately this duration system-visible
8890
kv.dist_sender.circuit_breaker.cancellation.enabled boolean true when enabled, in-flight requests will be cancelled when the circuit breaker trips application
8991
kv.dist_sender.circuit_breaker.cancellation.write_grace_period duration 10s how long after the circuit breaker trips to cancel write requests (these can't retry internally, so should be long enough to allow quorum/lease recovery) application

docs/generated/settings/settings.html

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,8 @@
100100
<tr><td><div id="setting-kv-closed-timestamp-lead-for-global-reads-auto-tune-enabled" class="anchored"><code>kv.closed_timestamp.lead_for_global_reads_auto_tune.enabled</code></div></td><td>boolean</td><td><code>false</code></td><td>if enabled, observed network latency between leaseholders and their furthest follower will be used to adjust closed timestamp policies for rangesranges configured to serve global reads. kv.closed_timestamp.lead_for_global_reads_override takes precedence if set.</td><td>Advanced/Self-hosted (read-write); Basic/Standard (read-only)</td></tr>
101101
<tr><td><div id="setting-kv-closed-timestamp-lead-for-global-reads-override" class="anchored"><code>kv.closed_timestamp.lead_for_global_reads_override</code></div></td><td>duration</td><td><code>0s</code></td><td>if nonzero, overrides the lead time that global_read ranges use to publish closed timestamps</td><td>Advanced/Self-hosted (read-write); Basic/Standard (read-only)</td></tr>
102102
<tr><td><div id="setting-kv-closed-timestamp-side-transport-interval" class="anchored"><code>kv.closed_timestamp.side_transport_interval</code></div></td><td>duration</td><td><code>200ms</code></td><td>the interval at which the closed timestamp side-transport attempts to advance each range&#39;s closed timestamp; set to 0 to disable the side-transport</td><td>Advanced/Self-hosted (read-write); Basic/Standard (read-only)</td></tr>
103+
<tr><td><div id="setting-kv-closed-timestamp-side-transport-pacing-refresh-interval" class="anchored"><code>kv.closed_timestamp.side_transport_pacing_refresh_interval</code></div></td><td>duration</td><td><code>10ms</code></td><td>the refresh interval for the task pacer that controls pacing of sending sidetransport updates to avoid overloading the system when many connections are waiting</td><td>Advanced/Self-hosted (read-write); Basic/Standard (read-only)</td></tr>
104+
<tr><td><div id="setting-kv-closed-timestamp-side-transport-pacing-smear-interval" class="anchored"><code>kv.closed_timestamp.side_transport_pacing_smear_interval</code></div></td><td>duration</td><td><code>1ms</code></td><td>the smear interval for the task pacer that controls the amount of time each paced batch is going to take when broadcasting sidetransport updates</td><td>Advanced/Self-hosted (read-write); Basic/Standard (read-only)</td></tr>
103105
<tr><td><div id="setting-kv-closed-timestamp-target-duration" class="anchored"><code>kv.closed_timestamp.target_duration</code></div></td><td>duration</td><td><code>3s</code></td><td>if nonzero, attempt to provide closed timestamp notifications for timestamps trailing cluster time by approximately this duration</td><td>Advanced/Self-hosted (read-write); Basic/Standard (read-only)</td></tr>
104106
<tr><td><div id="setting-kv-dist-sender-circuit-breaker-cancellation-enabled" class="anchored"><code>kv.dist_sender.circuit_breaker.cancellation.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>when enabled, in-flight requests will be cancelled when the circuit breaker trips</td><td>Basic/Standard/Advanced/Self-Hosted</td></tr>
105107
<tr><td><div id="setting-kv-dist-sender-circuit-breaker-cancellation-write-grace-period" class="anchored"><code>kv.dist_sender.circuit_breaker.cancellation.write_grace_period</code></div></td><td>duration</td><td><code>10s</code></td><td>how long after the circuit breaker trips to cancel write requests (these can&#39;t retry internally, so should be long enough to allow quorum/lease recovery)</td><td>Basic/Standard/Advanced/Self-Hosted</td></tr>

pkg/kv/kvserver/closedts/setting.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,3 +98,28 @@ var PolicySwitchWhenLatencyExceedsBucketFraction = settings.RegisterFloatSetting
9898
"exceeded before the closed timestamp policy will be changed",
9999
0.2,
100100
)
101+
102+
// SideTransportPacingRefreshInterval controls the task pacer refresh interval
103+
// for pacing broadcast updates in the side transport. This determines how long
104+
// the pacer takes to complete signaling all waiting connections.
105+
var SideTransportPacingRefreshInterval = settings.RegisterDurationSetting(
106+
settings.SystemVisible,
107+
"kv.closed_timestamp.side_transport_pacing_refresh_interval",
108+
"the refresh interval for the task pacer that controls pacing of sending "+
109+
"sidetransport updates to avoid overloading the system when many connections are waiting",
110+
10*time.Millisecond,
111+
settings.WithPublic,
112+
)
113+
114+
// SideTransportPacingSmearInterval controls the time interval between batches
115+
// when broadcasting side transport updates. The pacer uses this to spread out
116+
// work over time, sleeping between batches to avoid creating spikes in runnable
117+
// goroutines when many connections are waiting for updates.
118+
var SideTransportPacingSmearInterval = settings.RegisterDurationSetting(
119+
settings.SystemVisible,
120+
"kv.closed_timestamp.side_transport_pacing_smear_interval",
121+
"the smear interval for the task pacer that controls the amount of time "+
122+
"each paced batch is going to take when broadcasting sidetransport updates",
123+
1*time.Millisecond,
124+
settings.WithPublic,
125+
)

pkg/kv/kvserver/closedts/sidetransport/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ go_library(
2828
"//pkg/util/netutil",
2929
"//pkg/util/stop",
3030
"//pkg/util/syncutil",
31+
"//pkg/util/taskpacer",
3132
"//pkg/util/timeutil",
3233
"@com_github_cockroachdb_errors//:errors",
3334
],
@@ -51,6 +52,7 @@ go_test(
5152
"//pkg/rpc/rpcbase",
5253
"//pkg/settings/cluster",
5354
"//pkg/testutils",
55+
"//pkg/testutils/skip",
5456
"//pkg/util/hlc",
5557
"//pkg/util/leaktest",
5658
"//pkg/util/log",

pkg/kv/kvserver/closedts/sidetransport/sender.go

Lines changed: 71 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"github.com/cockroachdb/cockroach/pkg/util/netutil"
3434
"github.com/cockroachdb/cockroach/pkg/util/stop"
3535
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
36+
"github.com/cockroachdb/cockroach/pkg/util/taskpacer"
3637
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
3738
"github.com/cockroachdb/errors"
3839
)
@@ -116,6 +117,21 @@ type connTestingKnobs struct {
116117
sleepOnErrOverride time.Duration
117118
}
118119

120+
// pacerConfig implements taskpacer.Config using cluster settings.
121+
type pacerConfig struct {
122+
st *cluster.Settings
123+
}
124+
125+
// GetRefresh implements the taskpacer.Config interface
126+
func (c *pacerConfig) GetRefresh() time.Duration {
127+
return closedts.SideTransportPacingRefreshInterval.Get(&c.st.SV)
128+
}
129+
130+
// GetSmear implements the taskpacer.Config interface
131+
func (c *pacerConfig) GetSmear() time.Duration {
132+
return closedts.SideTransportPacingSmearInterval.Get(&c.st.SV)
133+
}
134+
119135
// trackedRange contains the information that the side-transport last published
120136
// about a particular range.
121137
type trackedRange struct {
@@ -212,7 +228,7 @@ func newSenderWithConnFactory(
212228
st: st,
213229
clock: clock,
214230
connFactory: connFactory,
215-
buf: newUpdatesBuf(),
231+
buf: newUpdatesBuf(st),
216232
}
217233
s.trackedMu.tracked = make(map[roachpb.RangeID]trackedRange)
218234
s.trackedMu.lastClosed = make(map[ctpb.RangeClosedTimestampPolicy]hlc.Timestamp)
@@ -237,6 +253,12 @@ func (s *Sender) Run(ctx context.Context, nodeID roachpb.NodeID) {
237253
}
238254
closedts.SideTransportCloseInterval.SetOnChange(&s.st.SV, confChanged)
239255

256+
// Set up callback for pacing refresh interval changes.
257+
pacingConfChanged := func(ctx context.Context) {
258+
s.buf.updatePacer(s.st)
259+
}
260+
closedts.SideTransportPacingRefreshInterval.SetOnChange(&s.st.SV, pacingConfChanged)
261+
240262
_ /* err */ = s.stopper.RunAsyncTask(ctx, "closedts side-transport publisher",
241263
func(ctx context.Context) {
242264
defer func() {
@@ -540,6 +562,10 @@ type updatesBuf struct {
540562
mu struct {
541563
syncutil.Mutex
542564

565+
// pacer controls the timing of broadcast updates to avoid overloading the
566+
// system.
567+
pacer *taskpacer.Pacer
568+
543569
// We use two condition variables that we atomically swap to avoid signaling
544570
// the same goroutine multiple times. Without this, a goroutine could:
545571
// 1. Wake up from a signal.
@@ -580,14 +606,22 @@ type updatesBuf struct {
580606
// little while and not have to send a snapshot when it resumes.
581607
const updatesBufSize = 50
582608

583-
func newUpdatesBuf() *updatesBuf {
609+
func newUpdatesBuf(st *cluster.Settings) *updatesBuf {
584610
buf := &updatesBuf{}
585611
buf.mu.updated1.L = &buf.mu
586612
buf.mu.updated2.L = &buf.mu
587613
buf.mu.data = make([]*ctpb.Update, updatesBufSize)
614+
buf.mu.pacer = taskpacer.New(&pacerConfig{st: st})
588615
return buf
589616
}
590617

618+
// updatePacer atomically replaces the task pacer with a new one using the current cluster settings.
619+
func (b *updatesBuf) updatePacer(st *cluster.Settings) {
620+
b.mu.Lock()
621+
defer b.mu.Unlock()
622+
b.mu.pacer = taskpacer.New(&pacerConfig{st: st})
623+
}
624+
591625
// Push adds a new update to the back of the buffer.
592626
func (b *updatesBuf) Push(ctx context.Context, update *ctpb.Update) {
593627
b.mu.Lock()
@@ -632,20 +666,39 @@ func (b *updatesBuf) Push(ctx context.Context, update *ctpb.Update) {
632666
// Notify everybody who might have been waiting for this message - we expect
633667
// all the connections to be blocked waiting.
634668
b.mu.Unlock()
635-
b.PaceBroadcastUpdate(condVar, numWaiters)
669+
b.PaceBroadcastUpdate(ctx, condVar, numWaiters)
636670
}
637671

638-
// PaceBroadcastUpdate paces the conditional variable signaling to avoid
639-
// overloading the system.
640-
func (b *updatesBuf) PaceBroadcastUpdate(condVar *sync.Cond, numWaiters *int) {
641-
// TODO(ibrahim): We can just hook this with the taskPacer.
672+
// PaceBroadcastUpdate paces the conditional variable signaling to avoid overloading the system.
673+
func (b *updatesBuf) PaceBroadcastUpdate(ctx context.Context, condVar *sync.Cond, numWaiters *int) {
642674
b.mu.Lock()
643675
originalNumWaiters := *numWaiters
644-
for originalNumWaiters > 0 {
645-
condVar.Signal()
646-
originalNumWaiters--
676+
if originalNumWaiters <= 0 {
677+
b.mu.Unlock()
678+
return
647679
}
680+
681+
// Get the current pacer (which uses the cluster setting refresh interval).
682+
pacer := b.mu.pacer
648683
b.mu.Unlock()
684+
685+
pacer.StartTask(timeutil.Now())
686+
687+
workLeft := originalNumWaiters
688+
for workLeft > 0 {
689+
todo, by := pacer.Pace(timeutil.Now(), workLeft)
690+
691+
b.mu.Lock()
692+
for i := 0; i < todo && workLeft > 0; i++ {
693+
condVar.Signal()
694+
workLeft--
695+
}
696+
b.mu.Unlock()
697+
698+
if workLeft > 0 && timeutil.Now().Before(by) {
699+
time.Sleep(by.Sub(timeutil.Now()))
700+
}
701+
}
649702
}
650703

651704
func (b *updatesBuf) lastIdxLocked() int {
@@ -743,6 +796,14 @@ func (b *updatesBuf) Close() {
743796
b.mu.updated2.Broadcast()
744797
}
745798

799+
// TestingGetTotalNumWaiters returns the total number of goroutines waiting
800+
// on the buffer. For testing purposes only.
801+
func (b *updatesBuf) TestingGetTotalNumWaiters() int {
802+
b.mu.Lock()
803+
defer b.mu.Unlock()
804+
return b.mu.numWaiters1 + b.mu.numWaiters2
805+
}
806+
746807
// connFactory is capable of creating new connections to specific nodes.
747808
type connFactory interface {
748809
new(*Sender, roachpb.NodeID) conn

pkg/kv/kvserver/closedts/sidetransport/sender_test.go

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"github.com/cockroachdb/cockroach/pkg/rpc/rpcbase"
2525
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
2626
"github.com/cockroachdb/cockroach/pkg/testutils"
27+
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
2728
"github.com/cockroachdb/cockroach/pkg/util/hlc"
2829
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
2930
"github.com/cockroachdb/cockroach/pkg/util/log"
@@ -927,3 +928,109 @@ func TestAllUpdatesBufAreSignalled(t *testing.T) {
927928
wg.Wait()
928929
require.Equal(t, int64(numGoroutines*numMsgs), counter.Load())
929930
}
931+
932+
// TestPaceUpdateSignalling verifies that the task pacer properly spaces out
933+
// signal calls after an update on the updatesBuf.
934+
func TestPaceUpdateSignalling(t *testing.T) {
935+
defer leaktest.AfterTest(t)()
936+
defer log.Scope(t).Close(t)
937+
938+
// Flaky under duress.
939+
skip.UnderDuress(t)
940+
941+
// Create a mock sender to get access to the buffer.
942+
ctx := context.Background()
943+
connFactory := &mockConnFactory{}
944+
st := cluster.MakeTestingClusterSettings()
945+
closedts.SideTransportPacingRefreshInterval.Override(ctx, &st.SV, 250*time.Millisecond)
946+
s, stopper := newMockSenderWithSt(connFactory, st)
947+
defer stopper.Stop(ctx)
948+
949+
// numWaiters controls how many goroutines will wait on updatesBuf.
950+
numWaiters := 1000
951+
952+
// testPacing tests that when multiple goroutines are waiting on
953+
// s.buf.GetBySeq, they receive their items spaced out by at least
954+
// expectedMinSpread. If expectedMinSpread is 0, then pacing is disabled,
955+
// and we expect all goroutines to receive their items within a few ms of
956+
// each other.
957+
//
958+
// seqNum is the sequence number to wait for.
959+
testPacing := func(seqNum ctpb.SeqNum, assertionFunc func(timeSpread time.Duration)) {
960+
// Track the times when goroutines receive items from the buffer.
961+
var receiveTimes []time.Time
962+
var mu syncutil.Mutex
963+
964+
// Create numWaiters goroutines that wait on s.buf.GetBySeq and record
965+
// receive times.
966+
done := make(chan struct{}, numWaiters)
967+
for i := 0; i < numWaiters; i++ {
968+
go func() {
969+
// Wait for the specified sequence number.
970+
_, ok := s.buf.GetBySeq(ctx, seqNum)
971+
require.Equal(t, true, ok)
972+
if ok {
973+
mu.Lock()
974+
receiveTimes = append(receiveTimes, time.Now())
975+
mu.Unlock()
976+
}
977+
done <- struct{}{}
978+
}()
979+
}
980+
981+
// Wait until all goroutines are waiting on the buffer.
982+
testutils.SucceedsSoon(t, func() error {
983+
if s.buf.TestingGetTotalNumWaiters() < numWaiters {
984+
return errors.New("not all goroutines are waiting yet")
985+
}
986+
return nil
987+
})
988+
989+
// Publish an item to the buffer, which should trigger the paced signalling.
990+
s.publish(ctx)
991+
992+
// Wait for all goroutines to finish.
993+
for i := 0; i < numWaiters; i++ {
994+
<-done
995+
}
996+
997+
// Verify that all goroutines received the message.
998+
require.Len(t, receiveTimes, numWaiters)
999+
1000+
// Find min and max receive times.
1001+
minTime := receiveTimes[0]
1002+
maxTime := receiveTimes[0]
1003+
for _, t := range receiveTimes[1:] {
1004+
if t.Before(minTime) {
1005+
minTime = t
1006+
}
1007+
if t.After(maxTime) {
1008+
maxTime = t
1009+
}
1010+
}
1011+
1012+
// Verify that the time spread matches expectations.
1013+
timeSpread := maxTime.Sub(minTime)
1014+
assertionFunc(timeSpread)
1015+
}
1016+
1017+
// Test with 250ms pacing interval - expect at least 125ms spread just to be
1018+
// conservative. In practice, it should be closer to 250ms.
1019+
closedts.SideTransportPacingRefreshInterval.Override(ctx, &st.SV, 250*time.Millisecond)
1020+
testPacing(1 /* seqNum */, func(timeSpread time.Duration) {
1021+
require.GreaterOrEqual(t, timeSpread, 125*time.Millisecond)
1022+
})
1023+
1024+
// Change to 100ms pacing interval - expect at least 50ms spread.
1025+
closedts.SideTransportPacingRefreshInterval.Override(ctx, &st.SV, 100*time.Millisecond)
1026+
testPacing(2 /* seqNum */, func(timeSpread time.Duration) {
1027+
require.GreaterOrEqual(t, timeSpread, 50*time.Millisecond)
1028+
})
1029+
1030+
// Change to 0ms (disabled) pacing interval - expect all goroutines to be
1031+
// woken within a few milliseconds of each other.
1032+
closedts.SideTransportPacingRefreshInterval.Override(ctx, &st.SV, 0)
1033+
testPacing(3 /* seqNum */, func(timeSpread time.Duration) {
1034+
require.LessOrEqual(t, timeSpread, 25*time.Millisecond)
1035+
})
1036+
}

0 commit comments

Comments
 (0)