Skip to content

chore: add lock method to LockRepository for managing room locks with disposal functionality#238

Merged
sampaiodiego merged 6 commits intomainfrom
chore/refactor-queue-generators
Mar 11, 2026
Merged

chore: add lock method to LockRepository for managing room locks with disposal functionality#238
sampaiodiego merged 6 commits intomainfrom
chore/refactor-queue-generators

Conversation

@ggazzo
Copy link
Member

@ggazzo ggazzo commented Oct 1, 2025

…disposal functionality

Summary by CodeRabbit

  • Refactor

    • Event processing converted to a streaming, generator-driven model for incremental, yield-based room updates.
    • Queue handling redesigned to process per-room items with single-handler coordination, re-entrant processing, and per-item lock management for safer concurrency.
    • Event ingestion simplified to always enqueue incoming events for processing.
  • Bug Fixes

    • Reduced duplicate/conflicting work and race conditions across processors, improving consistency and throughput for room updates.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Oct 1, 2025

Walkthrough

Converts per-room processing to an async-generator pipeline: handlers now yield progress, the queue consumes generators and manages per-room locks via a new LockRepository.lock API, and EventService no longer acquires locks and unconditionally enqueues rooms for processing.

Changes

Cohort / File(s) Summary
Async-generator handlers & flow
packages/federation-sdk/src/listeners/staging-area.listener.ts, packages/federation-sdk/src/services/staging-area.service.ts
Handler signatures changed to async * generators; StagingAreaService yields each processed event instead of returning a Promise; listener updated to delegate via yield*.
Queue & processing model
packages/federation-sdk/src/queues/staging-area.queue.ts
Queue redesigned to use a Set of roomIds, single registered AsyncGenerator handler (type QueueHandler = (roomId: RoomID) => AsyncGenerator<...>), per-item locking around processing, iterate generator yields and call lock.update() between yields, guard if no handler registered, and re-entrant processing control.
Lock repository API
packages/federation-sdk/src/repositories/lock.repository.ts
Adds lock(roomId, instanceId) returning { success, update(): Promise<void>, [Symbol.asyncDispose](): Promise<void> } to manage acquire/update/release lifecycle.
EventService enqueue changes
packages/federation-sdk/src/services/event.service.ts
Removes LockRepository dependency and per-room lock checks; incoming PDUs and startup scanning now always enqueue rooms, deferring concurrency to the queue/lock layer.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  actor EventSource
  participant EventService
  participant StagingAreaQueue as Queue
  participant LockRepository as LockRepo
  participant StagingAreaService as StagingSvc

  EventSource->>EventService: incoming PDU / startup scan
  EventService->>Queue: enqueue(roomId)

  Note over Queue: Processing loop (per room)
  Queue->>LockRepo: lock(roomId, instanceId)
  alt lock acquired
    LockRepo-->>Queue: { success: true, update(), [asyncDispose]() }
    Queue->>StagingSvc: iterate processEventForRoom(roomId) (async generator)
    loop for each yielded item
      StagingSvc-->>Queue: yield item / undefined
      Queue->>LockRepo: update()
    end
    Queue->>LockRepo: [asyncDispose]() (release)
  else lock not acquired
    LockRepo-->>Queue: { success: false }
    Note over Queue: Skip processing this room
  end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Suggested labels

type: feature

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Title check ⚠️ Warning The PR title describes only the lock method addition to LockRepository, but the changeset encompasses broader architectural changes including converting queue handlers to async generators, refactoring the queue processing model, and removing per-room locking from EventService. Revise the title to reflect the main architectural change, such as: 'refactor: convert StagingAreaQueue to async generator-based processing with per-item locking' or 'refactor: restructure queue and lock management for StagingArea' to accurately represent the scope of changes.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@codecov-commenter
Copy link

codecov-commenter commented Oct 1, 2025

Codecov Report

❌ Patch coverage is 16.41791% with 56 lines in your changes missing coverage. Please review.
✅ Project coverage is 50.79%. Comparing base (f867571) to head (dfd8335).

