@@ -4,6 +4,7 @@ import type { IUser } from '@rocket.chat/core-typings';
44import { UserStatus } from '@rocket.chat/core-typings' ;
55import { Settings , Users , UsersSessions } from '@rocket.chat/models' ;
66
7+ import { PresenceReaper } from './lib/PresenceReaper' ;
78import { processPresenceAndStatus } from './lib/processConnectionStatus' ;
89
910const MAX_CONNECTIONS = 200 ;
@@ -25,9 +26,17 @@ export class Presence extends ServiceClass implements IPresence {
2526
2627 private peakConnections = 0 ;
2728
29+ private reaper : PresenceReaper ;
30+
2831 constructor ( ) {
2932 super ( ) ;
3033
34+ this . reaper = new PresenceReaper ( {
35+ batchSize : 500 ,
36+ staleThresholdMs : 5 * 60 * 1000 , // 5 minutes
37+ onUpdate : ( userIds ) => this . handleReaperUpdates ( userIds ) ,
38+ } ) ;
39+
3140 this . onEvent ( 'watch.instanceStatus' , async ( { clientAction, id, diff } ) : Promise < void > => {
3241 if ( clientAction === 'removed' ) {
3342 this . connsPerInstance . delete ( id ) ;
@@ -72,7 +81,8 @@ export class Presence extends ServiceClass implements IPresence {
7281 return affectedUsers . forEach ( ( uid ) => this . updateUserPresence ( uid ) ) ;
7382 }
7483
75- async started ( ) : Promise < void > {
84+ override async started ( ) : Promise < void > {
85+ this . reaper . start ( ) ;
7686 this . lostConTimeout = setTimeout ( async ( ) => {
7787 const affectedUsers = await this . removeLostConnections ( ) ;
7888 return affectedUsers . forEach ( ( uid ) => this . updateUserPresence ( uid ) ) ;
@@ -89,7 +99,25 @@ export class Presence extends ServiceClass implements IPresence {
8999 }
90100 }
91101
92- async stopped ( ) : Promise < void > {
102+ private async handleReaperUpdates ( userIds : string [ ] ) : Promise < void > {
103+ const results = await Promise . allSettled ( userIds . map ( ( uid ) => this . updateUserPresence ( uid ) ) ) ;
104+ const fulfilled = results . filter ( ( result ) => result . status === 'fulfilled' ) ;
105+ const rejected = results . filter ( ( result ) => result . status === 'rejected' ) ;
106+
107+ if ( fulfilled . length > 0 ) {
108+ console . debug ( `[PresenceReaper] Successfully updated presence for ${ fulfilled . length } users.` ) ;
109+ }
110+
111+ if ( rejected . length > 0 ) {
112+ console . error (
113+ `[PresenceReaper] Failed to update presence for ${ rejected . length } users:` ,
114+ rejected . map ( ( { reason } ) => reason ) ,
115+ ) ;
116+ }
117+ }
118+
119+ override async stopped ( ) : Promise < void > {
120+ this . reaper . stop ( ) ;
93121 if ( ! this . lostConTimeout ) {
94122 return ;
95123 }
@@ -137,6 +165,28 @@ export class Presence extends ServiceClass implements IPresence {
137165 } ;
138166 }
139167
168+ async updateConnection ( uid : string , connectionId : string ) : Promise < { uid : string ; connectionId : string } | undefined > {
169+ const query = {
170+ '_id' : uid ,
171+ 'connections.id' : connectionId ,
172+ } ;
173+
174+ const update = {
175+ $set : {
176+ 'connections.$._updatedAt' : new Date ( ) ,
177+ } ,
178+ } ;
179+
180+ const result = await UsersSessions . updateOne ( query , update ) ;
181+ if ( result . modifiedCount === 0 ) {
182+ return ;
183+ }
184+
185+ await this . updateUserPresence ( uid ) ;
186+
187+ return { uid, connectionId } ;
188+ }
189+
140190 async removeConnection ( uid : string | undefined , session : string | undefined ) : Promise < { uid : string ; session : string } | undefined > {
141191 if ( ! uid || ! session ) {
142192 return ;
0 commit comments