Skip to content

Commit 5c4a70a

Browse files
authored
Refactor connection TTL logic in WS server (#1678)
* feat: change behaviour of auto disconnect Signed-off-by: Ivo Yankov <[email protected]> * chore: fix failing after hook Signed-off-by: Ivo Yankov <[email protected]> * fix: subscriptioController unit tests Signed-off-by: Ivo Yankov <[email protected]> * refactor: update var names to be more descriptive Signed-off-by: Ivo Yankov <[email protected]> * docs: update env var description Signed-off-by: Ivo Yankov <[email protected]> --------- Signed-off-by: Ivo Yankov <[email protected]>
1 parent f654cbd commit 5c4a70a

File tree

8 files changed

+123
-49
lines changed

8 files changed

+123
-49
lines changed

charts/hedera-json-rpc-relay-websocket/templates/configmap.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ data:
1818
WEB_SOCKET_PORT: {{ .Values.config.WEB_SOCKET_PORT | quote }}
1919
WS_CONNECTION_LIMIT_PER_IP: {{ .Values.config.WS_CONNECTION_LIMIT_PER_IP | quote }}
2020
WS_CONNECTION_LIMIT: {{ .Values.config.WS_CONNECTION_LIMIT | quote }}
21-
WS_MAX_CONNECTION_TTL: {{ .Values.config.WS_MAX_CONNECTION_TTL | quote }}
21+
WS_MAX_INACTIVITY_TTL: {{ .Values.config.WS_MAX_INACTIVITY_TTL | quote }}
2222
WS_MULTIPLE_ADDRESSES_ENABLED: {{ .Values.config.WS_MULTIPLE_ADDRESSES_ENABLED | quote }}
2323
WS_SUBSCRIPTION_LIMIT: {{ .Values.config.WS_SUBSCRIPTION_LIMIT | quote }}
2424
WS_PING_INTERVAL: {{ .Values.config.WS_PING_INTERVAL | quote }}

charts/hedera-json-rpc-relay-websocket/values.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ config:
2525
WEB_SOCKET_HTTP_PORT: 8547
2626
WS_CONNECTION_LIMIT_PER_IP: 10
2727
WS_CONNECTION_LIMIT: 100
28-
WS_MAX_CONNECTION_TTL: 300000
28+
WS_MAX_INACTIVITY_TTL: 300000
2929
WS_MULTIPLE_ADDRESSES_ENABLED: false
3030
WS_SUBSCRIPTION_LIMIT: 100
3131
WS_PING_INTERVAL: 1000

docs/configuration.md

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -95,19 +95,19 @@ Unless you need to set a non-default value, it is recommended to only populate o
9595
The following table lists the available properties along with their default values for the [Ws-server package](/packages/ws-server/).
9696
Unless you need to set a non-default value, it is recommended to only populate overridden properties in the custom `.env`.
9797

98-
| Name | Default | Description |
99-
| ------------------------------- | -------- | --------------------------------------------------------------------------------------- |
100-
| `SUBSCRIPTIONS_ENABLED` | "false" | If enabled eth_subscribe will be enabled using WebSockets. |
101-
| `WS_MAX_CONNECTION_TTL` | "300000" | Time in ms that the web socket connection is allowed to stay open, currently 5 minutes. |
102-
| `WS_CONNECTION_LIMIT` | "10" | Maximum amount of concurrent web socket connections allowed. |
103-
| `WS_POLLING_INTERVAL` | "500" | Time in ms in between each poll to mirror node while there are subscriptions. |
104-
| `WEB_SOCKET_PORT` | "8546" | Port for the web socket connections |
105-
| `WEB_SOCKET_HTTP_PORT` | "8547" | Port for standard http server, used for metrics and health status endpoints |
106-
| `WS_SUBSCRIPTION_LIMIT` | "10" | Maximum amount of subscriptions per single connection |
107-
| `WS_CONNECTION_LIMIT_PER_IP` | "10" | Maximum amount of connections from a single IP address |
108-
| `WS_MULTIPLE_ADDRESSES_ENABLED` | "false" | If enabled eth_subscribe will allow subscription to multiple contract address. |
109-
| `WS_CACHE_TTL` | "20000" | The time to live for cached entries. |
110-
| `WS_PING_INTERVAL` | "1000" | Interval between ping messages. Set to `0` to disable pinger. |
98+
| Name | Default | Description |
99+
| ------------------------------- |----------|-------------------------------------------------------------------------------------------------------------------------------|
100+
| `SUBSCRIPTIONS_ENABLED` | "false" | If enabled eth_subscribe will be enabled using WebSockets. |
101+
| `WS_MAX_INACTIVITY_TTL` | "300000" | Time in ms that the web socket connection is allowed to stay open without any messages sent or received, currently 5 minutes. |
102+
| `WS_CONNECTION_LIMIT` | "10" | Maximum amount of concurrent web socket connections allowed. |
103+
| `WS_POLLING_INTERVAL` | "500" | Time in ms in between each poll to mirror node while there are subscriptions. |
104+
| `WEB_SOCKET_PORT` | "8546" | Port for the web socket connections |
105+
| `WEB_SOCKET_HTTP_PORT` | "8547" | Port for standard http server, used for metrics and health status endpoints |
106+
| `WS_SUBSCRIPTION_LIMIT` | "10" | Maximum amount of subscriptions per single connection |
107+
| `WS_CONNECTION_LIMIT_PER_IP` | "10" | Maximum amount of connections from a single IP address |
108+
| `WS_MULTIPLE_ADDRESSES_ENABLED` | "false" | If enabled eth_subscribe will allow subscription to multiple contract address. |
109+
| `WS_CACHE_TTL` | "20000" | The time to live for cached entries. |
110+
| `WS_PING_INTERVAL` | "1000" | Interval between ping messages. Set to `0` to disable pinger. |
111111

112112
## Sample for connecting to Hedera Environments
113113

packages/relay/src/lib/subscriptionController.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,7 @@ export class SubscriptionController {
173173
method: 'eth_subscription',
174174
params: subscriptionData
175175
}));
176+
sub.connection.limiter.resetInactivityTTLTimer(sub.connection)
176177
}
177178
});
178179
}

