Skip to content
This repository was archived by the owner on Mar 18, 2025. It is now read-only.

Commit db066cf

Browse files
Merge pull request #2951 from w3f/yuri/websocket-disconnect
ApiHandler / ChainData refactoring
2 parents 5663431 + 8c2ae30 commit db066cf

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+592
-738
lines changed

apps/1kv-backend-staging/templates/kusama-otv-backend.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ spec:
5151
"wss://rpc.dotters.network/kusama",
5252
"wss://ksm-rpc.stakeworld.io"
5353
],
54+
"apiPeopleEndpoints": ["wss://kusama-people-rpc.polkadot.io"],
5455
"candidatesUrl": "https://raw.githubusercontent.com/w3f/1k-validators-be/master/candidates/kusama.json"
5556
},
5657
"constraints": {

apps/1kv-backend/templates/kusama-otv-backend.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ spec:
4848
"wss://rpc.dotters.network/kusama",
4949
"wss://ksm-rpc.stakeworld.io"
5050
],
51+
"apiPeopleEndpoints": ["wss://kusama-people-rpc.polkadot.io"],
5152
"candidatesUrl": "https://raw.githubusercontent.com/w3f/1k-validators-be/master/candidates/kusama.json"
5253
},
5354
"constraints": {

docs/docs/backend/config.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@ An example config may look something like:
4242

4343
- `dryRun`: Boolean (true/false). If set to true, the Nominator accounts that are added will calculate what a nomination would look like (how many and which validators), but not craft or submit any transactions. No nominations will be done when this flag is set. In the `nominate` function of the `Nominator` class, the `dryRun` flag is checked, and if it is set to true, the function will return after logging the validators it would nominate without doing anything. This flag is optional and set to `false` by default.
4444
- `networkPrefix`: Integer. Defines the network prefix. For Kusama, this is `2`, and for Polkadot, this is `0`. It can be set to `3` for running a local test network, although this isn't used much anymore. **This flag is required for `core` and `worker` services.**
45-
- `apiEndpoints`: Array of strings. Lists the RPC endpoints for the chain. When given a list of multiple, it will pick one at random to create a websocket connection to - this single connection is used throughout the entire service for any queries or submitting transactions. **This is required for `core` and `worker` services.
45+
- `apiEndpoints`: Array of strings. Lists the RPC endpoints for the chain. When given a list of multiple, it will pick one at random to create a websocket connection to - this single connection is used throughout the entire service for any queries or submitting transactions. **This is required for `core` and `worker` services.**
46+
- `apiPeopleEndpoints`: Optional array of strings. Lists the RPC endpoints for People parachain, if it's enabled on the network.
4647
- `bootstrap`: Boolean. An **optional** flag that can be set to `true` to enable the bootstrap process. This can be used when running a instance of the backend and would query the main Kusama or Polkadot instances at the api endpoints specified below to populate the db with non-deterministic values like `rank` or `discoveredAt`. _This isn't currently used anywhere yet_
4748
- `kusamaBootstrapEndpoint`: String. URL for the Kusama bootstrap endpoint. **optional**. _This isn't currently used anywhere yet_.
4849
- `polkadotBootstrapEndpoint`: String. URL for the Polkadot bootstrap endpoint. **optional**. _This isn't currently used anywhere yet_.
@@ -647,4 +648,4 @@ An example Worker config run as microservices may look something like:
647648
}
648649
}
649650

650-
```
651+
```

