Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion multinode/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
github.com/pkg/errors v0.9.1
github.com/prometheus/client_model v0.6.1
github.com/smartcontractkit/chainlink-common v0.7.0
github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20250502183956-f93abf9d14bc
github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20250717121125-2350c82883e2
github.com/stretchr/testify v1.10.0
go.uber.org/zap v1.27.0
)
Expand Down
4 changes: 2 additions & 2 deletions multinode/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR
github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
github.com/smartcontractkit/chainlink-common v0.7.0 h1:QThOrHKn+du8CTmzJPCha0Nwnvw0tonIEAQca+dnmE0=
github.com/smartcontractkit/chainlink-common v0.7.0/go.mod h1:pptbsF6z90IGCewkCgDMBxNYjfSOyW9X9l2jzYyQgmk=
github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20250502183956-f93abf9d14bc h1:HXG3U4nWgzKtH8rjODkcbn4x2M2ZvD2jQotCWRpDM1Q=
github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20250502183956-f93abf9d14bc/go.mod h1:jo+cUqNcHwN8IF7SInQNXDZ8qzBsyMpnLdYbDswviFc=
github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20250717121125-2350c82883e2 h1:ysZjKH+BpWlQhF93kr/Lc668UlCvT9NjfcsGdZT19I8=
github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20250717121125-2350c82883e2/go.mod h1:jo+cUqNcHwN8IF7SInQNXDZ8qzBsyMpnLdYbDswviFc=
github.com/smartcontractkit/libocr v0.0.0-20250220133800-f3b940c4f298 h1:PKiqnVOTChlH4a4ljJKL3OKGRgYfIpJS4YD1daAIKks=
github.com/smartcontractkit/libocr v0.0.0-20250220133800-f3b940c4f298/go.mod h1:Mb7+/LC4edz7HyHxX4QkE42pSuov4AV68+AxBXAap0o=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand Down
5 changes: 4 additions & 1 deletion multinode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ type node[
ws *url.URL
http *url.URL

rpc RPC
rpc RPC
isLoadBalancedRPC bool

stateMu sync.RWMutex // protects state* fields
state nodeState
Expand Down Expand Up @@ -136,6 +137,7 @@ func NewNode[
nodeOrder int32,
rpc RPC,
chainFamily string,
isLoadBalancedRPC bool,
) Node[CHAIN_ID, RPC] {
n := new(node[CHAIN_ID, HEAD, RPC])
n.name = name
Expand All @@ -162,6 +164,7 @@ func NewNode[
)
n.lfcLog = logger.Named(lggr, "Lifecycle")
n.rpc = rpc
n.isLoadBalancedRPC = isLoadBalancedRPC
n.chainFamily = chainFamily
return n
}
Expand Down
16 changes: 12 additions & 4 deletions multinode/node_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() {
if pollFailureThreshold > 0 && pollFailures >= pollFailureThreshold {
lggr.Errorw(fmt.Sprintf("RPC endpoint failed to respond to %d consecutive polls", pollFailures), "pollFailures", pollFailures, "nodeState", n.getCachedState())
if n.poolInfoProvider != nil {
if l, _ := n.poolInfoProvider.LatestChainInfo(); l < 2 {
if l, _ := n.poolInfoProvider.LatestChainInfo(); l < 2 && !n.isLoadBalancedRPC {
lggr.Criticalf("RPC endpoint failed to respond to polls; %s %s", msgCannotDisable, msgDegradedState)
continue
}
Expand All @@ -138,7 +138,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() {
}
if outOfSync, liveNodes := n.isOutOfSyncWithPool(); outOfSync {
// note: there must be another live node for us to be out of sync
if liveNodes < 2 {
if liveNodes < 2 && !n.isLoadBalancedRPC {
lggr.Criticalf("RPC endpoint has fallen behind; %s %s", msgCannotDisable, msgDegradedState)
continue
}
Expand All @@ -164,7 +164,9 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() {
// threshold amount of time, mark it broken
lggr.Errorw(fmt.Sprintf("RPC endpoint detected out of sync; no new heads received for %s (last head received was %v)", noNewHeadsTimeoutThreshold, localHighestChainInfo.BlockNumber), "nodeState", n.getCachedState(), "latestReceivedBlockNumber", localHighestChainInfo.BlockNumber, "noNewHeadsTimeoutThreshold", noNewHeadsTimeoutThreshold)
if n.poolInfoProvider != nil {
if l, _ := n.poolInfoProvider.LatestChainInfo(); l < 2 {
// if its the only node and its not a proxy, keep waiting for sync (check LatestChainInfo)
// if its a proxy, then declare out of sync and try reconnecting because proxy might return a healthier rpc
if l, _ := n.poolInfoProvider.LatestChainInfo(); l < 2 && !n.isLoadBalancedRPC {
lggr.Criticalf("RPC endpoint detected out of sync; %s %s", msgCannotDisable, msgDegradedState)
// We don't necessarily want to wait the full timeout to check again, we should
// check regularly and log noisily in this state
Expand All @@ -190,7 +192,9 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() {
// threshold amount of time, mark it broken
lggr.Errorw(fmt.Sprintf("RPC's finalized state is out of sync; no new finalized heads received for %s (last finalized head received was %v)", noNewFinalizedBlocksTimeoutThreshold, localHighestChainInfo.FinalizedBlockNumber), "latestReceivedBlockNumber", localHighestChainInfo.BlockNumber)
if n.poolInfoProvider != nil {
if l, _ := n.poolInfoProvider.LatestChainInfo(); l < 2 {
// if its the only node and its not a proxy, keep waiting for sync (check LatestChainInfo)
// if its a proxy, then declare out of sync and try reconnecting because proxy might return a healthier rpc
if l, _ := n.poolInfoProvider.LatestChainInfo(); l < 2 && !n.isLoadBalancedRPC {
lggr.Criticalf("RPC's finalized state is out of sync; %s %s", msgCannotDisable, msgDegradedState)
// We don't necessarily want to wait the full timeout to check again, we should
// check regularly and log noisily in this state
Expand Down Expand Up @@ -456,6 +460,10 @@ func (n *node[CHAIN_ID, HEAD, RPC]) outOfSyncLoop(syncIssues syncStatus) {
case <-time.After(zombieNodeCheckInterval(noNewHeadsTimeoutThreshold)):
if n.poolInfoProvider != nil {
if l, _ := n.poolInfoProvider.LatestChainInfo(); l < 1 {
if n.isLoadBalancedRPC {
n.declareUnreachable()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should declareOutOfSync here to avoid cases when an RPC that does not produce new blocks gets stuck in a loop of alive->outOfSync->Unreachable->alive. declareOutOfSync guarantees reconnection and allows us to keep track of previous issues like being out of sync with the pool or not generating new heads.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

func (n *node[CHAIN_ID, HEAD, RPC]) transitionToOutOfSync(fn func()) {
	ctx, cancel := n.stopCh.NewCtx()
	defer cancel()
	n.metrics.IncrementNodeTransitionsToOutOfSync(ctx, n.name)
	n.stateMu.Lock()
	defer n.stateMu.Unlock()
	if n.state == nodeStateClosed {
		return
	}
	switch n.state {
	case nodeStateAlive:
		n.rpc.Close()
		n.state = nodeStateOutOfSync
	default:
		panic(transitionFail(n.state, nodeStateOutOfSync))
	}
	fn()
}

i cannot transitionToOutOfSync from an outOfSyncState

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

declareUnreachable forces reconnection as well.
would like to understand more about how issue tracking will be made easier with declareOutOfSync

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. We can modify transitionToOutOfSync to allow self-transition just to force reconnect.

would like to understand more about how issue tracking will be made easier with declareOutOfSync

Let's say we have one load-balanced RPC. All RPCs fail to produce new blocks.

In the current implementation, the node will transition through the following states: alive (no new heads timeout) -> outOfSync (zombie check timeout) -> unreachable (successful dial) -> alive. The problem here is that RPC failed to overcome the initial health issue but was declared alive.

If we replace declareUnreachable with declareOutOfSync in this case, we'll still force RPC to reconnect, but will wait for a new head before declaring RPC alive. We'll still have a state transition loop, but in this case, we won't falsely transition to the alive state.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return
}
lggr.Criticalw("RPC endpoint is still out of sync, but there are no other available nodes. This RPC node will be forcibly moved back into the live pool in a degraded state", "syncIssues", syncIssues)
n.declareInSync()
return
Expand Down
164 changes: 163 additions & 1 deletion multinode/node_lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,33 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) {
tests.AssertLogEventually(t, observedLogs, fmt.Sprintf("RPC endpoint failed to respond to %d consecutive polls", pollFailureThreshold))
assert.Equal(t, nodeStateAlive, node.State())
})
t.Run("with threshold poll failures, we are the last node alive, but is a proxy, transitions to unreachable", func(t *testing.T) {
t.Parallel()
rpc := newMockRPCClient[ID, Head](t)
lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel)
const pollFailureThreshold = 3
node := newSubscribedNode(t, testNodeOpts{
config: testNodeConfig{
pollFailureThreshold: pollFailureThreshold,
pollInterval: tests.TestInterval,
},
rpc: rpc,
lggr: lggr,
isLoadBalancedRPC: true,
})
defer func() { assert.NoError(t, node.close()) }()
poolInfo := newMockPoolChainInfoProvider(t)
poolInfo.On("LatestChainInfo").Return(1, ChainInfo{
BlockNumber: 20,
}).Once()
node.SetPoolChainInfoProvider(poolInfo)
rpc.On("GetInterceptedChainInfo").Return(ChainInfo{BlockNumber: 20}, ChainInfo{BlockNumber: 20})
pollError := errors.New("failed to get ClientVersion")
rpc.On("ClientVersion", mock.Anything).Return("", pollError)
node.declareAlive()
tests.AssertLogEventually(t, observedLogs, fmt.Sprintf("RPC endpoint failed to respond to %d consecutive polls", pollFailureThreshold))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RPC endpoint failed to respond to is logged before RPC has transitioned to unreachable. To avoid flakiness, we should wait for RPC Node is unreachable msg.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fair, will assertEventually for the nodeState as well, like done in other tests.

assert.Equal(t, nodeStateUnreachable, node.State())
})
t.Run("when behind more than SyncThreshold, transitions to out of sync", func(t *testing.T) {
t.Parallel()
rpc := newMockRPCClient[ID, Head](t)
Expand Down Expand Up @@ -264,6 +291,42 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) {
node.declareAlive()
tests.AssertLogEventually(t, observedLogs, fmt.Sprintf("RPC endpoint has fallen behind; %s %s", msgCannotDisable, msgDegradedState))
})
t.Run("when behind more than SyncThreshold, we are the last live node, but is a proxy, transitions to out of sync -> unreachable", func(t *testing.T) {
t.Parallel()
rpc := newMockRPCClient[ID, Head](t)
lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel)
const syncThreshold = 10
node := newSubscribedNode(t, testNodeOpts{
config: testNodeConfig{
pollInterval: tests.TestInterval,
syncThreshold: syncThreshold,
selectionMode: NodeSelectionModeRoundRobin,
},
rpc: rpc,
lggr: lggr,
isLoadBalancedRPC: true,
})
defer func() { assert.NoError(t, node.close()) }()
rpc.On("ClientVersion", mock.Anything).Return("", nil)
const mostRecentBlock = 20
rpc.On("GetInterceptedChainInfo").Return(ChainInfo{BlockNumber: mostRecentBlock}, ChainInfo{BlockNumber: 30}).Twice()
poolInfo := newMockPoolChainInfoProvider(t)
poolInfo.On("LatestChainInfo").Return(1, ChainInfo{
BlockNumber: syncThreshold + mostRecentBlock + 1,
TotalDifficulty: big.NewInt(10),
})
node.SetPoolChainInfoProvider(poolInfo)
// tries to redial in outOfSync
rpc.On("Dial", mock.Anything).Return(errors.New("failed to dial")).Run(func(_ mock.Arguments) {
assert.Equal(t, nodeStateOutOfSync, node.State())
}).Once()
rpc.On("Dial", mock.Anything).Run(func(_ mock.Arguments) {
require.Equal(t, nodeStateOutOfSync, node.State())
}).Return(errors.New("failed to dial")).Maybe()
node.declareAlive()
tests.AssertLogEventually(t, observedLogs, "Dial failed: Node is unreachable")
assert.Equal(t, nodeStateUnreachable, node.State())
})
t.Run("when behind but SyncThreshold=0, stay alive", func(t *testing.T) {
t.Parallel()
rpc := newMockRPCClient[ID, Head](t)
Expand Down Expand Up @@ -333,7 +396,36 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) {
tests.AssertLogEventually(t, observedLogs, fmt.Sprintf("RPC endpoint detected out of sync; %s %s", msgCannotDisable, msgDegradedState))
assert.Equal(t, nodeStateAlive, node.State())
})

t.Run("when no new heads received for threshold, we are the last live node, but is a proxy, transitions to out of sync -> unreachable", func(t *testing.T) {
t.Parallel()
rpc := newMockRPCClient[ID, Head](t)
rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{})
lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel)
node := newSubscribedNode(t, testNodeOpts{
config: testNodeConfig{},
lggr: lggr,
chainConfig: clientMocks.ChainConfig{
NoNewHeadsThresholdVal: tests.TestInterval,
},
rpc: rpc,
isLoadBalancedRPC: true,
})
defer func() { assert.NoError(t, node.close()) }()
poolInfo := newMockPoolChainInfoProvider(t)
poolInfo.On("LatestChainInfo").Return(1, ChainInfo{
BlockNumber: 20,
TotalDifficulty: big.NewInt(10),
}).Once()
node.SetPoolChainInfoProvider(poolInfo)
// tries to redial in outOfSync
rpc.On("Dial", mock.Anything).Return(errors.New("failed to dial")).Run(func(_ mock.Arguments) {
assert.Equal(t, nodeStateOutOfSync, node.State())
}).Once()
rpc.On("Dial", mock.Anything).Return(errors.New("failed to dial")).Maybe()
node.declareAlive()
tests.AssertLogEventually(t, observedLogs, "Dial failed: Node is unreachable")
assert.Equal(t, nodeStateUnreachable, node.State())
})
t.Run("rpc closed head channel", func(t *testing.T) {
t.Parallel()
rpc := newMockRPCClient[ID, Head](t)
Expand Down Expand Up @@ -555,6 +647,40 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) {
tests.AssertLogEventually(t, observed, fmt.Sprintf("RPC's finalized state is out of sync; %s %s", msgCannotDisable, msgDegradedState))
assert.Equal(t, nodeStateAlive, node.State())
})
t.Run("when no new finalized heads received for threshold, we are the last live node, but is a proxy, transitions to out of sync -> unreachable", func(t *testing.T) {
t.Parallel()
rpc := newMockRPCClient[ID, Head](t)
rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{}).Once()
rpc.On("SubscribeToFinalizedHeads", mock.Anything).Return(make(<-chan Head), newSub(t), nil).Once()
lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel)
noNewFinalizedHeadsThreshold := tests.TestInterval
node := newSubscribedNode(t, testNodeOpts{
config: testNodeConfig{},
chainConfig: clientMocks.ChainConfig{
NoNewFinalizedHeadsThresholdVal: noNewFinalizedHeadsThreshold,
IsFinalityTagEnabled: true,
},
rpc: rpc,
lggr: lggr,
isLoadBalancedRPC: true,
})
defer func() { assert.NoError(t, node.close()) }()
poolInfo := newMockPoolChainInfoProvider(t)
poolInfo.On("LatestChainInfo").Return(1, ChainInfo{
BlockNumber: 20,
TotalDifficulty: big.NewInt(10),
}).Once()
node.SetPoolChainInfoProvider(poolInfo)
// tries to redial in outOfSync
// tries to redial in outOfSync
rpc.On("Dial", mock.Anything).Return(errors.New("failed to dial")).Run(func(_ mock.Arguments) {
assert.Equal(t, nodeStateOutOfSync, node.State())
}).Once()
rpc.On("Dial", mock.Anything).Return(errors.New("failed to dial")).Maybe()
node.declareAlive()
tests.AssertLogEventually(t, observedLogs, "Dial failed: Node is unreachable")
assert.Equal(t, nodeStateUnreachable, node.State())
})
t.Run("If finalized subscription returns an error, transitions to unreachable", func(t *testing.T) {
t.Parallel()
rpc := newMockRPCClient[ID, Head](t)
Expand Down Expand Up @@ -937,6 +1063,42 @@ func TestUnit_NodeLifecycle_outOfSyncLoop(t *testing.T) {
return node.State() == nodeStateAlive
})
})
t.Run("becomes alive if there is no other nodes, unless proxy", func(t *testing.T) {
t.Parallel()
rpc := newMockRPCClient[ID, Head](t)
nodeChainID := RandomID()
lggr, _ := logger.TestObserved(t, zap.DebugLevel)
node := newAliveNode(t, testNodeOpts{
chainConfig: clientMocks.ChainConfig{
NoNewHeadsThresholdVal: tests.TestInterval,
},
rpc: rpc,
chainID: nodeChainID,
lggr: lggr,
isLoadBalancedRPC: true,
})
defer func() { assert.NoError(t, node.close()) }()
poolInfo := newMockPoolChainInfoProvider(t)
poolInfo.On("LatestChainInfo").Return(0, ChainInfo{
BlockNumber: 100,
TotalDifficulty: big.NewInt(200),
})
node.SetPoolChainInfoProvider(poolInfo)
rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{})

rpc.On("Dial", mock.Anything).Return(nil).Once()
rpc.On("ChainID", mock.Anything).Return(nodeChainID, nil).Once()

outOfSyncSubscription := newMockSubscription(t)
outOfSyncSubscription.On("Err").Return((<-chan error)(nil))
outOfSyncSubscription.On("Unsubscribe").Once()
rpc.On("SubscribeToHeads", mock.Anything).Return(make(<-chan Head), outOfSyncSubscription, nil).Once()
rpc.On("Dial", mock.Anything).Return(errors.New("failed to redial")).Maybe()
node.declareOutOfSync(syncStatusNoNewHead)
tests.AssertEventually(t, func() bool {
return node.State() == nodeStateUnreachable
})
})
t.Run("Stays out-of-sync if received new head, but lags behind pool", func(t *testing.T) {
t.Parallel()
rpc := newMockRPCClient[ID, Head](t)
Expand Down
25 changes: 13 additions & 12 deletions multinode/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,17 +71,18 @@ type testNode struct {
}

type testNodeOpts struct {
config testNodeConfig
chainConfig mocks.ChainConfig
lggr logger.Logger
wsuri *url.URL
httpuri *url.URL
name string
id int
chainID ID
nodeOrder int32
rpc *mockRPCClient[ID, Head]
chainFamily string
config testNodeConfig
chainConfig mocks.ChainConfig
lggr logger.Logger
wsuri *url.URL
httpuri *url.URL
name string
id int
chainID ID
nodeOrder int32
rpc *mockRPCClient[ID, Head]
chainFamily string
isLoadBalancedRPC bool
}

func newTestNode(t *testing.T, opts testNodeOpts) testNode {
Expand Down Expand Up @@ -109,7 +110,7 @@ func newTestNode(t *testing.T, opts testNodeOpts) testNode {
require.NoError(t, err)

nodeI := NewNode[ID, Head, RPCClient[ID, Head]](opts.config, opts.chainConfig, opts.lggr, nodeMetrics,
opts.wsuri, opts.httpuri, opts.name, opts.id, opts.chainID, opts.nodeOrder, opts.rpc, opts.chainFamily)
opts.wsuri, opts.httpuri, opts.name, opts.id, opts.chainID, opts.nodeOrder, opts.rpc, opts.chainFamily, opts.isLoadBalancedRPC)

return testNode{
nodeI.(*node[ID, Head, RPCClient[ID, Head]]),
Expand Down
Loading