Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/bump-patch-1766148726545.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@rocket.chat/meteor': patch
---

Bump @rocket.chat/meteor version.
8 changes: 8 additions & 0 deletions .changeset/spicy-nails-design.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
"@rocket.chat/meteor": patch
"@rocket.chat/core-services": patch
"@rocket.chat/ddp-streamer": patch
"@rocket.chat/presence": patch
---

Ensures presence stays accurate by refreshing connections on heartbeats and removing stale sessions.
64 changes: 64 additions & 0 deletions apps/meteor/definition/externals/meteor/ddp-common.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,69 @@ declare module 'meteor/ddp-common' {
userId?: string;
});
}

/**
* Heartbeat options
*/
type HeartbeatOptions = {
/**
* interval to send pings, in milliseconds
*/
heartbeatInterval: number;
/**
* timeout to close the connection if a reply isn't received, in milliseconds.
*/
heartbeatTimeout: number;
/**
* function to call to send a ping on the connection.
*/
sendPing: () => void;
/**
* function to call to close the connection
*/
onTimeout: () => void;
};

class Heartbeat {
heartbeatInterval: number;

heartbeatTimeout: number;

_sendPing: () => void;

_onTimeout: () => void;

_seenPacket: boolean;

_heartbeatIntervalHandle: ReturnType<typeof setTimeout> | null;

_heartbeatTimeoutHandle: ReturnType<typeof setTimeout> | null;

constructor(options: HeartbeatOptions);

stop(): void;

start(): void;

_startHeartbeatIntervalTimer(): void;

_startHeartbeatTimeoutTimer(): void;

_clearHeartbeatIntervalTimer(): void;

_clearHeartbeatTimeoutTimer(): void;

/**
* The heartbeat interval timer is fired when we should send a ping.
*/
_heartbeatIntervalFired(): void;

/**
* The heartbeat timeout timer is fired when we sent a ping, but we timed out waiting for the pong.
*/
_heartbeatTimeoutFired(): void;

messageReceived(): void;
}
}
}
9 changes: 7 additions & 2 deletions apps/meteor/definition/externals/meteor/meteor.d.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import 'meteor/meteor';
import type { ServerMethods } from '@rocket.chat/ddp-client';
import type { IStreamerConstructor, IStreamer } from 'meteor/rocketchat:streamer';
import type { DDPCommon, IStreamerConstructor, IStreamer } from 'meteor/ddp-common';

