Skip to content
This repository was archived by the owner on Mar 18, 2025. It is now read-only.

Commit 5ad0ab4

Browse files
ApiHandler / ChainData refactoring
Removed a lot of retry/reconnect logic from ApiHandler and ChainData, as it was spawning a lot of PJS instances, which also handling reconnects on its own. This change also properly adpots the idea of talking both to relay chain and to parachains, depending on queried data. With new logic, each ApiHandler instance governs an interface to a single chain, where it can switch endpoints and do reconnects on its own. `ApiHandler.getApi()` returns a PJS instance that should work, and will block in case we're reconnecting. PeopleApi endpoints now live in configs, instead of being hardcoded. Instead of passing around ApiHandlers, passing a ChainData instance now, as it contains both ApiHandlers instance now, and as it is an intended abstraction for it
1 parent d6a764f commit 5ad0ab4

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+581
-736
lines changed

docs/docs/backend/config.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@ An example config may look something like:
4242

4343
- `dryRun`: Boolean (true/false). If set to true, the Nominator accounts that are added will calculate what a nomination would look like (how many and which validators), but not craft or submit any transactions. No nominations will be done when this flag is set. In the `nominate` function of the `Nominator` class, the `dryRun` flag is checked, and if it is set to true, the function will return after logging the validators it would nominate without doing anything. This flag is optional and set to `false` by default.
4444
- `networkPrefix`: Integer. Defines the network prefix. For Kusama, this is `2`, and for Polkadot, this is `0`. It can be set to `3` for running a local test network, although this isn't used much anymore. **This flag is required for `core` and `worker` services.**
45-
- `apiEndpoints`: Array of strings. Lists the RPC endpoints for the chain. When given a list of multiple, it will pick one at random to create a websocket connection to - this single connection is used throughout the entire service for any queries or submitting transactions. **This is required for `core` and `worker` services.
45+
- `apiEndpoints`: Array of strings. Lists the RPC endpoints for the chain. When given a list of multiple, it will pick one at random to create a websocket connection to - this single connection is used throughout the entire service for any queries or submitting transactions. **This is required for `core` and `worker` services.**
46+
- `apiPeopleEndpoints`: Optional array of strings. Lists the RPC endpoints for People parachain, if it's enabled on the network.
4647
- `bootstrap`: Boolean. An **optional** flag that can be set to `true` to enable the bootstrap process. This can be used when running a instance of the backend and would query the main Kusama or Polkadot instances at the api endpoints specified below to populate the db with non-deterministic values like `rank` or `discoveredAt`. _This isn't currently used anywhere yet_
4748
- `kusamaBootstrapEndpoint`: String. URL for the Kusama bootstrap endpoint. **optional**. _This isn't currently used anywhere yet_.
4849
- `polkadotBootstrapEndpoint`: String. URL for the Polkadot bootstrap endpoint. **optional**. _This isn't currently used anywhere yet_.
@@ -645,4 +646,4 @@ An example Worker config run as microservices may look something like:
645646
}
646647
}
647648

648-
```
649+
```

