Skip to content

Commit 268a2d8

Browse files
committed
fix(presence): update connections on heartbeat and remove them when stale (#37551)
1 parent 0745cb8 commit 268a2d8

File tree

9 files changed

+447
-4
lines changed

9 files changed

+447
-4
lines changed

.changeset/spicy-nails-design.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
---
2+
"@rocket.chat/meteor": patch
3+
"@rocket.chat/core-services": patch
4+
"@rocket.chat/ddp-streamer": patch
5+
"@rocket.chat/presence": patch
6+
---
7+
8+
Ensures presence stays accurate by refreshing connections on heartbeats and removing stale sessions.

apps/meteor/definition/externals/meteor/ddp-common.d.ts

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,69 @@ declare module 'meteor/ddp-common' {
1414
userId?: string;
1515
});
1616
}
17+
18+
/**
19+
* Heartbeat options
20+
*/
21+
type HeartbeatOptions = {
22+
/**
23+
* interval to send pings, in milliseconds
24+
*/
25+
heartbeatInterval: number;
26+
/**
27+
* timeout to close the connection if a reply isn't received, in milliseconds.
28+
*/
29+
heartbeatTimeout: number;
30+
/**
31+
* function to call to send a ping on the connection.
32+
*/
33+
sendPing: () => void;
34+
/**
35+
* function to call to close the connection
36+
*/
37+
onTimeout: () => void;
38+
};
39+
40+
class Heartbeat {
41+
heartbeatInterval: number;
42+
43+
heartbeatTimeout: number;
44+
45+
_sendPing: () => void;
46+
47+
_onTimeout: () => void;
48+
49+
_seenPacket: boolean;
50+
51+
_heartbeatIntervalHandle: ReturnType<typeof setTimeout> | null;
52+
53+
_heartbeatTimeoutHandle: ReturnType<typeof setTimeout> | null;
54+
55+
constructor(options: HeartbeatOptions);
56+
57+
stop(): void;
58+
59+
start(): void;
60+
61+
_startHeartbeatIntervalTimer(): void;
62+
63+
_startHeartbeatTimeoutTimer(): void;
64+
65+
_clearHeartbeatIntervalTimer(): void;
66+
67+
_clearHeartbeatTimeoutTimer(): void;
68+
69+
/**
70+
* The heartbeat interval timer is fired when we should send a ping.
71+
*/
72+
_heartbeatIntervalFired(): void;
73+
74+
/**
75+
* The heartbeat timeout timer is fired when we sent a ping, but we timed out waiting for the pong.
76+
*/
77+
_heartbeatTimeoutFired(): void;
78+
79+
messageReceived(): void;
80+
}
1781
}
1882
}

apps/meteor/definition/externals/meteor/meteor.d.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import 'meteor/meteor';
22
import type { ServerMethods } from '@rocket.chat/ddp-client';
3-
import type { IStreamerConstructor, IStreamer } from 'meteor/rocketchat:streamer';
3+
import type { DDPCommon, IStreamerConstructor, IStreamer } from 'meteor/ddp-common';
44