Files with missing lines Patch % Lines
...federation-sdk/src/repositories/lock.repository.ts 0.00% 26 Missing ⚠️
...es/federation-sdk/src/queues/staging-area.queue.ts 30.30% 23 Missing ⚠️
...ederation-sdk/src/services/staging-area.service.ts 0.00% 6 Missing ⚠️
...eration-sdk/src/listeners/staging-area.listener.ts 50.00% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #238      +/-   ##
==========================================
- Coverage   50.86%   50.79%   -0.07%     
==========================================
  Files          99       99              
  Lines       11239    11262      +23     
==========================================
+ Hits         5717     5721       +4     
- Misses       5522     5541      +19     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@ggazzo ggazzo marked this pull request as ready for review October 2, 2025 01:52
@sampaiodiego sampaiodiego force-pushed the chore/refactor-queue-generators branch from dbb1052 to c794902 Compare March 9, 2026 18:32
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@packages/federation-sdk/src/queues/staging-area.queue.ts`:
- Around line 45-52: When lockRepository.lock(roomId,
this.configService.instanceId) fails in the processing loop for the in-memory
queue (this.queue), don't continue and drop the room; instead requeue the room
with backoff and jitter so the only wake-up isn't lost. Implement a per-room
retry tracker (e.g., Map<RoomID, attempts>) used by the staging area processor
(the loop that consumes this.queue) to compute an exponential backoff + random
jitter delay, then schedule the room to be re-enqueued after that delay (or push
it back onto this.queue and use setTimeout to trigger processing); reset the
tracker on success and cap/max the attempts to avoid unbounded retries. Ensure
the code around lockRepository.lock and the queue handling uses this retry
mechanism rather than continuing/ignoring failed acquisitions.

In `@packages/federation-sdk/src/repositories/lock.repository.ts`:
- Around line 17-39: The lock() method currently returns an update() that always
resolves even if updateLockTimestamp failed; change the returned update function
(in lock()) to await updateLockTimestamp(roomId, instanceId) and return its
boolean result (or throw) so callers can detect lease loss; keep getLock(),
releaseLock(), and [Symbol.asyncDispose] behavior but ensure update()'s
signature becomes Promise<boolean> (or throws) and propagates the matchedCount
=== 1 result from updateLockTimestamp to abort processing when the heartbeat
fails.

In `@packages/federation-sdk/src/services/staging-area.service.ts`:
- Around line 147-157: The loop in staging-area.service.ts is peeking the next
event by calling this.eventService.getLeastDepthEventForRoom(roomId) and
yielding that, which causes event-staging.repository.ts to increment the retry
counter (`got`) unnecessarily; change the logic to stop peeking and instead
yield the current event from the iteration (do not call
getLeastDepthEventForRoom() solely to yield), so remove the extra
getLeastDepthEventForRoom() call and yield the already-retrieved event to avoid
burning retries on busy rooms.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: c8c2f5f0-1cd4-44ec-8e89-2841d4c9bc40

📥 Commits

Reviewing files that changed from the base of the PR and between dbb1052 and c794902.

📒 Files selected for processing (5)
  • packages/federation-sdk/src/listeners/staging-area.listener.ts
  • packages/federation-sdk/src/queues/staging-area.queue.ts
  • packages/federation-sdk/src/repositories/lock.repository.ts
  • packages/federation-sdk/src/services/event.service.ts
  • packages/federation-sdk/src/services/staging-area.service.ts
💤 Files with no reviewable changes (1)
  • packages/federation-sdk/src/services/event.service.ts
📜 Review details
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2026-03-04T13:37:36.036Z
Learnt from: sampaiodiego
Repo: RocketChat/homeserver PR: 338
File: packages/federation-sdk/src/queues/per-destination.queue.ts:204-208
Timestamp: 2026-03-04T13:37:36.036Z
Learning: In `packages/federation-sdk/src/queues/per-destination.queue.ts`, the `PerDestinationQueue` intentionally generates a new `txnId` for each retry attempt. This is by design: retries are not strict resends of the same payload — new PDUs/EDUs may be added to the queue during the backoff window and batched into the next attempt, making each transaction distinct. Reusing the same `txnId` across retries would therefore be incorrect for this implementation, even though the Matrix federation spec requires txnId reuse for identical payload retries.

Applied to files:

  • packages/federation-sdk/src/queues/staging-area.queue.ts
📚 Learning: 2025-10-10T22:18:31.655Z
Learnt from: sampaiodiego
Repo: RocketChat/homeserver PR: 224
File: packages/federation-sdk/src/services/event-authorization.service.ts:261-268
Timestamp: 2025-10-10T22:18:31.655Z
Learning: In packages/federation-sdk/src/services/state.service.ts, the method `StateService.getLatestRoomState(roomId: string)` has return type `Promise<State>` and never returns undefined. If the state is not found, it throws an error with message "No state found for room ${roomId}" instead of returning undefined.

Applied to files:

  • packages/federation-sdk/src/services/staging-area.service.ts
  • packages/federation-sdk/src/repositories/lock.repository.ts
🔇 Additional comments (1)
packages/federation-sdk/src/listeners/staging-area.listener.ts (1)

16-18: Generator delegation matches the new queue contract.

Nice thin adapter: the streaming behavior stays in StagingAreaService, and the listener only bridges queue input to the service generator.

Comment on lines +45 to +52
// eslint-disable-next-line no-await-in-loop, prettier/prettier
await using lock = await this.lockRepository.lock(
roomId,
this.configService.instanceId,
);
if (!lock.success) {
continue;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Don't drop the room when the lock is busy.

This queue is per-process memory (private queue: RoomID[] = []). If acquisition fails here and you continue, this instance loses the only wake-up it had for that room. Another node can release the room right after its final staging-area check, leaving newly staged events idle until some later enqueue happens. Requeue with backoff/jitter instead of discarding the item.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/federation-sdk/src/queues/staging-area.queue.ts` around lines 45 -
52, When lockRepository.lock(roomId, this.configService.instanceId) fails in the
processing loop for the in-memory queue (this.queue), don't continue and drop
the room; instead requeue the room with backoff and jitter so the only wake-up
isn't lost. Implement a per-room retry tracker (e.g., Map<RoomID, attempts>)
used by the staging area processor (the loop that consumes this.queue) to
compute an exponential backoff + random jitter delay, then schedule the room to
be re-enqueued after that delay (or push it back onto this.queue and use
setTimeout to trigger processing); reset the tracker on success and cap/max the
attempts to avoid unbounded retries. Ensure the code around lockRepository.lock
and the queue handling uses this retry mechanism rather than continuing/ignoring
failed acquisitions.

