diff --git a/lib/index.ts b/lib/index.ts index 57a0f62..585ec79 100644 --- a/lib/index.ts +++ b/lib/index.ts @@ -1,7 +1,7 @@ import uid2 = require("uid2"); import msgpack = require("notepack.io"); import { Adapter, BroadcastOptions, Room } from "socket.io-adapter"; -import { PUBSUB } from "./util"; +import { parseTimeout, PUBSUB } from "./util"; const debug = require("debug")("socket.io-redis"); @@ -36,6 +36,7 @@ interface Request { interface AckRequest { clientCountCallback: (clientCount: number) => void; ack: (...args: any[]) => void; + timeout: NodeJS.Timeout; } interface Parser { @@ -128,7 +129,7 @@ export class RedisAdapter extends Adapter { super(nsp); this.uid = uid2(6); - this.requestsTimeout = opts.requestsTimeout || 5000; + this.requestsTimeout = parseTimeout(opts.requestsTimeout, 5000); this.publishOnSpecificResponseChannel = !!opts.publishOnSpecificResponseChannel; this.parser = opts.parser || msgpack; @@ -185,6 +186,7 @@ export class RedisAdapter extends Adapter { ); } + // Use function() instead of arrow function so 'this' refers to the Redis client (event emitter) this.friendlyErrorHandler = function () { if (this.listenerCount("error") === 1) { console.warn("missing 'error' handler on this Redis client"); @@ -542,9 +544,9 @@ export class RedisAdapter extends Adapter { request.msgCount++; // ignore if response does not contain 'sockets' key - if (!response.sockets || !Array.isArray(response.sockets)) return; - - if (request.type === RequestType.SOCKETS) { + if (!response.sockets || !Array.isArray(response.sockets)) { + debug("ignoring malformed response (missing sockets array)"); + } else if (request.type === RequestType.SOCKETS) { response.sockets.forEach((s) => request.sockets.add(s)); } else { response.sockets.forEach((s) => request.sockets.push(s)); @@ -563,9 +565,11 @@ export class RedisAdapter extends Adapter { request.msgCount++; // ignore if response does not contain 'rooms' key - if (!response.rooms || !Array.isArray(response.rooms)) return; - - response.rooms.forEach((s) => request.rooms.add(s)); + if (!response.rooms || !Array.isArray(response.rooms)) { + debug("ignoring malformed response (missing rooms array)"); + } else { + response.rooms.forEach((s) => request.rooms.add(s)); + } if (request.msgCount === request.numSub) { clearTimeout(request.timeout); @@ -667,16 +671,22 @@ export class RedisAdapter extends Adapter { this.pubClient.publish(this.requestChannel, request); + // we have no way to know at this level whether the server has received an acknowledgement from each client, so we + // will simply clean up the ackRequests map after the given delay + const ackTimeout = parseTimeout( + opts.flags?.timeout, + this.requestsTimeout + ); + + const timeout = setTimeout(() => { + this.ackRequests.delete(requestId); + }, ackTimeout); + this.ackRequests.set(requestId, { clientCountCallback, ack, + timeout, }); - - // we have no way to know at this level whether the server has received an acknowledgement from each client, so we - // will simply clean up the ackRequests map after the given delay - setTimeout(() => { - this.ackRequests.delete(requestId); - }, opts.flags!.timeout); } super.broadcastWithAck(packet, opts, clientCountCallback, ack); @@ -895,6 +905,18 @@ export class RedisAdapter extends Adapter { } close(): Promise | void { + // Cancel all pending request timeouts and clear the map + this.requests.forEach((request) => { + clearTimeout(request.timeout); + }); + this.requests.clear(); + + // Cancel all pending ack request timeouts and clear the map + this.ackRequests.forEach((ackRequest) => { + clearTimeout(ackRequest.timeout); + }); + this.ackRequests.clear(); + const isRedisV4 = typeof this.pubClient.pSubscribe === "function"; if (isRedisV4) { this.subClient.pUnsubscribe( @@ -940,6 +962,9 @@ export class RedisAdapter extends Adapter { this.pubClient.off("error", this.friendlyErrorHandler); this.subClient.off("error", this.friendlyErrorHandler); + + // Clear listener references + this.redisListeners.clear(); } } diff --git a/lib/sharded-adapter.ts b/lib/sharded-adapter.ts index ea3f37a..2a82579 100644 --- a/lib/sharded-adapter.ts +++ b/lib/sharded-adapter.ts @@ -72,6 +72,8 @@ class ShardedRedisAdapter extends ClusterAdapter { private readonly opts: Required; private readonly channel: string; private readonly responseChannel: string; + private readonly onCreateRoom: (room: string) => void; + private readonly onDeleteRoom: (room: string) => void; constructor(nsp, pubClient, subClient, opts: ShardedRedisAdapterOptions) { super(nsp); @@ -97,17 +99,20 @@ class ShardedRedisAdapter extends ClusterAdapter { this.opts.subscriptionMode === "dynamic" || this.opts.subscriptionMode === "dynamic-private" ) { - this.on("create-room", (room) => { + this.onCreateRoom = (room) => { if (this.shouldUseASeparateNamespace(room)) { SSUBSCRIBE(this.subClient, this.dynamicChannel(room), handler); } - }); + }; - this.on("delete-room", (room) => { + this.onDeleteRoom = (room) => { if (this.shouldUseASeparateNamespace(room)) { SUNSUBSCRIBE(this.subClient, this.dynamicChannel(room)); } - }); + }; + + this.on("create-room", this.onCreateRoom); + this.on("delete-room", this.onDeleteRoom); } } @@ -118,6 +123,14 @@ class ShardedRedisAdapter extends ClusterAdapter { this.opts.subscriptionMode === "dynamic" || this.opts.subscriptionMode === "dynamic-private" ) { + // Remove event listeners to prevent memory leaks + if (this.onCreateRoom) { + this.off("create-room", this.onCreateRoom); + } + if (this.onDeleteRoom) { + this.off("delete-room", this.onDeleteRoom); + } + this.rooms.forEach((_sids, room) => { if (this.shouldUseASeparateNamespace(room)) { channels.push(this.dynamicChannel(room)); diff --git a/lib/util.ts b/lib/util.ts index cc4f74b..c0a0643 100644 --- a/lib/util.ts +++ b/lib/util.ts @@ -1,3 +1,9 @@ +export function parseTimeout(value: unknown, defaultValue: number): number { + return typeof value === "number" && value > 0 && value !== Infinity + ? value + : defaultValue; +} + export function hasBinary(obj: any, toJSON?: boolean): boolean { if (!obj || typeof obj !== "object") { return false; @@ -53,39 +59,74 @@ function isRedisV4Client(redisClient: any) { } const kHandlers = Symbol("handlers"); +const kListener = Symbol("listener"); +const kPendingUnsubscribes = Symbol("pendingUnsubscribes"); export function SSUBSCRIBE( redisClient: any, channel: string, handler: (rawMessage: Buffer, channel: Buffer) => void -) { +): Promise { if (isRedisV4Client(redisClient)) { - redisClient.sSubscribe(channel, handler, RETURN_BUFFERS); + return redisClient.sSubscribe(channel, handler, RETURN_BUFFERS); } else { - if (!redisClient[kHandlers]) { - redisClient[kHandlers] = new Map(); - redisClient.on("smessageBuffer", (rawChannel, message) => { - redisClient[kHandlers].get(rawChannel.toString())?.( - message, - rawChannel - ); - }); + const doSubscribe = (): Promise => { + if (!redisClient[kHandlers]) { + redisClient[kHandlers] = new Map(); + redisClient[kPendingUnsubscribes] = new Map>(); + redisClient[kListener] = (rawChannel: Buffer, message: Buffer) => { + redisClient[kHandlers].get(rawChannel.toString())?.( + message, + rawChannel + ); + }; + redisClient.on("smessageBuffer", redisClient[kListener]); + } + redisClient[kHandlers].set(channel, handler); + return redisClient.ssubscribe(channel); + }; + + // Wait for any pending unsubscribe on this channel to complete first + const pendingUnsubscribe = redisClient[kPendingUnsubscribes]?.get(channel); + if (pendingUnsubscribe) { + return pendingUnsubscribe.then(doSubscribe); } - redisClient[kHandlers].set(channel, handler); - redisClient.ssubscribe(channel); + return doSubscribe(); } } -export function SUNSUBSCRIBE(redisClient: any, channel: string | string[]) { +export function SUNSUBSCRIBE( + redisClient: any, + channel: string | string[] +): Promise { if (isRedisV4Client(redisClient)) { - redisClient.sUnsubscribe(channel); + return redisClient.sUnsubscribe(channel); } else { - redisClient.sunsubscribe(channel); - if (Array.isArray(channel)) { - channel.forEach((c) => redisClient[kHandlers].delete(c)); - } else { - redisClient[kHandlers].delete(channel); - } + const channels = Array.isArray(channel) ? channel : [channel]; + + // Remove handlers immediately to stop processing messages + channels.forEach((c) => redisClient[kHandlers]?.delete(c)); + + // Perform the unsubscribe and track as pending + const unsubscribePromise = redisClient.sunsubscribe(channel).then(() => { + // Remove from pending tracking + channels.forEach((c) => redisClient[kPendingUnsubscribes]?.delete(c)); + + // Clean up the global listener when no more handlers exist + if (redisClient[kHandlers]?.size === 0 && redisClient[kListener]) { + redisClient.off("smessageBuffer", redisClient[kListener]); + delete redisClient[kHandlers]; + delete redisClient[kListener]; + delete redisClient[kPendingUnsubscribes]; + } + }); + + // Track pending unsubscribe for each channel + channels.forEach((c) => + redisClient[kPendingUnsubscribes]?.set(c, unsubscribePromise) + ); + + return unsubscribePromise; } } diff --git a/package-lock.json b/package-lock.json index c580cf5..072a857 100644 --- a/package-lock.json +++ b/package-lock.json @@ -458,6 +458,7 @@ "resolved": "https://registry.npmjs.org/@redis/client/-/client-1.5.7.tgz", "integrity": "sha512-gaOBOuJPjK5fGtxSseaKgSvjiZXQCdLlGg9WYQst+/GRUjmXaiB5kVkeQMRtPc7Q2t93XZcJfBMSwzs/XS9UZw==", "dev": true, + "peer": true, "dependencies": { "cluster-key-slot": "1.1.2", "generic-pool": "3.9.0", @@ -564,7 +565,8 @@ "version": "14.14.7", "resolved": "https://registry.npmjs.org/@types/node/-/node-14.14.7.tgz", "integrity": "sha512-Zw1vhUSQZYw+7u5dAwNbIA9TuTotpzY/OF7sJM9FqPOF3SPjKnxrjoTktXDZgUjybf4cWVBP7O8wvKdSaGHweg==", - "dev": true + "dev": true, + "peer": true }, "node_modules/accepts": { "version": "1.3.8", @@ -2738,6 +2740,7 @@ "version": "2.5.4", "resolved": "https://registry.npmjs.org/socket.io-adapter/-/socket.io-adapter-2.5.4.tgz", "integrity": "sha512-wDNHGXGewWAjQPt3pyeYBtpWSq9cLE5UW1ZUPL/2eGK9jtse/FpXib7epSTsz0Q0m+6sg6Y4KtcFTlah1bdOVg==", + "peer": true, "dependencies": { "debug": "~4.3.4", "ws": "~8.11.0" @@ -3024,6 +3027,7 @@ "resolved": "https://registry.npmjs.org/typescript/-/typescript-4.9.5.tgz", "integrity": "sha512-1FXk9E2Hm+QzZQ7z+McJiHL4NW1F2EzMu9Nq9i3zAaGqibafqYwCVU6WyWAuyQRRzOlxou8xZSyXLEN8oKj24g==", "dev": true, + "peer": true, "bin": { "tsc": "bin/tsc", "tsserver": "bin/tsserver" @@ -3705,6 +3709,7 @@ "resolved": "https://registry.npmjs.org/@redis/client/-/client-1.5.7.tgz", "integrity": "sha512-gaOBOuJPjK5fGtxSseaKgSvjiZXQCdLlGg9WYQst+/GRUjmXaiB5kVkeQMRtPc7Q2t93XZcJfBMSwzs/XS9UZw==", "dev": true, + "peer": true, "requires": { "cluster-key-slot": "1.1.2", "generic-pool": "3.9.0", @@ -3800,7 +3805,8 @@ "version": "14.14.7", "resolved": "https://registry.npmjs.org/@types/node/-/node-14.14.7.tgz", "integrity": "sha512-Zw1vhUSQZYw+7u5dAwNbIA9TuTotpzY/OF7sJM9FqPOF3SPjKnxrjoTktXDZgUjybf4cWVBP7O8wvKdSaGHweg==", - "dev": true + "dev": true, + "peer": true }, "accepts": { "version": "1.3.8", @@ -5416,6 +5422,7 @@ "version": "2.5.4", "resolved": "https://registry.npmjs.org/socket.io-adapter/-/socket.io-adapter-2.5.4.tgz", "integrity": "sha512-wDNHGXGewWAjQPt3pyeYBtpWSq9cLE5UW1ZUPL/2eGK9jtse/FpXib7epSTsz0Q0m+6sg6Y4KtcFTlah1bdOVg==", + "peer": true, "requires": { "debug": "~4.3.4", "ws": "~8.11.0" @@ -5621,7 +5628,8 @@ "version": "4.9.5", "resolved": "https://registry.npmjs.org/typescript/-/typescript-4.9.5.tgz", "integrity": "sha512-1FXk9E2Hm+QzZQ7z+McJiHL4NW1F2EzMu9Nq9i3zAaGqibafqYwCVU6WyWAuyQRRzOlxou8xZSyXLEN8oKj24g==", - "dev": true + "dev": true, + "peer": true }, "uid2": { "version": "1.0.0", diff --git a/package.json b/package.json index 869e82d..5be8583 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@socket.io/redis-adapter", - "version": "8.3.0", + "version": "8.3.1", "description": "The Socket.IO Redis adapter, allowing to broadcast events between several Socket.IO servers", "license": "MIT", "repository": {