From 7004243e377480d28736e6180f2d146c576e17cd Mon Sep 17 00:00:00 2001 From: yashnevatia Date: Thu, 17 Jul 2025 13:00:21 +0100 Subject: [PATCH 1/6] rpc proxy flag --- multinode/node.go | 5 ++++- multinode/node_lifecycle.go | 4 ++-- multinode/node_test.go | 2 +- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/multinode/node.go b/multinode/node.go index 8e8634a..d14f244 100644 --- a/multinode/node.go +++ b/multinode/node.go @@ -105,7 +105,8 @@ type node[ ws *url.URL http *url.URL - rpc RPC + rpc RPC + isRPCProxy bool stateMu sync.RWMutex // protects state* fields state nodeState @@ -136,6 +137,7 @@ func NewNode[ nodeOrder int32, rpc RPC, chainFamily string, + isRPCProxy bool, ) Node[CHAIN_ID, RPC] { n := new(node[CHAIN_ID, HEAD, RPC]) n.name = name @@ -162,6 +164,7 @@ func NewNode[ ) n.lfcLog = logger.Named(lggr, "Lifecycle") n.rpc = rpc + n.isRPCProxy = isRPCProxy n.chainFamily = chainFamily return n } diff --git a/multinode/node_lifecycle.go b/multinode/node_lifecycle.go index abe1a9f..56744ed 100644 --- a/multinode/node_lifecycle.go +++ b/multinode/node_lifecycle.go @@ -164,7 +164,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() { // threshold amount of time, mark it broken lggr.Errorw(fmt.Sprintf("RPC endpoint detected out of sync; no new heads received for %s (last head received was %v)", noNewHeadsTimeoutThreshold, localHighestChainInfo.BlockNumber), "nodeState", n.getCachedState(), "latestReceivedBlockNumber", localHighestChainInfo.BlockNumber, "noNewHeadsTimeoutThreshold", noNewHeadsTimeoutThreshold) if n.poolInfoProvider != nil { - if l, _ := n.poolInfoProvider.LatestChainInfo(); l < 2 { + if l, _ := n.poolInfoProvider.LatestChainInfo(); l < 2 && !n.isRPCProxy { lggr.Criticalf("RPC endpoint detected out of sync; %s %s", msgCannotDisable, msgDegradedState) // We don't necessarily want to wait the full timeout to check again, we should // check regularly and log noisily in this state @@ -190,7 +190,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() { // threshold amount of time, mark it broken lggr.Errorw(fmt.Sprintf("RPC's finalized state is out of sync; no new finalized heads received for %s (last finalized head received was %v)", noNewFinalizedBlocksTimeoutThreshold, localHighestChainInfo.FinalizedBlockNumber), "latestReceivedBlockNumber", localHighestChainInfo.BlockNumber) if n.poolInfoProvider != nil { - if l, _ := n.poolInfoProvider.LatestChainInfo(); l < 2 { + if l, _ := n.poolInfoProvider.LatestChainInfo(); l < 2 && !n.isRPCProxy { lggr.Criticalf("RPC's finalized state is out of sync; %s %s", msgCannotDisable, msgDegradedState) // We don't necessarily want to wait the full timeout to check again, we should // check regularly and log noisily in this state diff --git a/multinode/node_test.go b/multinode/node_test.go index 26dc9d8..3d87b92 100644 --- a/multinode/node_test.go +++ b/multinode/node_test.go @@ -109,7 +109,7 @@ func newTestNode(t *testing.T, opts testNodeOpts) testNode { require.NoError(t, err) nodeI := NewNode[ID, Head, RPCClient[ID, Head]](opts.config, opts.chainConfig, opts.lggr, nodeMetrics, - opts.wsuri, opts.httpuri, opts.name, opts.id, opts.chainID, opts.nodeOrder, opts.rpc, opts.chainFamily) + opts.wsuri, opts.httpuri, opts.name, opts.id, opts.chainID, opts.nodeOrder, opts.rpc, opts.chainFamily, false) return testNode{ nodeI.(*node[ID, Head, RPCClient[ID, Head]]), From fbf5ef74cc0fe831055e58b651ffb31ab0214b7b Mon Sep 17 00:00:00 2001 From: yashnevatia Date: Thu, 17 Jul 2025 13:08:53 +0100 Subject: [PATCH 2/6] comments --- multinode/node_lifecycle.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/multinode/node_lifecycle.go b/multinode/node_lifecycle.go index 56744ed..2334d6c 100644 --- a/multinode/node_lifecycle.go +++ b/multinode/node_lifecycle.go @@ -164,6 +164,8 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() { // threshold amount of time, mark it broken lggr.Errorw(fmt.Sprintf("RPC endpoint detected out of sync; no new heads received for %s (last head received was %v)", noNewHeadsTimeoutThreshold, localHighestChainInfo.BlockNumber), "nodeState", n.getCachedState(), "latestReceivedBlockNumber", localHighestChainInfo.BlockNumber, "noNewHeadsTimeoutThreshold", noNewHeadsTimeoutThreshold) if n.poolInfoProvider != nil { + // if its the only node and its not a proxy, keep waiting for sync, check LatestChainInfo + // if its a proxy, then declare out of sync and try reconnecting because proxy might return a healthier rpc if l, _ := n.poolInfoProvider.LatestChainInfo(); l < 2 && !n.isRPCProxy { lggr.Criticalf("RPC endpoint detected out of sync; %s %s", msgCannotDisable, msgDegradedState) // We don't necessarily want to wait the full timeout to check again, we should @@ -190,6 +192,8 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() { // threshold amount of time, mark it broken lggr.Errorw(fmt.Sprintf("RPC's finalized state is out of sync; no new finalized heads received for %s (last finalized head received was %v)", noNewFinalizedBlocksTimeoutThreshold, localHighestChainInfo.FinalizedBlockNumber), "latestReceivedBlockNumber", localHighestChainInfo.BlockNumber) if n.poolInfoProvider != nil { + // if its the only node and its not a proxy, keep waiting for sync, check LatestChainInfo + // if its a proxy, then declare out of sync and try reconnecting because proxy might return a healthier rpc if l, _ := n.poolInfoProvider.LatestChainInfo(); l < 2 && !n.isRPCProxy { lggr.Criticalf("RPC's finalized state is out of sync; %s %s", msgCannotDisable, msgDegradedState) // We don't necessarily want to wait the full timeout to check again, we should From 9522f867c24b8c43aef83000cf15a450b4ecfa57 Mon Sep 17 00:00:00 2001 From: yashnevatia Date: Fri, 18 Jul 2025 16:35:43 +0100 Subject: [PATCH 3/6] adding tests --- multinode/go.mod | 2 +- multinode/go.sum | 4 +- multinode/node_lifecycle.go | 20 +++-- multinode/node_lifecycle_test.go | 149 +++++++++++++++++++++++++++++-- multinode/node_test.go | 3 +- 5 files changed, 160 insertions(+), 18 deletions(-) diff --git a/multinode/go.mod b/multinode/go.mod index eac7c13..aff8a5a 100644 --- a/multinode/go.mod +++ b/multinode/go.mod @@ -9,7 +9,7 @@ require ( github.com/pkg/errors v0.9.1 github.com/prometheus/client_model v0.6.1 github.com/smartcontractkit/chainlink-common v0.7.0 - github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20250502183956-f93abf9d14bc + github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20250717121125-2350c82883e2 github.com/stretchr/testify v1.10.0 go.uber.org/zap v1.27.0 ) diff --git a/multinode/go.sum b/multinode/go.sum index ab220c5..f3c40e4 100644 --- a/multinode/go.sum +++ b/multinode/go.sum @@ -74,8 +74,8 @@ github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= github.com/smartcontractkit/chainlink-common v0.7.0 h1:QThOrHKn+du8CTmzJPCha0Nwnvw0tonIEAQca+dnmE0= github.com/smartcontractkit/chainlink-common v0.7.0/go.mod h1:pptbsF6z90IGCewkCgDMBxNYjfSOyW9X9l2jzYyQgmk= -github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20250502183956-f93abf9d14bc h1:HXG3U4nWgzKtH8rjODkcbn4x2M2ZvD2jQotCWRpDM1Q= -github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20250502183956-f93abf9d14bc/go.mod h1:jo+cUqNcHwN8IF7SInQNXDZ8qzBsyMpnLdYbDswviFc= +github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20250717121125-2350c82883e2 h1:ysZjKH+BpWlQhF93kr/Lc668UlCvT9NjfcsGdZT19I8= +github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20250717121125-2350c82883e2/go.mod h1:jo+cUqNcHwN8IF7SInQNXDZ8qzBsyMpnLdYbDswviFc= github.com/smartcontractkit/libocr v0.0.0-20250220133800-f3b940c4f298 h1:PKiqnVOTChlH4a4ljJKL3OKGRgYfIpJS4YD1daAIKks= github.com/smartcontractkit/libocr v0.0.0-20250220133800-f3b940c4f298/go.mod h1:Mb7+/LC4edz7HyHxX4QkE42pSuov4AV68+AxBXAap0o= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/multinode/node_lifecycle.go b/multinode/node_lifecycle.go index 2334d6c..cffa3f7 100644 --- a/multinode/node_lifecycle.go +++ b/multinode/node_lifecycle.go @@ -128,7 +128,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() { if pollFailureThreshold > 0 && pollFailures >= pollFailureThreshold { lggr.Errorw(fmt.Sprintf("RPC endpoint failed to respond to %d consecutive polls", pollFailures), "pollFailures", pollFailures, "nodeState", n.getCachedState()) if n.poolInfoProvider != nil { - if l, _ := n.poolInfoProvider.LatestChainInfo(); l < 2 { + if l, _ := n.poolInfoProvider.LatestChainInfo(); l < 2 && !n.isRPCProxy { lggr.Criticalf("RPC endpoint failed to respond to polls; %s %s", msgCannotDisable, msgDegradedState) continue } @@ -138,7 +138,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() { } if outOfSync, liveNodes := n.isOutOfSyncWithPool(); outOfSync { // note: there must be another live node for us to be out of sync - if liveNodes < 2 { + if liveNodes < 2 && !n.isRPCProxy { lggr.Criticalf("RPC endpoint has fallen behind; %s %s", msgCannotDisable, msgDegradedState) continue } @@ -164,9 +164,10 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() { // threshold amount of time, mark it broken lggr.Errorw(fmt.Sprintf("RPC endpoint detected out of sync; no new heads received for %s (last head received was %v)", noNewHeadsTimeoutThreshold, localHighestChainInfo.BlockNumber), "nodeState", n.getCachedState(), "latestReceivedBlockNumber", localHighestChainInfo.BlockNumber, "noNewHeadsTimeoutThreshold", noNewHeadsTimeoutThreshold) if n.poolInfoProvider != nil { - // if its the only node and its not a proxy, keep waiting for sync, check LatestChainInfo + // if its the only node and its not a proxy, keep waiting for sync (check LatestChainInfo) // if its a proxy, then declare out of sync and try reconnecting because proxy might return a healthier rpc if l, _ := n.poolInfoProvider.LatestChainInfo(); l < 2 && !n.isRPCProxy { + lggr.Debug("ENTERING HERE") lggr.Criticalf("RPC endpoint detected out of sync; %s %s", msgCannotDisable, msgDegradedState) // We don't necessarily want to wait the full timeout to check again, we should // check regularly and log noisily in this state @@ -192,7 +193,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() { // threshold amount of time, mark it broken lggr.Errorw(fmt.Sprintf("RPC's finalized state is out of sync; no new finalized heads received for %s (last finalized head received was %v)", noNewFinalizedBlocksTimeoutThreshold, localHighestChainInfo.FinalizedBlockNumber), "latestReceivedBlockNumber", localHighestChainInfo.BlockNumber) if n.poolInfoProvider != nil { - // if its the only node and its not a proxy, keep waiting for sync, check LatestChainInfo + // if its the only node and its not a proxy, keep waiting for sync (check LatestChainInfo) // if its a proxy, then declare out of sync and try reconnecting because proxy might return a healthier rpc if l, _ := n.poolInfoProvider.LatestChainInfo(); l < 2 && !n.isRPCProxy { lggr.Criticalf("RPC's finalized state is out of sync; %s %s", msgCannotDisable, msgDegradedState) @@ -460,9 +461,14 @@ func (n *node[CHAIN_ID, HEAD, RPC]) outOfSyncLoop(syncIssues syncStatus) { case <-time.After(zombieNodeCheckInterval(noNewHeadsTimeoutThreshold)): if n.poolInfoProvider != nil { if l, _ := n.poolInfoProvider.LatestChainInfo(); l < 1 { - lggr.Criticalw("RPC endpoint is still out of sync, but there are no other available nodes. This RPC node will be forcibly moved back into the live pool in a degraded state", "syncIssues", syncIssues) - n.declareInSync() - return + if n.isRPCProxy { + n.declareUnreachable() + return + } else { + lggr.Criticalw("RPC endpoint is still out of sync, but there are no other available nodes. This RPC node will be forcibly moved back into the live pool in a degraded state", "syncIssues", syncIssues) + n.declareInSync() + return + } } } case err := <-headsSub.Errors: diff --git a/multinode/node_lifecycle_test.go b/multinode/node_lifecycle_test.go index 43469f7..21a6261 100644 --- a/multinode/node_lifecycle_test.go +++ b/multinode/node_lifecycle_test.go @@ -202,6 +202,33 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { tests.AssertLogEventually(t, observedLogs, fmt.Sprintf("RPC endpoint failed to respond to %d consecutive polls", pollFailureThreshold)) assert.Equal(t, nodeStateAlive, node.State()) }) + t.Run("with threshold poll failures, we are the last node alive, but is a proxy, transitions to unreachable", func(t *testing.T) { + t.Parallel() + rpc := newMockRPCClient[ID, Head](t) + lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel) + const pollFailureThreshold = 3 + node := newSubscribedNode(t, testNodeOpts{ + config: testNodeConfig{ + pollFailureThreshold: pollFailureThreshold, + pollInterval: tests.TestInterval, + }, + rpc: rpc, + lggr: lggr, + isRPCProxy: true, + }) + defer func() { assert.NoError(t, node.close()) }() + poolInfo := newMockPoolChainInfoProvider(t) + poolInfo.On("LatestChainInfo").Return(1, ChainInfo{ + BlockNumber: 20, + }).Once() + node.SetPoolChainInfoProvider(poolInfo) + rpc.On("GetInterceptedChainInfo").Return(ChainInfo{BlockNumber: 20}, ChainInfo{BlockNumber: 20}) + pollError := errors.New("failed to get ClientVersion") + rpc.On("ClientVersion", mock.Anything).Return("", pollError) + node.declareAlive() + tests.AssertLogEventually(t, observedLogs, fmt.Sprintf("RPC endpoint failed to respond to %d consecutive polls", pollFailureThreshold)) + assert.Equal(t, nodeStateUnreachable, node.State()) + }) t.Run("when behind more than SyncThreshold, transitions to out of sync", func(t *testing.T) { t.Parallel() rpc := newMockRPCClient[ID, Head](t) @@ -264,6 +291,42 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { node.declareAlive() tests.AssertLogEventually(t, observedLogs, fmt.Sprintf("RPC endpoint has fallen behind; %s %s", msgCannotDisable, msgDegradedState)) }) + t.Run("when behind more than SyncThreshold, we are the last live node, but is a proxy, transitions to out of sync -> unreachable", func(t *testing.T) { + t.Parallel() + rpc := newMockRPCClient[ID, Head](t) + lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel) + const syncThreshold = 10 + node := newSubscribedNode(t, testNodeOpts{ + config: testNodeConfig{ + pollInterval: tests.TestInterval, + syncThreshold: syncThreshold, + selectionMode: NodeSelectionModeRoundRobin, + }, + rpc: rpc, + lggr: lggr, + isRPCProxy: true, + }) + defer func() { assert.NoError(t, node.close()) }() + rpc.On("ClientVersion", mock.Anything).Return("", nil) + const mostRecentBlock = 20 + rpc.On("GetInterceptedChainInfo").Return(ChainInfo{BlockNumber: mostRecentBlock}, ChainInfo{BlockNumber: 30}).Twice() + poolInfo := newMockPoolChainInfoProvider(t) + poolInfo.On("LatestChainInfo").Return(1, ChainInfo{ + BlockNumber: syncThreshold + mostRecentBlock + 1, + TotalDifficulty: big.NewInt(10), + }) + node.SetPoolChainInfoProvider(poolInfo) + // tries to redial in outOfSync + rpc.On("Dial", mock.Anything).Return(errors.New("failed to dial")).Run(func(_ mock.Arguments) { + assert.Equal(t, nodeStateOutOfSync, node.State()) + }).Once() + rpc.On("Dial", mock.Anything).Run(func(_ mock.Arguments) { + require.Equal(t, nodeStateOutOfSync, node.State()) + }).Return(errors.New("failed to dial")).Maybe() + node.declareAlive() + tests.AssertLogEventually(t, observedLogs, "Dial failed: Node is unreachable") + assert.Equal(t, nodeStateUnreachable, node.State()) + }) t.Run("when behind but SyncThreshold=0, stay alive", func(t *testing.T) { t.Parallel() rpc := newMockRPCClient[ID, Head](t) @@ -333,7 +396,36 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { tests.AssertLogEventually(t, observedLogs, fmt.Sprintf("RPC endpoint detected out of sync; %s %s", msgCannotDisable, msgDegradedState)) assert.Equal(t, nodeStateAlive, node.State()) }) - + t.Run("when no new heads received for threshold, we are the last live node, but is a proxy, transitions to out of sync -> unreachable", func(t *testing.T) { + t.Parallel() + rpc := newMockRPCClient[ID, Head](t) + rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{}) + lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel) + node := newSubscribedNode(t, testNodeOpts{ + config: testNodeConfig{}, + lggr: lggr, + chainConfig: clientMocks.ChainConfig{ + NoNewHeadsThresholdVal: tests.TestInterval, + }, + rpc: rpc, + isRPCProxy: true, + }) + defer func() { assert.NoError(t, node.close()) }() + poolInfo := newMockPoolChainInfoProvider(t) + poolInfo.On("LatestChainInfo").Return(1, ChainInfo{ + BlockNumber: 20, + TotalDifficulty: big.NewInt(10), + }).Once() + node.SetPoolChainInfoProvider(poolInfo) + // tries to redial in outOfSync + rpc.On("Dial", mock.Anything).Return(errors.New("failed to dial")).Run(func(_ mock.Arguments) { + assert.Equal(t, nodeStateOutOfSync, node.State()) + }).Once() + rpc.On("Dial", mock.Anything).Return(errors.New("failed to dial")).Maybe() + node.declareAlive() + tests.AssertLogEventually(t, observedLogs, "Dial failed: Node is unreachable") + assert.Equal(t, nodeStateUnreachable, node.State()) + }) t.Run("rpc closed head channel", func(t *testing.T) { t.Parallel() rpc := newMockRPCClient[ID, Head](t) @@ -528,12 +620,12 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { return node.State() == nodeStateUnreachable }) }) - t.Run("when no new finalized heads received for threshold but we are the last live node, forcibly stays alive", func(t *testing.T) { + t.Run("when no new finalized heads received for threshold, we are the last live node, but is a proxy, transitions to out of sync -> unreachable", func(t *testing.T) { t.Parallel() rpc := newMockRPCClient[ID, Head](t) rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{}).Once() rpc.On("SubscribeToFinalizedHeads", mock.Anything).Return(make(<-chan Head), newSub(t), nil).Once() - lggr, observed := logger.TestObserved(t, zap.DebugLevel) + lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel) noNewFinalizedHeadsThreshold := tests.TestInterval node := newSubscribedNode(t, testNodeOpts{ config: testNodeConfig{}, @@ -541,8 +633,9 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { NoNewFinalizedHeadsThresholdVal: noNewFinalizedHeadsThreshold, IsFinalityTagEnabled: true, }, - rpc: rpc, - lggr: lggr, + rpc: rpc, + lggr: lggr, + isRPCProxy: true, }) defer func() { assert.NoError(t, node.close()) }() poolInfo := newMockPoolChainInfoProvider(t) @@ -551,9 +644,15 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { TotalDifficulty: big.NewInt(10), }).Once() node.SetPoolChainInfoProvider(poolInfo) + // tries to redial in outOfSync + // tries to redial in outOfSync + rpc.On("Dial", mock.Anything).Return(errors.New("failed to dial")).Run(func(_ mock.Arguments) { + assert.Equal(t, nodeStateOutOfSync, node.State()) + }).Once() + rpc.On("Dial", mock.Anything).Return(errors.New("failed to dial")).Maybe() node.declareAlive() - tests.AssertLogEventually(t, observed, fmt.Sprintf("RPC's finalized state is out of sync; %s %s", msgCannotDisable, msgDegradedState)) - assert.Equal(t, nodeStateAlive, node.State()) + tests.AssertLogEventually(t, observedLogs, "Dial failed: Node is unreachable") + assert.Equal(t, nodeStateUnreachable, node.State()) }) t.Run("If finalized subscription returns an error, transitions to unreachable", func(t *testing.T) { t.Parallel() @@ -937,6 +1036,42 @@ func TestUnit_NodeLifecycle_outOfSyncLoop(t *testing.T) { return node.State() == nodeStateAlive }) }) + t.Run("becomes alive if there is no other nodes, unless proxy", func(t *testing.T) { + t.Parallel() + rpc := newMockRPCClient[ID, Head](t) + nodeChainID := RandomID() + lggr, _ := logger.TestObserved(t, zap.DebugLevel) + node := newAliveNode(t, testNodeOpts{ + chainConfig: clientMocks.ChainConfig{ + NoNewHeadsThresholdVal: tests.TestInterval, + }, + rpc: rpc, + chainID: nodeChainID, + lggr: lggr, + isRPCProxy: true, + }) + defer func() { assert.NoError(t, node.close()) }() + poolInfo := newMockPoolChainInfoProvider(t) + poolInfo.On("LatestChainInfo").Return(0, ChainInfo{ + BlockNumber: 100, + TotalDifficulty: big.NewInt(200), + }) + node.SetPoolChainInfoProvider(poolInfo) + rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{}) + + rpc.On("Dial", mock.Anything).Return(nil).Once() + rpc.On("ChainID", mock.Anything).Return(nodeChainID, nil).Once() + + outOfSyncSubscription := newMockSubscription(t) + outOfSyncSubscription.On("Err").Return((<-chan error)(nil)) + outOfSyncSubscription.On("Unsubscribe").Once() + rpc.On("SubscribeToHeads", mock.Anything).Return(make(<-chan Head), outOfSyncSubscription, nil).Once() + rpc.On("Dial", mock.Anything).Return(errors.New("failed to redial")).Maybe() + node.declareOutOfSync(syncStatusNoNewHead) + tests.AssertEventually(t, func() bool { + return node.State() == nodeStateUnreachable + }) + }) t.Run("Stays out-of-sync if received new head, but lags behind pool", func(t *testing.T) { t.Parallel() rpc := newMockRPCClient[ID, Head](t) diff --git a/multinode/node_test.go b/multinode/node_test.go index 3d87b92..5da1e84 100644 --- a/multinode/node_test.go +++ b/multinode/node_test.go @@ -82,6 +82,7 @@ type testNodeOpts struct { nodeOrder int32 rpc *mockRPCClient[ID, Head] chainFamily string + isRPCProxy bool } func newTestNode(t *testing.T, opts testNodeOpts) testNode { @@ -109,7 +110,7 @@ func newTestNode(t *testing.T, opts testNodeOpts) testNode { require.NoError(t, err) nodeI := NewNode[ID, Head, RPCClient[ID, Head]](opts.config, opts.chainConfig, opts.lggr, nodeMetrics, - opts.wsuri, opts.httpuri, opts.name, opts.id, opts.chainID, opts.nodeOrder, opts.rpc, opts.chainFamily, false) + opts.wsuri, opts.httpuri, opts.name, opts.id, opts.chainID, opts.nodeOrder, opts.rpc, opts.chainFamily, opts.isRPCProxy) return testNode{ nodeI.(*node[ID, Head, RPCClient[ID, Head]]), From c7a30194989a56b621169f441f77fbfeabfa06be Mon Sep 17 00:00:00 2001 From: yashnevatia Date: Fri, 18 Jul 2025 16:38:46 +0100 Subject: [PATCH 4/6] lint --- multinode/node_lifecycle.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/multinode/node_lifecycle.go b/multinode/node_lifecycle.go index cffa3f7..c7d03fb 100644 --- a/multinode/node_lifecycle.go +++ b/multinode/node_lifecycle.go @@ -464,11 +464,10 @@ func (n *node[CHAIN_ID, HEAD, RPC]) outOfSyncLoop(syncIssues syncStatus) { if n.isRPCProxy { n.declareUnreachable() return - } else { - lggr.Criticalw("RPC endpoint is still out of sync, but there are no other available nodes. This RPC node will be forcibly moved back into the live pool in a degraded state", "syncIssues", syncIssues) - n.declareInSync() - return } + lggr.Criticalw("RPC endpoint is still out of sync, but there are no other available nodes. This RPC node will be forcibly moved back into the live pool in a degraded state", "syncIssues", syncIssues) + n.declareInSync() + return } } case err := <-headsSub.Errors: From a5e03c3ac8bd67530e3d37514d6e4e6708a06c8e Mon Sep 17 00:00:00 2001 From: yashnevatia Date: Fri, 18 Jul 2025 16:58:35 +0100 Subject: [PATCH 5/6] add back test --- multinode/node_lifecycle.go | 1 - multinode/node_lifecycle_test.go | 27 +++++++++++++++++++++++++++ 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/multinode/node_lifecycle.go b/multinode/node_lifecycle.go index c7d03fb..0e17119 100644 --- a/multinode/node_lifecycle.go +++ b/multinode/node_lifecycle.go @@ -167,7 +167,6 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() { // if its the only node and its not a proxy, keep waiting for sync (check LatestChainInfo) // if its a proxy, then declare out of sync and try reconnecting because proxy might return a healthier rpc if l, _ := n.poolInfoProvider.LatestChainInfo(); l < 2 && !n.isRPCProxy { - lggr.Debug("ENTERING HERE") lggr.Criticalf("RPC endpoint detected out of sync; %s %s", msgCannotDisable, msgDegradedState) // We don't necessarily want to wait the full timeout to check again, we should // check regularly and log noisily in this state diff --git a/multinode/node_lifecycle_test.go b/multinode/node_lifecycle_test.go index 21a6261..8037de3 100644 --- a/multinode/node_lifecycle_test.go +++ b/multinode/node_lifecycle_test.go @@ -620,6 +620,33 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { return node.State() == nodeStateUnreachable }) }) + t.Run("when no new finalized heads received for threshold but we are the last live node, forcibly stays alive", func(t *testing.T) { + t.Parallel() + rpc := newMockRPCClient[ID, Head](t) + rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{}).Once() + rpc.On("SubscribeToFinalizedHeads", mock.Anything).Return(make(<-chan Head), newSub(t), nil).Once() + lggr, observed := logger.TestObserved(t, zap.DebugLevel) + noNewFinalizedHeadsThreshold := tests.TestInterval + node := newSubscribedNode(t, testNodeOpts{ + config: testNodeConfig{}, + chainConfig: clientMocks.ChainConfig{ + NoNewFinalizedHeadsThresholdVal: noNewFinalizedHeadsThreshold, + IsFinalityTagEnabled: true, + }, + rpc: rpc, + lggr: lggr, + }) + defer func() { assert.NoError(t, node.close()) }() + poolInfo := newMockPoolChainInfoProvider(t) + poolInfo.On("LatestChainInfo").Return(1, ChainInfo{ + BlockNumber: 20, + TotalDifficulty: big.NewInt(10), + }).Once() + node.SetPoolChainInfoProvider(poolInfo) + node.declareAlive() + tests.AssertLogEventually(t, observed, fmt.Sprintf("RPC's finalized state is out of sync; %s %s", msgCannotDisable, msgDegradedState)) + assert.Equal(t, nodeStateAlive, node.State()) + }) t.Run("when no new finalized heads received for threshold, we are the last live node, but is a proxy, transitions to out of sync -> unreachable", func(t *testing.T) { t.Parallel() rpc := newMockRPCClient[ID, Head](t) From fdd50dad0201c173c0f5eb87f4f61bb93763a61a Mon Sep 17 00:00:00 2001 From: yashnevatia Date: Tue, 22 Jul 2025 12:00:36 +0100 Subject: [PATCH 6/6] changing flag name --- multinode/node.go | 8 ++++---- multinode/node_lifecycle.go | 10 +++++----- multinode/node_lifecycle_test.go | 30 +++++++++++++++--------------- multinode/node_test.go | 26 +++++++++++++------------- 4 files changed, 37 insertions(+), 37 deletions(-) diff --git a/multinode/node.go b/multinode/node.go index d14f244..6729459 100644 --- a/multinode/node.go +++ b/multinode/node.go @@ -105,8 +105,8 @@ type node[ ws *url.URL http *url.URL - rpc RPC - isRPCProxy bool + rpc RPC + isLoadBalancedRPC bool stateMu sync.RWMutex // protects state* fields state nodeState @@ -137,7 +137,7 @@ func NewNode[ nodeOrder int32, rpc RPC, chainFamily string, - isRPCProxy bool, + isLoadBalancedRPC bool, ) Node[CHAIN_ID, RPC] { n := new(node[CHAIN_ID, HEAD, RPC]) n.name = name @@ -164,7 +164,7 @@ func NewNode[ ) n.lfcLog = logger.Named(lggr, "Lifecycle") n.rpc = rpc - n.isRPCProxy = isRPCProxy + n.isLoadBalancedRPC = isLoadBalancedRPC n.chainFamily = chainFamily return n } diff --git a/multinode/node_lifecycle.go b/multinode/node_lifecycle.go index 0e17119..512a7db 100644 --- a/multinode/node_lifecycle.go +++ b/multinode/node_lifecycle.go @@ -128,7 +128,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() { if pollFailureThreshold > 0 && pollFailures >= pollFailureThreshold { lggr.Errorw(fmt.Sprintf("RPC endpoint failed to respond to %d consecutive polls", pollFailures), "pollFailures", pollFailures, "nodeState", n.getCachedState()) if n.poolInfoProvider != nil { - if l, _ := n.poolInfoProvider.LatestChainInfo(); l < 2 && !n.isRPCProxy { + if l, _ := n.poolInfoProvider.LatestChainInfo(); l < 2 && !n.isLoadBalancedRPC { lggr.Criticalf("RPC endpoint failed to respond to polls; %s %s", msgCannotDisable, msgDegradedState) continue } @@ -138,7 +138,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() { } if outOfSync, liveNodes := n.isOutOfSyncWithPool(); outOfSync { // note: there must be another live node for us to be out of sync - if liveNodes < 2 && !n.isRPCProxy { + if liveNodes < 2 && !n.isLoadBalancedRPC { lggr.Criticalf("RPC endpoint has fallen behind; %s %s", msgCannotDisable, msgDegradedState) continue } @@ -166,7 +166,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() { if n.poolInfoProvider != nil { // if its the only node and its not a proxy, keep waiting for sync (check LatestChainInfo) // if its a proxy, then declare out of sync and try reconnecting because proxy might return a healthier rpc - if l, _ := n.poolInfoProvider.LatestChainInfo(); l < 2 && !n.isRPCProxy { + if l, _ := n.poolInfoProvider.LatestChainInfo(); l < 2 && !n.isLoadBalancedRPC { lggr.Criticalf("RPC endpoint detected out of sync; %s %s", msgCannotDisable, msgDegradedState) // We don't necessarily want to wait the full timeout to check again, we should // check regularly and log noisily in this state @@ -194,7 +194,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() { if n.poolInfoProvider != nil { // if its the only node and its not a proxy, keep waiting for sync (check LatestChainInfo) // if its a proxy, then declare out of sync and try reconnecting because proxy might return a healthier rpc - if l, _ := n.poolInfoProvider.LatestChainInfo(); l < 2 && !n.isRPCProxy { + if l, _ := n.poolInfoProvider.LatestChainInfo(); l < 2 && !n.isLoadBalancedRPC { lggr.Criticalf("RPC's finalized state is out of sync; %s %s", msgCannotDisable, msgDegradedState) // We don't necessarily want to wait the full timeout to check again, we should // check regularly and log noisily in this state @@ -460,7 +460,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) outOfSyncLoop(syncIssues syncStatus) { case <-time.After(zombieNodeCheckInterval(noNewHeadsTimeoutThreshold)): if n.poolInfoProvider != nil { if l, _ := n.poolInfoProvider.LatestChainInfo(); l < 1 { - if n.isRPCProxy { + if n.isLoadBalancedRPC { n.declareUnreachable() return } diff --git a/multinode/node_lifecycle_test.go b/multinode/node_lifecycle_test.go index 8037de3..ba1ade7 100644 --- a/multinode/node_lifecycle_test.go +++ b/multinode/node_lifecycle_test.go @@ -212,9 +212,9 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { pollFailureThreshold: pollFailureThreshold, pollInterval: tests.TestInterval, }, - rpc: rpc, - lggr: lggr, - isRPCProxy: true, + rpc: rpc, + lggr: lggr, + isLoadBalancedRPC: true, }) defer func() { assert.NoError(t, node.close()) }() poolInfo := newMockPoolChainInfoProvider(t) @@ -302,9 +302,9 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { syncThreshold: syncThreshold, selectionMode: NodeSelectionModeRoundRobin, }, - rpc: rpc, - lggr: lggr, - isRPCProxy: true, + rpc: rpc, + lggr: lggr, + isLoadBalancedRPC: true, }) defer func() { assert.NoError(t, node.close()) }() rpc.On("ClientVersion", mock.Anything).Return("", nil) @@ -407,8 +407,8 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { chainConfig: clientMocks.ChainConfig{ NoNewHeadsThresholdVal: tests.TestInterval, }, - rpc: rpc, - isRPCProxy: true, + rpc: rpc, + isLoadBalancedRPC: true, }) defer func() { assert.NoError(t, node.close()) }() poolInfo := newMockPoolChainInfoProvider(t) @@ -660,9 +660,9 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { NoNewFinalizedHeadsThresholdVal: noNewFinalizedHeadsThreshold, IsFinalityTagEnabled: true, }, - rpc: rpc, - lggr: lggr, - isRPCProxy: true, + rpc: rpc, + lggr: lggr, + isLoadBalancedRPC: true, }) defer func() { assert.NoError(t, node.close()) }() poolInfo := newMockPoolChainInfoProvider(t) @@ -1072,10 +1072,10 @@ func TestUnit_NodeLifecycle_outOfSyncLoop(t *testing.T) { chainConfig: clientMocks.ChainConfig{ NoNewHeadsThresholdVal: tests.TestInterval, }, - rpc: rpc, - chainID: nodeChainID, - lggr: lggr, - isRPCProxy: true, + rpc: rpc, + chainID: nodeChainID, + lggr: lggr, + isLoadBalancedRPC: true, }) defer func() { assert.NoError(t, node.close()) }() poolInfo := newMockPoolChainInfoProvider(t) diff --git a/multinode/node_test.go b/multinode/node_test.go index 5da1e84..e3c8d71 100644 --- a/multinode/node_test.go +++ b/multinode/node_test.go @@ -71,18 +71,18 @@ type testNode struct { } type testNodeOpts struct { - config testNodeConfig - chainConfig mocks.ChainConfig - lggr logger.Logger - wsuri *url.URL - httpuri *url.URL - name string - id int - chainID ID - nodeOrder int32 - rpc *mockRPCClient[ID, Head] - chainFamily string - isRPCProxy bool + config testNodeConfig + chainConfig mocks.ChainConfig + lggr logger.Logger + wsuri *url.URL + httpuri *url.URL + name string + id int + chainID ID + nodeOrder int32 + rpc *mockRPCClient[ID, Head] + chainFamily string + isLoadBalancedRPC bool } func newTestNode(t *testing.T, opts testNodeOpts) testNode { @@ -110,7 +110,7 @@ func newTestNode(t *testing.T, opts testNodeOpts) testNode { require.NoError(t, err) nodeI := NewNode[ID, Head, RPCClient[ID, Head]](opts.config, opts.chainConfig, opts.lggr, nodeMetrics, - opts.wsuri, opts.httpuri, opts.name, opts.id, opts.chainID, opts.nodeOrder, opts.rpc, opts.chainFamily, opts.isRPCProxy) + opts.wsuri, opts.httpuri, opts.name, opts.id, opts.chainID, opts.nodeOrder, opts.rpc, opts.chainFamily, opts.isLoadBalancedRPC) return testNode{ nodeI.(*node[ID, Head, RPCClient[ID, Head]]),