Comment on lines +17 to +39
async lock(
roomId: string,
instanceId: string,
): Promise<{
success: boolean;
update: () => Promise<void>;
[Symbol.asyncDispose]: () => Promise<void>;
}> {
const lock = await this.getLock(roomId, instanceId);
return {
success: lock,
update: async () => {
if (!lock) {
return;
}
return this.updateLockTimestamp(roomId, instanceId);
},
[Symbol.asyncDispose]: async () => {
if (!lock) {
return;
}
return this.releaseLock(roomId, instanceId);
},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Make update() surface lease loss.

If one processing step runs longer than the 2-minute timeout, another instance can take the room. updateLockTimestamp(roomId, instanceId) will then match 0 documents, but lock.update() still resolves, so the queue/service keep processing without owning the lease. Return a boolean here (or throw) and abort processing when the heartbeat fails.

Minimal API shape
 async lock(
 	roomId: string,
 	instanceId: string,
 ): Promise<{
 	success: boolean;
-	update: () => Promise<void>;
+	update: () => Promise<boolean>;
 	[Symbol.asyncDispose]: () => Promise<void>;
 }> {
 	const lock = await this.getLock(roomId, instanceId);
 	return {
 		success: lock,
 		update: async () => {
 			if (!lock) {
-				return;
+				return false;
 			}
 			return this.updateLockTimestamp(roomId, instanceId);
 		},
async updateLockTimestamp(roomId: string, instanceId: string): Promise<boolean> {
	const { matchedCount } = await this.collection.updateOne(
		{ roomId, instanceId },
		{ $set: { lockedAt: new Date() } },
	);

	return matchedCount === 1;
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/federation-sdk/src/repositories/lock.repository.ts` around lines 17
- 39, The lock() method currently returns an update() that always resolves even
if updateLockTimestamp failed; change the returned update function (in lock())
to await updateLockTimestamp(roomId, instanceId) and return its boolean result
(or throw) so callers can detect lease loss; keep getLock(), releaseLock(), and
[Symbol.asyncDispose] behavior but ensure update()'s signature becomes
Promise<boolean> (or throws) and propagates the matchedCount === 1 result from
updateLockTimestamp to abort processing when the heartbeat fails.

Copy link

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3 issues found across 5 files

Prompt for AI agents (unresolved issues)

Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.


<file name="packages/federation-sdk/src/services/staging-area.service.ts">

<violation number="1" location="packages/federation-sdk/src/services/staging-area.service.ts:150">
P2: Redundant database query: `getLeastDepthEventForRoom` is called twice per loop iteration — once at the top (to process) and once at the bottom (to yield). The top call on the next iteration overwrites the bottom result, wasting a DB round-trip per event. Restructure to fetch once per iteration, e.g., move the initial fetch before the loop and use the bottom fetch as the source for the next iteration's processing.</violation>
</file>

<file name="packages/federation-sdk/src/queues/staging-area.queue.ts">

<violation number="1" location="packages/federation-sdk/src/queues/staging-area.queue.ts:47">
P1: Lock is acquired and released per-handler instead of per-roomId. The `await using` scope ends at each iteration of `for (const handler of this.handlers)`, releasing the lock between handlers. Another instance can grab the lock in that window, causing duplicate execution of earlier handlers and skipped execution of later ones.

Move the lock acquisition above the `for (const handler …)` loop so a single lock covers all handlers for a given room.</violation>

<violation number="2" location="packages/federation-sdk/src/queues/staging-area.queue.ts:51">
P1: When lock acquisition fails, `continue` skips to the next handler but the room ID has already been shifted out of the in-memory queue. This means the room is silently dropped — no retry, no re-enqueue. If another instance releases the lock moments later, newly staged events for that room will sit idle until some future enqueue happens. Re-queue the room (ideally with a small backoff/jitter) instead of discarding it.</violation>
</file>

Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.

roomId,
this.configService.instanceId,
);
if (!lock.success) {
Copy link

@cubic-dev-ai cubic-dev-ai bot Mar 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1: When lock acquisition fails, continue skips to the next handler but the room ID has already been shifted out of the in-memory queue. This means the room is silently dropped — no retry, no re-enqueue. If another instance releases the lock moments later, newly staged events for that room will sit idle until some future enqueue happens. Re-queue the room (ideally with a small backoff/jitter) instead of discarding it.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At packages/federation-sdk/src/queues/staging-area.queue.ts, line 51:

<comment>When lock acquisition fails, `continue` skips to the next handler but the room ID has already been shifted out of the in-memory queue. This means the room is silently dropped — no retry, no re-enqueue. If another instance releases the lock moments later, newly staged events for that room will sit idle until some future enqueue happens. Re-queue the room (ideally with a small backoff/jitter) instead of discarding it.</comment>

<file context>
@@ -34,8 +43,18 @@ export class StagingAreaQueue {
+						roomId,
+						this.configService.instanceId,
+					);
+					if (!lock.success) {
+						continue;
+					}
</file context>
Fix with Cubic

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
packages/federation-sdk/src/queues/staging-area.queue.ts (1)

45-52: ⚠️ Potential issue | 🔴 Critical

Don't drop the room when the lock is busy.

When lock.success is false, the continue statement only skips the current handler but the roomId has already been removed from the queue (line 42). If lock acquisition fails for all handlers, this room is permanently lost from processing until something else re-enqueues it.

Given the 2-minute lock timeout in LockRepository, another instance could hold the lock temporarily, causing this instance to silently drop work.

Consider re-enqueuing failed rooms with backoff:

🔧 Suggested approach
+	private retryQueue: Map<RoomID, { attempts: number; nextRetry: number }> = new Map();
+
+	private reEnqueueWithBackoff(roomId: RoomID): void {
+		const existing = this.retryQueue.get(roomId) || { attempts: 0, nextRetry: 0 };
+		const attempts = Math.min(existing.attempts + 1, 10);
+		const delay = Math.min(1000 * Math.pow(2, attempts) + Math.random() * 1000, 60000);
+		
+		this.retryQueue.set(roomId, { attempts, nextRetry: Date.now() + delay });
+		setTimeout(() => {
+			this.retryQueue.delete(roomId);
+			this.enqueue(roomId);
+		}, delay);
+	}

Then in the lock failure case:

 				if (!lock.success) {
+					this.reEnqueueWithBackoff(roomId);
+					break; // Skip remaining handlers for this room
-					continue;
 				}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/federation-sdk/src/queues/staging-area.queue.ts` around lines 45 -
52, The code currently removes roomId from the queue before attempting handler
locks so a failed lock (lockRepository.lock returning lock.success === false)
causes the room to be dropped; change the flow in the staging-area queue so you
do not permanently remove the room when lock acquisition fails: either delay
removal until after a handler successfully processes the room, or when
lock.success is false immediately re-enqueue the same roomId with a backoff
(exponential or fixed retry delay) instead of simple continue; update the logic
around this.handlers iteration and the call to lockRepository.lock (and any
requeue helper you have) so failed lock attempts requeue the room (with backoff)
and successful handler processing removes it.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@packages/federation-sdk/src/queues/staging-area.queue.ts`:
- Around line 45-58: The lock is currently acquired inside the for (const
handler of this.handlers) loop causing each handler to attempt its own lock;
instead acquire the lock once per room before iterating handlers so all handlers
run under the same lock atomically: call this.lockRepository.lock(roomId,
this.configService.instanceId) (and check lock.success) once, then iterate for
(const handler of this.handlers) and call handler(roomId) with await
lock.update() during processing, and finally release/dispose the lock after all
handlers complete; update the logic around lockRepository.lock, the handlers
iteration (this.handlers and handler(roomId)), and lock.update to reflect the
single-lock-per-room behavior.

---

Duplicate comments:
In `@packages/federation-sdk/src/queues/staging-area.queue.ts`:
- Around line 45-52: The code currently removes roomId from the queue before
attempting handler locks so a failed lock (lockRepository.lock returning
lock.success === false) causes the room to be dropped; change the flow in the
staging-area queue so you do not permanently remove the room when lock
acquisition fails: either delay removal until after a handler successfully
processes the room, or when lock.success is false immediately re-enqueue the
same roomId with a backoff (exponential or fixed retry delay) instead of simple
continue; update the logic around this.handlers iteration and the call to
lockRepository.lock (and any requeue helper you have) so failed lock attempts
requeue the room (with backoff) and successful handler processing removes it.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 65f27293-c68d-460f-b855-147c70bbda59

📥 Commits

Reviewing files that changed from the base of the PR and between c794902 and 3be70cb.

📒 Files selected for processing (2)
  • packages/federation-sdk/src/queues/staging-area.queue.ts
  • packages/federation-sdk/src/services/event.service.ts
💤 Files with no reviewable changes (1)
  • packages/federation-sdk/src/services/event.service.ts
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: cubic · AI code reviewer
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2026-03-04T13:37:36.036Z
Learnt from: sampaiodiego
Repo: RocketChat/homeserver PR: 338
File: packages/federation-sdk/src/queues/per-destination.queue.ts:204-208
Timestamp: 2026-03-04T13:37:36.036Z
Learning: In `packages/federation-sdk/src/queues/per-destination.queue.ts`, the `PerDestinationQueue` intentionally generates a new `txnId` for each retry attempt. This is by design: retries are not strict resends of the same payload — new PDUs/EDUs may be added to the queue during the backoff window and batched into the next attempt, making each transaction distinct. Reusing the same `txnId` across retries would therefore be incorrect for this implementation, even though the Matrix federation spec requires txnId reuse for identical payload retries.

Applied to files:

  • packages/federation-sdk/src/queues/staging-area.queue.ts
📚 Learning: 2025-10-10T22:18:31.655Z
Learnt from: sampaiodiego
Repo: RocketChat/homeserver PR: 224
File: packages/federation-sdk/src/services/event-authorization.service.ts:261-268
Timestamp: 2025-10-10T22:18:31.655Z
Learning: In packages/federation-sdk/src/services/state.service.ts, the method `StateService.getLatestRoomState(roomId: string)` has return type `Promise<State>` and never returns undefined. If the state is not found, it throws an error with message "No state found for room ${roomId}" instead of returning undefined.

Applied to files:

  • packages/federation-sdk/src/queues/staging-area.queue.ts
🔇 Additional comments (3)
packages/federation-sdk/src/queues/staging-area.queue.ts (3)

1-8: LGTM on imports and type definition.

The imports are correctly structured for tsyringe DI with delay to handle circular dependencies. The QueueHandler type change to AsyncGenerator enables the streaming/yielding progress pattern.

Minor note: AsyncGenerator<unknown | undefined> is slightly redundant since unknown already includes undefined, but this doesn't affect functionality.


18-22: LGTM on constructor injection.

The delay(() => LockRepository) pattern correctly handles potential circular dependencies. The ConfigService relies on tsyringe's automatic resolution by type, which works when registered as a singleton.


60-67: LGTM on the finally block.

The processing state reset and recursive check for new items is correctly implemented. The finally block ensures processing is always reset even if an exception occurs.

@sampaiodiego sampaiodiego force-pushed the chore/refactor-queue-generators branch from 3be70cb to dfd8335 Compare March 11, 2026 18:17
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (2)
packages/federation-sdk/src/repositories/lock.repository.ts (1)

17-41: ⚠️ Potential issue | 🔴 Critical

update() still doesn't surface lease loss.

The past review flagged this: if processing exceeds the 2-minute timeout, another instance can acquire the lock. updateLockTimestamp() will then match 0 documents, but update() still resolves silently, so the caller continues processing without owning the lease.

The return type remains Promise<void> instead of Promise<boolean>, and updateLockTimestamp() doesn't return matchedCount === 1.

🐛 Proposed fix
 async lock(
 	roomId: string,
 	instanceId: string,
 ): Promise<{
 	success: boolean;
-	update: () => Promise<void>;
+	update: () => Promise<boolean>;
 	[Symbol.asyncDispose]: () => Promise<void>;
 }> {
 	const lock = await this.getLock(roomId, instanceId);
 	return {
 		success: lock,
 		update: async () => {
 			if (!lock) {
-				return;
+				return false;
 			}
 			return this.updateLockTimestamp(roomId, instanceId);
 		},
 		[Symbol.asyncDispose]: async () => {
 			if (!lock) {
 				return;
 			}
 			return this.releaseLock(roomId, instanceId);
 		},
 	};
 }

Also update updateLockTimestamp:

-async updateLockTimestamp(roomId: string, instanceId: string): Promise<void> {
-	await this.collection.updateOne({ roomId, instanceId }, { $set: { lockedAt: new Date() } });
+async updateLockTimestamp(roomId: string, instanceId: string): Promise<boolean> {
+	const { matchedCount } = await this.collection.updateOne(
+		{ roomId, instanceId },
+		{ $set: { lockedAt: new Date() } },
+	);
+	return matchedCount === 1;
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/federation-sdk/src/repositories/lock.repository.ts` around lines 17
- 41, The lock() method's returned update() currently returns Promise<void} and
hides lease loss; change the update() signature to return Promise<boolean> and
make it propagate whether the lease was retained by returning the boolean result
from updateLockTimestamp(); also modify updateLockTimestamp() to return a
boolean (true when db result.matchedCount === 1, false otherwise) so callers can
detect lease loss; keep the existing behavior in [Symbol.asyncDispose]
(releaseLock) but ensure callers can stop processing if update() returns false.
packages/federation-sdk/src/queues/staging-area.queue.ts (1)

45-58: ⚠️ Potential issue | 🟠 Major

Room is dropped when lock acquisition fails.

When lock.success is false (line 56-58), the room has already been removed from the Set (line 48) and is silently dropped. Events remain stranded until another external enqueue happens for that room.

Consider re-enqueuing failed rooms with backoff to avoid losing wake-ups:

🔧 Suggested fix with backoff
+private retryDelays: Map<RoomID, number> = new Map();
+private readonly MAX_RETRY_DELAY = 30000; // 30 seconds

 while (this.queue.size > 0) {
 	const [roomId] = this.queue;
 	if (!roomId) continue;
 	this.queue.delete(roomId);

 	// eslint-disable-next-line no-await-in-loop, prettier/prettier
 	await using lock = await this.lockRepository.lock(
 		roomId,
 		this.configService.instanceId,
 	);

 	if (!lock.success) {
+		// Re-enqueue with exponential backoff
+		const currentDelay = this.retryDelays.get(roomId) ?? 100;
+		const nextDelay = Math.min(currentDelay * 2, this.MAX_RETRY_DELAY);
+		this.retryDelays.set(roomId, nextDelay);
+		setTimeout(() => this.enqueue(roomId), currentDelay + Math.random() * 100);
 		continue;
 	}

+	// Reset retry delay on successful lock
+	this.retryDelays.delete(roomId);

 	// eslint-disable-next-line no-await-in-loop --- this is valid since this.handler is an async generator
 	for await (const _ of this.handler(roomId)) {
 		await lock.update();
 	}
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/federation-sdk/src/queues/staging-area.queue.ts` around lines 45 -
58, The code removes room IDs from this.queue before attempting to acquire a
lock (this.lockRepository.lock) so when lock.success is false the room is
dropped; change the logic to avoid permanent loss by either delaying removal
until after a successful lock or re-enqueuing the room with backoff on failure:
e.g., check this.queue (or call this.enqueue) to re-add the room when
lock.success is false using exponential backoff state (a retry-count map keyed
by roomId) and ensure you reference the existing lock acquisition flow
(this.lockRepository.lock and configService.instanceId) and the current delete
call (this.queue.delete) to implement the retry/backoff rather than silently
dropping the room.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@packages/federation-sdk/src/queues/staging-area.queue.ts`:
- Around line 60-63: The loop in staging-area.queue.ts that iterates over
this.handler(roomId) must check the boolean result of lock.update() and abort
processing if it returns false; modify the for-await loop to capture the result
of await lock.update(), and if it is false, immediately break out (or return) to
stop handling the room so work doesn't continue after the lease is lost. Ensure
you reference lock.update() and this.handler(roomId) when making the change and
do not rely on exceptions — explicitly handle the false return to terminate
processing.

---

Duplicate comments:
In `@packages/federation-sdk/src/queues/staging-area.queue.ts`:
- Around line 45-58: The code removes room IDs from this.queue before attempting
to acquire a lock (this.lockRepository.lock) so when lock.success is false the
room is dropped; change the logic to avoid permanent loss by either delaying
removal until after a successful lock or re-enqueuing the room with backoff on
failure: e.g., check this.queue (or call this.enqueue) to re-add the room when
lock.success is false using exponential backoff state (a retry-count map keyed
by roomId) and ensure you reference the existing lock acquisition flow
(this.lockRepository.lock and configService.instanceId) and the current delete
call (this.queue.delete) to implement the retry/backoff rather than silently
dropping the room.

In `@packages/federation-sdk/src/repositories/lock.repository.ts`:
- Around line 17-41: The lock() method's returned update() currently returns
Promise<void} and hides lease loss; change the update() signature to return
Promise<boolean> and make it propagate whether the lease was retained by
returning the boolean result from updateLockTimestamp(); also modify
updateLockTimestamp() to return a boolean (true when db result.matchedCount ===
1, false otherwise) so callers can detect lease loss; keep the existing behavior
in [Symbol.asyncDispose] (releaseLock) but ensure callers can stop processing if
update() returns false.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 6bb52b16-d39a-40ac-a164-ac3c704b7e5b

📥 Commits

Reviewing files that changed from the base of the PR and between 3be70cb and dfd8335.

📒 Files selected for processing (5)
  • packages/federation-sdk/src/listeners/staging-area.listener.ts
  • packages/federation-sdk/src/queues/staging-area.queue.ts
  • packages/federation-sdk/src/repositories/lock.repository.ts
  • packages/federation-sdk/src/services/event.service.ts
  • packages/federation-sdk/src/services/staging-area.service.ts
💤 Files with no reviewable changes (1)
  • packages/federation-sdk/src/services/event.service.ts
🚧 Files skipped from review as they are similar to previous changes (1)
  • packages/federation-sdk/src/listeners/staging-area.listener.ts
📜 Review details
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-10-10T22:18:31.655Z
Learnt from: sampaiodiego
Repo: RocketChat/homeserver PR: 224
File: packages/federation-sdk/src/services/event-authorization.service.ts:261-268
Timestamp: 2025-10-10T22:18:31.655Z
Learning: In packages/federation-sdk/src/services/state.service.ts, the method `StateService.getLatestRoomState(roomId: string)` has return type `Promise<State>` and never returns undefined. If the state is not found, it throws an error with message "No state found for room ${roomId}" instead of returning undefined.

Applied to files:

  • packages/federation-sdk/src/repositories/lock.repository.ts
  • packages/federation-sdk/src/queues/staging-area.queue.ts
  • packages/federation-sdk/src/services/staging-area.service.ts
📚 Learning: 2026-03-04T13:37:36.036Z
Learnt from: sampaiodiego
Repo: RocketChat/homeserver PR: 338
File: packages/federation-sdk/src/queues/per-destination.queue.ts:204-208
Timestamp: 2026-03-04T13:37:36.036Z
Learning: In `packages/federation-sdk/src/queues/per-destination.queue.ts`, the `PerDestinationQueue` intentionally generates a new `txnId` for each retry attempt. This is by design: retries are not strict resends of the same payload — new PDUs/EDUs may be added to the queue during the backoff window and batched into the next attempt, making each transaction distinct. Reusing the same `txnId` across retries would therefore be incorrect for this implementation, even though the Matrix federation spec requires txnId reuse for identical payload retries.

Applied to files:

  • packages/federation-sdk/src/queues/staging-area.queue.ts
  • packages/federation-sdk/src/services/staging-area.service.ts
🔇 Additional comments (2)
packages/federation-sdk/src/services/staging-area.service.ts (1)

63-147: LGTM on the async generator conversion.

The refactor correctly:

  • Yields the current processed event rather than peeking the next (addressing the previous review concern)
  • Delegates lock management to the caller (the queue now calls lock.update() after each yield)
  • The got counter increment in getLeastDepthEventForRoom combined with MAX_EVENT_RETRY bounds the loop iterations, mitigating the infinite loop concern noted in the TODO at line 144
packages/federation-sdk/src/queues/staging-area.queue.ts (1)

12-12: Good: Set prevents duplicate enqueues.

Using a Set instead of an Array ensures the same room isn't enqueued multiple times, which aligns well with the event-driven processing model.

Comment on lines +60 to 63
// eslint-disable-next-line no-await-in-loop --- this is valid since this.handler is an async generator
for await (const _ of this.handler(roomId)) {
await lock.update();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Handle lock.update() failure to abort processing.

Once lock.update() is fixed to return boolean (per the comment on lock.repository.ts), this loop should check the return value and break if the lease was lost:

 for await (const _ of this.handler(roomId)) {
-	await lock.update();
+	const stillHeld = await lock.update();
+	if (!stillHeld) {
+		// Lost the lock — abort and let another instance continue
+		break;
+	}
 }

Otherwise, processing continues on a room that another instance now owns, risking duplicate/conflicting work.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/federation-sdk/src/queues/staging-area.queue.ts` around lines 60 -
63, The loop in staging-area.queue.ts that iterates over this.handler(roomId)
must check the boolean result of lock.update() and abort processing if it
returns false; modify the for-await loop to capture the result of await
lock.update(), and if it is false, immediately break out (or return) to stop
handling the room so work doesn't continue after the lease is lost. Ensure you
reference lock.update() and this.handler(roomId) when making the change and do
not rely on exceptions — explicitly handle the false return to terminate
processing.

@sampaiodiego sampaiodiego changed the title feat: add lock method to LockRepository for managing room locks with … chore: add lock method to LockRepository for managing room locks with disposal functionality Mar 11, 2026
@sampaiodiego sampaiodiego merged commit e440ea7 into main Mar 11, 2026
3 checks passed
@sampaiodiego sampaiodiego deleted the chore/refactor-queue-generators branch March 11, 2026 18:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants