Skip to content

Commit d6b18ba

Browse files
committed
storeliveness: rename SendAsync to EnqueueMessage
Previously, the `MessageSender` interface exposed a method called `SendAsync` that queued Store Liveness messages for asynchronous delivery to remote stores. The name `SendAsync` was misleading because it implied messages were sent directly, when in reality they were enqueued to per-node queues and processed in batches before being sent over the network. The rename to `EnqueueMessage` better reflects that the method places messages onto an outgoing queue rather than performing the actual send operation. This patch renames `SendAsync` to `EnqueueMessage` throughout the storeliveness package. This change is also set up for the next commit which enables smearing of the heartbeats. References: #148210 Release note: None
1 parent 353e4b9 commit d6b18ba

File tree

4 files changed

+25
-25
lines changed

4 files changed

+25
-25
lines changed

pkg/kv/kvserver/storeliveness/support_manager.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ var Enabled = settings.RegisterBoolSetting(
3434
// MessageSender is the interface that defines how Store Liveness messages are
3535
// sent. Transport is the production implementation of MessageSender.
3636
type MessageSender interface {
37-
SendAsync(ctx context.Context, msg slpb.Message) (sent bool)
37+
EnqueueMessage(ctx context.Context, msg slpb.Message) (sent bool)
3838
}
3939

4040
// SupportManager orchestrates requesting and providing Store Liveness support.
@@ -323,7 +323,7 @@ func (sm *SupportManager) sendHeartbeats(ctx context.Context) {
323323
// Send heartbeats to each remote store.
324324
successes := 0
325325
for _, msg := range heartbeats {
326-
if sent := sm.sender.SendAsync(ctx, msg); sent {
326+
if sent := sm.sender.EnqueueMessage(ctx, msg); sent {
327327
successes++
328328
} else {
329329
log.KvExec.Warningf(ctx, "failed to send heartbeat to store %+v", msg.To)
@@ -424,7 +424,7 @@ func (sm *SupportManager) handleMessages(ctx context.Context, msgs []*slpb.Messa
424424
sm.metrics.SupportForStores.Update(int64(sm.supporterStateHandler.getNumSupportFor()))
425425

426426
for _, response := range responses {
427-
_ = sm.sender.SendAsync(ctx, response)
427+
_ = sm.sender.EnqueueMessage(ctx, response)
428428
}
429429
log.KvExec.VInfof(ctx, 2, "sent %d heartbeat responses", len(responses))
430430
}

pkg/kv/kvserver/storeliveness/testutils.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ type testMessageSender struct {
9494
messages []slpb.Message
9595
}
9696

97-
func (tms *testMessageSender) SendAsync(_ context.Context, msg slpb.Message) (sent bool) {
97+
func (tms *testMessageSender) EnqueueMessage(_ context.Context, msg slpb.Message) (sent bool) {
9898
tms.mu.Lock()
9999
defer tms.mu.Unlock()
100100
tms.messages = append(tms.messages, msg)

pkg/kv/kvserver/storeliveness/transport.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -213,14 +213,14 @@ func (t *Transport) handleMessage(ctx context.Context, msg *slpb.Message) {
213213
t.metrics.MessagesReceived.Inc(1)
214214
}
215215

216-
// SendAsync implements the MessageSender interface. It sends a message to the
216+
// EnqueueMessage implements the MessageSender interface. It sends a message to the
217217
// recipient specified in the request, and returns false if the outgoing queue
218218
// is full or the node dialer's circuit breaker has tripped.
219219
//
220220
// The returned bool may be a false positive but will never be a false negative;
221221
// if sent is true the message may or may not actually be sent but if it's false
222222
// the message definitely was not sent.
223-
func (t *Transport) SendAsync(ctx context.Context, msg slpb.Message) (enqueued bool) {
223+
func (t *Transport) EnqueueMessage(ctx context.Context, msg slpb.Message) (enqueued bool) {
224224
toNodeID := msg.To.NodeID
225225
fromNodeID := msg.From.NodeID
226226
// If this is a message from one local store to another local store, do not
@@ -288,7 +288,7 @@ func (t *Transport) startProcessNewQueue(
288288
cleanup := func() {
289289
q, ok := t.getQueue(toNodeID)
290290
t.queues.Delete(toNodeID)
291-
// Account for all remaining messages in the queue. SendAsync may be
291+
// Account for all remaining messages in the queue. EnqueueMessage may be
292292
// writing to the queue concurrently, so it's possible that we won't
293293
// account for a few messages below.
294294
if ok {

pkg/kv/kvserver/storeliveness/transport_test.go

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ type clockWithManualSource struct {
6666

6767
// transportTester contains objects needed to test the Store Liveness Transport.
6868
// Typical usage will add multiple nodes with AddNode, add multiple stores with
69-
// AddStore, and send messages with SendAsync.
69+
// AddStore, and send messages with EnqueueMessage.
7070
type transportTester struct {
7171
t testing.TB
7272
st *cluster.Settings
@@ -199,7 +199,7 @@ func TestTransportSendAndReceive(t *testing.T) {
199199
// Send messages between each pair of stores.
200200
for _, from := range stores {
201201
for _, to := range stores {
202-
tt.transports[from.NodeID].SendAsync(ctx, makeMsg(from, to))
202+
tt.transports[from.NodeID].EnqueueMessage(ctx, makeMsg(from, to))
203203
}
204204
}
205205

@@ -261,7 +261,7 @@ func TestTransportRestartedNode(t *testing.T) {
261261
checkEnqueued := func(expectedEnqueued bool) {
262262
testutils.SucceedsSoon(
263263
t, func() error {
264-
enqueued := tt.transports[sender.NodeID].SendAsync(ctx, msg)
264+
enqueued := tt.transports[sender.NodeID].EnqueueMessage(ctx, msg)
265265
if enqueued != expectedEnqueued {
266266
return errors.Newf("enqueue success is still %v", enqueued)
267267
}
@@ -274,7 +274,7 @@ func TestTransportRestartedNode(t *testing.T) {
274274
initialSent := tt.transports[sender.NodeID].metrics.MessagesSent.Count()
275275
testutils.SucceedsSoon(
276276
t, func() error {
277-
tt.transports[sender.NodeID].SendAsync(ctx, msg)
277+
tt.transports[sender.NodeID].EnqueueMessage(ctx, msg)
278278
sent := tt.transports[sender.NodeID].metrics.MessagesSent.Count()
279279
if initialSent >= sent {
280280
return errors.Newf("message not sent yet; initial %d, current %d", initialSent, sent)
@@ -288,7 +288,7 @@ func TestTransportRestartedNode(t *testing.T) {
288288
initialDropped := tt.transports[sender.NodeID].metrics.MessagesSendDropped.Count()
289289
testutils.SucceedsSoon(
290290
t, func() error {
291-
tt.transports[sender.NodeID].SendAsync(ctx, msg)
291+
tt.transports[sender.NodeID].EnqueueMessage(ctx, msg)
292292
dropped := tt.transports[sender.NodeID].metrics.MessagesSendDropped.Count()
293293
if initialDropped >= dropped {
294294
return errors.Newf(
@@ -309,9 +309,9 @@ func TestTransportRestartedNode(t *testing.T) {
309309
return nil
310310
default:
311311
// To ensure messages start getting delivered, keep sending messages
312-
// out. Even after SendAsync returns true, messages may still not be
312+
// out. Even after EnqueueMessage returns true, messages may still not be
313313
// delivered (e.g. if the receiver node is not up yet).
314-
tt.transports[sender.NodeID].SendAsync(ctx, msg)
314+
tt.transports[sender.NodeID].EnqueueMessage(ctx, msg)
315315
}
316316
return errors.New("still waiting to receive message")
317317
},
@@ -322,7 +322,7 @@ func TestTransportRestartedNode(t *testing.T) {
322322
// The message is sent out successfully.
323323
checkEnqueued(true /* expectedEnqueued */)
324324
// The message sent as part of checkSend above will likely be dropped it's
325-
// also possible that the SendAsync races with the deletion of the send queue
325+
// also possible that the EnqueueMessage races with the deletion of the send queue
326326
// (due to the failed node dial), in which case a dropped message will not be
327327
// recorded.
328328
checkDropped()
@@ -338,7 +338,7 @@ func TestTransportRestartedNode(t *testing.T) {
338338
// fails after the circuit breaker kicks in.
339339
receiverStopper.Stop(context.Background())
340340
checkEnqueued(false /* expectedEnqueued */)
341-
// Subsequent calls to SendAsync are expected to result in messages being
341+
// Subsequent calls to EnqueueMessage are expected to result in messages being
342342
// dropped due to the tripped circuit breaker.
343343
checkDropped()
344344

@@ -380,8 +380,8 @@ func TestTransportSendToMissingStore(t *testing.T) {
380380

381381
// Send the message to the missing store first to ensure it doesn't affect the
382382
// receipt of the message to the existing store.
383-
require.True(t, tt.transports[sender.NodeID].SendAsync(ctx, missingMsg))
384-
require.True(t, tt.transports[sender.NodeID].SendAsync(ctx, existingMsg))
383+
require.True(t, tt.transports[sender.NodeID].EnqueueMessage(ctx, missingMsg))
384+
require.True(t, tt.transports[sender.NodeID].EnqueueMessage(ctx, existingMsg))
385385

386386
// Wait for the message to the existing store to be received.
387387
testutils.SucceedsSoon(
@@ -438,7 +438,7 @@ func TestTransportClockPropagation(t *testing.T) {
438438

439439
// Send a message from the sender to the receiver.
440440
msg := slpb.Message{Type: slpb.MsgHeartbeat, From: sender, To: receiver}
441-
require.True(t, tt.transports[sender.NodeID].SendAsync(ctx, msg))
441+
require.True(t, tt.transports[sender.NodeID].EnqueueMessage(ctx, msg))
442442

443443
// Wait for the message to be received.
444444
testutils.SucceedsSoon(
@@ -480,12 +480,12 @@ func TestTransportShortCircuit(t *testing.T) {
480480
handler := tt.AddStore(store2)
481481
tt.AddStore(store3)
482482

483-
// Reach in and set node 1's dialer to nil. If SendAsync attempts to dial a
483+
// Reach in and set node 1's dialer to nil. If EnqueueMessage attempts to dial a
484484
// node, it will panic.
485485
tt.transports[node1].dialer = nil
486486

487487
// Send messages between two stores on the same node.
488-
tt.transports[store1.NodeID].SendAsync(
488+
tt.transports[store1.NodeID].EnqueueMessage(
489489
ctx, slpb.Message{Type: slpb.MsgHeartbeat, From: store1, To: store2},
490490
)
491491
// The message is received.
@@ -506,7 +506,7 @@ func TestTransportShortCircuit(t *testing.T) {
506506
// we expect a panic.
507507
require.Panics(
508508
t, func() {
509-
tt.transports[store1.NodeID].SendAsync(
509+
tt.transports[store1.NodeID].EnqueueMessage(
510510
ctx, slpb.Message{Type: slpb.MsgHeartbeat, From: store1, To: store3},
511511
)
512512
}, "sending message to a remote store with a nil dialer",
@@ -536,7 +536,7 @@ func TestTransportIdleSendQueue(t *testing.T) {
536536
}
537537

538538
// Send and receive a message.
539-
require.True(t, tt.transports[sender.NodeID].SendAsync(ctx, msg))
539+
require.True(t, tt.transports[sender.NodeID].EnqueueMessage(ctx, msg))
540540
testutils.SucceedsSoon(
541541
t, func() error {
542542
select {
@@ -585,7 +585,7 @@ func TestTransportFullReceiveQueue(t *testing.T) {
585585
testutils.SucceedsSoon(
586586
t, func() error {
587587
// The message enqueue can fail temporarily if the sender queue fills up.
588-
if !tt.transports[sender.NodeID].SendAsync(ctx, msg) {
588+
if !tt.transports[sender.NodeID].EnqueueMessage(ctx, msg) {
589589
sendDropped++
590590
return errors.New("still waiting to enqueue message")
591591
}
@@ -614,7 +614,7 @@ func TestTransportFullReceiveQueue(t *testing.T) {
614614
},
615615
)
616616
// The receiver queue is full but the enqueue to the sender queue succeeds.
617-
require.True(t, tt.transports[sender.NodeID].SendAsync(ctx, msg))
617+
require.True(t, tt.transports[sender.NodeID].EnqueueMessage(ctx, msg))
618618
testutils.SucceedsSoon(
619619
t, func() error {
620620
if tt.transports[receiver.NodeID].metrics.MessagesReceiveDropped.Count() != int64(1) {

0 commit comments

Comments
 (0)