Skip to content

Commit dd94e09

Browse files
authored
Node: Channel writes without blocking (wormhole-foundation#4276)
* Node: Channel writes without blocking * Fix a few more things
1 parent 95e2018 commit dd94e09

37 files changed

+136
-81
lines changed

node/cmd/spy/spy.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ func (s *spyServer) PublishSignedVAA(vaaBytes []byte) error {
126126
return err
127127
}
128128
}
129-
sub.ch <- message{vaaBytes: vaaBytes}
129+
sub.ch <- message{vaaBytes: vaaBytes} //nolint:channelcheck // Don't want to drop incoming VAAs
130130
continue
131131
}
132132

@@ -146,7 +146,7 @@ func (s *spyServer) PublishSignedVAA(vaaBytes []byte) error {
146146
return err
147147
}
148148
}
149-
sub.ch <- message{vaaBytes: vaaBytes}
149+
sub.ch <- message{vaaBytes: vaaBytes} //nolint:channelcheck // Don't want to drop incoming VAAs
150150
}
151151
}
152152

@@ -252,7 +252,7 @@ func newSpyServer(logger *zap.Logger) *spyServer {
252252
func DoWithTimeout(f func() error, d time.Duration) error {
253253
errChan := make(chan error, 1)
254254
go func() {
255-
errChan <- f()
255+
errChan <- f() //nolint:channelcheck // Has timeout below
256256
close(errChan)
257257
}()
258258
t := time.NewTimer(d)

node/hack/evm_test/wstest.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ func main() {
7777
case <-ctx.Done():
7878
return
7979
case err := <-headerSubscription.Err():
80-
errC <- fmt.Errorf("block subscription failed: %w", err)
80+
errC <- fmt.Errorf("block subscription failed: %w", err) //nolint:channelcheck // The watcher will exit anyway
8181
return
8282
case block := <-headSink:
8383
// These two pointers should have been checked before the event was placed on the channel, but just being safe.
@@ -114,7 +114,7 @@ func main() {
114114
case <-ctx.Done():
115115
return
116116
case err := <-messageSub.Err():
117-
errC <- fmt.Errorf("message subscription failed: %w", err)
117+
errC <- fmt.Errorf("message subscription failed: %w", err) //nolint:channelcheck // The watcher will exit anyway
118118
return
119119
case ev := <-messageC:
120120
logger.Info("Received a log event from the contract", zap.Any("ev", ev))

node/pkg/adminrpc/adminserver.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -840,7 +840,7 @@ func (s *nodePrivilegedService) InjectGovernanceVAA(ctx context.Context, req *no
840840

841841
vaaInjectionsTotal.Inc()
842842

843-
s.injectC <- &common.MessagePublication{
843+
s.injectC <- &common.MessagePublication{ //nolint:channelcheck // Only blocks this command
844844
TxID: ethcommon.Hash{}.Bytes(),
845845
Timestamp: v.Timestamp,
846846
Nonce: v.Nonce,
@@ -940,7 +940,7 @@ func (s *nodePrivilegedService) fetchMissing(
940940
// Inject into the gossip signed VAA receive path.
941941
// This has the same effect as if the VAA was received from the network
942942
// (verifying signature, storing in local DB...).
943-
s.signedInC <- &gossipv1.SignedVAAWithQuorum{
943+
s.signedInC <- &gossipv1.SignedVAAWithQuorum{ //nolint:channelcheck // Only blocks this command
944944
Vaa: vaaBytes,
945945
}
946946

node/pkg/common/channel_utils.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,29 @@ package common
22

33
import (
44
"context"
5+
6+
"github.com/prometheus/client_golang/prometheus"
7+
"github.com/prometheus/client_golang/prometheus/promauto"
8+
)
9+
10+
var (
11+
channelWriteDrops = promauto.NewCounterVec(
12+
prometheus.CounterOpts{
13+
Name: "wormhole_channel_write_drops",
14+
Help: "Total number of channel writes that were dropped due to channel overflow",
15+
}, []string{"channel_id"})
516
)
617

18+
// WriteToChannelWithoutBlocking attempts to write the specified event to the specified channel. If the write would block,
19+
// it increments the `channelWriteDrops` metric with the specified channel ID.
20+
func WriteToChannelWithoutBlocking[T any](channel chan<- T, evt T, label string) {
21+
select {
22+
case channel <- evt:
23+
default:
24+
channelWriteDrops.WithLabelValues(label).Inc()
25+
}
26+
}
27+
728
// ReadFromChannelWithTimeout reads events from the channel until a timeout occurs or the max maxCount is reached.
829
func ReadFromChannelWithTimeout[T any](ctx context.Context, ch <-chan T, maxCount int) ([]T, error) {
930
out := make([]T, 0, maxCount)

node/pkg/common/channel_utils_test.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,3 +78,17 @@ func TestReadFromChannelWithTimeout_TooMuchData(t *testing.T) {
7878
require.Equal(t, 1, len(observations))
7979
assert.Equal(t, 3, observations[0])
8080
}
81+
82+
func TestWriteToChannelWithoutBlocking(t *testing.T) {
83+
myChan := make(chan int, 1)
84+
assert.Equal(t, 0.0, getCounterValue(channelWriteDrops, "numbers"))
85+
WriteToChannelWithoutBlocking(myChan, 42, "numbers")
86+
assert.Equal(t, 0.0, getCounterValue(channelWriteDrops, "numbers"))
87+
WriteToChannelWithoutBlocking(myChan, 43, "numbers")
88+
assert.Equal(t, 1.0, getCounterValue(channelWriteDrops, "numbers"))
89+
WriteToChannelWithoutBlocking(myChan, 44, "numbers")
90+
assert.Equal(t, 2.0, getCounterValue(channelWriteDrops, "numbers"))
91+
WriteToChannelWithoutBlocking(myChan, 44, "different_label")
92+
assert.Equal(t, 1.0, getCounterValue(channelWriteDrops, "different_label"))
93+
assert.Equal(t, 2.0, getCounterValue(channelWriteDrops, "numbers"))
94+
}

node/pkg/common/guardianset.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ func (st *GuardianSetState) SetHeartbeat(addr common.Address, peerId peer.ID, hb
177177

178178
v[peerId] = hb
179179
if st.updateC != nil {
180-
st.updateC <- hb
180+
WriteToChannelWithoutBlocking(st.updateC, hb, "heartbeat")
181181
}
182182
return nil
183183
}

node/pkg/governor/governor_monitoring.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ import (
8181
"sort"
8282
"time"
8383

84+
"github.com/certusone/wormhole/node/pkg/common"
8485
"github.com/certusone/wormhole/node/pkg/guardiansigner"
8586
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
8687
publicrpcv1 "github.com/certusone/wormhole/node/pkg/proto/publicrpc/v1"
@@ -628,7 +629,7 @@ func (gov *ChainGovernor) publishConfig(ctx context.Context, hb *gossipv1.Heartb
628629
panic(err)
629630
}
630631

631-
sendC <- b
632+
common.WriteToChannelWithoutBlocking(sendC, b, "gov_config_gossip_out")
632633
}
633634

634635
func (gov *ChainGovernor) publishStatus(ctx context.Context, hb *gossipv1.Heartbeat, sendC chan<- []byte, startTime time.Time, guardianSigner guardiansigner.GuardianSigner, ourAddr ethCommon.Address) {
@@ -713,5 +714,5 @@ func (gov *ChainGovernor) publishStatus(ctx context.Context, hb *gossipv1.Heartb
713714
panic(err)
714715
}
715716

716-
sendC <- b
717+
common.WriteToChannelWithoutBlocking(sendC, b, "gov_status_gossip_out")
717718
}

node/pkg/governor/governor_prices.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ func (gov *ChainGovernor) queryCoinGecko(ctx context.Context) error {
158158
for {
159159
select {
160160
case <-ticker.C:
161-
throttle <- 1
161+
throttle <- 1 //nolint:channelcheck // We want this to block for throttling
162162
case <-ctx.Done():
163163
return
164164
}

node/pkg/node/node.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@ const (
3636
// per second during normal operations. However, since some messages get published immediately, we need to allow extra room.
3737
inboundBatchObservationBufferSize = 1000
3838

39+
// inboundMessageBufferSize configures the size of the msgC channel used to publish new observations from the watcher to the processor.
40+
// This channel is shared across all the watchers so we don't want to hang up other watchers while the processor is handling an observation from one.
41+
inboundMessageBufferSize = 1000
42+
3943
// inboundSignedVaaBufferSize configures the size of the signedInC channel that contains VAAs from other Guardians.
4044
// One VAA takes roughly 0.01ms to process if we already have one in the database and 2ms if we don't.
4145
// So in the worst case the entire queue can be processed in 2s.
@@ -143,7 +147,7 @@ func (g *G) initializeBasic(rootCtxCancel context.CancelFunc) {
143147
g.gossipAttestationSendC = make(chan []byte, gossipAttestationSendBufferSize)
144148
g.gossipVaaSendC = make(chan []byte, gossipVaaSendBufferSize)
145149
g.batchObsvC = makeChannelPair[*common.MsgWithTimeStamp[gossipv1.SignedObservationBatch]](inboundBatchObservationBufferSize)
146-
g.msgC = makeChannelPair[*common.MessagePublication](0)
150+
g.msgC = makeChannelPair[*common.MessagePublication](inboundMessageBufferSize)
147151
g.setC = makeChannelPair[*common.GuardianSet](1) // This needs to be a buffered channel because of a circular dependency between processor and accountant during startup.
148152
g.signedInC = makeChannelPair[*gossipv1.SignedVAAWithQuorum](inboundSignedVaaBufferSize)
149153
g.obsvReqC = makeChannelPair[*gossipv1.ObservationRequest](observationRequestInboundBufferSize)

node/pkg/node/options.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -398,7 +398,7 @@ func GuardianOptionWatchers(watcherConfigs []watchers.WatcherConfig, ibcWatcherC
398398
zap.String("txID", msg.TxIDString()),
399399
zap.Time("timestamp", msg.Timestamp))
400400
} else {
401-
g.msgC.writeC <- msg
401+
g.msgC.writeC <- msg //nolint:channelcheck // The channel to the processor is buffered and shared across chains, if it backs up we should stop processing new observations
402402
}
403403
}
404404
}
@@ -424,7 +424,7 @@ func GuardianOptionWatchers(watcherConfigs []watchers.WatcherConfig, ibcWatcherC
424424
zap.Stringer("watcherChainId", chainId),
425425
)
426426
}
427-
g.queryResponseC.writeC <- response
427+
g.queryResponseC.writeC <- response //nolint:channelcheck // This channel is buffered, if it backs up we'll stop processing queries until it clears
428428
}
429429
}
430430
}(chainQueryResponseC[chainId], chainId)

0 commit comments

Comments
 (0)