Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ export class StagingAreaListener {
this.stagingAreaQueue.registerHandler(this.handleQueueItem.bind(this));
}

async handleQueueItem(data: RoomID) {
async *handleQueueItem(data: RoomID) {
this.logger.debug(`Processing room ${data}`);
await this.stagingAreaService.processEventForRoom(data);
yield* this.stagingAreaService.processEventForRoom(data);
}
}
48 changes: 36 additions & 12 deletions packages/federation-sdk/src/queues/staging-area.queue.ts
Original file line number Diff line number Diff line change
@@ -1,48 +1,72 @@
import type { RoomID } from '@rocket.chat/federation-room';
import 'reflect-metadata';
import { singleton } from 'tsyringe';
import { delay, inject, singleton } from 'tsyringe';

type QueueHandler = (roomId: RoomID) => Promise<void>;
import { LockRepository } from '../repositories/lock.repository';
import { ConfigService } from '../services/config.service';

type QueueHandler = (roomId: RoomID) => AsyncGenerator<unknown | undefined>;

@singleton()
export class StagingAreaQueue {
private queue: RoomID[] = [];
private queue: Set<RoomID> = new Set();

private handlers: QueueHandler[] = [];
private handler: QueueHandler | null = null;

private processing = false;

constructor(
@inject(delay(() => LockRepository))
private readonly lockRepository: LockRepository,
private readonly configService: ConfigService,
) {}

enqueue(roomId: RoomID): void {
this.queue.push(roomId);
this.queue.add(roomId);
this.processQueue();
}

registerHandler(handler: QueueHandler): void {
this.handlers.push(handler);
this.handler = handler;
}

private async processQueue(): Promise<void> {
if (this.processing) {
return;
}

if (!this.handler) {
throw new Error('No handler registered for StagingAreaQueue');
}

this.processing = true;

try {
while (this.queue.length > 0) {
const roomId = this.queue.shift() as RoomID;
while (this.queue.size > 0) {
const [roomId] = this.queue;
if (!roomId) continue;
this.queue.delete(roomId);

// eslint-disable-next-line no-await-in-loop, prettier/prettier
await using lock = await this.lockRepository.lock(
roomId,
this.configService.instanceId,
);

if (!lock.success) {
continue;
}

for (const handler of this.handlers) {
// eslint-disable-next-line no-await-in-loop
await handler(roomId);
// eslint-disable-next-line no-await-in-loop --- this is valid since this.handler is an async generator
for await (const _ of this.handler(roomId)) {
await lock.update();
}
Comment on lines +60 to 63
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Handle lock.update() failure to abort processing.

Once lock.update() is fixed to return boolean (per the comment on lock.repository.ts), this loop should check the return value and break if the lease was lost:

 for await (const _ of this.handler(roomId)) {
-	await lock.update();
+	const stillHeld = await lock.update();
+	if (!stillHeld) {
+		// Lost the lock — abort and let another instance continue
+		break;
+	}
 }

Otherwise, processing continues on a room that another instance now owns, risking duplicate/conflicting work.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/federation-sdk/src/queues/staging-area.queue.ts` around lines 60 -
63, The loop in staging-area.queue.ts that iterates over this.handler(roomId)
must check the boolean result of lock.update() and abort processing if it
returns false; modify the for-await loop to capture the result of await
lock.update(), and if it is false, immediately break out (or return) to stop
handling the room so work doesn't continue after the lease is lost. Ensure you
reference lock.update() and this.handler(roomId) when making the change and do
not rely on exceptions — explicitly handle the false return to terminate
processing.

}
} finally {
this.processing = false;

// Check if new items were added while processing
if (this.queue.length > 0) {
if (this.queue.size > 0) {
this.processQueue();
}
}
Expand Down
26 changes: 26 additions & 0 deletions packages/federation-sdk/src/repositories/lock.repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,32 @@ export class LockRepository {
this.collection.createIndex({ roomId: 1 }, { unique: true });
}

async lock(
roomId: string,
instanceId: string,
): Promise<{
success: boolean;
update: () => Promise<void>;
[Symbol.asyncDispose]: () => Promise<void>;
}> {
const lock = await this.getLock(roomId, instanceId);
return {
success: lock,
update: async () => {
if (!lock) {
return;
}
return this.updateLockTimestamp(roomId, instanceId);
},
[Symbol.asyncDispose]: async () => {
if (!lock) {
return;
}
return this.releaseLock(roomId, instanceId);
},
Comment on lines +17 to +39
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Make update() surface lease loss.

If one processing step runs longer than the 2-minute timeout, another instance can take the room. updateLockTimestamp(roomId, instanceId) will then match 0 documents, but lock.update() still resolves, so the queue/service keep processing without owning the lease. Return a boolean here (or throw) and abort processing when the heartbeat fails.

Minimal API shape
 async lock(
 	roomId: string,
 	instanceId: string,
 ): Promise<{
 	success: boolean;
-	update: () => Promise<void>;
+	update: () => Promise<boolean>;
 	[Symbol.asyncDispose]: () => Promise<void>;
 }> {
 	const lock = await this.getLock(roomId, instanceId);
 	return {
 		success: lock,
 		update: async () => {
 			if (!lock) {
-				return;
+				return false;
 			}
 			return this.updateLockTimestamp(roomId, instanceId);
 		},
async updateLockTimestamp(roomId: string, instanceId: string): Promise<boolean> {
	const { matchedCount } = await this.collection.updateOne(
		{ roomId, instanceId },
		{ $set: { lockedAt: new Date() } },
	);

	return matchedCount === 1;
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/federation-sdk/src/repositories/lock.repository.ts` around lines 17
- 39, The lock() method currently returns an update() that always resolves even
if updateLockTimestamp failed; change the returned update function (in lock())
to await updateLockTimestamp(roomId, instanceId) and return its boolean result
(or throw) so callers can detect lease loss; keep getLock(), releaseLock(), and
[Symbol.asyncDispose] behavior but ensure update()'s signature becomes
Promise<boolean> (or throws) and propagates the matchedCount === 1 result from
updateLockTimestamp to abort processing when the heartbeat fails.

};
}

async getLock(roomId: string, instanceId: string): Promise<boolean> {
const timedout = new Date();
timedout.setTime(timedout.getTime() - 2 * 60 * 1000); // 2 minutes ago
Expand Down
23 changes: 0 additions & 23 deletions packages/federation-sdk/src/services/event.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import {
RoomID,
RoomState,
RoomVersion,
type State,
getAuthChain,
} from '@rocket.chat/federation-room';
import { delay, inject, singleton } from 'tsyringe';
Expand All @@ -39,7 +38,6 @@ import type { StateService } from './state.service';
import { StagingAreaQueue } from '../queues/staging-area.queue';
import { EventStagingRepository } from '../repositories/event-staging.repository';
import { EventRepository } from '../repositories/event.repository';
import { LockRepository } from '../repositories/lock.repository';
import { eventSchemas } from '../utils/event-schemas';

export interface AuthEventParams {
Expand All @@ -65,8 +63,6 @@ export class EventService {
private readonly eventRepository: EventRepository,
@inject(delay(() => EventStagingRepository))
private readonly eventStagingRepository: EventStagingRepository,
@inject(delay(() => LockRepository))
private readonly lockRepository: LockRepository,
) {}

async getEventById<T extends PduType, P extends EventStore<PduForType<T>>>(eventId: EventID, type?: T): Promise<P | null> {
Expand Down Expand Up @@ -197,16 +193,6 @@ export class EventService {
// save the event as staged to be processed
await this.eventStagingRepository.create(eventId, origin, event);

// acquire a lock for processing the event
const lock = await this.lockRepository.getLock(roomId, this.configService.instanceId);
if (!lock) {
this.logger.debug(`Couldn't acquire a lock for room ${roomId}`);
continue;
}

// if we have a lock, we can process the event
// void this.stagingAreaService.processEventForRoom(roomId);

// TODO change this to call stagingAreaService directly (line above)
this.stagingAreaQueue.enqueue(roomId);
}
Expand Down Expand Up @@ -637,15 +623,6 @@ export class EventService {

// not we try to process one room at a time
for await (const roomId of rooms) {
const lock = await this.lockRepository.getLock(roomId, this.configService.instanceId);
if (!lock) {
this.logger.debug(`Couldn't acquire a lock for room ${roomId}`);
continue;
}

// if we have a lock, we can process the event
// void this.stagingAreaService.processEventForRoom(roomId);

// TODO change this to call stagingAreaService directly (line above)
this.stagingAreaQueue.enqueue(roomId);

Expand Down
15 changes: 6 additions & 9 deletions packages/federation-sdk/src/services/staging-area.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ export class StagingAreaService {
return [authEvents, prevEvents];
}

async processEventForRoom(roomId: RoomID) {
async *processEventForRoom(roomId: RoomID) {
const roomIdToRoomVersion = new Map<string, RoomVersion>();
const getRoomVersion = async (roomId: RoomID) => {
const version = roomIdToRoomVersion.get(roomId) ?? (await this.stateService.getRoomVersion(roomId));
Expand Down Expand Up @@ -92,11 +92,6 @@ export class StagingAreaService {

this.logger.info({ msg: 'Processing event', eventId: event._id });

// if we got an event, we need to update the lock's timestamp to avoid it being timed out
// and acquired by another instance while we're processing a batch of events for this room
// eslint-disable-next-line no-await-in-loop
await this.lockRepository.updateLockTimestamp(roomId, this.configService.instanceId);

try {
// eslint-disable-next-line no-await-in-loop
const addedMissing = await this.processDependencyStage(event);
Expand Down Expand Up @@ -143,10 +138,12 @@ export class StagingAreaService {
});
}
}
} while (event);

// release the lock after processing
await this.lockRepository.releaseLock(roomId, this.configService.instanceId);
yield event;

// TODO: what should we do to avoid infinite loops in case the next event is always the same event
} while (event);
this.logger.debug({ msg: 'No more events to process for room', roomId });
}

private async processDependencyStage(event: EventStagingStore) {
Expand Down
Loading