Skip to content

Commit 7a4eb1a

Browse files
committed
fix: UDP Cluster Manager - simplified & improved server alive waiting timer
1 parent 19d59d9 commit 7a4eb1a

File tree

2 files changed

+16
-33
lines changed

2 files changed

+16
-33
lines changed

src/RedisQueue.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -887,6 +887,10 @@ export class RedisQueue extends EventEmitter<EventMap>
887887
);
888888
const list = <string>await this.writer.client('LIST');
889889

890+
if (!list || !list.split) {
891+
return 0;
892+
}
893+
890894
return list.split(/\r?\n/).filter(client => rx.test(client)).length;
891895
}
892896

src/UDPClusterManager.ts

Lines changed: 12 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ interface ClusterServer extends IMessageQueueConnection {
4949
export const DEFAULT_UDP_CLUSTER_MANAGER_OPTIONS = {
5050
broadcastPort: 63000,
5151
broadcastAddress: '255.255.255.255',
52-
aliveTimeoutCorrection: 1000,
52+
aliveTimeoutCorrection: 2000,
5353
};
5454

5555
export interface UDPClusterManagerOptions {
@@ -293,55 +293,34 @@ export class UDPClusterManager extends ClusterManager {
293293
private static serverAliveWait(
294294
cluster: ICluster,
295295
server: ClusterServer,
296-
aliveTimeoutCorrection?: number,
296+
aliveTimeoutCorrection: number = 0,
297297
message?: Message,
298298
): void {
299299
if (server.timer) {
300300
clearTimeout(server.timer);
301-
server.timer = undefined;
302-
}
303-
304-
server.timestamp = Date.now();
305-
306-
if (message) {
307-
server.timeout = message.timeout;
308301
}
309302

310-
const correction = aliveTimeoutCorrection || 0;
311-
const timeout = (server.timeout || 0) + correction;
303+
const timeout = (message?.timeout || 0) + aliveTimeoutCorrection;
312304

313305
if (timeout <= 0) {
314306
return;
315307
}
316308

317-
const timerId = setTimeout(() => {
318-
const existing = cluster.find<ClusterServer>(server, true);
319-
320-
if (!existing || existing.timer !== timerId) {
321-
return;
322-
}
323-
324-
const now = Date.now();
325-
326-
if (!existing.timestamp) {
327-
clearTimeout(existing.timer);
328-
existing.timer = undefined;
329-
cluster.remove(existing);
309+
server.timeout = timeout;
310+
server.timestamp = Date.now();
311+
server.timer = setTimeout(() => {
312+
const entry = cluster.find<ClusterServer>(server, true);
330313

314+
if (!entry?.timestamp) {
331315
return;
332316
}
333317

334-
const delta = now - existing.timestamp;
335-
const currentTimeout = (existing.timeout || 0) + correction;
318+
const elapsed = Date.now() - entry.timestamp;
336319

337-
if (delta >= currentTimeout) {
338-
clearTimeout(existing.timer);
339-
existing.timer = undefined;
340-
cluster.remove(existing);
320+
if (elapsed >= timeout) {
321+
cluster.remove(entry);
341322
}
342-
}, timeout);
343-
344-
server.timer = timerId;
323+
}, server.timeout);
345324
}
346325

347326
/**

0 commit comments

Comments
 (0)