Skip to content

Commit 52acfed

Browse files
craig[bot]iskettanehyuzefovich
committed
144187: gossip: batch both client and server gossip messages r=iskettaneh a=iskettaneh This commit adds an optimization in gossip where before sending a gossip info update message, we wait for a small amount of time before sending the updates. This gives some time for updates to be batched, reducing the number of messages in the system. Fixes: #119420 Release note: None 145555: sql: disable buffered writes when changing txn isolation r=yuzefovich a=yuzefovich We currently only support write buffering under serializable isolation, and we explicitly disable that on the txn creation. However, we forgot to do so in one spot where we might be changing the isolation of already created txn, and this commit fixes that oversight. Epic: None Release note: None Co-authored-by: Ibrahim Kettaneh <[email protected]> Co-authored-by: Yahor Yuzefovich <[email protected]>
3 parents ceb0303 + d1bc0b5 + 17e364f commit 52acfed

File tree

10 files changed

+176
-14
lines changed

10 files changed

+176
-14
lines changed

docs/generated/metrics/metrics.yaml

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10965,6 +10965,22 @@ layers:
1096510965
unit: COUNT
1096610966
aggregation: AVG
1096710967
derivative: NON_NEGATIVE_DERIVATIVE
10968+
- name: gossip.messages.received
10969+
exported_name: gossip_messages_received
10970+
description: Number of received gossip messages
10971+
y_axis_label: Messages
10972+
type: COUNTER
10973+
unit: COUNT
10974+
aggregation: AVG
10975+
derivative: NON_NEGATIVE_DERIVATIVE
10976+
- name: gossip.messages.sent
10977+
exported_name: gossip_messages_sent
10978+
description: Number of sent gossip messages
10979+
y_axis_label: Messages
10980+
type: COUNTER
10981+
unit: COUNT
10982+
aggregation: AVG
10983+
derivative: NON_NEGATIVE_DERIVATIVE
1096810984
- name: intentage
1096910985
exported_name: intentage
1097010986
description: Cumulative age of locks

