Skip to content

Commit b529fa0

Browse files
committed
add cron every minute to look expired status
1 parent 3dc152e commit b529fa0

3 files changed

Lines changed: 26 additions & 2 deletions

File tree

ee/packages/presence/src/Presence.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import type { IPresence, IBrokerNode } from '@rocket.chat/core-services';
22
import { License, ServiceClass, Settings } from '@rocket.chat/core-services';
33
import type { IUser } from '@rocket.chat/core-typings';
44
import { UserStatus } from '@rocket.chat/core-typings';
5+
import { cronJobs } from '@rocket.chat/cron';
56
import { Users, UsersSessions } from '@rocket.chat/models';
67

78
import { PresenceReaper } from './lib/PresenceReaper';
@@ -98,6 +99,25 @@ export class Presence extends ServiceClass implements IPresence {
9899
} catch (e: unknown) {
99100
// ignore
100101
}
102+
103+
// TODO: Agenda default lockLifetime is 10 min. This job executes in ms and runs every 1 min.
104+
// If an instance crashes mid-execution, expired statuses stay locked for up to 10 min.
105+
// Reduce lockLifetime to 60s once @rocket.chat/agenda exposes the option in its typed API.
106+
await cronJobs.add('presence-status-expiration', '* * * * *', () => this.processExpiredStatuses());
107+
}
108+
109+
private async processExpiredStatuses(batchSize = 100): Promise<void> {
110+
const expiredUsers = await Users.findExpiredStatuses(batchSize).toArray();
111+
112+
if (expiredUsers.length === 0) {
113+
return;
114+
}
115+
116+
await Promise.allSettled(expiredUsers.map(({ _id }) => this.endActiveState(_id)));
117+
118+
if (expiredUsers.length === batchSize) {
119+
return this.processExpiredStatuses(batchSize);
120+
}
101121
}
102122

103123
private async handleReaperUpdates(userIds: string[]): Promise<void> {

packages/model-typings/src/models/IUsersModel.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,8 @@ export interface IUsersModel extends IBaseModel<IUser> {
172172

173173
updateStatusText(_id: IUser['_id'], statusText: string, options?: UpdateOptions): Promise<UpdateResult>;
174174

175+
findExpiredStatuses(limit: number): FindCursor<Pick<IUser, '_id'>>;
176+
175177
findOneForPresenceEngine(userId: IUser['_id']): Promise<IUser | null>;
176178

177179
updatePresenceAndStatus(userId: IUser['_id'], values: Record<string, unknown>, clear?: string[]): Promise<IUser | null>;
@@ -242,8 +244,6 @@ export interface IUsersModel extends IBaseModel<IUser> {
242244
}: { statusDefault?: UserStatus; status: UserStatus; statusConnection: UserStatus; statusText?: string },
243245
): Promise<UpdateResult>;
244246

245-
updateStatusAndStatusDefault(userId: string, status: UserStatus, statusDefault: UserStatus): Promise<UpdateResult>;
246-
247247
setFederationAvatarUrlById(userId: IUser['_id'], federationAvatarUrl: string): Promise<UpdateResult>;
248248

249249
setFederationAvatarUrlById(userId: IUser['_id'], federationAvatarUrl: string): Promise<UpdateResult>;

packages/models/src/models/Users.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1097,6 +1097,10 @@ export class UsersRaw extends BaseRaw<IUser, DefaultFields<IUser>> implements IU
10971097
return this.updateOne({ _id }, update, { session: options?.session });
10981098
}
10991099

1100+
findExpiredStatuses(limit: number) {
1101+
return this.find({ statusExpiresAt: { $lt: new Date() } }, { projection: { _id: 1 }, limit });
1102+
}
1103+
11001104
findOneForPresenceEngine(userId: IUser['_id']) {
11011105
return this.findOneById(userId, {
11021106
projection: {

0 commit comments

Comments
 (0)