packages/relay/tests/lib/subscriptionController.spec.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,19 +29,24 @@ import sinon from 'sinon';
2929
import dotenv from "dotenv";
3030
import path from 'path';
3131
import {Registry} from "prom-client";
32+
import ConnectionLimiter from "@hashgraph/json-rpc-ws-server/dist/ConnectionLimiter";
3233

3334
dotenv.config({ path: path.resolve(__dirname, '../test.env') });
3435

3536
const logger = pino();
37+
const register = new Registry();
38+
const limiter = new ConnectionLimiter(logger, register);
3639
let ethImpl: EthImpl;
3740
let poller: Poller;
3841

3942
class MockWsConnection {
4043

4144
id: string;
45+
limiter: ConnectionLimiter;
4246

4347
constructor(id: string) {
4448
this.id = id;
49+
this.limiter = limiter;
4550
}
4651

4752
send(msg) {

packages/server/tests/acceptance/ws/subscribe.spec.ts

Lines changed: 66 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ describe('@web-socket Acceptance Tests', async function() {
8383
const accounts: AliasAccount[] = [];
8484
let logContractSigner;
8585
// Cached original ENV variables
86-
let originalWsMaxConnectionTtl;
86+
let originalWsMaxInactivityTtl;
8787
let originalWsMultipleAddressesEnabledValue;
8888

8989
const topics = [
@@ -93,7 +93,7 @@ describe('@web-socket Acceptance Tests', async function() {
9393
"0x0000000000000000000000000000000000000000000000000000000000000007"
9494
]
9595

96-
this.beforeAll(async () => {
96+
before(async () => {
9797
const { socketServer } = global;
9898
server = socketServer;
9999

@@ -109,24 +109,24 @@ describe('@web-socket Acceptance Tests', async function() {
109109
await new Promise(r => setTimeout(r, 5000));
110110
});
111111

112-
this.beforeEach(async () => {
112+
beforeEach(async () => {
113113
// restore original ENV value
114114
process.env.WS_MULTIPLE_ADDRESSES_ENABLED = originalWsMultipleAddressesEnabledValue;
115115

116-
117116
wsProvider = await new ethers.WebSocketProvider(WS_RELAY_URL);
118-
119117
requestId = Utils.generateRequestId();
120118
// Stabilizes the initial connection test.
121119
await new Promise(resolve => setTimeout(resolve, 1000));
122120
if (server)
123121
expect(server._connections).to.equal(1);
124122
});
125123

126-
this.afterEach(async () => {
127-
await wsProvider.destroy();
128-
await new Promise(resolve => setTimeout(resolve, 1000));
129-
if (server)
124+
afterEach(async () => {
125+
if (wsProvider) {
126+
await wsProvider.destroy();
127+
await new Promise(resolve => setTimeout(resolve, 1000));
128+
}
129+
if (server)
130130
expect(server._connections).to.equal(0);
131131
});
132132

@@ -509,17 +509,18 @@ describe('@web-socket Acceptance Tests', async function() {
509509
});
510510

511511
describe('Connection TTL', async function () {
512+
let TEST_TTL = 5000;
513+
512514
this.beforeAll(async () => {
513515
// cache original ENV values
514-
originalWsMaxConnectionTtl = process.env.WS_MAX_CONNECTION_TTL || '300000';
515-
process.env.WS_MAX_CONNECTION_TTL = '10000';
516+
originalWsMaxInactivityTtl = process.env.WS_MAX_INACTIVITY_TTL || '300000';
517+
process.env.WS_MAX_INACTIVITY_TTL = TEST_TTL.toString();
516518
});
517519
this.afterAll(async () => {
518520
// Return ENV variables to their original value
519-
process.env.WS_MAX_CONNECTION_TTL = originalWsMaxConnectionTtl;
521+
process.env.WS_MAX_INACTIVITY_TTL = originalWsMaxInactivityTtl;
520522
});
521523

522-
523524
it('Connection TTL is enforced, should close all connections', async function () {
524525
const wsConn2 = await new ethers.WebSocketProvider(WS_RELAY_URL);
525526
const wsConn3 = await new ethers.WebSocketProvider(WS_RELAY_URL);
@@ -542,12 +543,63 @@ describe('@web-socket Acceptance Tests', async function() {
542543
expect(message.toString('utf8')).to.equal(WebSocketError.TTL_EXPIRED.message);
543544
})
544545

545-
await new Promise(resolve => setTimeout(resolve, parseInt(process.env.WS_MAX_CONNECTION_TTL) + 1000));
546+
await new Promise(resolve => setTimeout(resolve, parseInt(process.env.WS_MAX_INACTIVITY_TTL) + 1000));
546547

547548
expect(closeEventHandled2).to.eq(true);
548549
expect(closeEventHandled3).to.eq(true);
549550
expect(server._connections).to.equal(0);
550551
});
552+
553+
describe('Connection TTL is reset', async function() {
554+
const initialWaitTime = 2000;
555+
let timeAtStart, closeEventHandled;
556+
557+
beforeEach(async () => {
558+
timeAtStart = Date.now();
559+
560+
closeEventHandled = false;
561+
wsProvider.websocket.on('close', (code, message) => {
562+
expect(code).to.equal(WebSocketError.TTL_EXPIRED.code);
563+
expect(message.toString('utf8')).to.equal(WebSocketError.TTL_EXPIRED.message);
564+
565+
closeEventHandled = true;
566+
const timeAtDisconnect = Date.now();
567+
expect(timeAtDisconnect - timeAtStart).to.be.gte(TEST_TTL + initialWaitTime);
568+
});
569+
});
570+
571+
afterEach(async () => {
572+
// wait for TTL to trigger + buffer time
573+
await new Promise(resolve => setTimeout(resolve, TEST_TTL + 1000));
574+
expect(closeEventHandled).to.eq(true);
575+
// @ts-ignore
576+
wsProvider = false;
577+
});
578+
579+
it('when the client sends a message', async function() {
580+
await new Promise(resolve => setTimeout(resolve, initialWaitTime));
581+
582+
const response = await wsProvider.send('eth_chainId', []);
583+
expect(response).to.eq(CHAIN_ID);
584+
});
585+
586+
it('when the server sends a message', async function() {
587+
let eventCaptured = false;
588+
wsProvider.on({address: logContractSigner.target}, function(data) {
589+
eventCaptured = true;
590+
});
591+
592+
await new Promise(resolve => setTimeout(resolve, initialWaitTime));
593+
const gasOptions = await Utils.gasOptions(requestId);
594+
const tx = await logContractSigner.log1(5, gasOptions);
595+
await tx.wait();
596+
597+
// buffer time
598+
await new Promise(resolve => setTimeout(resolve, 1000));
599+
600+
expect(eventCaptured).to.eq(true);
601+
});
602+
});
551603
});
552604

553605
describe('Subscribes to log events', async function () {

packages/ws-server/src/ConnectionLimiter.ts

Lines changed: 33 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
import { Logger } from "pino";
2222
import {WebSocketError} from "@hashgraph/json-rpc-relay";
23-
import {Histogram, Gauge, Registry, Counter} from "prom-client";
23+
import {Gauge, Registry, Counter} from "prom-client";
2424

2525
type IpCounter = {
2626
[key: string]: number;
@@ -36,7 +36,7 @@ export default class ConnectionLimiter {
3636
private ipConnectionsGauge: Gauge;
3737
private ipConnectionLimitCounter: Counter;
3838
private connectionLimitCounter: Counter;
39-
private connectionTTLCounter: Counter;
39+
private inactivityTTLCounter: Counter;
4040
private register: Registry;
4141

4242
constructor(logger: Logger, register: Registry) {
@@ -80,10 +80,10 @@ export default class ConnectionLimiter {
8080
registers: [register]
8181
});
8282

83-
const connectionTTLLimitMetric = 'rpc_websocket_total_connection_limit_by_ttl_enforced';
84-
this.register.removeSingleMetric(connectionTTLLimitMetric);
85-
this.connectionTTLCounter = new Counter({
86-
name: connectionTTLLimitMetric,
83+
const inactivityTTLLimitMetric = 'rpc_websocket_total_connection_limit_by_ttl_enforced';
84+
this.register.removeSingleMetric(inactivityTTLLimitMetric);
85+
this.inactivityTTLCounter = new Counter({
86+
name: inactivityTTLLimitMetric,
8787
help: 'Relay websocket total connection ttl limits enforced',
8888
registers: [register]
8989
});
@@ -139,19 +139,8 @@ export default class ConnectionLimiter {
139139
return;
140140
}
141141

142-
// Limit connection TTL and close connection if its reached
143-
const maxConnectionTTL = parseInt(process.env.WS_MAX_CONNECTION_TTL || '300000');
144-
setTimeout(() => {
145-
if (ctx.websocket.readyState !== 3) { // 3 = CLOSED, Avoid closing already closed connections
146-
this.logger.debug(`Closing connection ${ctx.websocket.id} due to reaching TTL of ${maxConnectionTTL}ms`);
147-
try {
148-
this.connectionTTLCounter.inc();
149-
ctx.websocket.close(TTL_EXPIRED.code, TTL_EXPIRED.message);
150-
} catch (e) {
151-
this.logger.error(`${ctx.websocket.id}: ${e}`);
152-
}
153-
}
154-
}, maxConnectionTTL);
142+
// Limit connection TTL and close connection when it is reached
143+
this.startInactivityTTLTimer(ctx.websocket);
155144
}
156145

157146
public incrementSubs(ctx) {
@@ -165,4 +154,29 @@ export default class ConnectionLimiter {
165154
public validateSubscriptionLimit(ctx) {
166155
return ctx.websocket.subscriptions < parseInt(process.env.WS_SUBSCRIPTION_LIMIT || '10');
167156
}
157+
158+
// Starts a timeout timer that closes the connection
159+
public startInactivityTTLTimer(websocket) {
160+
const maxInactivityTTL = parseInt(process.env.WS_MAX_INACTIVITY_TTL || '300000');
161+
websocket.inactivityTTL = setTimeout(() => {
162+
if (websocket.readyState !== 3) { // 3 = CLOSED, Avoid closing already closed connections
163+
this.logger.debug(`Closing connection ${websocket.id} due to reaching TTL of ${maxInactivityTTL}ms`);
164+
try {
165+
this.inactivityTTLCounter.inc();
166+
websocket.close(TTL_EXPIRED.code, TTL_EXPIRED.message);
167+
} catch (e) {
168+
this.logger.error(`${websocket.id}: ${e}`);
169+
}
170+
}
171+
}, maxInactivityTTL);
172+
}
173+
174+
// Resets the inactivity TTL timer
175+
public resetInactivityTTLTimer(websocket) {
176+
if (websocket?.inactivityTTL) {
177+
clearTimeout(websocket.inactivityTTL);
178+
}
179+
180+
this.startInactivityTTLTimer(websocket);
181+
}
168182
}

packages/ws-server/src/webSocketServer.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ async function validateSubscribeEthLogsParams(filters: any, requestId: string) {
119119

120120
app.ws.use(async (ctx) => {
121121
ctx.websocket.id = relay.subs()?.generateId();
122+
ctx.websocket.limiter = limiter;
122123
const connectionIdPrefix = formatConnectionIdMessage(ctx.websocket.id);
123124
const connectionRequestIdPrefix = formatRequestIdMessage(uuid());
124125
logger.info(`${connectionIdPrefix} ${connectionRequestIdPrefix} New connection established. Current active connections: ${ctx.app.server._connections}`);
@@ -136,7 +137,8 @@ app.ws.use(async (ctx) => {
136137
limiter.applyLimits(ctx);
137138

138139
ctx.websocket.on('message', async (msg) => {
139-
ctx.websocket.id = relay.subs()?.generateId();
140+
// Receiving a message from the client resets the TTL timer
141+
limiter.resetInactivityTTLTimer(ctx.websocket);
140142
const requestIdPrefix = formatRequestIdMessage(uuid());
141143
let request;
142144
try {

0 commit comments

Comments
 (0)