diff --git a/ee/apps/ddp-streamer/src/Client.ts b/ee/apps/ddp-streamer/src/Client.ts index 0ce8762948726..4807157212efd 100644 --- a/ee/apps/ddp-streamer/src/Client.ts +++ b/ee/apps/ddp-streamer/src/Client.ts @@ -3,6 +3,7 @@ import type { IncomingMessage } from 'http'; import { Presence } from '@rocket.chat/core-services'; import type { ISocketConnection } from '@rocket.chat/core-typings'; +import { throttle } from 'underscore'; import { v1 as uuidv1 } from 'uuid'; import type WebSocket from 'ws'; @@ -74,7 +75,17 @@ export class Client extends EventEmitter { public userToken?: string; - private _seenPacket = true; + private updatePresence = throttle( + () => { + if (this.userId) { + void Presence.updateConnection(this.userId, this.connection.id).catch((err) => { + console.error('Error updating connection presence:', err); + }); + } + }, + TIMEOUT, + { leading: true, trailing: false }, + ); constructor( public ws: WebSocket, @@ -182,18 +193,6 @@ export class Client extends EventEmitter { this.ws.close(WS_ERRORS.TIMEOUT, WS_ERRORS_MESSAGES.TIMEOUT); }; - private messageReceived = (): void => { - if (this._seenPacket || !this.userId) { - this._seenPacket = true; - return; - } - - this._seenPacket = true; - void Presence.updateConnection(this.userId, this.connection.id).catch((err) => { - console.error('Error updating connection presence after heartbeat:', err); - }); - }; - ping(id?: string): void { this.send(server.serialize({ [DDP_EVENTS.MSG]: DDP_EVENTS.PING, ...(id && { [DDP_EVENTS.ID]: id }) })); } @@ -203,9 +202,6 @@ export class Client extends EventEmitter { } handleIdle = (): void => { - if (this.userId) { - this._seenPacket = false; - } this.ping(); this.timeout = setTimeout(this.closeTimeout, TIMEOUT); }; @@ -218,7 +214,7 @@ export class Client extends EventEmitter { handler = async (payload: WebSocket.Data, isBinary: boolean): Promise => { try { const packet = server.parse(payload, isBinary); - this.messageReceived(); + this.updatePresence(); this.emit('message', packet); if (this.wait) { return new Promise((resolve) => this.once(DDP_EVENTS.LOGGED, () => resolve(this.process(packet.msg, packet))));