Skip to content

Commit 79a172a

Browse files
committed
gossip: batch both client and server gossip messages
This commit adds an optimization in gossip where before sending a gossip info update message, we wait for a small amount of time before sending the updates. This gives some time for updates to be batched, reducing the number of messages in the system. Fixes: #119420 Release note: None
1 parent 646d15c commit 79a172a

File tree

6 files changed

+112
-0
lines changed

6 files changed

+112
-0
lines changed

docs/generated/metrics/metrics.html

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,8 @@
170170
<tr><td>STORAGE</td><td>gossip.connections.refused</td><td>Number of refused incoming gossip connections</td><td>Connections</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
171171
<tr><td>STORAGE</td><td>gossip.infos.received</td><td>Number of received gossip Info objects</td><td>Infos</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
172172
<tr><td>STORAGE</td><td>gossip.infos.sent</td><td>Number of sent gossip Info objects</td><td>Infos</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
173+
<tr><td>STORAGE</td><td>gossip.messages.received</td><td>Number of received gossip messages</td><td>Messages</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
174+
<tr><td>STORAGE</td><td>gossip.messages.sent</td><td>Number of sent gossip messages</td><td>Messages</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
173175
<tr><td>STORAGE</td><td>intentage</td><td>Cumulative age of locks</td><td>Age</td><td>GAUGE</td><td>SECONDS</td><td>AVG</td><td>NONE</td></tr>
174176
<tr><td>STORAGE</td><td>intentbytes</td><td>Number of bytes in intent KV pairs</td><td>Storage</td><td>GAUGE</td><td>BYTES</td><td>AVG</td><td>NONE</td></tr>
175177
<tr><td>STORAGE</td><td>intentcount</td><td>Count of intent keys</td><td>Keys</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>

