Skip to content

Commit 3ffc120

Browse files
authored
chore: cherry picked 2487 (#2518)
feat: improved metrics and added rate limit for the batch request feature in ws server (#2474) (#2487) * feat: added shouldRateLimitOnMethod() to connectionLimiter * feat: applied shouldRateLimitOnMethod() on batch requests * feat: applied shouldRateLimitOnMethod() on single requests * feat: added methodsCounter metric for batch_requests * fix: skipped rateLimit for eth_subscribe and eth_unsubscribe * test: added acceptancetest for rate limiter in the WS * test: enabled test_ws_server in ratelimiter CI --------- Signed-off-by: Logan Nguyen <[email protected]>
1 parent 6fb88d6 commit 3ffc120

File tree

9 files changed

+169
-19
lines changed

9 files changed

+169
-19
lines changed

.github/workflows/acceptance.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ jobs:
4747
uses: ./.github/workflows/acceptance-workflow.yml
4848
with:
4949
testfilter: ratelimiter
50+
test_ws_server: true
5051

5152
tokencreate:
5253
name: Token Create

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
"acceptancetest:api_batch2": "ts-mocha packages/server/tests/acceptance/index.spec.ts -g '@api-batch-2' --exit",
3434
"acceptancetest:api_batch3": "ts-mocha packages/server/tests/acceptance/index.spec.ts -g '@api-batch-3' --exit",
3535
"acceptancetest:erc20": "ts-mocha packages/server/tests/acceptance/index.spec.ts -g '@erc20' --exit",
36-
"acceptancetest:ratelimiter": "ts-mocha packages/server/tests/acceptance/index.spec.ts -g '@ratelimiter' --exit",
36+
"acceptancetest:ratelimiter": "ts-mocha packages/ws-server/tests/acceptance/index.spec.ts -g '@web-socket-ratelimiter' --exit && ts-mocha packages/server/tests/acceptance/index.spec.ts -g '@ratelimiter' --exit",
3737
"acceptancetest:tokencreate": "ts-mocha packages/server/tests/acceptance/index.spec.ts -g '@tokencreate' --exit",
3838
"acceptancetest:tokenmanagement": "ts-mocha packages/server/tests/acceptance/index.spec.ts -g '@tokenmanagement' --exit",
3939
"acceptancetest:htsprecompilev1": "ts-mocha packages/server/tests/acceptance/index.spec.ts -g '@htsprecompilev1' --exit",

packages/server/tests/localAcceptance.env

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,3 +22,4 @@ BATCH_REQUESTS_ENABLED=true
2222
TEST_GAS_PRICE_DEVIATION=0.2
2323
WS_NEW_HEADS_ENABLED=false
2424
INITIAL_BALANCE='5000000000'
25+
LIMIT_DURATION=90000

packages/ws-server/src/controllers/index.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,11 @@ import { handleEthSubsribe, handleEthUnsubscribe } from './eth_subscribe';
2727
import { MirrorNodeClient } from '@hashgraph/json-rpc-relay/dist/lib/clients';
2828
import jsonResp from '@hashgraph/json-rpc-server/dist/koaJsonRpc/lib/RpcResponse';
2929
import { resolveParams, validateJsonRpcRequest, verifySupportedMethod } from '../utils/utils';
30-
import { InvalidRequest, MethodNotFound } from '@hashgraph/json-rpc-server/dist/koaJsonRpc/lib/RpcError';
30+
import {
31+
InvalidRequest,
32+
MethodNotFound,
33+
IPRateLimitExceeded,
34+
} from '@hashgraph/json-rpc-server/dist/koaJsonRpc/lib/RpcError';
3135

3236
/**
3337
* Handles sending requests to a Relay by calling a specified method with given parameters.
@@ -122,6 +126,11 @@ export const getRequestResult = async (
122126
return jsonResp(request.id || null, new MethodNotFound(request.method), undefined);
123127
}
124128

129+
// verify rate limit for method method based on IP
130+
if (limiter.shouldRateLimitOnMethod(ctx.ip, request.method, ctx.websocket.requestId)) {
131+
return jsonResp(null, new IPRateLimitExceeded(request.method), undefined);
132+
}
133+
125134
// Validate request's params
126135
try {
127136
const methodValidations = Validator.METHODS[method];

packages/ws-server/src/metrics/connectionLimiter.ts

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ import { Logger } from 'pino';
2222
import { WS_CONSTANTS } from '../utils/constants';
2323
import { Gauge, Registry, Counter } from 'prom-client';
2424
import { WebSocketError } from '@hashgraph/json-rpc-relay';
25+
import RateLimit from '@hashgraph/json-rpc-server/dist/rateLimit';
26+
import constants from '@hashgraph/json-rpc-relay/dist/lib/constants';
27+
import { methodConfiguration } from '@hashgraph/json-rpc-server/dist/koaJsonRpc/lib/methodConfiguration';
2528

2629
type IpCounter = {
2730
[key: string]: number;
@@ -39,6 +42,7 @@ export default class ConnectionLimiter {
3942
private connectionLimitCounter: Counter;
4043
private inactivityTTLCounter: Counter;
4144
private register: Registry;
45+
private rateLimit: RateLimit;
4246

4347
constructor(logger: Logger, register: Registry) {
4448
this.logger = logger;
@@ -82,6 +86,11 @@ export default class ConnectionLimiter {
8286
help: WS_CONSTANTS.connLimiter.inactivityTTLLimitMetric.help,
8387
registers: [register],
8488
});
89+
90+
const rateLimitDuration = process.env.LIMIT_DURATION
91+
? parseInt(process.env.LIMIT_DURATION)
92+
: constants.DEFAULT_RATE_LIMIT.DURATION;
93+
this.rateLimit = new RateLimit(logger.child({ name: 'ip-rate-limit' }), register, rateLimitDuration);
8594
}
8695

8796
public incrementCounters(ctx) {
@@ -199,4 +208,13 @@ export default class ConnectionLimiter {
199208

200209
this.startInactivityTTLTimer(websocket);
201210
}
211+
212+
public shouldRateLimitOnMethod(ip, methodName, requestId) {
213+
// subcription limits are already covered in this.validateSubscriptionLimit()
214+
if (methodName === WS_CONSTANTS.METHODS.ETH_SUBSCRIBE || methodName === WS_CONSTANTS.METHODS.ETH_UNSUBSCRIBE)
215+
return false;
216+
217+
const methodTotalLimit = methodConfiguration[methodName].total;
218+
return this.rateLimit.shouldRateLimit(ip, methodName, methodTotalLimit, requestId);
219+
}
202220
}

packages/ws-server/src/webSocketServer.ts

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ dotenv.config({ path: path.resolve(__dirname, '../../../.env') });
3434
import KoaJsonRpc from '@hashgraph/json-rpc-server/dist/koaJsonRpc';
3535
import jsonResp from '@hashgraph/json-rpc-server/dist/koaJsonRpc/lib/RpcResponse';
3636
import { type Relay, RelayImpl, predefined, JsonRpcError } from '@hashgraph/json-rpc-relay';
37+
import { IPRateLimitExceeded } from '@hashgraph/json-rpc-server/dist/koaJsonRpc/lib/RpcError';
3738
import { sendToClient, handleConnectionClose, getBatchRequestsMaxSize, getWsBatchRequestsEnabled } from './utils/utils';
3839

3940
const mainLogger = pino({
@@ -66,10 +67,12 @@ app.ws.use(async (ctx) => {
6667
const startTime = process.hrtime();
6768

6869
ctx.websocket.id = relay.subs()?.generateId();
70+
ctx.websocket.requestId = uuid();
71+
6972
ctx.websocket.limiter = limiter;
7073
ctx.websocket.wsMetricRegistry = wsMetricRegistry;
7174
const connectionIdPrefix = formatIdMessage('Connection ID', ctx.websocket.id);
72-
const requestIdPrefix = formatIdMessage('Request ID', uuid());
75+
const requestIdPrefix = formatIdMessage('Request ID', ctx.websocket.requestId);
7376
logger.info(
7477
`${connectionIdPrefix} ${requestIdPrefix} New connection established. Current active connections: ${ctx.app.server._connections}`,
7578
);
@@ -116,6 +119,13 @@ app.ws.use(async (ctx) => {
116119
if (Array.isArray(request)) {
117120
logger.trace(`${connectionIdPrefix} ${requestIdPrefix}: Receive batch request=${JSON.stringify(request)}`);
118121

122+
// Increment metrics for batch_requests
123+
wsMetricRegistry.getCounter('methodsCounter').labels(WS_CONSTANTS.BATCH_REQUEST_METHOD_NAME).inc();
124+
wsMetricRegistry
125+
.getCounter('methodsCounterByIp')
126+
.labels(ctx.request.ip, WS_CONSTANTS.BATCH_REQUEST_METHOD_NAME)
127+
.inc();
128+
119129
// send error if batch request feature is not enabled
120130
if (!getWsBatchRequestsEnabled()) {
121131
const batchRequestDisabledError = predefined.WS_BATCH_REQUESTS_DISABLED;
@@ -135,7 +145,13 @@ app.ws.use(async (ctx) => {
135145
return;
136146
}
137147

138-
// TODO: verify rate limit
148+
// verify rate limit for batch_request method based on IP
149+
if (limiter.shouldRateLimitOnMethod(ctx.ip, WS_CONSTANTS.BATCH_REQUEST_METHOD_NAME, ctx.websocket.requestId)) {
150+
ctx.websocket.send(
151+
JSON.stringify([jsonResp(null, new IPRateLimitExceeded(WS_CONSTANTS.BATCH_REQUEST_METHOD_NAME), undefined)]),
152+
);
153+
return;
154+
}
139155

140156
// process requests
141157
const requestPromises = request.map((item: any) => {

packages/ws-server/tests/acceptance/batchRequest.spec.ts

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import { WsTestConstant, WsTestHelper } from '../helper';
2525
import { predefined } from '@hashgraph/json-rpc-relay/src';
2626

2727
describe('@web-socket-batch-1 Batch Requests', async function () {
28+
const METHOD_NAME = 'batch_request';
2829
let ethersWsProvider: WebSocketProvider;
2930
let batchRequests: any = [];
3031

@@ -80,10 +81,7 @@ describe('@web-socket-batch-1 Batch Requests', async function () {
8081

8182
it(`Should submit batch requests to WS server using Standard Web Socket and retrieve batch responses`, async () => {
8283
// call batch request
83-
const batchResponses = await WsTestHelper.sendRequestToStandardWebSocket(
84-
WsTestConstant.BATCH_REQUEST_METHOD_NAME,
85-
batchRequests,
86-
);
84+
const batchResponses = await WsTestHelper.sendRequestToStandardWebSocket(METHOD_NAME, batchRequests);
8785

8886
// individually process each request
8987
let promises: any = [];
@@ -97,10 +95,7 @@ describe('@web-socket-batch-1 Batch Requests', async function () {
9795

9896
it('Should submit batch requests to WS server and get batchRequestDisabledError if WS_BATCH_REQUESTS_DISABLED=false ', async () => {
9997
process.env.WS_BATCH_REQUESTS_ENABLED = 'false';
100-
const batchResponses = await WsTestHelper.sendRequestToStandardWebSocket(
101-
WsTestConstant.BATCH_REQUEST_METHOD_NAME,
102-
batchRequests,
103-
);
98+
const batchResponses = await WsTestHelper.sendRequestToStandardWebSocket(METHOD_NAME, batchRequests);
10499

105100
const expectedError = predefined.WS_BATCH_REQUESTS_DISABLED;
106101
delete expectedError.data;
@@ -111,10 +106,7 @@ describe('@web-socket-batch-1 Batch Requests', async function () {
111106
it('Should submit batch requests to WS server and get batchRequestAmountMaxExceed if requests size exceeds WS_BATCH_REQUESTS_MAX_SIZE', async () => {
112107
process.env.WS_BATCH_REQUESTS_MAX_SIZE = '1';
113108

114-
const batchResponses = await WsTestHelper.sendRequestToStandardWebSocket(
115-
WsTestConstant.BATCH_REQUEST_METHOD_NAME,
116-
batchRequests,
117-
);
109+
const batchResponses = await WsTestHelper.sendRequestToStandardWebSocket(METHOD_NAME, batchRequests);
118110

119111
const expectedError = predefined.BATCH_REQUESTS_AMOUNT_MAX_EXCEEDED(
120112
batchRequests.length,
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
/*-
2+
*
3+
* Hedera JSON RPC Relay
4+
*
5+
* Copyright (C) 2024 Hedera Hashgraph, LLC
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*
19+
*/
20+
21+
// external resources
22+
import { expect } from 'chai';
23+
import { WsTestHelper } from '../helper';
24+
import relayConstants from '@hashgraph/json-rpc-relay/src/lib/constants';
25+
import { AliasAccount } from '@hashgraph/json-rpc-server/tests/types/AliasAccount';
26+
import { IPRateLimitExceeded } from '@hashgraph/json-rpc-server/dist/koaJsonRpc/lib/RpcError';
27+
28+
describe('@web-socket-ratelimiter Rate Limit Tests', async function () {
29+
const rateLimitTier1 = Number(process.env.TIER_1_RATE_LIMIT || relayConstants.DEFAULT_RATE_LIMIT.TIER_1);
30+
const rateLimitTier2 = Number(process.env.TIER_2_RATE_LIMIT || relayConstants.DEFAULT_RATE_LIMIT.TIER_2);
31+
const limitDuration = Number(process.env.LIMIT_DURATION) || relayConstants.DEFAULT_RATE_LIMIT.DURATION;
32+
33+
const batchRequests = [
34+
{
35+
id: 1,
36+
jsonrpc: '2.0',
37+
method: 'eth_chainId',
38+
params: [],
39+
},
40+
{
41+
id: 1,
42+
jsonrpc: '2.0',
43+
method: 'eth_blockNumber',
44+
params: [],
45+
},
46+
];
47+
48+
after(async () => {
49+
// expect all the connections to the WS server to be closed after all
50+
expect(global.socketServer._connections).to.eq(0);
51+
});
52+
53+
it(`Should submit single requests to WS server and receive IPRateLimitExceeded error until rate limit is reached`, async () => {
54+
const SINGLE_REQUEST_METHOD_NAME = 'eth_gasPrice';
55+
for (let i = 0; i < rateLimitTier2; i++) {
56+
await WsTestHelper.sendRequestToStandardWebSocket(SINGLE_REQUEST_METHOD_NAME, []);
57+
}
58+
59+
// exceed rate limit
60+
const response = await WsTestHelper.sendRequestToStandardWebSocket(SINGLE_REQUEST_METHOD_NAME, []);
61+
const ipRateLimitError = new IPRateLimitExceeded(SINGLE_REQUEST_METHOD_NAME);
62+
expect(response.error.code).to.deep.eq(ipRateLimitError.code);
63+
expect(response.error.message).to.deep.eq(ipRateLimitError.message);
64+
65+
// wait until rate limit is reset
66+
await new Promise((r) => setTimeout(r, limitDuration));
67+
});
68+
69+
it(`Should submit batch requests to WS server and receive IPRateLimitExceeded error until rate limit is reached`, async () => {
70+
const BATCH_REQUEST_METHOD_NAME = 'batch_request';
71+
72+
// call batch request multitime to reach limit
73+
for (let i = 0; i < rateLimitTier1; i++) {
74+
await WsTestHelper.sendRequestToStandardWebSocket(BATCH_REQUEST_METHOD_NAME, batchRequests);
75+
}
76+
77+
// exceed rate limit
78+
const batchResponses = await WsTestHelper.sendRequestToStandardWebSocket(BATCH_REQUEST_METHOD_NAME, batchRequests);
79+
const ipRateLimitError = new IPRateLimitExceeded(BATCH_REQUEST_METHOD_NAME);
80+
81+
expect(batchResponses[0].error.code).to.deep.eq(ipRateLimitError.code);
82+
expect(batchResponses[0].error.message).to.deep.eq(ipRateLimitError.message);
83+
84+
// wait until rate limit is reset
85+
await new Promise((r) => setTimeout(r, limitDuration));
86+
});
87+
88+
it(`Should reset limit for requests`, async () => {
89+
const SINGLE_REQUEST_METHOD_NAME = 'eth_getBalance';
90+
const account: AliasAccount = global.accounts[0];
91+
92+
for (let i = 0; i < rateLimitTier2; i++) {
93+
await WsTestHelper.sendRequestToStandardWebSocket(SINGLE_REQUEST_METHOD_NAME, [account.address, 'latest']);
94+
}
95+
// exceed rate limit
96+
const rateLimitResponse = await WsTestHelper.sendRequestToStandardWebSocket(SINGLE_REQUEST_METHOD_NAME, [
97+
account.address,
98+
'latest',
99+
]);
100+
const ipRateLimitError = new IPRateLimitExceeded(SINGLE_REQUEST_METHOD_NAME);
101+
expect(rateLimitResponse.error.code).to.deep.eq(ipRateLimitError.code);
102+
expect(rateLimitResponse.error.message).to.deep.eq(ipRateLimitError.message);
103+
104+
// wait until rate limit is reset
105+
await new Promise((r) => setTimeout(r, limitDuration));
106+
const response = await WsTestHelper.sendRequestToStandardWebSocket(SINGLE_REQUEST_METHOD_NAME, [
107+
account.address,
108+
'latest',
109+
]);
110+
const expectedResult = await global.relay.call(SINGLE_REQUEST_METHOD_NAME, [account.address, 'latest']);
111+
expect(response.result).to.eq(expectedResult);
112+
});
113+
});

packages/ws-server/tests/helper/index.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,13 @@ export class WsTestHelper {
3838
}
3939
}
4040

41-
static async sendRequestToStandardWebSocket(method: string, params: any, ms?: number | undefined) {
41+
static async sendRequestToStandardWebSocket(method: string, params: any[], ms?: number | undefined) {
42+
const BATCH_REQUEST_METHOD_NAME = 'batch_request';
4243
const webSocket = new WebSocket(WsTestConstant.WS_RELAY_URL);
4344

4445
let response: any;
4546

46-
if (method === WsTestConstant.BATCH_REQUEST_METHOD_NAME) {
47+
if (method === BATCH_REQUEST_METHOD_NAME) {
4748
webSocket.on('open', () => {
4849
webSocket.send(JSON.stringify(params));
4950
});
@@ -94,5 +95,4 @@ export class WsTestConstant {
9495
static STANDARD_WEB_SOCKET = 'Standard Web Socket';
9596
static ETHERS_WS_PROVIDER = 'Ethers Web Socket Provider';
9697
static WS_RELAY_URL = process.env.WS_RELAY_URL || `ws://127.0.0.1:8546`;
97-
static BATCH_REQUEST_METHOD_NAME: 'batch_request';
9898
}

0 commit comments

Comments
 (0)