@@ -2,171 +2,228 @@ import { ApiPromise, WsProvider } from "@polkadot/api";
22import EventEmitter from "eventemitter3" ;
33
44import logger from "../logger" ;
5- import { POLKADOT_API_TIMEOUT } from "../constants " ;
6- import { sleep } from "../utils " ;
5+ import { sleep } from "../utils/util " ;
6+ import { API_PROVIDER_TIMEOUT , POLKADOT_API_TIMEOUT } from "../constants " ;
77
88export const apiLabel = { label : "ApiHandler" } ;
99
10+ /**
11+ * A higher level handler for the Polkadot-Js API that can handle reconnecting
12+ * to a different provider if one proves troublesome.
13+ */
1014class ApiHandler extends EventEmitter {
1115 private _wsProvider ?: WsProvider ;
1216 private _api : ApiPromise | null = null ;
13- private readonly _endpoints : string [ ] ;
14- private _currentEndpointIndex = 0 ;
15- private _maxRetries = 25 ;
16- private _connectionAttemptInProgress = false ;
17+ private readonly _endpoints : string [ ] = [ ] ;
18+ static isConnected = false ;
1719 private healthCheckInProgress = false ;
20+ private _currentEndpoint ?: string ;
1821 public upSince : number = Date . now ( ) ;
19-
2022 constructor ( endpoints : string [ ] ) {
2123 super ( ) ;
22- this . _endpoints = endpoints ;
23- this . initiateConnection ( ) . catch ( this . handleError ) ;
24+ this . _endpoints = endpoints . sort ( ( ) => Math . random ( ) - 0.5 ) ;
25+ this . upSince = Date . now ( ) ;
2426 }
2527
26- public async initiateConnection ( retryCount = 0 ) : Promise < void > {
27- logger . info ( `Initiating connection...` , apiLabel ) ;
28- if ( this . _connectionAttemptInProgress ) {
29- logger . info (
30- "Connection attempt already in progress, skipping new attempt." ,
31- apiLabel ,
32- ) ;
33- return ;
28+ async healthCheck ( retries = 0 ) : Promise < boolean > {
29+ if ( retries < 10 ) {
30+ try {
31+ this . healthCheckInProgress = true ;
32+ let chain ;
33+
34+ const isConnected = this . _wsProvider ?. isConnected ;
35+ if ( isConnected && ! this . _api ?. isConnected ) {
36+ try {
37+ chain = await this . _api ?. rpc . system . chain ( ) ;
38+ } catch ( e ) {
39+ await sleep ( API_PROVIDER_TIMEOUT ) ;
40+ }
41+ }
42+ chain = await this . _api ?. rpc . system . chain ( ) ;
43+
44+ if ( isConnected && chain ) {
45+ this . healthCheckInProgress = false ;
46+ return true ;
47+ } else {
48+ await sleep ( API_PROVIDER_TIMEOUT ) ;
49+ logger . info ( `api still disconnected, disconnecting.` , apiLabel ) ;
50+ await this . _wsProvider ?. disconnect ( ) ;
51+ await this . getProvider ( this . _endpoints ) ;
52+ await this . getAPI ( ) ;
53+ return false ;
54+ }
55+ } catch ( e : unknown ) {
56+ const errorMessage =
57+ e instanceof Error ? e . message : "An unknown error occurred" ;
58+ logger . error (
59+ `Error in health check for WS Provider for rpc. ${ errorMessage } ` ,
60+ apiLabel ,
61+ ) ;
62+ this . healthCheckInProgress = false ;
63+ return false ;
64+ }
3465 }
66+ return false ;
67+ }
3568
36- logger . info (
37- `Setting connection attempt in progress. Endpoints: ${ this . _endpoints } ` ,
38- apiLabel ,
39- ) ;
40- this . _connectionAttemptInProgress = true ;
41- const endpoint = this . currentEndpoint ( ) ;
42- logger . info ( `Attempting to connect to endpoint: ${ endpoint } ` , apiLabel ) ;
43-
44- this . _wsProvider = new WsProvider ( endpoint , POLKADOT_API_TIMEOUT ) ;
69+ public currentEndpoint ( ) {
70+ return this . _currentEndpoint ;
71+ }
4572
46- this . _wsProvider . on ( "error" , async ( error ) => {
47- logger . error (
48- `WS provider error at ${ endpoint } : ${ error . message } ` ,
49- apiLabel ,
73+ async getProvider ( endpoints : string [ ] ) : Promise < WsProvider > {
74+ return await new Promise < WsProvider > ( ( resolve , reject ) => {
75+ const wsProvider = new WsProvider (
76+ endpoints ,
77+ 5000 ,
78+ undefined ,
79+ POLKADOT_API_TIMEOUT ,
5080 ) ;
51- await this . retryConnection ( ) ;
52- } ) ;
5381
54- this . _wsProvider . on ( "disconnected" , async ( ) => {
55- logger . info ( `WS provider disconnected from ${ endpoint } ` , apiLabel ) ;
56- await this . retryConnection ( ) ;
82+ wsProvider . on ( "disconnected" , async ( ) => {
83+ try {
84+ const isHealthy = await this . healthCheck ( ) ;
85+ logger . info (
86+ `[Disconnection] ${ this . _currentEndpoint } } Health check result: ${ isHealthy } ` ,
87+ apiLabel ,
88+ ) ;
89+ resolve ( wsProvider ) ;
90+ } catch ( error : any ) {
91+ logger . warn (
92+ `WS provider for rpc ${ endpoints [ 0 ] } disconnected!` ,
93+ apiLabel ,
94+ ) ;
95+ reject ( error ) ;
96+ }
97+ } ) ;
98+ wsProvider . on ( "connected" , ( ) => {
99+ logger . info ( `WS provider for rpc ${ endpoints [ 0 ] } connected` , apiLabel ) ;
100+ this . _currentEndpoint = endpoints [ 0 ] ;
101+ resolve ( wsProvider ) ;
102+ } ) ;
103+ wsProvider . on ( "error" , async ( ) => {
104+ try {
105+ const isHealthy = await this . healthCheck ( ) ;
106+ logger . info (
107+ `[Error] ${ this . _currentEndpoint } Health check result: ${ isHealthy } ` ,
108+ apiLabel ,
109+ ) ;
110+ resolve ( wsProvider ) ;
111+ } catch ( error : any ) {
112+ logger . error ( `Error thrown for rpc ${ this . _endpoints [ 0 ] } ` , apiLabel ) ;
113+ reject ( error ) ;
114+ }
115+ } ) ;
57116 } ) ;
58-
59- try {
60- const api = await ApiPromise . create ( { provider : this . _wsProvider } ) ;
61- await api . isReadyOrError ;
62- this . _api = api ;
63- this . _registerEventHandlers ( api ) ;
64- logger . info ( `Successfully connected to ${ endpoint } ` , apiLabel ) ;
65- this . emit ( "connected" , { endpoint : endpoint } ) ;
66- this . upSince = Date . now ( ) ;
67- } catch ( error ) {
68- logger . error (
69- `Connection failed to endpoint ${ endpoint } : ${ error } ` ,
70- apiLabel ,
71- ) ;
72- await this . retryConnection ( ) ;
73- } finally {
74- this . _connectionAttemptInProgress = false ;
75- }
76- }
77-
78- private async retryConnection ( ) : Promise < void > {
79- if ( ! this . isConnected ( ) && this . _maxRetries > 0 ) {
80- this . _maxRetries -- ;
81- await this . cleanupConnection ( ) ;
82- this . moveToNextEndpoint ( ) ;
83- await this . initiateConnection ( ) ;
84- }
85- }
86-
87- private moveToNextEndpoint ( ) : void {
88- this . _currentEndpointIndex =
89- ( this . _currentEndpointIndex + 1 ) % this . _endpoints . length ;
90117 }
91118
92- private async cleanupConnection ( ) : Promise < void > {
93- try {
94- if ( this . _wsProvider ) {
95- this . _wsProvider ?. disconnect ( ) ;
96- this . _wsProvider = undefined ;
97- }
98- await this . _api ?. disconnect ( ) ;
99- this . _api = null ;
100- this . _connectionAttemptInProgress = false ;
101- logger . info ( `Connection cleaned up` , apiLabel ) ;
102- await sleep ( 3000 ) ;
103- } catch ( error ) {
104- logger . error ( `Error cleaning up connection: ${ error } ` , apiLabel ) ;
119+ async getAPI ( retries = 0 ) : Promise < ApiPromise > {
120+ if ( this . _wsProvider && this . _api && this . _api ?. isConnected ) {
121+ return this . _api ;
105122 }
106- }
107-
108- async healthCheck ( ) : Promise < boolean > {
109- if ( this . healthCheckInProgress ) return false ;
110- this . healthCheckInProgress = true ;
123+ const endpoints = this . _endpoints . sort ( ( ) => Math . random ( ) - 0.5 ) ;
111124
112125 try {
113126 logger . info (
114- `Performing health check... endpoint: ${ this . currentEndpoint ( ) } ` ,
127+ `[getAPI]: try ${ retries } creating provider with endpoint ${ endpoints [ 0 ] } ` ,
115128 apiLabel ,
116129 ) ;
117- const wsConnected = this . _wsProvider ?. isConnected || false ;
118- const apiConnected = this . _api ?. isConnected || false ;
119- const chain = await this . _api ?. rpc . system . chain ( ) ;
120-
121- this . healthCheckInProgress = false ;
122- const healthy = wsConnected && apiConnected && ! ! chain ;
123- logger . info ( `Health: ${ healthy } ` , apiLabel ) ;
124- if ( ! healthy ) {
125- logger . info (
126- "Cleaning up connection and trying a different endpoint" ,
127- apiLabel ,
128- ) ;
129- this . cleanupConnection ( ) ;
130- this . moveToNextEndpoint ( ) ;
131- this . initiateConnection ( ) ;
130+ const provider = await this . getProvider ( endpoints ) ;
131+ this . _wsProvider = provider ;
132+ logger . info (
133+ `[getAPI]: provider created with endpoint: ${ endpoints [ 0 ] } ` ,
134+ apiLabel ,
135+ ) ;
136+ const api = await ApiPromise . create ( {
137+ provider : provider ,
138+ noInitWarn : true ,
139+ } ) ;
140+ await api . isReadyOrError ;
141+ logger . info ( `[getApi] Api is ready` , apiLabel ) ;
142+ return api ;
143+ } catch ( e ) {
144+ if ( retries < 15 ) {
145+ return await this . getAPI ( retries + 1 ) ;
146+ } else {
147+ const provider = await this . getProvider ( endpoints ) ;
148+ return await ApiPromise . create ( {
149+ provider : provider ,
150+ noInitWarn : true ,
151+ } ) ;
132152 }
133- return healthy ;
134- } catch ( error ) {
135- logger . error ( `Health check failed: ${ error } ` , apiLabel ) ;
136- this . healthCheckInProgress = false ;
137- this . cleanupConnection ( ) ;
138- this . moveToNextEndpoint ( ) ;
139- this . initiateConnection ( ) ;
140- return false ;
141153 }
142154 }
143155
144- currentEndpoint ( ) : string {
145- return this . _endpoints [ this . _currentEndpointIndex ] ;
156+ async setAPI ( ) {
157+ const api = await this . getAPI ( 0 ) ;
158+ this . _api = api ;
159+ this . _registerEventHandlers ( this . _api ) ;
160+ return api ;
146161 }
147162
148- getApi ( ) : ApiPromise | null {
149- return this . _api ;
163+ isConnected ( ) : boolean {
164+ return this . _wsProvider ?. isConnected || false ;
150165 }
151166
152- isConnected ( ) : boolean {
153- return ! ! this . _wsProvider ?. isConnected && ! ! this . _api ?. isConnected ;
167+ getApi ( ) : ApiPromise | null {
168+ if ( ! this . _api ) {
169+ return null ;
170+ } else {
171+ return this . _api ;
172+ }
154173 }
155174
156175 _registerEventHandlers ( api : ApiPromise ) : void {
176+ if ( ! api ) {
177+ logger . warn ( `API is null, cannot register event handlers.` , apiLabel ) ;
178+ return ;
179+ }
180+
181+ logger . info ( `Registering event handlers...` , apiLabel ) ;
182+
157183 api . query . system . events ( ( events ) => {
158184 events . forEach ( ( record ) => {
159185 const { event } = record ;
186+
160187 if ( event . section === "session" && event . method === "NewSession" ) {
161188 const sessionIndex = Number ( event ?. data [ 0 ] ?. toString ( ) ) || 0 ;
162- this . emit ( "newSession" , sessionIndex ) ;
189+ this . handleNewSessionEvent ( sessionIndex ) ;
190+ }
191+
192+ if (
193+ event . section === "staking" &&
194+ ( event . method === "Reward" || event . method === "Rewarded" )
195+ ) {
196+ const [ stash , amount ] = event . data ;
197+ this . handleRewardEvent ( stash . toString ( ) , amount . toString ( ) ) ;
198+ }
199+
200+ if ( event . section === "imOnline" && event . method === "SomeOffline" ) {
201+ const data = event . data . toJSON ( ) ;
202+ const offlineVals =
203+ Array . isArray ( data ) && Array . isArray ( data [ 0 ] )
204+ ? data [ 0 ] . reduce ( ( acc : string [ ] , val ) => {
205+ if ( Array . isArray ( val ) && typeof val [ 0 ] === "string" ) {
206+ acc . push ( val [ 0 ] ) ;
207+ }
208+ return acc ;
209+ } , [ ] )
210+ : [ ] ;
211+ this . handleSomeOfflineEvent ( offlineVals as string [ ] ) ;
163212 }
164213 } ) ;
165214 } ) ;
166215 }
167216
168- private handleError ( error ) : void {
169- logger . error ( `Unhandled exception in ApiHandler: ${ error } ` , apiLabel ) ;
217+ handleNewSessionEvent ( sessionIndex : number ) : void {
218+ this . emit ( "newSession" , { sessionIndex } ) ;
219+ }
220+
221+ handleRewardEvent ( stash : string , amount : string ) : void {
222+ this . emit ( "reward" , { stash, amount } ) ;
223+ }
224+
225+ handleSomeOfflineEvent ( offlineVals : string [ ] ) : void {
226+ this . emit ( "someOffline" , { offlineVals } ) ;
170227 }
171228}
172229
0 commit comments