Skip to content

Commit 6958680

Browse files
authored
Merge pull request #152456 from iskettaneh/backport25.2-144187
release-25.2: gossip: batch both client and server gossip messages
2 parents fef451b + 79a172a commit 6958680

File tree

9 files changed

+157
-14
lines changed

9 files changed

+157
-14
lines changed

docs/generated/metrics/metrics.html

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,8 @@
170170
<tr><td>STORAGE</td><td>gossip.connections.refused</td><td>Number of refused incoming gossip connections</td><td>Connections</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
171171
<tr><td>STORAGE</td><td>gossip.infos.received</td><td>Number of received gossip Info objects</td><td>Infos</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
172172
<tr><td>STORAGE</td><td>gossip.infos.sent</td><td>Number of sent gossip Info objects</td><td>Infos</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
173+
<tr><td>STORAGE</td><td>gossip.messages.received</td><td>Number of received gossip messages</td><td>Messages</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
174+
<tr><td>STORAGE</td><td>gossip.messages.sent</td><td>Number of sent gossip messages</td><td>Messages</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
173175
<tr><td>STORAGE</td><td>intentage</td><td>Cumulative age of locks</td><td>Age</td><td>GAUGE</td><td>SECONDS</td><td>AVG</td><td>NONE</td></tr>
174176
<tr><td>STORAGE</td><td>intentbytes</td><td>Number of bytes in intent KV pairs</td><td>Storage</td><td>GAUGE</td><td>BYTES</td><td>AVG</td><td>NONE</td></tr>
175177
<tr><td>STORAGE</td><td>intentcount</td><td>Count of intent keys</td><td>Keys</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>