helmfile.d/config/kusama/otv-backend-ci.yaml.gotmpl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ config: |
2727
"wss://rpc.dotters.network/kusama",
2828
"wss://ksm-rpc.stakeworld.io"
2929
],
30+
"apiPeopleEndpoints": ["wss://kusama-people-rpc.polkadot.io"],
3031
"candidatesUrl": "https://raw.githubusercontent.com/w3f/1k-validators-be/master/candidates/kusama.json"
3132
},
3233
"constraints": {

packages/common/src/ApiHandler/ApiHandler.ts

Lines changed: 115 additions & 136 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ import { ApiPromise, WsProvider } from "@polkadot/api";
22
import EventEmitter from "eventemitter3";
33

44
import logger from "../logger";
5-
import { sleep } from "../utils/util";
65
import { API_PROVIDER_TIMEOUT, POLKADOT_API_TIMEOUT } from "../constants";
76

87
export const apiLabel = { label: "ApiHandler" };
@@ -12,167 +11,147 @@ export const apiLabel = { label: "ApiHandler" };
1211
* to a different provider if one proves troublesome.
1312
*/
1413
class ApiHandler extends EventEmitter {
15-
private _wsProvider?: WsProvider;
16-
private _api: ApiPromise | null = null;
17-
private readonly _endpoints: string[] = [];
18-
static isConnected = false;
19-
private healthCheckInProgress = false;
20-
private _currentEndpoint?: string;
21-
public upSince: number = Date.now();
14+
private wsProvider?: WsProvider;
15+
private api: ApiPromise | null = null;
16+
private readonly endpoints: string[] = [];
17+
18+
// If we're reconnecting right now, awaiting on this promise will block until connection succedes
19+
private connectionAttempt: Promise<void> | null = null;
20+
21+
public upSince = -1;
22+
public isConnected = false;
23+
2224
constructor(endpoints: string[]) {
2325
super();
24-
this._endpoints = endpoints.sort(() => Math.random() - 0.5);
25-
this.upSince = Date.now();
26+
this.endpoints = endpoints.sort(() => Math.random() - 0.5);
2627
}
2728

28-
async healthCheck(retries = 0): Promise<boolean> {
29-
if (retries < 10) {
29+
/**
30+
* This copies connectWithRetry() logic from WsProvider
31+
* The issue with original logic is that `autoConnectMs` is set to 0 when disconnect() is called, but we
32+
* want to call it from nextEndpoint()
33+
*
34+
* This function can be called multiple times, and it'll wait on the same promise, without spamming reconnects.
35+
* @see https://github.com/polkadot-js/api/blob/2ef84c5dcdbbff8aec9ba01e4f13a50130d1a6f3/packages/rpc-provider/src/ws/index.ts#L239-L271
36+
*/
37+
private async connectWithRetry(): Promise<void> {
38+
if (!this.wsProvider) {
39+
throw new Error(
40+
"connectWithRetry() is called before initializing WsProvider",
41+
);
42+
}
43+
44+
if (this.connectionAttempt instanceof Promise) {
45+
await this.connectionAttempt;
46+
return;
47+
}
48+
49+
this.isConnected = false;
50+
this.connectionAttempt = new Promise(async (resolve) => {
3051
try {
31-
this.healthCheckInProgress = true;
32-
let chain;
33-
34-
const isConnected = this._wsProvider?.isConnected;
35-
if (isConnected && !this._api?.isConnected) {
36-
try {
37-
chain = await this._api?.rpc.system.chain();
38-
} catch (e) {
39-
await sleep(API_PROVIDER_TIMEOUT);
40-
}
41-
}
42-
chain = await this._api?.rpc.system.chain();
43-
44-
if (isConnected && chain) {
45-
this.healthCheckInProgress = false;
46-
return true;
47-
} else {
48-
await sleep(API_PROVIDER_TIMEOUT);
49-
logger.info(`api still disconnected, disconnecting.`, apiLabel);
50-
await this._wsProvider?.disconnect();
51-
await this.getProvider(this._endpoints);
52-
await this.getAPI();
53-
return false;
54-
}
55-
} catch (e: unknown) {
56-
const errorMessage =
57-
e instanceof Error ? e.message : "An unknown error occurred";
58-
logger.error(
59-
`Error in health check for WS Provider for rpc. ${errorMessage}`,
52+
await this.wsProvider.connect();
53+
54+
await new Promise<void>((resolve, reject) => {
55+
const unsubConnect = this.wsProvider.on("connected", resolve);
56+
const unsubDisconnect = this.wsProvider.on("disconnected", reject);
57+
this.connectionAttempt.finally(() => {
58+
unsubConnect();
59+
unsubDisconnect();
60+
});
61+
});
62+
63+
this.connectionAttempt = null;
64+
this.upSince = Date.now();
65+
this.isConnected = true;
66+
logger.info(`Connected to ${this.currentEndpoint()}`, apiLabel);
67+
resolve();
68+
} catch (err) {
69+
logger.warn(
70+
`Connection attempt to ${this.currentEndpoint()} failed: ${JSON.stringify(err)}, trying next endpoint`,
6071
apiLabel,
6172
);
62-
this.healthCheckInProgress = false;
63-
return false;
73+
setTimeout(() => {
74+
this.connectionAttempt = null;
75+
this.connectWithRetry().then(resolve);
76+
}, API_PROVIDER_TIMEOUT);
6477
}
65-
}
66-
return false;
67-
}
78+
});
6879

69-
public currentEndpoint() {
70-
return this._currentEndpoint;
80+
await this.connectionAttempt;
7181
}
7282

73-
async getProvider(endpoints: string[]): Promise<WsProvider> {
74-
return await new Promise<WsProvider>((resolve, reject) => {
75-
const wsProvider = new WsProvider(
76-
endpoints,
77-
5000,
78-
undefined,
79-
POLKADOT_API_TIMEOUT,
80-
);
83+
/**
84+
* In case of errors like RPC rate limit, we might want to force endpoint change
85+
* PJS handles endpoint rotation internally, changing the endpoint on every next connection attempt.
86+
* We only disconnect here; reconnect happens inside `"disconnected"` event handler
87+
*/
88+
async nextEndpoint() {
89+
logger.info("Rotating API endpoint", apiLabel);
90+
await this.wsProvider.disconnect();
91+
await this.connectWithRetry();
92+
}
8193

82-
wsProvider.on("disconnected", async () => {
83-
try {
84-
const isHealthy = await this.healthCheck();
85-
logger.info(
86-
`[Disconnection] ${this._currentEndpoint}} Health check result: ${isHealthy}`,
87-
apiLabel,
88-
);
89-
resolve(wsProvider);
90-
} catch (error: any) {
91-
logger.warn(
92-
`WS provider for rpc ${endpoints[0]} disconnected!`,
93-
apiLabel,
94-
);
95-
reject(error);
96-
}
97-
});
98-
wsProvider.on("connected", () => {
99-
logger.info(`WS provider for rpc ${endpoints[0]} connected`, apiLabel);
100-
this._currentEndpoint = endpoints[0];
101-
resolve(wsProvider);
102-
});
103-
wsProvider.on("error", async () => {
104-
try {
105-
const isHealthy = await this.healthCheck();
106-
logger.info(
107-
`[Error] ${this._currentEndpoint} Health check result: ${isHealthy}`,
108-
apiLabel,
109-
);
110-
resolve(wsProvider);
111-
} catch (error: any) {
112-
logger.error(`Error thrown for rpc ${this._endpoints[0]}`, apiLabel);
113-
reject(error);
114-
}
115-
});
116-
});
94+
currentEndpoint(): string | undefined {
95+
return this.wsProvider?.endpoint;
11796
}
11897

119-
async getAPI(retries = 0): Promise<ApiPromise> {
120-
if (this._wsProvider && this._api && this._api?.isConnected) {
121-
return this._api;
98+
private async healthCheck(): Promise<void> {
99+
if (this.connectionAttempt instanceof Promise) {
100+
return;
122101
}
123-
const endpoints = this._endpoints.sort(() => Math.random() - 0.5);
124-
125102
try {
126-
logger.info(
127-
`[getAPI]: try ${retries} creating provider with endpoint ${endpoints[0]}`,
103+
const api = await this.getApi();
104+
await api.rpc.system.chain();
105+
} catch (err) {
106+
logger.warn(
107+
`Healthcheck on ${this.currentEndpoint()} failed: ${JSON.stringify(err)}, trying next endpoint`,
128108
apiLabel,
129109
);
130-
const provider = await this.getProvider(endpoints);
131-
this._wsProvider = provider;
132-
logger.info(
133-
`[getAPI]: provider created with endpoint: ${endpoints[0]}`,
134-
apiLabel,
135-
);
136-
const api = await ApiPromise.create({
137-
provider: provider,
138-
noInitWarn: true,
139-
});
140-
await api.isReadyOrError;
141-
logger.info(`[getApi] Api is ready`, apiLabel);
142-
return api;
143-
} catch (e) {
144-
if (retries < 15) {
145-
return await this.getAPI(retries + 1);
146-
} else {
147-
const provider = await this.getProvider(endpoints);
148-
return await ApiPromise.create({
149-
provider: provider,
150-
noInitWarn: true,
151-
});
152-
}
110+
await this.nextEndpoint();
153111
}
154112
}
155113

156-
async setAPI() {
157-
const api = await this.getAPI(0);
158-
this._api = api;
159-
this._registerEventHandlers(this._api);
160-
return api;
161-
}
114+
/**
115+
* This function provides access to PJS api. While the ApiPromise instance never changes,
116+
* the function will block if we're reconnecting.
117+
* It's intended to be called every time instead of saving ApiPromise instance long-term.
118+
*/
119+
async getApi(): Promise<ApiPromise> {
120+
if (!this.wsProvider) {
121+
this.wsProvider = new WsProvider(
122+
this.endpoints,
123+
false, // Do not autoconnect
124+
undefined,
125+
POLKADOT_API_TIMEOUT,
126+
);
127+
await this.connectWithRetry();
128+
this.wsProvider.on("disconnected", () => {
129+
logger.warn(`WsProvider disconnected`, apiLabel);
130+
this.connectWithRetry();
131+
});
132+
}
133+
if (!this.api) {
134+
this.api = await ApiPromise.create({
135+
provider: this.wsProvider,
136+
noInitWarn: true,
137+
});
138+
await this.api.isReady;
139+
this.registerEventHandlers(this.api);
162140

163-
isConnected(): boolean {
164-
return this._wsProvider?.isConnected || false;
165-
}
141+
// healthcheck queries RPC, thus its interval can't be shorter than RPC timout
142+
setInterval(() => {
143+
void this.healthCheck();
144+
}, POLKADOT_API_TIMEOUT);
145+
}
166146

167-
getApi(): ApiPromise | null {
168-
if (!this._api) {
169-
return null;
170-
} else {
171-
return this._api;
147+
if (this.connectionAttempt instanceof Promise) {
148+
await this.connectionAttempt;
172149
}
150+
151+
return this.api;
173152
}
174153

175-
_registerEventHandlers(api: ApiPromise): void {
154+
private registerEventHandlers(api: ApiPromise): void {
176155
if (!api) {
177156
logger.warn(`API is null, cannot register event handlers.`, apiLabel);
178157
return;

packages/common/src/ApiHandler/__mocks__/ApiHandler.ts

Lines changed: 13 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,30 @@
11
import EventEmitter from "eventemitter3";
22
import { ApiPromise } from "@polkadot/api";
3+
import { vi } from "vitest";
34

45
const createMockApiPromise = (): any => ({
5-
isConnected: jest.fn().mockReturnValue(true),
6+
isConnected: vi.fn().mockReturnValue(true),
67
query: {
78
system: {
8-
events: jest.fn().mockImplementation((callback) => callback([])), // Simplified; adjust as needed
9+
events: vi.fn().mockImplementation((callback) => callback([])), // Simplified; adjust as needed
910
},
1011
},
1112
isReadyOrError: Promise.resolve(),
1213
// Use the function itself to provide a new instance for the 'create' method
13-
create: jest.fn().mockImplementation(() => createMockApiPromise()),
14+
create: vi.fn().mockImplementation(() => createMockApiPromise()),
1415
// Add more mocked methods and properties as needed
1516
});
1617

1718
// A mock class for ApiHandler
1819
class ApiHandlerMock extends EventEmitter {
19-
private _endpoints: string[];
20-
private _api: ApiPromise = createMockApiPromise as unknown as ApiPromise;
21-
private healthCheckInProgress = false;
20+
private endpoints: string[];
21+
private api: ApiPromise = createMockApiPromise as unknown as ApiPromise;
2222

2323
constructor(endpoints: string[]) {
2424
super();
2525
// Initialize with mock data or behavior as needed
26-
this._endpoints = endpoints.sort(() => Math.random() - 0.5);
27-
}
28-
29-
async healthCheck(): Promise<boolean> {
30-
this.healthCheckInProgress = true;
31-
// Simulate the health check logic; adjust the logic as needed for your tests
32-
const isConnected = this._api.isConnected;
33-
this.healthCheckInProgress = false;
34-
return isConnected;
26+
this.endpoints = endpoints.sort(() => Math.random() - 0.5);
27+
this.api = createMockApiPromise();
3528
}
3629

3730
async getProvider(endpoints: string[]): Promise<ApiPromise | undefined> {
@@ -42,25 +35,15 @@ class ApiHandlerMock extends EventEmitter {
4235
return undefined;
4336
}
4437

45-
async getAPI(retries = 0): Promise<ApiPromise> {
46-
// Use the mockApiPromise directly for simplicity
47-
return this._api;
48-
}
49-
50-
async setAPI(): Promise<void> {
51-
// Directly set the mock _api; in a real scenario, you might want to simulate more complex logic
52-
this._api = await this.getAPI();
53-
}
54-
55-
isConnected(): boolean {
56-
return this._api.isConnected;
38+
get isConnected(): boolean {
39+
return this.api.isConnected;
5740
}
5841

59-
getApi(): ApiPromise {
60-
return this._api;
42+
async getApi(): Promise<ApiPromise> {
43+
return this.api;
6144
}
6245

63-
_registerEventHandlers(api: ApiPromise): void {
46+
private registerEventHandlers(api: ApiPromise): void {
6447
// Simplify the event handler registration for testing purposes
6548
// In a real scenario, you might want to simulate more complex event handling
6649
api.query.system.events((events) => {

0 commit comments

Comments
 (0)