helmfile.d/config/kusama/otv-backend-ci.yaml.gotmpl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ config: |
2727
"wss://rpc.dotters.network/kusama",
2828
"wss://ksm-rpc.stakeworld.io"
2929
],
30+
"apiPeopleEndpoints": ["wss://kusama-people-rpc.polkadot.io"],
3031
"candidatesUrl": "https://raw.githubusercontent.com/w3f/1k-validators-be/master/candidates/kusama.json"
3132
},
3233
"constraints": {

packages/common/src/ApiHandler/ApiHandler.ts

Lines changed: 115 additions & 136 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ import { ApiPromise, WsProvider } from "@polkadot/api";
22
import EventEmitter from "eventemitter3";
33

44
import logger from "../logger";
5-
import { sleep } from "../utils/util";
65
import { API_PROVIDER_TIMEOUT, POLKADOT_API_TIMEOUT } from "../constants";
76

87
export const apiLabel = { label: "ApiHandler" };
@@ -12,167 +11,147 @@ export const apiLabel = { label: "ApiHandler" };
1211
* to a different provider if one proves troublesome.
1312
*/
1413
class ApiHandler extends EventEmitter {
15-
private _wsProvider?: WsProvider;
16-
private _api: ApiPromise | null = null;
17-
private readonly _endpoints: string[] = [];
18-
static isConnected = false;
19-
private healthCheckInProgress = false;
20-
private _currentEndpoint?: string;
21-
public upSince: number = Date.now();
14+
private wsProvider?: WsProvider;
15+
private api: ApiPromise | null = null;
16+
private readonly endpoints: string[] = [];
17+
18+
// If we're reconnecting right now, awaiting on this promise will block until connection succedes
19+
private connectionAttempt: Promise<void> | null = null;
20+
21+
public upSince = -1;
22+
public isConnected = false;
23+
2224
constructor(endpoints: string[]) {
2325
super();
24-
this._endpoints = endpoints.sort(() => Math.random() - 0.5);
25-
this.upSince = Date.now();
26+
this.endpoints = endpoints.sort(() => Math.random() - 0.5);
2627
}
2728

28-
async healthCheck(retries = 0): Promise<boolean> {
29-
if (retries < 10) {
29+
/**
30+
* This copies connectWithRetry() logic from WsProvider
31+
* The issue with original logic is that `autoConnectMs` is set to 0 when disconnect() is called, but we
32+
* want to call it from nextEndpoint()
33+
*
34+
* This function can be called multiple times, and it'll wait on the same promise, without spamming reconnects.
35+
* @see https://github.com/polkadot-js/api/blob/2ef84c5dcdbbff8aec9ba01e4f13a50130d1a6f3/packages/rpc-provider/src/ws/index.ts#L239-L271
36+
*/
37+
private async connectWithRetry(): Promise<void> {
38+
if (!this.wsProvider) {
39+
throw new Error(
40+
"connectWithRetry() is called before initializing WsProvider",
41+
);
42+
}
43+
44+
if (this.connectionAttempt instanceof Promise) {
45+
await this.connectionAttempt;
46+
return;
47+
}
48+
49+
this.isConnected = false;
50+
this.connectionAttempt = new Promise(async (resolve) => {
3051
try {
31-
this.healthCheckInProgress = true;
32-
let chain;
33-
34-
const isConnected = this._wsProvider?.isConnected;
35-
if (isConnected && !this._api?.isConnected) {
36-
try {
37-
chain = await this._api?.rpc.system.chain();
38-
} catch (e) {
39-
await sleep(API_PROVIDER_TIMEOUT);
40-
}
41-
}
42-
chain = await this._api?.rpc.system.chain();
43-
44-
if (isConnected && chain) {
45-
this.healthCheckInProgress = false;
46-
return true;
47-
} else {
48-
await sleep(API_PROVIDER_TIMEOUT);
49-
logger.info(`api still disconnected, disconnecting.`, apiLabel);
50-
await this._wsProvider?.disconnect();
51-
await this.getProvider(this._endpoints);
52-
await this.getAPI();
53-
return false;
54-
}
55-
} catch (e: unknown) {
56-
const errorMessage =
57-
e instanceof Error ? e.message : "An unknown error occurred";
58-
logger.error(
59-
`Error in health check for WS Provider for rpc. ${errorMessage}`,
52+
await this.wsProvider.connect();
53+
54+
await new Promise<void>((resolve, reject) => {
55+
const unsubConnect = this.wsProvider.on("connected", resolve);
56+
const unsubDisconnect = this.wsProvider.on("disconnected", reject);
57+
this.connectionAttempt.finally(() => {
58+
unsubConnect();
59+
unsubDisconnect();
60+
});
61+
});
62+
63+
this.connectionAttempt = null;
64+
this.upSince = Date.now();
65+
this.isConnected = true;
66+
logger.info(`Connected to ${this.currentEndpoint()}`, apiLabel);
67+
resolve();
68+
} catch (err) {
69+
logger.warn(
70+
`Connection attempt to ${this.currentEndpoint()} failed: ${JSON.stringify(err)}, trying next endpoint`,
6071
apiLabel,
6172
);
62-
this.healthCheckInProgress = false;
63-
return false;
73+
setTimeout(() => {
74+
this.connectionAttempt = null;
75+
this.connectWithRetry().then(resolve);
76+
}, API_PROVIDER_TIMEOUT);
6477
}
65-
}
66-
return false;
67-
}
78+
});
6879

69-
public currentEndpoint() {
70-
return this._currentEndpoint;
80+
await this.connectionAttempt;
7181
}
7282

73-
async getProvider(endpoints: string[]): Promise<WsProvider> {
74-
return await new Promise<WsProvider>((resolve, reject) => {
75-
const wsProvider = new WsProvider(
76-
endpoints,
77-
5000,
78-
undefined,
79-
POLKADOT_API_TIMEOUT,
80-
);
83+
/**
84+
* In case of errors like RPC rate limit, we might want to force endpoint change
85+
* PJS handles endpoint rotation internally, changing the endpoint on every next connection attempt.
86+
* We only disconnect here; reconnect happens inside `"disconnected"` event handler
87+
*/
88+
async nextEndpoint() {
89+
logger.info("Rotating API endpoint", apiLabel);
90+
await this.wsProvider.disconnect();
91+
await this.connectWithRetry();
92+
}
8193

82-
wsProvider.on("disconnected", async () => {
83-
try {
84-
const isHealthy = await this.healthCheck();
85-
logger.info(
86-
`[Disconnection] ${this._currentEndpoint}} Health check result: ${isHealthy}`,
87-
apiLabel,
88-
);
89-
resolve(wsProvider);
90-
} catch (error: any) {
91-
logger.warn(
92-
`WS provider for rpc ${endpoints[0]} disconnected!`,
93-
apiLabel,
94-
);
95-
reject(error);
96-
}
97-
});
98-
wsProvider.on("connected", () => {
99-
logger.info(`WS provider for rpc ${endpoints[0]} connected`, apiLabel);
100-
this._currentEndpoint = endpoints[0];
101-
resolve(wsProvider);
102-
});
103-
wsProvider.on("error", async () => {
104-
try {
105-
const isHealthy = await this.healthCheck();
106-
logger.info(
107-
`[Error] ${this._currentEndpoint} Health check result: ${isHealthy}`,
108-
apiLabel,
109-
);
110-
resolve(wsProvider);
111-
} catch (error: any) {
112-
logger.error(`Error thrown for rpc ${this._endpoints[0]}`, apiLabel);
113-
reject(error);
114-
}
115-
});
116-
});
94+
currentEndpoint(): string | undefined {
95+
return this.wsProvider?.endpoint;
11796
}
11897

119-
async getAPI(retries = 0): Promise<ApiPromise> {
120-
if (this._wsProvider && this._api && this._api?.isConnected) {
121-
return this._api;
98+
private async healthCheck(): Promise<void> {
99+
if (this.connectionAttempt instanceof Promise) {
100+
return;
122101
}
123-
const endpoints = this._endpoints.sort(() => Math.random() - 0.5);
124-
125102
try {
126-
logger.info(
127-
`[getAPI]: try ${retries} creating provider with endpoint ${endpoints[0]}`,
103+
const api = await this.getApi();
104+
await api.rpc.system.chain();
105+
} catch (err) {
106+
logger.warn(
107+
`Healthcheck on ${this.currentEndpoint()} failed: ${JSON.stringify(err)}, trying next endpoint`,
128108
apiLabel,
129109
);
130-
const provider = await this.getProvider(endpoints);
131-
this._wsProvider = provider;
132-
logger.info(
133-
`[getAPI]: provider created with endpoint: ${endpoints[0]}`,
134-
apiLabel,
135-
);
136-
const api = await ApiPromise.create({
137-
provider: provider,
138-
noInitWarn: true,
139-
});
140-
await api.isReadyOrError;
141-
logger.info(`[getApi] Api is ready`, apiLabel);
142-
return api;
143-
} catch (e) {
144-
if (retries < 15) {
145-
return await this.getAPI(retries + 1);
146-
} else {
147-
const provider = await this.getProvider(endpoints);
148-
return await ApiPromise.create({
149-
provider: provider,
150-
noInitWarn: true,
151-
});
152-
}
110+
await this.nextEndpoint();
153111
}
154112
}
155113

156-
async setAPI() {
157-
const api = await this.getAPI(0);
158-
this._api = api;
159-
this._registerEventHandlers(this._api);
160-
return api;
161-
}
114+
/**
115+
* This function provides access to PJS api. While the ApiPromise instance never changes,
116+
* the function will block if we're reconnecting.
117+
* It's intended to be called every time instead of saving ApiPromise instance long-term.
118+
*/
119+
async getApi(): Promise<ApiPromise> {
120+
if (!this.wsProvider) {
121+
this.wsProvider = new WsProvider(
122+
this.endpoints,
123+
false, // Do not autoconnect
124+
undefined,
125+
POLKADOT_API_TIMEOUT,
126+
);
127+
await this.connectWithRetry();
128+
this.wsProvider.on("disconnected", () => {
129+
logger.warn(`WsProvider disconnected`, apiLabel);
130+
this.connectWithRetry();
131+
});
132+
}
133+
if (!this.api) {
134+
this.api = await ApiPromise.create({
135+
provider: this.wsProvider,
136+
noInitWarn: true,
137+
});
138+
await this.api.isReady;
139+
this.registerEventHandlers(this.api);
162140

163-
isConnected(): boolean {
164-
return this._wsProvider?.isConnected || false;
165-
}
141+
// healthcheck queries RPC, thus its interval can't be shorter than RPC timout
142+
setInterval(() => {
143+
void this.healthCheck();
144+
}, POLKADOT_API_TIMEOUT);
145+
}
166146

167-
getApi(): ApiPromise | null {
168-
if (!this._api) {
169-
return null;
170-
} else {
171-
return this._api;
147+
if (this.connectionAttempt instanceof Promise) {
148+
await this.connectionAttempt;
172149
}
150+
151+
return this.api;
173152
}
174153

175-
_registerEventHandlers(api: ApiPromise): void {
154+
private registerEventHandlers(api: ApiPromise): void {
176155
if (!api) {
177156
logger.warn(`API is null, cannot register event handlers.`, apiLabel);
178157
return;

0 commit comments

Comments
 (0)