Skip to content

Commit e440ea7

Browse files
ggazzosampaiodiego
andauthored
chore: add lock method to LockRepository for managing room locks with disposal functionality (#238)
Co-authored-by: Diego Sampaio <chinello@gmail.com>
1 parent f867571 commit e440ea7

File tree

5 files changed

+70
-46
lines changed

5 files changed

+70
-46
lines changed

packages/federation-sdk/src/listeners/staging-area.listener.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@ export class StagingAreaListener {
1313
this.stagingAreaQueue.registerHandler(this.handleQueueItem.bind(this));
1414
}
1515

16-
async handleQueueItem(data: RoomID) {
16+
async *handleQueueItem(data: RoomID) {
1717
this.logger.debug(`Processing room ${data}`);
18-
await this.stagingAreaService.processEventForRoom(data);
18+
yield* this.stagingAreaService.processEventForRoom(data);
1919
}
2020
}

packages/federation-sdk/src/queues/staging-area.queue.ts

Lines changed: 36 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,48 +1,72 @@
11
import type { RoomID } from '@rocket.chat/federation-room';
22
import 'reflect-metadata';
3-
import { singleton } from 'tsyringe';
3+
import { delay, inject, singleton } from 'tsyringe';
44

5-
type QueueHandler = (roomId: RoomID) => Promise<void>;
5+
import { LockRepository } from '../repositories/lock.repository';
6+
import { ConfigService } from '../services/config.service';
7+
8+
type QueueHandler = (roomId: RoomID) => AsyncGenerator<unknown | undefined>;
69

710
@singleton()
811
export class StagingAreaQueue {
9-
private queue: RoomID[] = [];
12+
private queue: Set<RoomID> = new Set();
1013

11-
private handlers: QueueHandler[] = [];
14+
private handler: QueueHandler | null = null;
1215

1316
private processing = false;
1417

18+
constructor(
19+
@inject(delay(() => LockRepository))
20+
private readonly lockRepository: LockRepository,
21+
private readonly configService: ConfigService,
22+
) {}
23+
1524
enqueue(roomId: RoomID): void {
16-
this.queue.push(roomId);
25+
this.queue.add(roomId);
1726
this.processQueue();
1827
}
1928

2029
registerHandler(handler: QueueHandler): void {
21-
this.handlers.push(handler);
30+
this.handler = handler;
2231
}
2332

2433
private async processQueue(): Promise<void> {
2534
if (this.processing) {
2635
return;
2736
}
2837

38+
if (!this.handler) {
39+
throw new Error('No handler registered for StagingAreaQueue');
40+
}
41+
2942
this.processing = true;
3043

3144
try {
32-
while (this.queue.length > 0) {
33-
const roomId = this.queue.shift() as RoomID;
45+
while (this.queue.size > 0) {
46+
const [roomId] = this.queue;
3447
if (!roomId) continue;
48+
this.queue.delete(roomId);
49+
50+
// eslint-disable-next-line no-await-in-loop, prettier/prettier
51+
await using lock = await this.lockRepository.lock(
52+
roomId,
53+
this.configService.instanceId,
54+
);
55+
56+
if (!lock.success) {
57+
continue;
58+
}
3559

36-
for (const handler of this.handlers) {
37-
// eslint-disable-next-line no-await-in-loop
38-
await handler(roomId);
60+
// eslint-disable-next-line no-await-in-loop --- this is valid since this.handler is an async generator
61+
for await (const _ of this.handler(roomId)) {
62+
await lock.update();
3963
}
4064
}
4165
} finally {
4266
this.processing = false;
4367

4468
// Check if new items were added while processing
45-
if (this.queue.length > 0) {
69+
if (this.queue.size > 0) {
4670
this.processQueue();
4771
}
4872
}

packages/federation-sdk/src/repositories/lock.repository.ts

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,32 @@ export class LockRepository {
1414
this.collection.createIndex({ roomId: 1 }, { unique: true });
1515
}
1616

17+
async lock(
18+
roomId: string,
19+
instanceId: string,
20+
): Promise<{
21+
success: boolean;
22+
update: () => Promise<void>;
23+
[Symbol.asyncDispose]: () => Promise<void>;
24+
}> {
25+
const lock = await this.getLock(roomId, instanceId);
26+
return {
27+
success: lock,
28+
update: async () => {
29+
if (!lock) {
30+
return;
31+
}
32+
return this.updateLockTimestamp(roomId, instanceId);
33+
},
34+
[Symbol.asyncDispose]: async () => {
35+
if (!lock) {
36+
return;
37+
}
38+
return this.releaseLock(roomId, instanceId);
39+
},
40+
};
41+
}
42+
1743
async getLock(roomId: string, instanceId: string): Promise<boolean> {
1844
const timedout = new Date();
1945
timedout.setTime(timedout.getTime() - 2 * 60 * 1000); // 2 minutes ago

packages/federation-sdk/src/services/event.service.ts

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import {
2626
RoomID,
2727
RoomState,
2828
RoomVersion,
29-
type State,
3029
getAuthChain,
3130
} from '@rocket.chat/federation-room';
3231
import { delay, inject, singleton } from 'tsyringe';
@@ -39,7 +38,6 @@ import type { StateService } from './state.service';
3938
import { StagingAreaQueue } from '../queues/staging-area.queue';
4039
import { EventStagingRepository } from '../repositories/event-staging.repository';
4140
import { EventRepository } from '../repositories/event.repository';
42-
import { LockRepository } from '../repositories/lock.repository';
4341
import { eventSchemas } from '../utils/event-schemas';
4442

4543
export interface AuthEventParams {
@@ -65,8 +63,6 @@ export class EventService {
6563
private readonly eventRepository: EventRepository,
6664
@inject(delay(() => EventStagingRepository))
6765
private readonly eventStagingRepository: EventStagingRepository,
68-
@inject(delay(() => LockRepository))
69-
private readonly lockRepository: LockRepository,
7066
) {}
7167

7268
async getEventById<T extends PduType, P extends EventStore<PduForType<T>>>(eventId: EventID, type?: T): Promise<P | null> {
@@ -197,16 +193,6 @@ export class EventService {
197193
// save the event as staged to be processed
198194
await this.eventStagingRepository.create(eventId, origin, event);
199195

200-
// acquire a lock for processing the event
201-
const lock = await this.lockRepository.getLock(roomId, this.configService.instanceId);
202-
if (!lock) {
203-
this.logger.debug(`Couldn't acquire a lock for room ${roomId}`);
204-
continue;
205-
}
206-
207-
// if we have a lock, we can process the event
208-
// void this.stagingAreaService.processEventForRoom(roomId);
209-
210196
// TODO change this to call stagingAreaService directly (line above)
211197
this.stagingAreaQueue.enqueue(roomId);
212198
}
@@ -637,15 +623,6 @@ export class EventService {
637623

638624
// not we try to process one room at a time
639625
for await (const roomId of rooms) {
640-
const lock = await this.lockRepository.getLock(roomId, this.configService.instanceId);
641-
if (!lock) {
642-
this.logger.debug(`Couldn't acquire a lock for room ${roomId}`);
643-
continue;
644-
}
645-
646-
// if we have a lock, we can process the event
647-
// void this.stagingAreaService.processEventForRoom(roomId);
648-
649626
// TODO change this to call stagingAreaService directly (line above)
650627
this.stagingAreaQueue.enqueue(roomId);
651628

packages/federation-sdk/src/services/staging-area.service.ts

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ export class StagingAreaService {
6060
return [authEvents, prevEvents];
6161
}
6262

63-
async processEventForRoom(roomId: RoomID) {
63+
async *processEventForRoom(roomId: RoomID) {
6464
const roomIdToRoomVersion = new Map<string, RoomVersion>();
6565
const getRoomVersion = async (roomId: RoomID) => {
6666
const version = roomIdToRoomVersion.get(roomId) ?? (await this.stateService.getRoomVersion(roomId));
@@ -92,11 +92,6 @@ export class StagingAreaService {
9292

9393
this.logger.info({ msg: 'Processing event', eventId: event._id });
9494

95-
// if we got an event, we need to update the lock's timestamp to avoid it being timed out
96-
// and acquired by another instance while we're processing a batch of events for this room
97-
// eslint-disable-next-line no-await-in-loop
98-
await this.lockRepository.updateLockTimestamp(roomId, this.configService.instanceId);
99-
10095
try {
10196
// eslint-disable-next-line no-await-in-loop
10297
const addedMissing = await this.processDependencyStage(event);
@@ -143,10 +138,12 @@ export class StagingAreaService {
143138
});
144139
}
145140
}
146-
} while (event);
147141

148-
// release the lock after processing
149-
await this.lockRepository.releaseLock(roomId, this.configService.instanceId);
142+
yield event;
143+
144+
// TODO: what should we do to avoid infinite loops in case the next event is always the same event
145+
} while (event);
146+
this.logger.debug({ msg: 'No more events to process for room', roomId });
150147
}
151148

152149
private async processDependencyStage(event: EventStagingStore) {

0 commit comments

Comments
 (0)