pkg/gossip/client.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,8 +219,10 @@ func (c *client) sendGossip(g *Gossip, stream Gossip_GossipClient, firstReq bool
219219
infosSent := int64(len(delta))
220220
c.clientMetrics.BytesSent.Inc(bytesSent)
221221
c.clientMetrics.InfosSent.Inc(infosSent)
222+
c.clientMetrics.MessagesSent.Inc(1)
222223
c.nodeMetrics.BytesSent.Inc(bytesSent)
223224
c.nodeMetrics.InfosSent.Inc(infosSent)
225+
c.nodeMetrics.MessagesSent.Inc(1)
224226

225227
if log.V(1) {
226228
ctx := c.AnnotateCtx(stream.Context())
@@ -247,8 +249,10 @@ func (c *client) handleResponse(ctx context.Context, g *Gossip, reply *Response)
247249
infosReceived := int64(len(reply.Delta))
248250
c.clientMetrics.BytesReceived.Inc(bytesReceived)
249251
c.clientMetrics.InfosReceived.Inc(infosReceived)
252+
c.clientMetrics.MessagesReceived.Inc(1)
250253
c.nodeMetrics.BytesReceived.Inc(bytesReceived)
251254
c.nodeMetrics.InfosReceived.Inc(infosReceived)
255+
c.nodeMetrics.MessagesReceived.Inc(1)
252256

253257
// Combine remote node's infostore delta with ours.
254258
if reply.Delta != nil {
@@ -326,6 +330,7 @@ func (c *client) gossip(
326330

327331
errCh := make(chan error, 1)
328332
initCh := make(chan struct{}, 1)
333+
329334
// This wait group is used to allow the caller to wait until gossip
330335
// processing is terminated.
331336
wg.Add(1)
@@ -390,6 +395,9 @@ func (c *client) gossip(
390395
case <-initTimer.C:
391396
maybeRegister()
392397
case <-sendGossipChan:
398+
// We need to send the gossip delta to the remote server. Wait a bit to
399+
// batch the updates in one message.
400+
batchAndConsume(sendGossipChan, infosBatchDelay)
393401
if err := c.sendGossip(g, stream, count == 0); err != nil {
394402
return err
395403
}

pkg/gossip/gossip.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,10 @@ const (
118118
// we didn't need to tighten the last time we checked.
119119
gossipTightenInterval = time.Second
120120

121+
// infosBatchDelay controls how much time do we wait to batch infos before
122+
// sending them.
123+
infosBatchDelay = 10 * time.Millisecond
124+
121125
unknownNodeID roachpb.NodeID = 0
122126
)
123127

@@ -141,6 +145,18 @@ var (
141145
Measurement: "Connections",
142146
Unit: metric.Unit_COUNT,
143147
}
148+
MetaMessagesSent = metric.Metadata{
149+
Name: "gossip.messages.sent",
150+
Help: "Number of sent gossip messages",
151+
Measurement: "Messages",
152+
Unit: metric.Unit_COUNT,
153+
}
154+
MetaMessagesReceived = metric.Metadata{
155+
Name: "gossip.messages.received",
156+
Help: "Number of received gossip messages",
157+
Measurement: "Messages",
158+
Unit: metric.Unit_COUNT,
159+
}
144160
MetaInfosSent = metric.Metadata{
145161
Name: "gossip.infos.sent",
146162
Help: "Number of sent gossip Info objects",

pkg/gossip/gossip.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,8 @@ message MetricSnap {
7676
int64 bytes_sent = 3;
7777
int64 infos_received = 4;
7878
int64 infos_sent = 5;
79+
int64 messages_received = 7;
80+
int64 messages_sent = 8;
7981
int64 conns_refused = 6;
8082
}
8183

pkg/gossip/gossip_test.go

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1058,3 +1058,85 @@ func TestServerSendsHighStampsDiff(t *testing.T) {
10581058
return nil
10591059
})
10601060
}
1061+
1062+
// TestGossipBatching verifies that both server and client gossip updates are
1063+
// batched.
1064+
func TestGossipBatching(t *testing.T) {
1065+
defer leaktest.AfterTest(t)()
1066+
skip.UnderDeadlock(t, "might be flaky since it relies on some upper-bound timing")
1067+
skip.UnderRace(t, "might be flaky since it relies on some upper-bound timing")
1068+
1069+
stopper := stop.NewStopper()
1070+
defer stopper.Stop(context.Background())
1071+
1072+
// Shared cluster ID by all gossipers
1073+
clusterID := uuid.MakeV4()
1074+
1075+
local, localCtx := startGossip(clusterID, 1, stopper, t, metric.NewRegistry())
1076+
remote, remoteCtx := startGossip(clusterID, 2, stopper, t, metric.NewRegistry())
1077+
remote.mu.Lock()
1078+
rAddr := remote.mu.is.NodeAddr
1079+
remote.mu.Unlock()
1080+
local.manage(localCtx)
1081+
remote.manage(remoteCtx)
1082+
1083+
// Start a client connection to the remote node
1084+
local.mu.Lock()
1085+
local.startClientLocked(rAddr, roachpb.Locality{}, localCtx)
1086+
local.mu.Unlock()
1087+
1088+
// Wait for connection to be established
1089+
var c *client
1090+
testutils.SucceedsSoon(t, func() error {
1091+
c = local.findClient(func(c *client) bool { return c.addr.String() == rAddr.String() })
1092+
if c == nil {
1093+
return fmt.Errorf("client not found")
1094+
}
1095+
return nil
1096+
})
1097+
1098+
// Prepare 10,000 keys to gossip. This is a large enough number to allow
1099+
// batching to kick in.
1100+
numKeys := 10_000
1101+
localKeys := make([]string, numKeys)
1102+
remoteKeys := make([]string, numKeys)
1103+
for i := 0; i < numKeys; i++ {
1104+
localKeys[i] = fmt.Sprintf("local-key-%d", i)
1105+
remoteKeys[i] = fmt.Sprintf("remote-key-%d", i)
1106+
}
1107+
1108+
// Gossip the keys to both local and remote nodes.
1109+
for i := range numKeys {
1110+
require.NoError(t, local.AddInfo(localKeys[i], []byte("value"), time.Hour))
1111+
require.NoError(t, remote.AddInfo(remoteKeys[i], []byte("value"), time.Hour))
1112+
}
1113+
1114+
// Wait for updates to propagate
1115+
testutils.SucceedsSoon(t, func() error {
1116+
for i := range numKeys {
1117+
if _, err := local.GetInfo(remoteKeys[i]); err != nil {
1118+
return err
1119+
}
1120+
if _, err := remote.GetInfo(localKeys[i]); err != nil {
1121+
return err
1122+
}
1123+
}
1124+
return nil
1125+
})
1126+
1127+
// Record the number of messages both the client and the server sent, and
1128+
// assert that it's within the expected bounds.
1129+
serverMessagesSentCount := remote.serverMetrics.MessagesSent.Count()
1130+
clientMessagesSentCount := local.serverMetrics.MessagesSent.Count()
1131+
1132+
fmt.Printf("client msgs sent: %+v\n", clientMessagesSentCount)
1133+
fmt.Printf("server msgs sent: %+v\n", serverMessagesSentCount)
1134+
1135+
// upperBoundMessages is the maximum number of sent messages we expect to see.
1136+
// Note that in reality with batching, we see 3-10 messages sent in this test,
1137+
// However, in order to avoid flakiness, we set a very high number here. The
1138+
// test would fail even with this high number if we don't have batching.
1139+
upperBoundMessages := int64(500)
1140+
require.LessOrEqual(t, serverMessagesSentCount, upperBoundMessages)
1141+
require.LessOrEqual(t, clientMessagesSentCount, upperBoundMessages)
1142+
}

pkg/gossip/server.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,8 +125,10 @@ func (s *server) Gossip(stream Gossip_GossipServer) error {
125125
infoCount := int64(len(reply.Delta))
126126
s.nodeMetrics.BytesSent.Inc(bytesSent)
127127
s.nodeMetrics.InfosSent.Inc(infoCount)
128+
s.nodeMetrics.MessagesSent.Inc(1)
128129
s.serverMetrics.BytesSent.Inc(bytesSent)
129130
s.serverMetrics.InfosSent.Inc(infoCount)
131+
s.serverMetrics.MessagesSent.Inc(1)
130132

131133
return stream.Send(reply)
132134
}
@@ -199,6 +201,9 @@ func (s *server) Gossip(stream Gossip_GossipServer) error {
199201
case err := <-errCh:
200202
return err
201203
case <-ready:
204+
// We just sleep here instead of calling batchAndConsume() because the
205+
// channel is closed, and sleeping won't block the sender of the channel.
206+
time.Sleep(infosBatchDelay)
202207
}
203208
}
204209
}
@@ -307,8 +312,10 @@ func (s *server) gossipReceiver(
307312
infosReceived := int64(len(args.Delta))
308313
s.nodeMetrics.BytesReceived.Inc(bytesReceived)
309314
s.nodeMetrics.InfosReceived.Inc(infosReceived)
315+
s.nodeMetrics.MessagesReceived.Inc(1)
310316
s.serverMetrics.BytesReceived.Inc(bytesReceived)
311317
s.serverMetrics.InfosReceived.Inc(infosReceived)
318+
s.serverMetrics.MessagesReceived.Inc(1)
312319

313320
freshCount, err := s.mu.is.combine(args.Delta, args.NodeID)
314321
if err != nil {

pkg/gossip/status.go

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ type Metrics struct {
2626
BytesSent *metric.Counter
2727
InfosReceived *metric.Counter
2828
InfosSent *metric.Counter
29+
MessagesSent *metric.Counter
30+
MessagesReceived *metric.Counter
2931
CallbacksProcessed *metric.Counter
3032
CallbacksPending *metric.Gauge
3133
CallbacksProcessingDuration metric.IHistogram
@@ -39,6 +41,8 @@ func makeMetrics() Metrics {
3941
BytesSent: metric.NewCounter(MetaBytesSent),
4042
InfosReceived: metric.NewCounter(MetaInfosReceived),
4143
InfosSent: metric.NewCounter(MetaInfosSent),
44+
MessagesReceived: metric.NewCounter(MetaMessagesReceived),
45+
MessagesSent: metric.NewCounter(MetaMessagesSent),
4246
CallbacksProcessed: metric.NewCounter(MetaCallbacksProcessed),
4347
CallbacksPending: metric.NewGauge(MetaCallbacksPending),
4448
CallbacksProcessingDuration: metric.NewHistogram(metric.HistogramOptions{
@@ -63,11 +67,13 @@ func (m Metrics) String() string {
6367
// Snapshot returns a snapshot of the metrics.
6468
func (m Metrics) Snapshot() MetricSnap {
6569
return MetricSnap{
66-
ConnsRefused: m.ConnectionsRefused.Count(),
67-
BytesReceived: m.BytesReceived.Count(),
68-
BytesSent: m.BytesSent.Count(),
69-
InfosReceived: m.InfosReceived.Count(),
70-
InfosSent: m.InfosSent.Count(),
70+
ConnsRefused: m.ConnectionsRefused.Count(),
71+
BytesReceived: m.BytesReceived.Count(),
72+
BytesSent: m.BytesSent.Count(),
73+
InfosReceived: m.InfosReceived.Count(),
74+
InfosSent: m.InfosSent.Count(),
75+
MessagesReceived: m.MessagesReceived.Count(),
76+
MessagesSent: m.MessagesSent.Count(),
7177
}
7278
}
7379

@@ -77,7 +83,8 @@ func (m MetricSnap) String() string {
7783

7884
// SafeFormat implements the redact.SafeFormatter interface.
7985
func (m MetricSnap) SafeFormat(w redact.SafePrinter, _ rune) {
80-
w.Printf("infos %d/%d sent/received, bytes %dB/%dB sent/received",
86+
w.Printf("messages %d/%d sent/received, infos %d/%d sent/received, bytes %dB/%dB sent/received",
87+
m.MessagesSent, m.MessagesReceived,
8188
m.InfosSent, m.InfosReceived,
8289
m.BytesSent, m.BytesReceived)
8390
if m.ConnsRefused > 0 {

pkg/gossip/status_test.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,16 @@ func TestGossipStatus(t *testing.T) {
2121
},
2222
MaxConns: 3,
2323
MetricSnap: MetricSnap{
24-
BytesReceived: 1000,
25-
BytesSent: 2000,
26-
InfosReceived: 10,
27-
InfosSent: 20,
28-
ConnsRefused: 17,
24+
BytesReceived: 1000,
25+
BytesSent: 2000,
26+
InfosReceived: 10,
27+
InfosSent: 20,
28+
MessagesSent: 2,
29+
MessagesReceived: 1,
30+
ConnsRefused: 17,
2931
},
3032
}
31-
if exp, act := `gossip server (2/3 cur/max conns, infos 20/10 sent/received, bytes 2000B/1000B sent/received, refused 17 conns)
33+
if exp, act := `gossip server (2/3 cur/max conns, messages 2/1 sent/received, infos 20/10 sent/received, bytes 2000B/1000B sent/received, refused 17 conns)
3234
1: localhost:1234 (17s)
3335
4: localhost:4567 (18s)
3436
`, ss.String(); exp != act {
@@ -39,13 +41,13 @@ func TestGossipStatus(t *testing.T) {
3941
ConnStatus: []OutgoingConnStatus{
4042
{
4143
ConnStatus: ss.ConnStatus[0],
42-
MetricSnap: MetricSnap{BytesReceived: 77, BytesSent: 88, InfosReceived: 11, InfosSent: 22},
44+
MetricSnap: MetricSnap{BytesReceived: 77, BytesSent: 88, InfosReceived: 11, InfosSent: 22, MessagesReceived: 3, MessagesSent: 4},
4345
},
4446
},
4547
MaxConns: 3,
4648
}
4749
if exp, act := `gossip client (1/3 cur/max conns)
48-
1: localhost:1234 (17s: infos 22/11 sent/received, bytes 88B/77B sent/received)
50+
1: localhost:1234 (17s: messages 4/3 sent/received, infos 22/11 sent/received, bytes 88B/77B sent/received)
4951
`, cs.String(); exp != act {
5052
t.Errorf("expected:\n%q\ngot:\n%q", exp, act)
5153
}

pkg/gossip/util.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,11 @@ package gossip
88
import (
99
"bytes"
1010
"sort"
11+
"time"
1112

1213
"github.com/cockroachdb/cockroach/pkg/config"
1314
"github.com/cockroachdb/cockroach/pkg/roachpb"
15+
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
1416
)
1517

1618
// SystemConfigDeltaFilter keeps track of SystemConfig values so that unmodified
@@ -92,3 +94,18 @@ func (df *SystemConfigDeltaFilter) ForModified(
9294
}
9395
}
9496
}
97+
98+
// batchAndConsume waits on a channel to allow batching more events. It keeps
99+
// while consuming events as they come to avoid blocking the channel producer.
100+
func batchAndConsume(ch <-chan struct{}, batchDuration time.Duration) {
101+
var batchTimer timeutil.Timer
102+
defer batchTimer.Stop()
103+
batchTimer.Reset(batchDuration)
104+
for !batchTimer.Read {
105+
select {
106+
case <-ch: // event happened while we are waiting for our batchTimer to end.
107+
case <-batchTimer.C:
108+
batchTimer.Read = true
109+
}
110+
}
111+
}

pkg/sql/conn_executor.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3536,6 +3536,11 @@ func (ex *connExecutor) setTransactionModes(
35363536
if err := ex.state.setIsolationLevel(level); err != nil {
35373537
return pgerror.WithCandidateCode(err, pgcode.ActiveSQLTransaction)
35383538
}
3539+
if level != isolation.Serializable {
3540+
// TODO(#143497): we currently only support buffered writes under
3541+
// serializable isolation.
3542+
ex.state.mu.txn.SetBufferedWritesEnabled(false)
3543+
}
35393544
}
35403545
rwMode := modes.ReadWriteMode
35413546
if modes.AsOf.Expr != nil && asOfTs.IsEmpty() {

0 commit comments

Comments
 (0)