Skip to content

Commit 62cd194

Browse files
committed
chore: added timeout to ws connection
1 parent 42b3937 commit 62cd194

File tree

3 files changed

+29
-2
lines changed

3 files changed

+29
-2
lines changed

packages/event-downloader/src/config.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ export const USE_SSL = process.env.USE_SSL === 'true';
2727
export const BATCH_SIZE = parseInt(process.env.BATCH_SIZE ?? '5000', 10);
2828
export const PARALLEL_CONNECTIONS = parseInt(process.env.PARALLEL_CONNECTIONS ?? '5', 10);
2929
export const WINDOW_SIZE = parseInt(process.env.WINDOW_SIZE ?? '100', 10);
30+
export const CONNECTION_TIMEOUT_MS = parseInt(process.env.CONNECTION_TIMEOUT_MS ?? '60000', 10);
3031

3132
// Database configuration
3233
export const DB_PATH = process.env.DB_PATH ?? './events.sqlite';
@@ -41,6 +42,7 @@ export const getConfig = () => {
4142
PARALLEL_CONNECTIONS,
4243
WINDOW_SIZE,
4344
DB_PATH,
45+
CONNECTION_TIMEOUT_MS,
4446
};
4547
};
4648

packages/event-downloader/src/worker.ts

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import { WebSocket, MessageEvent, ErrorEvent } from 'ws';
99
import { bigIntUtils } from '@hathor/wallet-lib';
1010
import { FullNodeEvent, FullNodeEventSchema, WebSocketSendEvent } from './types';
11-
import { FULLNODE_HOST, USE_SSL, WINDOW_SIZE } from './config';
11+
import { FULLNODE_HOST, USE_SSL, WINDOW_SIZE, CONNECTION_TIMEOUT_MS } from './config';
1212

1313
export interface BatchConfig {
1414
batchStart: number;
@@ -43,6 +43,28 @@ export function createWorker(config: BatchConfig, callbacks: WorkerCallbacks): W
4343
let isRunning = false;
4444
let eventsSinceLastAck = 0;
4545
let lastReceivedEventId = 0;
46+
let activityTimeout: ReturnType<typeof setTimeout> | null = null;
47+
48+
const resetActivityTimeout = (): void => {
49+
if (activityTimeout) {
50+
clearTimeout(activityTimeout);
51+
}
52+
if (isRunning && CONNECTION_TIMEOUT_MS > 0) {
53+
activityTimeout = setTimeout(() => {
54+
if (isRunning) {
55+
onError(new Error(`Connection timeout: no activity for ${CONNECTION_TIMEOUT_MS}ms`));
56+
stop();
57+
}
58+
}, CONNECTION_TIMEOUT_MS);
59+
}
60+
};
61+
62+
const clearActivityTimeout = (): void => {
63+
if (activityTimeout) {
64+
clearTimeout(activityTimeout);
65+
activityTimeout = null;
66+
}
67+
};
4668

4769
const getWsUrl = (): string => {
4870
const protocol = USE_SSL ? 'wss://' : 'ws://';
@@ -74,9 +96,11 @@ export function createWorker(config: BatchConfig, callbacks: WorkerCallbacks): W
7496
...(lastAckEventId !== undefined && { last_ack_event_id: lastAckEventId }),
7597
};
7698
sendMessage(startMessage);
99+
resetActivityTimeout();
77100
};
78101

79102
socket.onmessage = (socketEvent: MessageEvent) => {
103+
resetActivityTimeout();
80104
try {
81105
const rawData = bigIntUtils.JSONBigInt.parse(socketEvent.data.toString());
82106
const parseResult = FullNodeEventSchema.safeParse(rawData);
@@ -140,6 +164,7 @@ export function createWorker(config: BatchConfig, callbacks: WorkerCallbacks): W
140164

141165
const stop = (): void => {
142166
isRunning = false;
167+
clearActivityTimeout();
143168
if (socket) {
144169
socket.close();
145170
socket = null;
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
{"root":["./src/config.ts","./src/db.ts","./src/event-parser.ts","./src/index.ts","./src/orchestrator.ts","./src/types.ts","./src/worker.ts"],"version":"5.9.3"}
1+
{"root":["./src/config.ts","./src/db.ts","./src/event-parser.ts","./src/index.ts","./src/orchestrator.ts","./src/types.ts","./src/worker.ts"],"version":"5.7.2"}

0 commit comments

Comments
 (0)