pkg/gossip/client.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -330,6 +330,7 @@ func (c *client) gossip(
330330

331331
errCh := make(chan error, 1)
332332
initCh := make(chan struct{}, 1)
333+
333334
// This wait group is used to allow the caller to wait until gossip
334335
// processing is terminated.
335336
wg.Add(1)
@@ -394,6 +395,9 @@ func (c *client) gossip(
394395
case <-initTimer.C:
395396
maybeRegister()
396397
case <-sendGossipChan:
398+
// We need to send the gossip delta to the remote server. Wait a bit to
399+
// batch the updates in one message.
400+
batchAndConsume(sendGossipChan, infosBatchDelay)
397401
if err := c.sendGossip(g, stream, count == 0); err != nil {
398402
return err
399403
}

pkg/gossip/gossip.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,10 @@ const (
118118
// we didn't need to tighten the last time we checked.
119119
gossipTightenInterval = time.Second
120120

121+
// infosBatchDelay controls how much time do we wait to batch infos before
122+
// sending them.
123+
infosBatchDelay = 10 * time.Millisecond
124+
121125
unknownNodeID roachpb.NodeID = 0
122126
)
123127

pkg/gossip/gossip_test.go

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1058,3 +1058,85 @@ func TestServerSendsHighStampsDiff(t *testing.T) {
10581058
return nil
10591059
})
10601060
}
1061+
1062+
// TestGossipBatching verifies that both server and client gossip updates are
1063+
// batched.
1064+
func TestGossipBatching(t *testing.T) {
1065+
defer leaktest.AfterTest(t)()
1066+
skip.UnderDeadlock(t, "might be flaky since it relies on some upper-bound timing")
1067+
skip.UnderRace(t, "might be flaky since it relies on some upper-bound timing")
1068+
1069+
stopper := stop.NewStopper()
1070+
defer stopper.Stop(context.Background())
1071+
1072+
// Shared cluster ID by all gossipers
1073+
clusterID := uuid.MakeV4()
1074+
1075+
local, localCtx := startGossip(clusterID, 1, stopper, t, metric.NewRegistry())
1076+
remote, remoteCtx := startGossip(clusterID, 2, stopper, t, metric.NewRegistry())
1077+
remote.mu.Lock()
1078+
rAddr := remote.mu.is.NodeAddr
1079+
remote.mu.Unlock()
1080+
local.manage(localCtx)
1081+
remote.manage(remoteCtx)
1082+
1083+
// Start a client connection to the remote node
1084+
local.mu.Lock()
1085+
local.startClientLocked(rAddr, roachpb.Locality{}, localCtx)
1086+
local.mu.Unlock()
1087+
1088+
// Wait for connection to be established
1089+
var c *client
1090+
testutils.SucceedsSoon(t, func() error {
1091+
c = local.findClient(func(c *client) bool { return c.addr.String() == rAddr.String() })
1092+
if c == nil {
1093+
return fmt.Errorf("client not found")
1094+
}
1095+
return nil
1096+
})
1097+
1098+
// Prepare 10,000 keys to gossip. This is a large enough number to allow
1099+
// batching to kick in.
1100+
numKeys := 10_000
1101+
localKeys := make([]string, numKeys)
1102+
remoteKeys := make([]string, numKeys)
1103+
for i := 0; i < numKeys; i++ {
1104+
localKeys[i] = fmt.Sprintf("local-key-%d", i)
1105+
remoteKeys[i] = fmt.Sprintf("remote-key-%d", i)
1106+
}
1107+
1108+
// Gossip the keys to both local and remote nodes.
1109+
for i := range numKeys {
1110+
require.NoError(t, local.AddInfo(localKeys[i], []byte("value"), time.Hour))
1111+
require.NoError(t, remote.AddInfo(remoteKeys[i], []byte("value"), time.Hour))
1112+
}
1113+
1114+
// Wait for updates to propagate
1115+
testutils.SucceedsSoon(t, func() error {
1116+
for i := range numKeys {
1117+
if _, err := local.GetInfo(remoteKeys[i]); err != nil {
1118+
return err
1119+
}
1120+
if _, err := remote.GetInfo(localKeys[i]); err != nil {
1121+
return err
1122+
}
1123+
}
1124+
return nil
1125+
})
1126+
1127+
// Record the number of messages both the client and the server sent, and
1128+
// assert that it's within the expected bounds.
1129+
serverMessagesSentCount := remote.serverMetrics.MessagesSent.Count()
1130+
clientMessagesSentCount := local.serverMetrics.MessagesSent.Count()
1131+
1132+
fmt.Printf("client msgs sent: %+v\n", clientMessagesSentCount)
1133+
fmt.Printf("server msgs sent: %+v\n", serverMessagesSentCount)
1134+
1135+
// upperBoundMessages is the maximum number of sent messages we expect to see.
1136+
// Note that in reality with batching, we see 3-10 messages sent in this test,
1137+
// However, in order to avoid flakiness, we set a very high number here. The
1138+
// test would fail even with this high number if we don't have batching.
1139+
upperBoundMessages := int64(500)
1140+
require.LessOrEqual(t, serverMessagesSentCount, upperBoundMessages)
1141+
require.LessOrEqual(t, clientMessagesSentCount, upperBoundMessages)
1142+
}

pkg/gossip/server.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,9 @@ func (s *server) Gossip(stream Gossip_GossipServer) error {
201201
case err := <-errCh:
202202
return err
203203
case <-ready:
204+
// We just sleep here instead of calling batchAndConsume() because the
205+
// channel is closed, and sleeping won't block the sender of the channel.
206+
time.Sleep(infosBatchDelay)
204207
}
205208
}
206209
}

pkg/gossip/util.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,11 @@ package gossip
88
import (
99
"bytes"
1010
"sort"
11+
"time"
1112

1213
"github.com/cockroachdb/cockroach/pkg/config"
1314
"github.com/cockroachdb/cockroach/pkg/roachpb"
15+
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
1416
)
1517

1618
// SystemConfigDeltaFilter keeps track of SystemConfig values so that unmodified
@@ -92,3 +94,18 @@ func (df *SystemConfigDeltaFilter) ForModified(
9294
}
9395
}
9496
}
97+
98+
// batchAndConsume waits on a channel to allow batching more events. It keeps
99+
// while consuming events as they come to avoid blocking the channel producer.
100+
func batchAndConsume(ch <-chan struct{}, batchDuration time.Duration) {
101+
var batchTimer timeutil.Timer
102+
defer batchTimer.Stop()
103+
batchTimer.Reset(batchDuration)
104+
for !batchTimer.Read {
105+
select {
106+
case <-ch: // event happened while we are waiting for our batchTimer to end.
107+
case <-batchTimer.C:
108+
batchTimer.Read = true
109+
}
110+
}
111+
}

0 commit comments

Comments
 (0)