pkg/gossip/client.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,8 +219,10 @@ func (c *client) sendGossip(g *Gossip, stream Gossip_GossipClient, firstReq bool
219219
infosSent := int64(len(delta))
220220
c.clientMetrics.BytesSent.Inc(bytesSent)
221221
c.clientMetrics.InfosSent.Inc(infosSent)
222+
c.clientMetrics.MessagesSent.Inc(1)
222223
c.nodeMetrics.BytesSent.Inc(bytesSent)
223224
c.nodeMetrics.InfosSent.Inc(infosSent)
225+
c.nodeMetrics.MessagesSent.Inc(1)
224226

225227
if log.V(1) {
226228
ctx := c.AnnotateCtx(stream.Context())
@@ -247,8 +249,10 @@ func (c *client) handleResponse(ctx context.Context, g *Gossip, reply *Response)
247249
infosReceived := int64(len(reply.Delta))
248250
c.clientMetrics.BytesReceived.Inc(bytesReceived)
249251
c.clientMetrics.InfosReceived.Inc(infosReceived)
252+
c.clientMetrics.MessagesReceived.Inc(1)
250253
c.nodeMetrics.BytesReceived.Inc(bytesReceived)
251254
c.nodeMetrics.InfosReceived.Inc(infosReceived)
255+
c.nodeMetrics.MessagesReceived.Inc(1)
252256

253257
// Combine remote node's infostore delta with ours.
254258
if reply.Delta != nil {
@@ -326,6 +330,7 @@ func (c *client) gossip(
326330

327331
errCh := make(chan error, 1)
328332
initCh := make(chan struct{}, 1)
333+
329334
// This wait group is used to allow the caller to wait until gossip
330335
// processing is terminated.
331336
wg.Add(1)
@@ -390,6 +395,9 @@ func (c *client) gossip(
390395
case <-initTimer.C:
391396
maybeRegister()
392397
case <-sendGossipChan:
398+
// We need to send the gossip delta to the remote server. Wait a bit to
399+
// batch the updates in one message.
400+
batchAndConsume(sendGossipChan, infosBatchDelay)
393401
if err := c.sendGossip(g, stream, count == 0); err != nil {
394402
return err
395403
}

pkg/gossip/gossip.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,10 @@ const (
118118
// we didn't need to tighten the last time we checked.
119119
gossipTightenInterval = time.Second
120120

121+
// infosBatchDelay controls how much time do we wait to batch infos before
122+
// sending them.
123+
infosBatchDelay = 10 * time.Millisecond
124+
121125
unknownNodeID roachpb.NodeID = 0
122126
)
123127

@@ -141,6 +145,18 @@ var (
141145
Measurement: "Connections",
142146
Unit: metric.Unit_COUNT,
143147
}
148+
MetaMessagesSent = metric.Metadata{
149+
Name: "gossip.messages.sent",
150+
Help: "Number of sent gossip messages",
151+
Measurement: "Messages",
152+
Unit: metric.Unit_COUNT,
153+
}
154+
MetaMessagesReceived = metric.Metadata{
155+
Name: "gossip.messages.received",
156+
Help: "Number of received gossip messages",
157+
Measurement: "Messages",
158+
Unit: metric.Unit_COUNT,
159+
}
144160
MetaInfosSent = metric.Metadata{
145161
Name: "gossip.infos.sent",
146162
Help: "Number of sent gossip Info objects",

pkg/gossip/gossip.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,8 @@ message MetricSnap {
7676
int64 bytes_sent = 3;
7777
int64 infos_received = 4;
7878
int64 infos_sent = 5;
79+
int64 messages_received = 7;
80+
int64 messages_sent = 8;
7981
int64 conns_refused = 6;
8082
}
8183

pkg/gossip/gossip_test.go

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1058,3 +1058,85 @@ func TestServerSendsHighStampsDiff(t *testing.T) {
10581058
return nil
10591059
})
10601060
}
1061+
1062+
// TestGossipBatching verifies that both server and client gossip updates are
1063+
// batched.
1064+
func TestGossipBatching(t *testing.T) {
1065+
defer leaktest.AfterTest(t)()
1066+
skip.UnderDeadlock(t, "might be flaky since it relies on some upper-bound timing")
1067+
skip.UnderRace(t, "might be flaky since it relies on some upper-bound timing")
1068+
1069+
stopper := stop.NewStopper()
1070+
defer stopper.Stop(context.Background())
1071+
1072+
// Shared cluster ID by all gossipers
1073+
clusterID := uuid.MakeV4()
1074+
1075+
local, localCtx := startGossip(clusterID, 1, stopper, t, metric.NewRegistry())
1076+
remote, remoteCtx := startGossip(clusterID, 2, stopper, t, metric.NewRegistry())
1077+
remote.mu.Lock()
1078+
rAddr := remote.mu.is.NodeAddr
1079+
remote.mu.Unlock()
1080+
local.manage(localCtx)
1081+
remote.manage(remoteCtx)
1082+
1083+
// Start a client connection to the remote node
1084+
local.mu.Lock()
1085+
local.startClientLocked(rAddr, roachpb.Locality{}, localCtx)
1086+
local.mu.Unlock()
1087+
1088+
// Wait for connection to be established
1089+
var c *client
1090+
testutils.SucceedsSoon(t, func() error {
1091+
c = local.findClient(func(c *client) bool { return c.addr.String() == rAddr.String() })
1092+
if c == nil {
1093+
return fmt.Errorf("client not found")
1094+
}
1095+
return nil
1096+
})
1097+
1098+
// Prepare 10,000 keys to gossip. This is a large enough number to allow
1099+
// batching to kick in.
1100+
numKeys := 10_000
1101+
localKeys := make([]string, numKeys)
1102+
remoteKeys := make([]string, numKeys)
1103+
for i := 0; i < numKeys; i++ {
1104+
localKeys[i] = fmt.Sprintf("local-key-%d", i)
1105+
remoteKeys[i] = fmt.Sprintf("remote-key-%d", i)
1106+
}
1107+
1108+
// Gossip the keys to both local and remote nodes.
1109+
for i := range numKeys {
1110+
require.NoError(t, local.AddInfo(localKeys[i], []byte("value"), time.Hour))
1111+
require.NoError(t, remote.AddInfo(remoteKeys[i], []byte("value"), time.Hour))
1112+
}
1113+
1114+
// Wait for updates to propagate
1115+
testutils.SucceedsSoon(t, func() error {
1116+
for i := range numKeys {
1117+
if _, err := local.GetInfo(remoteKeys[i]); err != nil {
1118+
return err
1119+
}
1120+
if _, err := remote.GetInfo(localKeys[i]); err != nil {
1121+
return err
1122+
}
1123+
}
1124+
return nil
1125+
})
1126+
1127+
// Record the number of messages both the client and the server sent, and
1128+
// assert that it's within the expected bounds.
1129+
serverMessagesSentCount := remote.serverMetrics.MessagesSent.Count()
1130+
clientMessagesSentCount := local.serverMetrics.MessagesSent.Count()
1131+
1132+
fmt.Printf("client msgs sent: %+v\n", clientMessagesSentCount)
1133+
fmt.Printf("server msgs sent: %+v\n", serverMessagesSentCount)
1134+
1135+
// upperBoundMessages is the maximum number of sent messages we expect to see.
1136+
// Note that in reality with batching, we see 3-10 messages sent in this test,
1137+
// However, in order to avoid flakiness, we set a very high number here. The
1138+
// test would fail even with this high number if we don't have batching.
1139+
upperBoundMessages := int64(500)
1140+
require.LessOrEqual(t, serverMessagesSentCount, upperBoundMessages)
1141+
require.LessOrEqual(t, clientMessagesSentCount, upperBoundMessages)
1142+
}

pkg/gossip/server.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,8 +125,10 @@ func (s *server) Gossip(stream Gossip_GossipServer) error {
125125
infoCount := int64(len(reply.Delta))
126126
s.nodeMetrics.BytesSent.Inc(bytesSent)
127127
s.nodeMetrics.InfosSent.Inc(infoCount)
128+
s.nodeMetrics.MessagesSent.Inc(1)
128129
s.serverMetrics.BytesSent.Inc(bytesSent)
129130
s.serverMetrics.InfosSent.Inc(infoCount)
131+
s.serverMetrics.MessagesSent.Inc(1)
130132

131133
return stream.Send(reply)
132134
}
@@ -199,6 +201,9 @@ func (s *server) Gossip(stream Gossip_GossipServer) error {
199201
case err := <-errCh:
200202
return err
201203
case <-ready:
204+
// We just sleep here instead of calling batchAndConsume() because the
205+
// channel is closed, and sleeping won't block the sender of the channel.
206+
time.Sleep(infosBatchDelay)
202207
}
203208
}
204209
}
@@ -307,8 +312,10 @@ func (s *server) gossipReceiver(
307312
infosReceived := int64(len(args.Delta))
308313
s.nodeMetrics.BytesReceived.Inc(bytesReceived)
309314
s.nodeMetrics.InfosReceived.Inc(infosReceived)
315+
s.nodeMetrics.MessagesReceived.Inc(1)
310316
s.serverMetrics.BytesReceived.Inc(bytesReceived)
311317
s.serverMetrics.InfosReceived.Inc(infosReceived)
318+
s.serverMetrics.MessagesReceived.Inc(1)
312319

313320
freshCount, err := s.mu.is.combine(args.Delta, args.NodeID)
314321
if err != nil {

pkg/gossip/status.go

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ type Metrics struct {
2626
BytesSent *metric.Counter
2727
InfosReceived *metric.Counter
2828
InfosSent *metric.Counter
29+
MessagesSent *metric.Counter
30+
MessagesReceived *metric.Counter
2931
CallbacksProcessed *metric.Counter
3032
CallbacksPending *metric.Gauge
3133
CallbacksProcessingDuration metric.IHistogram
@@ -39,6 +41,8 @@ func makeMetrics() Metrics {
3941
BytesSent: metric.NewCounter(MetaBytesSent),
4042
InfosReceived: metric.NewCounter(MetaInfosReceived),
4143
InfosSent: metric.NewCounter(MetaInfosSent),
44+
MessagesReceived: metric.NewCounter(MetaMessagesReceived),
45+
MessagesSent: metric.NewCounter(MetaMessagesSent),
4246
CallbacksProcessed: metric.NewCounter(MetaCallbacksProcessed),
4347
CallbacksPending: metric.NewGauge(MetaCallbacksPending),
4448
CallbacksProcessingDuration: metric.NewHistogram(metric.HistogramOptions{
@@ -63,11 +67,13 @@ func (m Metrics) String() string {
6367
// Snapshot returns a snapshot of the metrics.
6468
func (m Metrics) Snapshot() MetricSnap {
6569
return MetricSnap{
66-
ConnsRefused: m.ConnectionsRefused.Count(),
67-
BytesReceived: m.BytesReceived.Count(),
68-
BytesSent: m.BytesSent.Count(),
69-
InfosReceived: m.InfosReceived.Count(),
70-
InfosSent: m.InfosSent.Count(),
70+
ConnsRefused: m.ConnectionsRefused.Count(),
71+
BytesReceived: m.BytesReceived.Count(),
72+
BytesSent: m.BytesSent.Count(),
73+
InfosReceived: m.InfosReceived.Count(),
74+
InfosSent: m.InfosSent.Count(),
75+
MessagesReceived: m.MessagesReceived.Count(),
76+
MessagesSent: m.MessagesSent.Count(),
7177
}
7278
}
7379

@@ -77,7 +83,8 @@ func (m MetricSnap) String() string {
7783

7884
// SafeFormat implements the redact.SafeFormatter interface.
7985
func (m MetricSnap) SafeFormat(w redact.SafePrinter, _ rune) {
80-
w.Printf("infos %d/%d sent/received, bytes %dB/%dB sent/received",
86+
w.Printf("messages %d/%d sent/received, infos %d/%d sent/received, bytes %dB/%dB sent/received",
87+
m.MessagesSent, m.MessagesReceived,
8188
m.InfosSent, m.InfosReceived,
8289
m.BytesSent, m.BytesReceived)
8390
if m.ConnsRefused > 0 {

pkg/gossip/status_test.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,16 @@ func TestGossipStatus(t *testing.T) {
2121
},
2222
MaxConns: 3,
2323
MetricSnap: MetricSnap{
24-
BytesReceived: 1000,
25-
BytesSent: 2000,
26-
InfosReceived: 10,
27-
InfosSent: 20,
28-
ConnsRefused: 17,
24+
BytesReceived: 1000,
25+
BytesSent: 2000,
26+
InfosReceived: 10,
27+
InfosSent: 20,
28+
MessagesSent: 2,
29+
MessagesReceived: 1,
30+
ConnsRefused: 17,
2931
},
3032
}
31-
if exp, act := `gossip server (2/3 cur/max conns, infos 20/10 sent/received, bytes 2000B/1000B sent/received, refused 17 conns)
33+
if exp, act := `gossip server (2/3 cur/max conns, messages 2/1 sent/received, infos 20/10 sent/received, bytes 2000B/1000B sent/received, refused 17 conns)
3234
1: localhost:1234 (17s)
3335
4: localhost:4567 (18s)
3436
`, ss.String(); exp != act {
@@ -39,13 +41,13 @@ func TestGossipStatus(t *testing.T) {
3941
ConnStatus: []OutgoingConnStatus{
4042
{
4143
ConnStatus: ss.ConnStatus[0],
42-
MetricSnap: MetricSnap{BytesReceived: 77, BytesSent: 88, InfosReceived: 11, InfosSent: 22},
44+
MetricSnap: MetricSnap{BytesReceived: 77, BytesSent: 88, InfosReceived: 11, InfosSent: 22, MessagesReceived: 3, MessagesSent: 4},
4345
},
4446
},
4547
MaxConns: 3,
4648
}
4749
if exp, act := `gossip client (1/3 cur/max conns)
48-
1: localhost:1234 (17s: infos 22/11 sent/received, bytes 88B/77B sent/received)
50+
1: localhost:1234 (17s: messages 4/3 sent/received, infos 22/11 sent/received, bytes 88B/77B sent/received)
4951
`, cs.String(); exp != act {
5052
t.Errorf("expected:\n%q\ngot:\n%q", exp, act)
5153
}

pkg/gossip/util.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,11 @@ package gossip
88
import (
99
"bytes"
1010
"sort"
11+
"time"
1112

1213
"github.com/cockroachdb/cockroach/pkg/config"
1314
"github.com/cockroachdb/cockroach/pkg/roachpb"
15+
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
1416
)
1517

1618
// SystemConfigDeltaFilter keeps track of SystemConfig values so that unmodified
@@ -92,3 +94,18 @@ func (df *SystemConfigDeltaFilter) ForModified(
9294
}
9395
}
9496
}
97+
98+
// batchAndConsume waits on a channel to allow batching more events. It keeps
99+
// while consuming events as they come to avoid blocking the channel producer.
100+
func batchAndConsume(ch <-chan struct{}, batchDuration time.Duration) {
101+
var batchTimer timeutil.Timer
102+
defer batchTimer.Stop()
103+
batchTimer.Reset(batchDuration)
104+
for !batchTimer.Read {
105+
select {
106+
case <-ch: // event happened while we are waiting for our batchTimer to end.
107+
case <-batchTimer.C:
108+
batchTimer.Read = true
109+
}
110+
}
111+
}

0 commit comments

Comments
 (0)