type StringifyBuffers<T extends unknown[]> = {
[P in keyof T]: T[P] extends Buffer ? string : T[P];
Expand Down Expand Up @@ -39,7 +39,12 @@ declare module 'meteor/meteor' {
isDesktop: () => boolean;
}

const server: any;
const server: {
sessions: Map<string, { userId: string; heartbeat: DDPCommon.Heartbeat }>;
publish_handlers: {
meteor_autoupdate_clientVersions(): void;
};
};

const runAsUser: <T>(userId: string, scope: () => T) => T;

Expand Down
10 changes: 10 additions & 0 deletions apps/meteor/ee/server/startup/presence.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,16 @@ Meteor.startup(() => {
return;
}

const _messageReceived = session.heartbeat.messageReceived.bind(session.heartbeat);
session.heartbeat.messageReceived = function messageReceived() {
if (this._seenPacket === false) {
void Presence.updateConnection(login.user._id, login.connection.id).catch((err) => {
console.error('Error updating connection presence on heartbeat:', err);
});
}
return _messageReceived();
};

void (async function () {
await Presence.newConnection(login.user._id, login.connection.id, nodeId);
updateConns();
Expand Down
19 changes: 19 additions & 0 deletions ee/apps/ddp-streamer/src/Client.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { EventEmitter } from 'events';
import type { IncomingMessage } from 'http';

import { Presence } from '@rocket.chat/core-services';
import type { ISocketConnection } from '@rocket.chat/core-typings';
import { v1 as uuidv1 } from 'uuid';
import type WebSocket from 'ws';
Expand Down Expand Up @@ -73,6 +74,8 @@ export class Client extends EventEmitter {

public userToken?: string;

private _seenPacket = true;

constructor(
public ws: WebSocket,
public meteorClient = false,
Expand Down Expand Up @@ -179,6 +182,18 @@ export class Client extends EventEmitter {
this.ws.close(WS_ERRORS.TIMEOUT, WS_ERRORS_MESSAGES.TIMEOUT);
};

private messageReceived = (): void => {
if (this._seenPacket || !this.userId) {
this._seenPacket = true;
return;
}

this._seenPacket = true;
void Presence.updateConnection(this.userId, this.connection.id).catch((err) => {
console.error('Error updating connection presence after heartbeat:', err);
});
};

ping(id?: string): void {
this.send(server.serialize({ [DDP_EVENTS.MSG]: DDP_EVENTS.PING, ...(id && { [DDP_EVENTS.ID]: id }) }));
}
Expand All @@ -188,6 +203,9 @@ export class Client extends EventEmitter {
}

handleIdle = (): void => {
if (this.userId) {
this._seenPacket = false;
}
this.ping();
this.timeout = setTimeout(this.closeTimeout, TIMEOUT);
};
Expand All @@ -200,6 +218,7 @@ export class Client extends EventEmitter {
handler = async (payload: WebSocket.Data, isBinary: boolean): Promise<void> => {
try {
const packet = server.parse(payload, isBinary);
this.messageReceived();
this.emit('message', packet);
if (this.wait) {
return new Promise((resolve) => this.once(DDP_EVENTS.LOGGED, () => resolve(this.process(packet.msg, packet))));
Expand Down
54 changes: 52 additions & 2 deletions ee/packages/presence/src/Presence.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import type { IUser } from '@rocket.chat/core-typings';
import { UserStatus } from '@rocket.chat/core-typings';
import { Settings, Users, UsersSessions } from '@rocket.chat/models';

import { PresenceReaper } from './lib/PresenceReaper';
import { processPresenceAndStatus } from './lib/processConnectionStatus';

const MAX_CONNECTIONS = 200;
Expand All @@ -25,9 +26,17 @@ export class Presence extends ServiceClass implements IPresence {

private peakConnections = 0;

private reaper: PresenceReaper;

constructor() {
super();

this.reaper = new PresenceReaper({
batchSize: 500,
staleThresholdMs: 5 * 60 * 1000, // 5 minutes
onUpdate: (userIds) => this.handleReaperUpdates(userIds),
});

this.onEvent('watch.instanceStatus', async ({ clientAction, id, diff }): Promise<void> => {
if (clientAction === 'removed') {
this.connsPerInstance.delete(id);
Expand Down Expand Up @@ -72,7 +81,8 @@ export class Presence extends ServiceClass implements IPresence {
return affectedUsers.forEach((uid) => this.updateUserPresence(uid));
}

async started(): Promise<void> {
override async started(): Promise<void> {
this.reaper.start();
this.lostConTimeout = setTimeout(async () => {
const affectedUsers = await this.removeLostConnections();
return affectedUsers.forEach((uid) => this.updateUserPresence(uid));
Expand All @@ -89,7 +99,25 @@ export class Presence extends ServiceClass implements IPresence {
}
}

async stopped(): Promise<void> {
private async handleReaperUpdates(userIds: string[]): Promise<void> {
const results = await Promise.allSettled(userIds.map((uid) => this.updateUserPresence(uid)));
const fulfilled = results.filter((result) => result.status === 'fulfilled');
const rejected = results.filter((result) => result.status === 'rejected');

if (fulfilled.length > 0) {
console.debug(`[PresenceReaper] Successfully updated presence for ${fulfilled.length} users.`);
}

if (rejected.length > 0) {
console.error(
`[PresenceReaper] Failed to update presence for ${rejected.length} users:`,
rejected.map(({ reason }) => reason),
);
}
}

override async stopped(): Promise<void> {
this.reaper.stop();
if (!this.lostConTimeout) {
return;
}
Expand Down Expand Up @@ -137,6 +165,28 @@ export class Presence extends ServiceClass implements IPresence {
};
}

async updateConnection(uid: string, connectionId: string): Promise<{ uid: string; connectionId: string } | undefined> {
const query = {
'_id': uid,
'connections.id': connectionId,
};

const update = {
$set: {
'connections.$._updatedAt': new Date(),
},
};

const result = await UsersSessions.updateOne(query, update);
if (result.modifiedCount === 0) {
return;
}

await this.updateUserPresence(uid);

return { uid, connectionId };
}

async removeConnection(uid: string | undefined, session: string | undefined): Promise<{ uid: string; session: string } | undefined> {
if (!uid || !session) {
return;
Expand Down
Loading
Loading