Skip to content

Commit d49ca2e

Browse files
authored
Add IsLoadBalancedRPC flag (#64)
* rpc proxy flag * comments * adding tests * lint * add back test * changing flag name
1 parent 2350c82 commit d49ca2e

File tree

6 files changed

+195
-21
lines changed

6 files changed

+195
-21
lines changed

multinode/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ require (
99
github.com/pkg/errors v0.9.1
1010
github.com/prometheus/client_model v0.6.1
1111
github.com/smartcontractkit/chainlink-common v0.7.0
12-
github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20250502183956-f93abf9d14bc
12+
github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20250717121125-2350c82883e2
1313
github.com/stretchr/testify v1.10.0
1414
go.uber.org/zap v1.27.0
1515
)

multinode/go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,8 @@ github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR
7474
github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
7575
github.com/smartcontractkit/chainlink-common v0.7.0 h1:QThOrHKn+du8CTmzJPCha0Nwnvw0tonIEAQca+dnmE0=
7676
github.com/smartcontractkit/chainlink-common v0.7.0/go.mod h1:pptbsF6z90IGCewkCgDMBxNYjfSOyW9X9l2jzYyQgmk=
77-
github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20250502183956-f93abf9d14bc h1:HXG3U4nWgzKtH8rjODkcbn4x2M2ZvD2jQotCWRpDM1Q=
78-
github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20250502183956-f93abf9d14bc/go.mod h1:jo+cUqNcHwN8IF7SInQNXDZ8qzBsyMpnLdYbDswviFc=
77+
github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20250717121125-2350c82883e2 h1:ysZjKH+BpWlQhF93kr/Lc668UlCvT9NjfcsGdZT19I8=
78+
github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20250717121125-2350c82883e2/go.mod h1:jo+cUqNcHwN8IF7SInQNXDZ8qzBsyMpnLdYbDswviFc=
7979
github.com/smartcontractkit/libocr v0.0.0-20250220133800-f3b940c4f298 h1:PKiqnVOTChlH4a4ljJKL3OKGRgYfIpJS4YD1daAIKks=
8080
github.com/smartcontractkit/libocr v0.0.0-20250220133800-f3b940c4f298/go.mod h1:Mb7+/LC4edz7HyHxX4QkE42pSuov4AV68+AxBXAap0o=
8181
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=

multinode/node.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,8 @@ type node[
105105
ws *url.URL
106106
http *url.URL
107107

108-
rpc RPC
108+
rpc RPC
109+
isLoadBalancedRPC bool
109110

110111
stateMu sync.RWMutex // protects state* fields
111112
state nodeState
@@ -136,6 +137,7 @@ func NewNode[
136137
nodeOrder int32,
137138
rpc RPC,
138139
chainFamily string,
140+
isLoadBalancedRPC bool,
139141
) Node[CHAIN_ID, RPC] {
140142
n := new(node[CHAIN_ID, HEAD, RPC])
141143
n.name = name
@@ -162,6 +164,7 @@ func NewNode[
162164
)
163165
n.lfcLog = logger.Named(lggr, "Lifecycle")
164166
n.rpc = rpc
167+
n.isLoadBalancedRPC = isLoadBalancedRPC
165168
n.chainFamily = chainFamily
166169
return n
167170
}

multinode/node_lifecycle.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() {
128128
if pollFailureThreshold > 0 && pollFailures >= pollFailureThreshold {
129129
lggr.Errorw(fmt.Sprintf("RPC endpoint failed to respond to %d consecutive polls", pollFailures), "pollFailures", pollFailures, "nodeState", n.getCachedState())
130130
if n.poolInfoProvider != nil {
131-
if l, _ := n.poolInfoProvider.LatestChainInfo(); l < 2 {
131+
if l, _ := n.poolInfoProvider.LatestChainInfo(); l < 2 && !n.isLoadBalancedRPC {
132132
lggr.Criticalf("RPC endpoint failed to respond to polls; %s %s", msgCannotDisable, msgDegradedState)
133133
continue
134134
}
@@ -138,7 +138,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() {
138138
}
139139
if outOfSync, liveNodes := n.isOutOfSyncWithPool(); outOfSync {
140140
// note: there must be another live node for us to be out of sync
141-
if liveNodes < 2 {
141+
if liveNodes < 2 && !n.isLoadBalancedRPC {
142142
lggr.Criticalf("RPC endpoint has fallen behind; %s %s", msgCannotDisable, msgDegradedState)
143143
continue
144144
}
@@ -164,7 +164,9 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() {
164164
// threshold amount of time, mark it broken
165165
lggr.Errorw(fmt.Sprintf("RPC endpoint detected out of sync; no new heads received for %s (last head received was %v)", noNewHeadsTimeoutThreshold, localHighestChainInfo.BlockNumber), "nodeState", n.getCachedState(), "latestReceivedBlockNumber", localHighestChainInfo.BlockNumber, "noNewHeadsTimeoutThreshold", noNewHeadsTimeoutThreshold)
166166
if n.poolInfoProvider != nil {
167-
if l, _ := n.poolInfoProvider.LatestChainInfo(); l < 2 {
167+
// if its the only node and its not a proxy, keep waiting for sync (check LatestChainInfo)
168+
// if its a proxy, then declare out of sync and try reconnecting because proxy might return a healthier rpc
169+
if l, _ := n.poolInfoProvider.LatestChainInfo(); l < 2 && !n.isLoadBalancedRPC {
168170
lggr.Criticalf("RPC endpoint detected out of sync; %s %s", msgCannotDisable, msgDegradedState)
169171
// We don't necessarily want to wait the full timeout to check again, we should
170172
// check regularly and log noisily in this state
@@ -190,7 +192,9 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() {
190192
// threshold amount of time, mark it broken
191193
lggr.Errorw(fmt.Sprintf("RPC's finalized state is out of sync; no new finalized heads received for %s (last finalized head received was %v)", noNewFinalizedBlocksTimeoutThreshold, localHighestChainInfo.FinalizedBlockNumber), "latestReceivedBlockNumber", localHighestChainInfo.BlockNumber)
192194
if n.poolInfoProvider != nil {
193-
if l, _ := n.poolInfoProvider.LatestChainInfo(); l < 2 {
195+
// if its the only node and its not a proxy, keep waiting for sync (check LatestChainInfo)
196+
// if its a proxy, then declare out of sync and try reconnecting because proxy might return a healthier rpc
197+
if l, _ := n.poolInfoProvider.LatestChainInfo(); l < 2 && !n.isLoadBalancedRPC {
194198
lggr.Criticalf("RPC's finalized state is out of sync; %s %s", msgCannotDisable, msgDegradedState)
195199
// We don't necessarily want to wait the full timeout to check again, we should
196200
// check regularly and log noisily in this state
@@ -456,6 +460,10 @@ func (n *node[CHAIN_ID, HEAD, RPC]) outOfSyncLoop(syncIssues syncStatus) {
456460
case <-time.After(zombieNodeCheckInterval(noNewHeadsTimeoutThreshold)):
457461
if n.poolInfoProvider != nil {
458462
if l, _ := n.poolInfoProvider.LatestChainInfo(); l < 1 {
463+
if n.isLoadBalancedRPC {
464+
n.declareUnreachable()
465+
return
466+
}
459467
lggr.Criticalw("RPC endpoint is still out of sync, but there are no other available nodes. This RPC node will be forcibly moved back into the live pool in a degraded state", "syncIssues", syncIssues)
460468
n.declareInSync()
461469
return

multinode/node_lifecycle_test.go

Lines changed: 163 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,33 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) {
202202
tests.AssertLogEventually(t, observedLogs, fmt.Sprintf("RPC endpoint failed to respond to %d consecutive polls", pollFailureThreshold))
203203
assert.Equal(t, nodeStateAlive, node.State())
204204
})
205+
t.Run("with threshold poll failures, we are the last node alive, but is a proxy, transitions to unreachable", func(t *testing.T) {
206+
t.Parallel()
207+
rpc := newMockRPCClient[ID, Head](t)
208+
lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel)
209+
const pollFailureThreshold = 3
210+
node := newSubscribedNode(t, testNodeOpts{
211+
config: testNodeConfig{
212+
pollFailureThreshold: pollFailureThreshold,
213+
pollInterval: tests.TestInterval,
214+
},
215+
rpc: rpc,
216+
lggr: lggr,
217+
isLoadBalancedRPC: true,
218+
})
219+
defer func() { assert.NoError(t, node.close()) }()
220+
poolInfo := newMockPoolChainInfoProvider(t)
221+
poolInfo.On("LatestChainInfo").Return(1, ChainInfo{
222+
BlockNumber: 20,
223+
}).Once()
224+
node.SetPoolChainInfoProvider(poolInfo)
225+
rpc.On("GetInterceptedChainInfo").Return(ChainInfo{BlockNumber: 20}, ChainInfo{BlockNumber: 20})
226+
pollError := errors.New("failed to get ClientVersion")
227+
rpc.On("ClientVersion", mock.Anything).Return("", pollError)
228+
node.declareAlive()
229+
tests.AssertLogEventually(t, observedLogs, fmt.Sprintf("RPC endpoint failed to respond to %d consecutive polls", pollFailureThreshold))
230+
assert.Equal(t, nodeStateUnreachable, node.State())
231+
})
205232
t.Run("when behind more than SyncThreshold, transitions to out of sync", func(t *testing.T) {
206233
t.Parallel()
207234
rpc := newMockRPCClient[ID, Head](t)
@@ -264,6 +291,42 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) {
264291
node.declareAlive()
265292
tests.AssertLogEventually(t, observedLogs, fmt.Sprintf("RPC endpoint has fallen behind; %s %s", msgCannotDisable, msgDegradedState))
266293
})
294+
t.Run("when behind more than SyncThreshold, we are the last live node, but is a proxy, transitions to out of sync -> unreachable", func(t *testing.T) {
295+
t.Parallel()
296+
rpc := newMockRPCClient[ID, Head](t)
297+
lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel)
298+
const syncThreshold = 10
299+
node := newSubscribedNode(t, testNodeOpts{
300+
config: testNodeConfig{
301+
pollInterval: tests.TestInterval,
302+
syncThreshold: syncThreshold,
303+
selectionMode: NodeSelectionModeRoundRobin,
304+
},
305+
rpc: rpc,
306+
lggr: lggr,
307+
isLoadBalancedRPC: true,
308+
})
309+
defer func() { assert.NoError(t, node.close()) }()
310+
rpc.On("ClientVersion", mock.Anything).Return("", nil)
311+
const mostRecentBlock = 20
312+
rpc.On("GetInterceptedChainInfo").Return(ChainInfo{BlockNumber: mostRecentBlock}, ChainInfo{BlockNumber: 30}).Twice()
313+
poolInfo := newMockPoolChainInfoProvider(t)
314+
poolInfo.On("LatestChainInfo").Return(1, ChainInfo{
315+
BlockNumber: syncThreshold + mostRecentBlock + 1,
316+
TotalDifficulty: big.NewInt(10),
317+
})
318+
node.SetPoolChainInfoProvider(poolInfo)
319+
// tries to redial in outOfSync
320+
rpc.On("Dial", mock.Anything).Return(errors.New("failed to dial")).Run(func(_ mock.Arguments) {
321+
assert.Equal(t, nodeStateOutOfSync, node.State())
322+
}).Once()
323+
rpc.On("Dial", mock.Anything).Run(func(_ mock.Arguments) {
324+
require.Equal(t, nodeStateOutOfSync, node.State())
325+
}).Return(errors.New("failed to dial")).Maybe()
326+
node.declareAlive()
327+
tests.AssertLogEventually(t, observedLogs, "Dial failed: Node is unreachable")
328+
assert.Equal(t, nodeStateUnreachable, node.State())
329+
})
267330
t.Run("when behind but SyncThreshold=0, stay alive", func(t *testing.T) {
268331
t.Parallel()
269332
rpc := newMockRPCClient[ID, Head](t)
@@ -333,7 +396,36 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) {
333396
tests.AssertLogEventually(t, observedLogs, fmt.Sprintf("RPC endpoint detected out of sync; %s %s", msgCannotDisable, msgDegradedState))
334397
assert.Equal(t, nodeStateAlive, node.State())
335398
})
336-
399+
t.Run("when no new heads received for threshold, we are the last live node, but is a proxy, transitions to out of sync -> unreachable", func(t *testing.T) {
400+
t.Parallel()
401+
rpc := newMockRPCClient[ID, Head](t)
402+
rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{})
403+
lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel)
404+
node := newSubscribedNode(t, testNodeOpts{
405+
config: testNodeConfig{},
406+
lggr: lggr,
407+
chainConfig: clientMocks.ChainConfig{
408+
NoNewHeadsThresholdVal: tests.TestInterval,
409+
},
410+
rpc: rpc,
411+
isLoadBalancedRPC: true,
412+
})
413+
defer func() { assert.NoError(t, node.close()) }()
414+
poolInfo := newMockPoolChainInfoProvider(t)
415+
poolInfo.On("LatestChainInfo").Return(1, ChainInfo{
416+
BlockNumber: 20,
417+
TotalDifficulty: big.NewInt(10),
418+
}).Once()
419+
node.SetPoolChainInfoProvider(poolInfo)
420+
// tries to redial in outOfSync
421+
rpc.On("Dial", mock.Anything).Return(errors.New("failed to dial")).Run(func(_ mock.Arguments) {
422+
assert.Equal(t, nodeStateOutOfSync, node.State())
423+
}).Once()
424+
rpc.On("Dial", mock.Anything).Return(errors.New("failed to dial")).Maybe()
425+
node.declareAlive()
426+
tests.AssertLogEventually(t, observedLogs, "Dial failed: Node is unreachable")
427+
assert.Equal(t, nodeStateUnreachable, node.State())
428+
})
337429
t.Run("rpc closed head channel", func(t *testing.T) {
338430
t.Parallel()
339431
rpc := newMockRPCClient[ID, Head](t)
@@ -555,6 +647,40 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) {
555647
tests.AssertLogEventually(t, observed, fmt.Sprintf("RPC's finalized state is out of sync; %s %s", msgCannotDisable, msgDegradedState))
556648
assert.Equal(t, nodeStateAlive, node.State())
557649
})
650+
t.Run("when no new finalized heads received for threshold, we are the last live node, but is a proxy, transitions to out of sync -> unreachable", func(t *testing.T) {
651+
t.Parallel()
652+
rpc := newMockRPCClient[ID, Head](t)
653+
rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{}).Once()
654+
rpc.On("SubscribeToFinalizedHeads", mock.Anything).Return(make(<-chan Head), newSub(t), nil).Once()
655+
lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel)
656+
noNewFinalizedHeadsThreshold := tests.TestInterval
657+
node := newSubscribedNode(t, testNodeOpts{
658+
config: testNodeConfig{},
659+
chainConfig: clientMocks.ChainConfig{
660+
NoNewFinalizedHeadsThresholdVal: noNewFinalizedHeadsThreshold,
661+
IsFinalityTagEnabled: true,
662+
},
663+
rpc: rpc,
664+
lggr: lggr,
665+
isLoadBalancedRPC: true,
666+
})
667+
defer func() { assert.NoError(t, node.close()) }()
668+
poolInfo := newMockPoolChainInfoProvider(t)
669+
poolInfo.On("LatestChainInfo").Return(1, ChainInfo{
670+
BlockNumber: 20,
671+
TotalDifficulty: big.NewInt(10),
672+
}).Once()
673+
node.SetPoolChainInfoProvider(poolInfo)
674+
// tries to redial in outOfSync
675+
// tries to redial in outOfSync
676+
rpc.On("Dial", mock.Anything).Return(errors.New("failed to dial")).Run(func(_ mock.Arguments) {
677+
assert.Equal(t, nodeStateOutOfSync, node.State())
678+
}).Once()
679+
rpc.On("Dial", mock.Anything).Return(errors.New("failed to dial")).Maybe()
680+
node.declareAlive()
681+
tests.AssertLogEventually(t, observedLogs, "Dial failed: Node is unreachable")
682+
assert.Equal(t, nodeStateUnreachable, node.State())
683+
})
558684
t.Run("If finalized subscription returns an error, transitions to unreachable", func(t *testing.T) {
559685
t.Parallel()
560686
rpc := newMockRPCClient[ID, Head](t)
@@ -937,6 +1063,42 @@ func TestUnit_NodeLifecycle_outOfSyncLoop(t *testing.T) {
9371063
return node.State() == nodeStateAlive
9381064
})
9391065
})
1066+
t.Run("becomes alive if there is no other nodes, unless proxy", func(t *testing.T) {
1067+
t.Parallel()
1068+
rpc := newMockRPCClient[ID, Head](t)
1069+
nodeChainID := RandomID()
1070+
lggr, _ := logger.TestObserved(t, zap.DebugLevel)
1071+
node := newAliveNode(t, testNodeOpts{
1072+
chainConfig: clientMocks.ChainConfig{
1073+
NoNewHeadsThresholdVal: tests.TestInterval,
1074+
},
1075+
rpc: rpc,
1076+
chainID: nodeChainID,
1077+
lggr: lggr,
1078+
isLoadBalancedRPC: true,
1079+
})
1080+
defer func() { assert.NoError(t, node.close()) }()
1081+
poolInfo := newMockPoolChainInfoProvider(t)
1082+
poolInfo.On("LatestChainInfo").Return(0, ChainInfo{
1083+
BlockNumber: 100,
1084+
TotalDifficulty: big.NewInt(200),
1085+
})
1086+
node.SetPoolChainInfoProvider(poolInfo)
1087+
rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{})
1088+
1089+
rpc.On("Dial", mock.Anything).Return(nil).Once()
1090+
rpc.On("ChainID", mock.Anything).Return(nodeChainID, nil).Once()
1091+
1092+
outOfSyncSubscription := newMockSubscription(t)
1093+
outOfSyncSubscription.On("Err").Return((<-chan error)(nil))
1094+
outOfSyncSubscription.On("Unsubscribe").Once()
1095+
rpc.On("SubscribeToHeads", mock.Anything).Return(make(<-chan Head), outOfSyncSubscription, nil).Once()
1096+
rpc.On("Dial", mock.Anything).Return(errors.New("failed to redial")).Maybe()
1097+
node.declareOutOfSync(syncStatusNoNewHead)
1098+
tests.AssertEventually(t, func() bool {
1099+
return node.State() == nodeStateUnreachable
1100+
})
1101+
})
9401102
t.Run("Stays out-of-sync if received new head, but lags behind pool", func(t *testing.T) {
9411103
t.Parallel()
9421104
rpc := newMockRPCClient[ID, Head](t)

multinode/node_test.go

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -71,17 +71,18 @@ type testNode struct {
7171
}
7272

7373
type testNodeOpts struct {
74-
config testNodeConfig
75-
chainConfig mocks.ChainConfig
76-
lggr logger.Logger
77-
wsuri *url.URL
78-
httpuri *url.URL
79-
name string
80-
id int
81-
chainID ID
82-
nodeOrder int32
83-
rpc *mockRPCClient[ID, Head]
84-
chainFamily string
74+
config testNodeConfig
75+
chainConfig mocks.ChainConfig
76+
lggr logger.Logger
77+
wsuri *url.URL
78+
httpuri *url.URL
79+
name string
80+
id int
81+
chainID ID
82+
nodeOrder int32
83+
rpc *mockRPCClient[ID, Head]
84+
chainFamily string
85+
isLoadBalancedRPC bool
8586
}
8687

8788
func newTestNode(t *testing.T, opts testNodeOpts) testNode {
@@ -109,7 +110,7 @@ func newTestNode(t *testing.T, opts testNodeOpts) testNode {
109110
require.NoError(t, err)
110111

111112
nodeI := NewNode[ID, Head, RPCClient[ID, Head]](opts.config, opts.chainConfig, opts.lggr, nodeMetrics,
112-
opts.wsuri, opts.httpuri, opts.name, opts.id, opts.chainID, opts.nodeOrder, opts.rpc, opts.chainFamily)
113+
opts.wsuri, opts.httpuri, opts.name, opts.id, opts.chainID, opts.nodeOrder, opts.rpc, opts.chainFamily, opts.isLoadBalancedRPC)
113114

114115
return testNode{
115116
nodeI.(*node[ID, Head, RPCClient[ID, Head]]),

0 commit comments

Comments
 (0)