55
type StringifyBuffers<T extends unknown[]> = {
66
[P in keyof T]: T[P] extends Buffer ? string : T[P];
@@ -39,7 +39,12 @@ declare module 'meteor/meteor' {
3939
isDesktop: () => boolean;
4040
}
4141

42-
const server: any;
42+
const server: {
43+
sessions: Map<string, { userId: string; heartbeat: DDPCommon.Heartbeat }>;
44+
publish_handlers: {
45+
meteor_autoupdate_clientVersions(): void;
46+
};
47+
};
4348

4449
const runAsUser: <T>(userId: string, scope: () => T) => T;
4550

apps/meteor/ee/server/startup/presence.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,16 @@ Meteor.startup(() => {
4040
return;
4141
}
4242

43+
const _messageReceived = session.heartbeat.messageReceived.bind(session.heartbeat);
44+
session.heartbeat.messageReceived = function messageReceived() {
45+
if (this._seenPacket === false) {
46+
void Presence.updateConnection(login.user._id, login.connection.id).catch((err) => {
47+
console.error('Error updating connection presence on heartbeat:', err);
48+
});
49+
}
50+
return _messageReceived();
51+
};
52+
4353
void (async function () {
4454
await Presence.newConnection(login.user._id, login.connection.id, nodeId);
4555
updateConns();

ee/apps/ddp-streamer/src/Client.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { EventEmitter } from 'events';
22
import type { IncomingMessage } from 'http';
33

4+
import { Presence } from '@rocket.chat/core-services';
45
import type { ISocketConnection } from '@rocket.chat/core-typings';
56
import { v1 as uuidv1 } from 'uuid';
67
import type WebSocket from 'ws';
@@ -73,6 +74,8 @@ export class Client extends EventEmitter {
7374

7475
public userToken?: string;
7576

77+
private _seenPacket = true;
78+
7679
constructor(
7780
public ws: WebSocket,
7881
public meteorClient = false,
@@ -179,6 +182,18 @@ export class Client extends EventEmitter {
179182
this.ws.close(WS_ERRORS.TIMEOUT, WS_ERRORS_MESSAGES.TIMEOUT);
180183
};
181184

185+
private messageReceived = (): void => {
186+
if (this._seenPacket || !this.userId) {
187+
this._seenPacket = true;
188+
return;
189+
}
190+
191+
this._seenPacket = true;
192+
void Presence.updateConnection(this.userId, this.connection.id).catch((err) => {
193+
console.error('Error updating connection presence after heartbeat:', err);
194+
});
195+
};
196+
182197
ping(id?: string): void {
183198
this.send(server.serialize({ [DDP_EVENTS.MSG]: DDP_EVENTS.PING, ...(id && { [DDP_EVENTS.ID]: id }) }));
184199
}
@@ -188,6 +203,9 @@ export class Client extends EventEmitter {
188203
}
189204

190205
handleIdle = (): void => {
206+
if (this.userId) {
207+
this._seenPacket = false;
208+
}
191209
this.ping();
192210
this.timeout = setTimeout(this.closeTimeout, TIMEOUT);
193211
};
@@ -200,6 +218,7 @@ export class Client extends EventEmitter {
200218
handler = async (payload: WebSocket.Data, isBinary: boolean): Promise<void> => {
201219
try {
202220
const packet = server.parse(payload, isBinary);
221+
this.messageReceived();
203222
this.emit('message', packet);
204223
if (this.wait) {
205224
return new Promise((resolve) => this.once(DDP_EVENTS.LOGGED, () => resolve(this.process(packet.msg, packet))));

ee/packages/presence/src/Presence.ts

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import type { IUser } from '@rocket.chat/core-typings';
44
import { UserStatus } from '@rocket.chat/core-typings';
55
import { Settings, Users, UsersSessions } from '@rocket.chat/models';
66

7+
import { PresenceReaper } from './lib/PresenceReaper';
78
import { processPresenceAndStatus } from './lib/processConnectionStatus';
89

910
const MAX_CONNECTIONS = 200;
@@ -25,9 +26,17 @@ export class Presence extends ServiceClass implements IPresence {
2526

2627
private peakConnections = 0;
2728

29+
private reaper: PresenceReaper;
30+
2831
constructor() {
2932
super();
3033

34+
this.reaper = new PresenceReaper({
35+
batchSize: 500,
36+
staleThresholdMs: 5 * 60 * 1000, // 5 minutes
37+
onUpdate: (userIds) => this.handleReaperUpdates(userIds),
38+
});
39+
3140
this.onEvent('watch.instanceStatus', async ({ clientAction, id, diff }): Promise<void> => {
3241
if (clientAction === 'removed') {
3342
this.connsPerInstance.delete(id);
@@ -72,7 +81,8 @@ export class Presence extends ServiceClass implements IPresence {
7281
return affectedUsers.forEach((uid) => this.updateUserPresence(uid));
7382
}
7483

75-
async started(): Promise<void> {
84+
override async started(): Promise<void> {
85+
this.reaper.start();
7686
this.lostConTimeout = setTimeout(async () => {
7787
const affectedUsers = await this.removeLostConnections();
7888
return affectedUsers.forEach((uid) => this.updateUserPresence(uid));
@@ -89,7 +99,25 @@ export class Presence extends ServiceClass implements IPresence {
8999
}
90100
}
91101

92-
async stopped(): Promise<void> {
102+
private async handleReaperUpdates(userIds: string[]): Promise<void> {
103+
const results = await Promise.allSettled(userIds.map((uid) => this.updateUserPresence(uid)));
104+
const fulfilled = results.filter((result) => result.status === 'fulfilled');
105+
const rejected = results.filter((result) => result.status === 'rejected');
106+
107+
if (fulfilled.length > 0) {
108+
console.debug(`[PresenceReaper] Successfully updated presence for ${fulfilled.length} users.`);
109+
}
110+
111+
if (rejected.length > 0) {
112+
console.error(
113+
`[PresenceReaper] Failed to update presence for ${rejected.length} users:`,
114+
rejected.map(({ reason }) => reason),
115+
);
116+
}
117+
}
118+
119+
override async stopped(): Promise<void> {
120+
this.reaper.stop();
93121
if (!this.lostConTimeout) {
94122
return;
95123
}
@@ -137,6 +165,28 @@ export class Presence extends ServiceClass implements IPresence {
137165
};
138166
}
139167

168+
async updateConnection(uid: string, connectionId: string): Promise<{ uid: string; connectionId: string } | undefined> {
169+
const query = {
170+
'_id': uid,
171+
'connections.id': connectionId,
172+
};
173+
174+
const update = {
175+
$set: {
176+
'connections.$._updatedAt': new Date(),
177+
},
178+
};
179+
180+
const result = await UsersSessions.updateOne(query, update);
181+
if (result.modifiedCount === 0) {
182+
return;
183+
}
184+
185+
await this.updateUserPresence(uid);
186+
187+
return { uid, connectionId };
188+
}
189+
140190
async removeConnection(uid: string | undefined, session: string | undefined): Promise<{ uid: string; session: string } | undefined> {
141191
if (!uid || !session) {
142192
return;

0 commit comments

Comments
 (0)