Skip to content
This repository was archived by the owner on Mar 14, 2025. It is now read-only.

Commit 656fb32

Browse files
committed
Mark finalized head subscription as part of alive loop.
1 parent a1636a9 commit 656fb32

File tree

8 files changed

+139
-4
lines changed

8 files changed

+139
-4
lines changed

common/client/mock_node_client_test.go

Lines changed: 33 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

common/client/mock_rpc_test.go

Lines changed: 33 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

common/client/node_lifecycle.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() {
119119
}
120120

121121
defer finalizedHeadsSub.Unsubscribe()
122+
n.rpc.SetAliveLoopFinalizedHeadSub(finalizedHeadsSub.sub)
122123
}
123124

124125
var pollCh <-chan time.Time

common/client/node_lifecycle_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -446,6 +446,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) {
446446
rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{}).Once()
447447
rpc.On("SubscribeToHeads", mock.Anything).Return(make(<-chan Head), newSub(t), nil).Once()
448448
rpc.On("SetAliveLoopSub", mock.Anything).Once()
449+
rpc.On("SetAliveLoopFinalizedHeadSub", mock.Anything).Once()
449450
lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel)
450451
node := newDialedNode(t, testNodeOpts{
451452
config: testNodeConfig{},
@@ -467,6 +468,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) {
467468
ch := make(chan Head)
468469
rpc.On("SubscribeToFinalizedHeads", mock.Anything).Return((<-chan Head)(ch), newSub(t), nil).Once()
469470
rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{}).Once()
471+
rpc.On("SetAliveLoopFinalizedHeadSub", mock.Anything).Once()
470472
name := "node-" + rand.Str(5)
471473
node := newSubscribedNode(t, testNodeOpts{
472474
config: testNodeConfig{},
@@ -501,6 +503,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) {
501503
ch := make(chan Head)
502504
close(ch)
503505
rpc.On("SubscribeToFinalizedHeads", mock.Anything).Return((<-chan Head)(ch), newSub(t), nil).Once()
506+
rpc.On("SetAliveLoopFinalizedHeadSub", mock.Anything).Once()
504507
lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel)
505508
node := newSubscribedNode(t, testNodeOpts{
506509
chainConfig: clientMocks.ChainConfig{
@@ -527,6 +530,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) {
527530
ch := make(chan Head, 1)
528531
ch <- head{BlockNumber: 10}.ToMockHead(t)
529532
rpc.On("SubscribeToFinalizedHeads", mock.Anything).Return((<-chan Head)(ch), newSub(t), nil).Once()
533+
rpc.On("SetAliveLoopFinalizedHeadSub", mock.Anything).Once()
530534
lggr, observed := logger.TestObserved(t, zap.DebugLevel)
531535
noNewFinalizedHeadsThreshold := tests.TestInterval
532536
node := newSubscribedNode(t, testNodeOpts{
@@ -560,6 +564,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) {
560564
rpc := newMockNodeClient[types.ID, Head](t)
561565
rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{}).Once()
562566
rpc.On("SubscribeToFinalizedHeads", mock.Anything).Return(make(<-chan Head), newSub(t), nil).Once()
567+
rpc.On("SetAliveLoopFinalizedHeadSub", mock.Anything).Once()
563568
lggr, observed := logger.TestObserved(t, zap.DebugLevel)
564569
noNewFinalizedHeadsThreshold := tests.TestInterval
565570
node := newSubscribedNode(t, testNodeOpts{
@@ -593,6 +598,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) {
593598
sub.On("Err").Return((<-chan error)(errCh))
594599
sub.On("Unsubscribe").Once()
595600
rpc.On("SubscribeToFinalizedHeads", mock.Anything).Return((<-chan Head)(nil), sub, nil).Once()
601+
rpc.On("SetAliveLoopFinalizedHeadSub", mock.Anything).Once()
596602
lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel)
597603
node := newSubscribedNode(t, testNodeOpts{
598604
chainConfig: clientMocks.ChainConfig{
@@ -1116,6 +1122,7 @@ func TestUnit_NodeLifecycle_outOfSyncLoop(t *testing.T) {
11161122
outOfSyncSubscription.On("Unsubscribe").Once()
11171123
ch := make(chan Head)
11181124
rpc.On("SubscribeToFinalizedHeads", mock.Anything).Return((<-chan Head)(ch), outOfSyncSubscription, nil).Once()
1125+
rpc.On("SetAliveLoopFinalizedHeadSub", mock.Anything).Once()
11191126

11201127
setupRPCForAliveLoop(t, rpc)
11211128

common/client/types.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ type NodeClient[
6666
ClientVersion(context.Context) (string, error)
6767
SubscribersCount() int32
6868
SetAliveLoopSub(types.Subscription)
69+
SetAliveLoopFinalizedHeadSub(types.Subscription)
6970
UnsubscribeAllExceptAliveLoop()
7071
IsSyncing(ctx context.Context) (bool, error)
7172
SubscribeToFinalizedHeads(_ context.Context) (<-chan HEAD, types.Subscription, error)

core/chains/evm/client/mocks/rpc_client.go

Lines changed: 33 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/chains/evm/client/rpc_client.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,8 @@ type rpcClient struct {
137137
subs []ethereum.Subscription
138138

139139
// Need to track the aliveLoop subscription, so we do not cancel it when checking lease on the MultiNode
140-
aliveLoopSub ethereum.Subscription
140+
aliveLoopHeadsSub ethereum.Subscription
141+
aliveLoopFinalizedHeadsSub ethereum.Subscription
141142

142143
// chStopInFlight can be closed to immediately cancel all in-flight requests on
143144
// this rpcClient. Closing and replacing should be serialized through
@@ -368,11 +369,18 @@ func (r *rpcClient) unsubscribeAll() {
368369
}
369370
r.subs = nil
370371
}
371-
func (r *rpcClient) SetAliveLoopSub(sub commontypes.Subscription) {
372+
func (r *rpcClient) SetAliveLoopSub(headsSub commontypes.Subscription) {
372373
r.stateMu.Lock()
373374
defer r.stateMu.Unlock()
374375

375-
r.aliveLoopSub = sub
376+
r.aliveLoopHeadsSub = headsSub
377+
}
378+
379+
func (r *rpcClient) SetAliveLoopFinalizedHeadSub(finalizedHeads commontypes.Subscription) {
380+
r.stateMu.Lock()
381+
defer r.stateMu.Unlock()
382+
383+
r.aliveLoopFinalizedHeadsSub = finalizedHeads
376384
}
377385

378386
// SubscribersCount returns the number of client subscribed to the node
@@ -389,7 +397,7 @@ func (r *rpcClient) UnsubscribeAllExceptAliveLoop() {
389397
defer r.stateMu.Unlock()
390398

391399
for _, s := range r.subs {
392-
if s != r.aliveLoopSub {
400+
if s != r.aliveLoopHeadsSub && s != r.aliveLoopFinalizedHeadsSub {
393401
s.Unsubscribe()
394402
}
395403
}

core/chains/evm/client/rpc_client_test.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,25 @@ func TestRPCClient_SubscribeNewHead(t *testing.T) {
318318
require.NoError(t, err)
319319
checkClosedRPCClientShouldRemoveExistingSub(t, ctx, sub, rpc)
320320
})
321+
t.Run("UnsubscribeAllExceptAliveLoop should keep finalized heads subscription open", func(t *testing.T) {
322+
server := testutils.NewWSServer(t, chainId, serverCallBack)
323+
wsURL := server.WSURL()
324+
325+
rpc := client.NewRPCClient(lggr, wsURL, &url.URL{}, "rpc", 1, chainId, commonclient.Primary, 1, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
326+
defer rpc.Close()
327+
require.NoError(t, rpc.Dial(ctx))
328+
329+
_, sub, err := rpc.SubscribeToFinalizedHeads(tests.Context(t))
330+
require.NoError(t, err)
331+
rpc.SetAliveLoopFinalizedHeadSub(sub)
332+
rpc.UnsubscribeAllExceptAliveLoop()
333+
select {
334+
case <-sub.Err():
335+
t.Fatal("Expected subscription to remain open")
336+
default:
337+
}
338+
checkClosedRPCClientShouldRemoveExistingSub(t, ctx, sub, rpc)
339+
})
321340
t.Run("Subscription error is properly wrapper", func(t *testing.T) {
322341
server := testutils.NewWSServer(t, chainId, serverCallBack)
323342
wsURL := server.WSURL()

0 commit comments

Comments
 (0)