Skip to content

feat: event outgoing queue#338

Merged
ggazzo merged 10 commits intomainfrom
feat-outgoing-queue
Mar 11, 2026
Merged

feat: event outgoing queue#338
ggazzo merged 10 commits intomainfrom
feat-outgoing-queue

Conversation

@sampaiodiego
Copy link
Member

@sampaiodiego sampaiodiego commented Mar 3, 2026

FGA-10

FGA-11

Summary by CodeRabbit

  • New Features

    • Per-destination outbound queues with automatic batching (up to 50 PDUs / 100 EDUs), configurable retries, exponential backoff, and server-up handling
    • Centralized sender service and SDK method to notify when a remote server is back up
    • Middleware now triggers remote-server-up notifications to resume delivery immediately
  • Refactor

    • Outgoing message flow consolidated behind the new sender orchestration
  • Tests

    • Comprehensive tests for enqueueing, batching, retries/backoff, server recovery, concurrency, and edge cases

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Mar 3, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review

Walkthrough

Adds a per-destination outbound queue (PerDestinationQueue) with batching, retry/exponential backoff, and server-up signaling; introduces FederationSenderService to manage destination queues; integrates sender into FederationService and exposes notifyRemoteServerUp via FederationSDK; auth middleware now notifies remote-server-up on valid requests; comprehensive tests added.

Changes

Cohort / File(s) Summary
PerDestinationQueue & tests
packages/federation-sdk/src/queues/per-destination.queue.ts, packages/federation-sdk/src/queues/per-destination.queue.spec.ts
New PerDestinationQueue implementation and exhaustive unit tests: PDU/EDU queuing, batching limits (50 PDUs / 100 EDUs), transaction construction, exponential backoff (configurable), max-retry and max-backoff handling, 1‑hour threshold behavior (clearing/parking), single in‑flight send enforcement, server-up resume, and env/constructor config overrides.
FederationSenderService
packages/federation-sdk/src/services/federation-sender.service.ts
New singleton service managing a Map of PerDestinationQueue instances; exposes sendPDU/sendEDU (single & multi-destination), notifyRemoteServerUp, and lazily creates per-destination queues; delegates actual transmission to PerDestinationQueue.
FederationService integration
packages/federation-sdk/src/services/federation.service.ts
Refactored outbound flow to use FederationSenderService (sendPDUToMultiple/sendEDUToMultiple); removed inline sendTransaction/sendEvent implementations; added notifyRemoteServerUp passthrough; adjusted imports and destination filtering.
SDK surface
packages/federation-sdk/src/sdk.ts
Added FederationSDK.notifyRemoteServerUp(...) that delegates to federationService.notifyRemoteServerUp to expose server-up signaling.
Auth middleware hookup
packages/homeserver/src/middlewares/isAuthenticated.ts
Auth middleware now calls federationSDK.notifyRemoteServerUp(...) on successful validation to clear backoff and trigger immediate outbound retries for that remote.
EDU routing minor change
packages/federation-sdk/src/services/edu.service.ts
Replaced use of StateService.getServersInRoom(...) with getServerSetInRoom(...), converting the returned Set to an array before filtering out origin.
Manifest / package
package.json
Small manifest update(s) recorded (lines changed).

Sequence Diagram(s)

sequenceDiagram
    participant Auth as Auth Middleware
    participant SDK as FederationSDK
    participant Sender as FederationSenderService
    participant Queue as PerDestinationQueue
    participant Req as FederationRequestService
    participant Remote as RemoteServer

    Remote->>Auth: valid request
    Auth->>SDK: notifyRemoteServerUp(server)
    SDK->>Sender: notifyRemoteServerUp(server)
    Sender->>Queue: serverUp() [clear backoff, resume]
    Queue->>Req: sendTransaction(batch)
    Req->>Remote: POST /_matrix/federation/...
    Remote-->>Req: 200 OK
    Req-->>Queue: success → clear retry state
Loading
sequenceDiagram
    actor App
    participant SDK as FederationSDK
    participant Sender as FederationSenderService
    participant Queue as PerDestinationQueue
    participant Req as FederationRequestService
    actor Remote

    App->>SDK: sendPDU(destination, pdu)
    SDK->>Sender: sendPDU(destination, pdu)
    Sender->>Queue: enqueue(pdu)
    alt batch ready or timer fires
        Queue->>Req: sendTransaction(batch)
        Req->>Remote: POST
        alt success
            Remote-->>Req: 200
            Req-->>Queue: clear backoff, continue processing
        else failure
            Remote-->>Req: error
            Req-->>Queue: increment retry, schedule backoff (exponential)
        end
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'feat: event outgoing queue' accurately summarizes the primary change: implementing a queuing system for outbound federation events.
Linked Issues check ✅ Passed The PR addresses core objectives from FGA-10 and FGA-11: implements per-destination queuing with batching (FGA-11), exponential backoff for unresponsive servers (FGA-10), and automatic retry when servers come back online (FGA-10).
Out of Scope Changes check ✅ Passed All changes are within scope: per-destination queue implementation, retry/backoff logic, federation sender service, SDK integration, and middleware notification for server status align directly with FGA-10 and FGA-11 objectives.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

Tip

Try Coding Plans. Let us write the prompt for your AI agent so you can ship faster (with fewer bugs).
Share your feedback on Discord.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3 issues found across 6 files

Prompt for AI agents (unresolved issues)

Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.


<file name="packages/federation-sdk/src/queues/per-destination.queue.spec.ts">

<violation number="1" location="packages/federation-sdk/src/queues/per-destination.queue.spec.ts:77">
P2: Using fixed `setTimeout` delays for async synchronization makes these tests flaky and non-deterministic; use fake timers or explicit synchronization instead of wall-clock sleeps.</violation>

<violation number="2" location="packages/federation-sdk/src/queues/per-destination.queue.spec.ts:502">
P2: This test deletes global `process.env` keys without restoring them, which can leak state to other tests and create order-dependent failures.</violation>
</file>

<file name="packages/federation-sdk/src/queues/per-destination.queue.ts">

<violation number="1" location="packages/federation-sdk/src/queues/per-destination.queue.ts:126">
P1: Race condition: the `finally` block resets `this.processing = false` after the recursive `this.processQueue()` call has already set it to `true` and started an async send. This allows concurrent `processQueue` executions, leading to duplicate transaction sends.

The recursive call in the `try` block is not awaited, so the `finally` block of the outer call runs while the inner call's `sendTransaction` is in-flight, resetting the mutex. Remove the explicit state change and recursion from the `try` block and move continuation logic after `finally` sets `processing = false`.</violation>
</file>

Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🧹 Nitpick comments (2)
packages/homeserver/src/middlewares/isAuthenticated.ts (1)

44-46: Remove redundant post-validation guard

After the early return at Lines [35]-[40], isValid is already truthy here, so this conditional can be simplified.

Suggested patch
-				if (isValid) {
-					federationSDK.notifyRemoteServerUp(isValid);
-				}
+				federationSDK.notifyRemoteServerUp(isValid);
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/homeserver/src/middlewares/isAuthenticated.ts` around lines 44 - 46,
The post-validation if-check around federationSDK.notifyRemoteServerUp is
redundant because the preceding early return guarantees isValid is truthy;
remove the conditional and call federationSDK.notifyRemoteServerUp(isValid)
directly in the isAuthenticated middleware so the notification happens
unconditionally after validation (refer to the isValid variable and
federationSDK.notifyRemoteServerUp call in isAuthenticated.ts).
packages/federation-sdk/src/services/federation.service.ts (1)

226-229: De-duplicate destination servers before EDU fan-out

Line [228] filters local server but preserves duplicates. If callers pass repeated domains, identical EDUs are enqueued multiple times for the same destination.

Suggested patch
-		const destinations = servers.filter((server) => server !== this.configService.serverName);
+		const destinations = [...new Set(servers)].filter((server) => server !== this.configService.serverName);
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/federation-sdk/src/services/federation.service.ts` around lines 226
- 229, sendEDUToServers currently filters out the local server but leaves
duplicate destination entries, causing duplicate EDUs to be enqueued; update the
logic in sendEDUToServers so that after filtering out
this.configService.serverName you deduplicate the remaining servers (e.g., via a
Set or Array.from(new Set(...))) and use that deduplicated list as the
destinations variable before the fan-out/enqueue step to prevent sending
identical EDUs multiple times to the same server.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@packages/federation-sdk/src/queues/per-destination.queue.spec.ts`:
- Around line 500-505: The test "should use default configuration when env vars
are not set" mutates global process.env without restoring it; save the original
values of FEDERATION_OUTGOING_MAX_RETRIES,
FEDERATION_OUTGOING_INITIAL_BACKOFF_MS, FEDERATION_OUTGOING_MAX_BACKOFF_MS, and
FEDERATION_OUTGOING_BACKOFF_MULTIPLIER at the start of the test (or in a
surrounding beforeEach), delete or unset them for the assertion, and then
restore the saved originals in a finally block or an afterEach to ensure no
global env leakage between tests; update the test to reference the same env keys
when saving/restoring.

In `@packages/federation-sdk/src/queues/per-destination.queue.ts`:
- Around line 181-183: The bug is that the per-destination queue clears the
processing flag too early (in the block that checks this.isEmpty()), allowing a
concurrent enqueue to start a second in-flight send; update the logic in the
methods referencing the processing flag (e.g., where this.processing is set to
false and this.processQueue() is called) so that processing is only cleared
after the current processing work completes (i.e., after processQueue
finishes/awaits or after the send loop finishes), or use an atomic guard
(check-and-set) to prevent a new processor from starting while the current
invocation is still running; apply the same fix to the equivalent block
referenced in the review (the other occurrence around lines 196-198).
- Around line 141-145: The code schedules a setTimeout using waitTime =
this.nextRetryAt - now which can be Infinity when retries are parked, causing
timer overflow and rapid firing; update the block in processQueue (where
this.nextRetryAt is checked and setTimeout is called) to skip scheduling when
waitTime is not finite (use Number.isFinite(waitTime) or check this.nextRetryAt
!== Infinity), only call logger.debug and setTimeout when the waitTime is
finite, and return immediately otherwise; apply the same guard to the other
similar sites that compute waitTime (the other checks around lines handling
parked retries).
- Around line 204-208: The sendTransaction function currently generates a fresh
txnId each call which breaks idempotency on retries; change the flow so txnId is
generated once per transaction payload (e.g., in processQueue before first call)
and then reused for all retries by passing that txnId into sendTransaction (add
an optional txnId parameter to sendTransaction) and ensure handleRetry forwards
the same txnId instead of letting sendTransaction recreate it; alternatively,
attach a stable txnId to the Transaction object before the first send and have
sendTransaction use transaction.txnId if present to guarantee the same txnId
across retries.

In `@packages/federation-sdk/src/services/federation-sender.service.ts`:
- Line 18: The queues Map (private readonly queues) grows indefinitely because
PerDestinationQueue has no shutdown/eviction path; add a shutdown():
Promise<void> method on PerDestinationQueue that cancels any timers, clears
retry/backoff state, and resolves when stopped, then update the code that
currently touches the map (references to PerDestinationQueue and the removal
code around the existing deletion/lines ~99-101) to call await queue.shutdown()
before deleting the entry; also add an eviction policy (e.g., idle TTL check or
explicit removeDestination method) that calls shutdown() when a destination is
considered idle, and expose a service-level shutdown() on the federation sender
to iterate all queues, call shutdown() on each, and clear the Map.

---

Nitpick comments:
In `@packages/federation-sdk/src/services/federation.service.ts`:
- Around line 226-229: sendEDUToServers currently filters out the local server
but leaves duplicate destination entries, causing duplicate EDUs to be enqueued;
update the logic in sendEDUToServers so that after filtering out
this.configService.serverName you deduplicate the remaining servers (e.g., via a
Set or Array.from(new Set(...))) and use that deduplicated list as the
destinations variable before the fan-out/enqueue step to prevent sending
identical EDUs multiple times to the same server.

In `@packages/homeserver/src/middlewares/isAuthenticated.ts`:
- Around line 44-46: The post-validation if-check around
federationSDK.notifyRemoteServerUp is redundant because the preceding early
return guarantees isValid is truthy; remove the conditional and call
federationSDK.notifyRemoteServerUp(isValid) directly in the isAuthenticated
middleware so the notification happens unconditionally after validation (refer
to the isValid variable and federationSDK.notifyRemoteServerUp call in
isAuthenticated.ts).

ℹ️ Review info

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 7bb5a49 and 6928533.

📒 Files selected for processing (6)
  • packages/federation-sdk/src/queues/per-destination.queue.spec.ts
  • packages/federation-sdk/src/queues/per-destination.queue.ts
  • packages/federation-sdk/src/sdk.ts
  • packages/federation-sdk/src/services/federation-sender.service.ts
  • packages/federation-sdk/src/services/federation.service.ts
  • packages/homeserver/src/middlewares/isAuthenticated.ts
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: cubic · AI code reviewer
🧰 Additional context used
🧠 Learnings (1)
📓 Common learnings
Learnt from: ricardogarim
Repo: RocketChat/homeserver PR: 184
File: packages/federation-sdk/src/services/media.service.ts:21-31
Timestamp: 2025-09-14T13:15:46.588Z
Learning: In the RocketChat homeserver project, ricardogarim prefers to defer timeout and retry enhancements for media downloads when there's already a TODO to make timeouts configurable, indicating they manage technical debt systematically rather than implementing every suggested improvement immediately.
🧬 Code graph analysis (2)
packages/homeserver/src/middlewares/isAuthenticated.ts (1)
packages/federation-sdk/src/index.ts (1)
  • federationSDK (176-176)
packages/federation-sdk/src/queues/per-destination.queue.ts (2)
packages/core/src/events/edu/base.ts (1)
  • BaseEDU (17-21)
packages/federation-sdk/src/specs/federation-api.ts (1)
  • FederationEndpoints (17-53)
🔇 Additional comments (2)
packages/federation-sdk/src/sdk.ts (1)

270-272: Clean facade extension

This pass-through method follows the existing SDK delegation pattern and keeps API surface consistent.

packages/federation-sdk/src/services/federation.service.ts (1)

209-224: Good separation of concerns in outbound PDU path

Signing once and delegating transport/retry to FederationSenderService is a clean boundary.

@codecov-commenter
Copy link

codecov-commenter commented Mar 3, 2026

Codecov Report

❌ Patch coverage is 62.93706% with 106 lines in your changes missing coverage. Please review.
✅ Project coverage is 50.88%. Comparing base (a286ad8) to head (c03818d).
⚠️ Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
...federation-sdk/src/queues/per-destination.queue.ts 75.50% 49 Missing ⚠️
...tion-sdk/src/services/federation-sender.service.ts 32.07% 36 Missing ⚠️
.../federation-sdk/src/services/federation.service.ts 40.74% 16 Missing ⚠️
...ackages/federation-sdk/src/services/edu.service.ts 0.00% 3 Missing ⚠️
packages/federation-sdk/src/sdk.ts 50.00% 1 Missing ⚠️
...on-sdk/src/services/event-authorization.service.ts 0.00% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #338      +/-   ##
==========================================
+ Coverage   50.37%   50.88%   +0.51%     
==========================================
  Files          97       99       +2     
  Lines       11058    11244     +186     
==========================================
+ Hits         5570     5722     +152     
- Misses       5488     5522      +34     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (3)
packages/federation-sdk/src/queues/per-destination.queue.spec.ts (1)

205-256: Timing-based tests may be flaky in CI.

Tests relying on precise setTimeout timing (e.g., waiting 150ms for 100ms backoff) can be flaky under load. Consider using jest.useFakeTimers() for more reliable timing control, or increase wait buffers.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/federation-sdk/src/queues/per-destination.queue.spec.ts` around
lines 205 - 256, The timing-based assertions in per-destination.queue.spec.ts
are flaky; update the tests to control timers deterministically by using Jest
fake timers (call jest.useFakeTimers() in the test or beforeEach) and replace
real setTimeout waits with jest.advanceTimersByTime(...) (or alternatively
increase the wait buffers significantly) so retries from PerDestinationQueue
(methods enqueuePDU and its internal retry/backoff logic with
initialBackoffMs/backoffMultiplier/maxBackoffMs) are asserted reliably; ensure
you restore real timers after the test (jest.useRealTimers()) to avoid side
effects.
packages/federation-sdk/src/queues/per-destination.queue.ts (2)

248-264: Hardcoded 1-hour threshold may conflict with configurable maxBackoffMs.

The 1-hour threshold (3600000) is hardcoded, but maxBackoffMs is configurable via env var. If someone sets maxBackoffMs to less than 1 hour (e.g., 30 minutes), the threshold check here would never trigger since backoff is already capped by maxBackoffMs at line 243-246. This is likely fine, but worth noting that the threshold behavior depends on maxBackoffMs >= 3600000.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/federation-sdk/src/queues/per-destination.queue.ts` around lines 248
- 264, The hardcoded 1-hour check uses a literal 3600000 which can conflict with
the configurable maxBackoffMs; change the threshold comparison to use
this.maxBackoffMs (or maxBackoffMs) instead of the literal so the "enter
catch-up mode" branch honors the configured cap—update the condition that checks
backoff (the block that logs the warning, clears this.pduQueue/this.eduQueue,
resets this.retryCount and sets this.nextRetryAt = Infinity) to compare against
the configured maxBackoffMs value.

48-55: Consider validating parsed environment variables.

parseInt and parseFloat return NaN for invalid input strings. While unlikely in production, invalid env vars could cause unpredictable backoff behavior.

Optional defensive validation
 function getRetryConfigFromEnv(): RetryConfig {
+	const parseIntSafe = (value: string | undefined, fallback: number): number => {
+		const parsed = parseInt(value || '', 10);
+		return Number.isNaN(parsed) ? fallback : parsed;
+	};
+	const parseFloatSafe = (value: string | undefined, fallback: number): number => {
+		const parsed = parseFloat(value || '');
+		return Number.isNaN(parsed) ? fallback : parsed;
+	};
 	return {
-		maxRetries: parseInt(process.env.FEDERATION_OUTGOING_MAX_RETRIES || '10', 10),
-		initialBackoffMs: parseInt(process.env.FEDERATION_OUTGOING_INITIAL_BACKOFF_MS || '1000', 10),
-		maxBackoffMs: parseInt(process.env.FEDERATION_OUTGOING_MAX_BACKOFF_MS || '3600000', 10), // 1 hour
-		backoffMultiplier: parseFloat(process.env.FEDERATION_OUTGOING_BACKOFF_MULTIPLIER || '2'),
+		maxRetries: parseIntSafe(process.env.FEDERATION_OUTGOING_MAX_RETRIES, 10),
+		initialBackoffMs: parseIntSafe(process.env.FEDERATION_OUTGOING_INITIAL_BACKOFF_MS, 1000),
+		maxBackoffMs: parseIntSafe(process.env.FEDERATION_OUTGOING_MAX_BACKOFF_MS, 3600000),
+		backoffMultiplier: parseFloatSafe(process.env.FEDERATION_OUTGOING_BACKOFF_MULTIPLIER, 2),
 	};
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/federation-sdk/src/queues/per-destination.queue.ts` around lines 48
- 55, getRetryConfigFromEnv currently trusts parseInt/parseFloat which can yield
NaN; update getRetryConfigFromEnv to validate each parsed value (for example
maxRetries, initialBackoffMs, maxBackoffMs should be finite integers >= 0 and
backoffMultiplier should be a finite number > 0), and if validation fails
replace with the safe defaults shown in the current literals (10, 1000, 3600000,
2). Implement explicit checks (Number.isFinite, Number.isInteger or isNaN) and
optional clamping (e.g. maxRetries >= 0) before returning the RetryConfig to
ensure predictable backoff behavior.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@packages/federation-sdk/src/queues/per-destination.queue.spec.ts`:
- Around line 205-256: The timing-based assertions in
per-destination.queue.spec.ts are flaky; update the tests to control timers
deterministically by using Jest fake timers (call jest.useFakeTimers() in the
test or beforeEach) and replace real setTimeout waits with
jest.advanceTimersByTime(...) (or alternatively increase the wait buffers
significantly) so retries from PerDestinationQueue (methods enqueuePDU and its
internal retry/backoff logic with
initialBackoffMs/backoffMultiplier/maxBackoffMs) are asserted reliably; ensure
you restore real timers after the test (jest.useRealTimers()) to avoid side
effects.

In `@packages/federation-sdk/src/queues/per-destination.queue.ts`:
- Around line 248-264: The hardcoded 1-hour check uses a literal 3600000 which
can conflict with the configurable maxBackoffMs; change the threshold comparison
to use this.maxBackoffMs (or maxBackoffMs) instead of the literal so the "enter
catch-up mode" branch honors the configured cap—update the condition that checks
backoff (the block that logs the warning, clears this.pduQueue/this.eduQueue,
resets this.retryCount and sets this.nextRetryAt = Infinity) to compare against
the configured maxBackoffMs value.
- Around line 48-55: getRetryConfigFromEnv currently trusts parseInt/parseFloat
which can yield NaN; update getRetryConfigFromEnv to validate each parsed value
(for example maxRetries, initialBackoffMs, maxBackoffMs should be finite
integers >= 0 and backoffMultiplier should be a finite number > 0), and if
validation fails replace with the safe defaults shown in the current literals
(10, 1000, 3600000, 2). Implement explicit checks (Number.isFinite,
Number.isInteger or isNaN) and optional clamping (e.g. maxRetries >= 0) before
returning the RetryConfig to ensure predictable backoff behavior.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 89cf7105-a75b-42c8-9659-d7cc3710a373

📥 Commits

Reviewing files that changed from the base of the PR and between 6928533 and 24b7f0e.

📒 Files selected for processing (2)
  • packages/federation-sdk/src/queues/per-destination.queue.spec.ts
  • packages/federation-sdk/src/queues/per-destination.queue.ts
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: cubic · AI code reviewer
🧰 Additional context used
🧠 Learnings (4)
📓 Common learnings
Learnt from: sampaiodiego
Repo: RocketChat/homeserver PR: 338
File: packages/federation-sdk/src/queues/per-destination.queue.ts:204-208
Timestamp: 2026-03-04T13:37:33.578Z
Learning: In `packages/federation-sdk/src/queues/per-destination.queue.ts`, the `PerDestinationQueue` intentionally generates a new `txnId` for each retry attempt. This is by design: retries are not strict resends of the same payload — new PDUs/EDUs may be added to the queue during the backoff window and batched into the next attempt, making each transaction distinct. Reusing the same `txnId` across retries would therefore be incorrect for this implementation, even though the Matrix federation spec requires txnId reuse for identical payload retries.
📚 Learning: 2026-03-04T13:37:33.578Z
Learnt from: sampaiodiego
Repo: RocketChat/homeserver PR: 338
File: packages/federation-sdk/src/queues/per-destination.queue.ts:204-208
Timestamp: 2026-03-04T13:37:33.578Z
Learning: In packages/federation-sdk/src/queues/per-destination.queue.ts, PerDestinationQueue must generate a new txnId for each retry instead of reusing the previous txnId. This is because retries may enqueue additional PDUs/EDUs during the backoff window and batch them into the next attempt, making each transaction distinct. Reusing the same txnId would be incorrect for this implementation, even though the Matrix federation spec mentions txnId reuse for identical payload retries.

Applied to files:

  • packages/federation-sdk/src/queues/per-destination.queue.ts
📚 Learning: 2025-09-14T13:15:46.588Z
Learnt from: ricardogarim
Repo: RocketChat/homeserver PR: 184
File: packages/federation-sdk/src/services/media.service.ts:21-31
Timestamp: 2025-09-14T13:15:46.588Z
Learning: In the RocketChat homeserver project, ricardogarim prefers to defer timeout and retry enhancements for media downloads when there's already a TODO to make timeouts configurable, indicating they manage technical debt systematically rather than implementing every suggested improvement immediately.

Applied to files:

  • packages/federation-sdk/src/queues/per-destination.queue.ts
📚 Learning: 2026-03-04T13:37:33.578Z
Learnt from: sampaiodiego
Repo: RocketChat/homeserver PR: 338
File: packages/federation-sdk/src/queues/per-destination.queue.ts:204-208
Timestamp: 2026-03-04T13:37:33.578Z
Learning: In `packages/federation-sdk/src/queues/per-destination.queue.ts`, the `PerDestinationQueue` intentionally generates a new `txnId` for each retry attempt. This is by design: retries are not strict resends of the same payload — new PDUs/EDUs may be added to the queue during the backoff window and batched into the next attempt, making each transaction distinct. Reusing the same `txnId` across retries would therefore be incorrect for this implementation, even though the Matrix federation spec requires txnId reuse for identical payload retries.

Applied to files:

  • packages/federation-sdk/src/queues/per-destination.queue.spec.ts
🔇 Additional comments (13)
packages/federation-sdk/src/queues/per-destination.queue.ts (5)

1-7: LGTM!

Imports are clean and well-organized, bringing in necessary types from federation packages and local modules.


57-83: LGTM!

Clean class structure with proper encapsulation. The config merge pattern ({ ...envConfig, ...retryConfig }) allows test overrides while using env defaults in production.


85-129: LGTM!

Enqueueing and server-up notification logic is well-implemented. The notifyServerUp method correctly resets backoff state and triggers immediate processing when items are queued.


134-207: LGTM!

The processQueue implementation correctly addresses previously identified issues:

  • Infinity guard at line 143 prevents setTimeout overflow
  • The shouldContinue pattern (lines 159, 187, 203-206) fixes the race condition by deferring continuation until after the finally block releases the processing mutex

The batching logic with slice and splice is correct.


212-217: LGTM!

Transaction ID generation per attempt is intentional, as new PDUs/EDUs may be batched during backoff windows. Based on learnings, this design is correct for this implementation.

packages/federation-sdk/src/queues/per-destination.queue.spec.ts (8)

1-64: LGTM!

Test setup is well-organized with clear helper functions for creating mock PDUs/EDUs and extracting call data. The beforeEach properly clears and restores mocks.


66-129: LGTM!

Basic enqueueing tests properly validate that PDUs and EDUs are sent in separate transactions when enqueued sequentially.


131-203: LGTM!

Batching tests effectively verify Matrix spec limits (50 PDUs, 100 EDUs per transaction) by using a delayed mock to allow items to accumulate.


287-385: LGTM!

Excellent coverage of the 1-hour threshold behavior, including verification that setTimeout is never called with Infinity delay. The setTimeoutSpy approach effectively validates the guard.


387-455: LGTM!

The notifyServerUp tests comprehensively cover both normal backoff clearing and recovery from the parked Infinity state after the 1-hour threshold.


457-565: LGTM!

Good coverage of concurrency prevention, isEmpty edge cases, and transaction structure validation including unique transaction IDs.


568-580: Restore process.env after mutation to prevent test pollution.

This test deletes environment variables without restoring them, which can affect subsequent tests in the same run.

Suggested fix
 		it('should use default configuration when env vars are not set', () => {
+			const savedEnv = {
+				maxRetries: process.env.FEDERATION_OUTGOING_MAX_RETRIES,
+				initialBackoff: process.env.FEDERATION_OUTGOING_INITIAL_BACKOFF_MS,
+				maxBackoff: process.env.FEDERATION_OUTGOING_MAX_BACKOFF_MS,
+				multiplier: process.env.FEDERATION_OUTGOING_BACKOFF_MULTIPLIER,
+			};
+
 			// Clear any environment variables
 			delete process.env.FEDERATION_OUTGOING_MAX_RETRIES;
 			delete process.env.FEDERATION_OUTGOING_INITIAL_BACKOFF_MS;
 			delete process.env.FEDERATION_OUTGOING_MAX_BACKOFF_MS;
 			delete process.env.FEDERATION_OUTGOING_BACKOFF_MULTIPLIER;
 
 			queue = new PerDestinationQueue(destination, origin, mockRequestService);
 
 			// Default values should be used (maxRetries: 10, initialBackoffMs: 1000, etc.)
 			// We can't directly access private fields, but we can test behavior
 			expect(queue).toBeDefined();
+
+			// Restore environment
+			process.env.FEDERATION_OUTGOING_MAX_RETRIES = savedEnv.maxRetries;
+			process.env.FEDERATION_OUTGOING_INITIAL_BACKOFF_MS = savedEnv.initialBackoff;
+			process.env.FEDERATION_OUTGOING_MAX_BACKOFF_MS = savedEnv.maxBackoff;
+			process.env.FEDERATION_OUTGOING_BACKOFF_MULTIPLIER = savedEnv.multiplier;
 		});

582-604: LGTM!

The custom config override test correctly validates that constructor parameters take precedence over environment defaults.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

♻️ Duplicate comments (1)
packages/federation-sdk/src/services/federation-sender.service.ts (1)

18-18: ⚠️ Potential issue | 🟠 Major

Add queue eviction/teardown lifecycle to avoid unbounded destination retention.

queues only grows; there is no eviction or service-level shutdown that drains and clears queue instances/timers.

Suggested direction
 export class FederationSenderService {
   private readonly queues = new Map<string, PerDestinationQueue>();
+  // Optional: track last-activity timestamps for idle eviction.
 
+  shutdown(): void {
+    for (const queue of this.queues.values()) {
+      queue.shutdown?.();
+    }
+    this.queues.clear();
+    this.logger.info('FederationSenderService shutdown complete');
+  }
+
   private getOrCreateQueue(destination: string): PerDestinationQueue {
     let queue = this.queues.get(destination);
     if (!queue) {
       ...
       this.queues.set(destination, queue);
     }
     return queue;
   }
 }

Also applies to: 84-91

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/federation-sdk/src/services/federation-sender.service.ts` at line
18, queues (the Map<string, PerDestinationQueue>) currently grows unbounded and
PerDestinationQueue instances/timers are never drained; add lifecycle management
by implementing eviction and teardown: when creating/adding entries to queues
implement an eviction policy (LRU or TTL) that calls a new
PerDestinationQueue.close()/stop()/drain method to cancel timers and flush/abort
work before deleting the Map entry, and add a service-level shutdown method
(e.g., shutdown() or close()) on the federation sender class that iterates all
queues, calls the same teardown method on each PerDestinationQueue, clears the
Map, and ensures any background timers/promises are awaited or cleared; wire
this shutdown into existing application lifecycle hooks so queues are not
leaked.
🧹 Nitpick comments (1)
packages/federation-sdk/src/services/federation-sender.service.ts (1)

89-90: Consider wiring retry/backoff config explicitly when creating queues (optional)

PerDestinationQueue currently derives retry settings from environment variables (FEDERATION_OUTGOING_MAX_RETRIES, FEDERATION_OUTGOING_INITIAL_BACKOFF_MS, etc.). The constructor does accept an optional retryConfig parameter for testing, but FederationSenderService never passes one. If you prefer explicit configuration over environment variables, consider adding retry settings to ConfigService and passing them here. This would improve testability and make retry behavior more discoverable in code.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/federation-sdk/src/services/federation-sender.service.ts` around
lines 89 - 90, FederationSenderService currently constructs PerDestinationQueue
without passing retryConfig so the queue falls back to env vars; update
FederationSenderService to read explicit retry settings from ConfigService
(e.g., outgoingMaxRetries, outgoingInitialBackoffMs, outgoingMaxBackoffMs,
outgoingBackoffFactor) and pass an object as the optional retryConfig argument
when instantiating PerDestinationQueue (the line creating queue: new
PerDestinationQueue(destination, this.configService.serverName,
this.requestService)). If needed, add corresponding getters/properties to
ConfigService and map them into the expected retryConfig shape so tests and
callers can control backoff/retry behavior without relying on environment
variables.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@packages/federation-sdk/src/queues/per-destination.queue.spec.ts`:
- Around line 238-255: The test doesn't actually verify that backoff delays cap
at maxBackoffMs; change the test to use jest.useFakeTimers() and
jest.advanceTimersByTime()/jest.runOnlyPendingTimers() after calling
queue.enqueuePDU(createMockPdu('$event1')) so you can drive retries
deterministically and observe delays, then assert the delay growth stops at
5000ms (maxBackoffMs) given initialBackoffMs:1000 and backoffMultiplier:2;
specifically, stub mockRequestService.put to always reject, advance timers
through multiple retry cycles for PerDestinationQueue, and either (a) verify the
time advanced between two successive scheduled retries does not exceed 5000, or
(b) spy on setTimeout calls (or the queue's internal scheduling method) and
assert one of the scheduled delays equals 5000ms to prove capping.
- Line 77: The tests use many real setTimeout sleeps (e.g., the await new
Promise(resolve => setTimeout(resolve, 50)) call) which makes the suite slow and
flaky; switch the spec to use fake timers by calling vi.useFakeTimers() in the
test suite setup (beforeEach) and replace each real-time wait with deterministic
timer control (vi.advanceTimersByTime(ms) or vi.runAllTimers()) so expectations
run immediately after advancing time; finally restore timers in afterEach with
vi.useRealTimers() to avoid leaking fake timers to other suites.
- Around line 322-355: The test creates a spy (setTimeoutSpy) but restores it
only at the end, so failures can leak the spy; wrap the body of this test that
constructs PerDestinationQueue, calls queue.enqueuePDU and runs the assertions
(including checks against mockRequestService.put and queue.isEmpty()) in a
try-finally and call setTimeoutSpy.mockRestore() in the finally block so the spy
is always cleaned up even if an expect throws.

---

Duplicate comments:
In `@packages/federation-sdk/src/services/federation-sender.service.ts`:
- Line 18: queues (the Map<string, PerDestinationQueue>) currently grows
unbounded and PerDestinationQueue instances/timers are never drained; add
lifecycle management by implementing eviction and teardown: when creating/adding
entries to queues implement an eviction policy (LRU or TTL) that calls a new
PerDestinationQueue.close()/stop()/drain method to cancel timers and flush/abort
work before deleting the Map entry, and add a service-level shutdown method
(e.g., shutdown() or close()) on the federation sender class that iterates all
queues, calls the same teardown method on each PerDestinationQueue, clears the
Map, and ensures any background timers/promises are awaited or cleared; wire
this shutdown into existing application lifecycle hooks so queues are not
leaked.

---

Nitpick comments:
In `@packages/federation-sdk/src/services/federation-sender.service.ts`:
- Around line 89-90: FederationSenderService currently constructs
PerDestinationQueue without passing retryConfig so the queue falls back to env
vars; update FederationSenderService to read explicit retry settings from
ConfigService (e.g., outgoingMaxRetries, outgoingInitialBackoffMs,
outgoingMaxBackoffMs, outgoingBackoffFactor) and pass an object as the optional
retryConfig argument when instantiating PerDestinationQueue (the line creating
queue: new PerDestinationQueue(destination, this.configService.serverName,
this.requestService)). If needed, add corresponding getters/properties to
ConfigService and map them into the expected retryConfig shape so tests and
callers can control backoff/retry behavior without relying on environment
variables.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 0661f131-c585-4dff-9181-30a77f9eb560

📥 Commits

Reviewing files that changed from the base of the PR and between 24b7f0e and 63bc110.

📒 Files selected for processing (2)
  • packages/federation-sdk/src/queues/per-destination.queue.spec.ts
  • packages/federation-sdk/src/services/federation-sender.service.ts
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: cubic · AI code reviewer
🧰 Additional context used
🧠 Learnings (2)
📓 Common learnings
Learnt from: sampaiodiego
Repo: RocketChat/homeserver PR: 338
File: packages/federation-sdk/src/queues/per-destination.queue.ts:204-208
Timestamp: 2026-03-04T13:37:33.578Z
Learning: In `packages/federation-sdk/src/queues/per-destination.queue.ts`, the `PerDestinationQueue` intentionally generates a new `txnId` for each retry attempt. This is by design: retries are not strict resends of the same payload — new PDUs/EDUs may be added to the queue during the backoff window and batched into the next attempt, making each transaction distinct. Reusing the same `txnId` across retries would therefore be incorrect for this implementation, even though the Matrix federation spec requires txnId reuse for identical payload retries.
📚 Learning: 2026-03-04T13:37:33.578Z
Learnt from: sampaiodiego
Repo: RocketChat/homeserver PR: 338
File: packages/federation-sdk/src/queues/per-destination.queue.ts:204-208
Timestamp: 2026-03-04T13:37:33.578Z
Learning: In `packages/federation-sdk/src/queues/per-destination.queue.ts`, the `PerDestinationQueue` intentionally generates a new `txnId` for each retry attempt. This is by design: retries are not strict resends of the same payload — new PDUs/EDUs may be added to the queue during the backoff window and batched into the next attempt, making each transaction distinct. Reusing the same `txnId` across retries would therefore be incorrect for this implementation, even though the Matrix federation spec requires txnId reuse for identical payload retries.

Applied to files:

  • packages/federation-sdk/src/services/federation-sender.service.ts
  • packages/federation-sdk/src/queues/per-destination.queue.spec.ts
🔇 Additional comments (1)
packages/federation-sdk/src/queues/per-destination.queue.spec.ts (1)

550-564: Good coverage for distinct transaction IDs across separate sends.

This assertion is valuable for guarding queue transaction isolation behavior.

Based on learnings: PerDestinationQueue intentionally generates a new txnId for distinct attempts/batches, so ID uniqueness checks are expected for this implementation.

Comment on lines +238 to +255
it('should respect maxBackoffMs limit', async () => {
mockRequestService.put = jest.fn().mockRejectedValue(new Error('Always fails'));

queue = new PerDestinationQueue(destination, origin, mockRequestService, {
maxRetries: 20,
initialBackoffMs: 1000,
maxBackoffMs: 5000,
backoffMultiplier: 2,
});

queue.enqueuePDU(createMockPdu('$event1'));

await new Promise((resolve) => setTimeout(resolve, 50));

// After many retries, backoff should cap at maxBackoffMs
// Just verify it doesn't grow unbounded by checking the implementation works
expect(mockRequestService.put).toHaveBeenCalledTimes(1);
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Test name and assertion do not validate maxBackoffMs capping.

The test currently asserts only the first call happened. It never proves the retry delay is capped at 5000ms.

Suggested test hardening
 it('should respect maxBackoffMs limit', async () => {
+  const setTimeoutSpy = jest.spyOn(global, 'setTimeout');
   mockRequestService.put = jest.fn().mockRejectedValue(new Error('Always fails'));
 
   queue = new PerDestinationQueue(destination, origin, mockRequestService, {
     maxRetries: 20,
     initialBackoffMs: 1000,
     maxBackoffMs: 5000,
     backoffMultiplier: 2,
   });
 
   queue.enqueuePDU(createMockPdu('$event1'));
 
-  await new Promise((resolve) => setTimeout(resolve, 50));
-
-  // After many retries, backoff should cap at maxBackoffMs
-  // Just verify it doesn't grow unbounded by checking the implementation works
-  expect(mockRequestService.put).toHaveBeenCalledTimes(1);
+  await new Promise((resolve) => setTimeout(resolve, 120));
+  const scheduledDelays = setTimeoutSpy.mock.calls
+    .map((call) => call[1])
+    .filter((delay): delay is number => typeof delay === 'number');
+  expect(scheduledDelays.some((delay) => delay === 5000)).toBe(true);
+  expect(scheduledDelays.every((delay) => delay <= 5000)).toBe(true);
+  setTimeoutSpy.mockRestore();
 });
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
it('should respect maxBackoffMs limit', async () => {
mockRequestService.put = jest.fn().mockRejectedValue(new Error('Always fails'));
queue = new PerDestinationQueue(destination, origin, mockRequestService, {
maxRetries: 20,
initialBackoffMs: 1000,
maxBackoffMs: 5000,
backoffMultiplier: 2,
});
queue.enqueuePDU(createMockPdu('$event1'));
await new Promise((resolve) => setTimeout(resolve, 50));
// After many retries, backoff should cap at maxBackoffMs
// Just verify it doesn't grow unbounded by checking the implementation works
expect(mockRequestService.put).toHaveBeenCalledTimes(1);
});
it('should respect maxBackoffMs limit', async () => {
const setTimeoutSpy = jest.spyOn(global, 'setTimeout');
mockRequestService.put = jest.fn().mockRejectedValue(new Error('Always fails'));
queue = new PerDestinationQueue(destination, origin, mockRequestService, {
maxRetries: 20,
initialBackoffMs: 1000,
maxBackoffMs: 5000,
backoffMultiplier: 2,
});
queue.enqueuePDU(createMockPdu('$event1'));
await new Promise((resolve) => setTimeout(resolve, 120));
const scheduledDelays = setTimeoutSpy.mock.calls
.map((call) => call[1])
.filter((delay): delay is number => typeof delay === 'number');
expect(scheduledDelays.some((delay) => delay === 5000)).toBe(true);
expect(scheduledDelays.every((delay) => delay <= 5000)).toBe(true);
setTimeoutSpy.mockRestore();
});
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/federation-sdk/src/queues/per-destination.queue.spec.ts` around
lines 238 - 255, The test doesn't actually verify that backoff delays cap at
maxBackoffMs; change the test to use jest.useFakeTimers() and
jest.advanceTimersByTime()/jest.runOnlyPendingTimers() after calling
queue.enqueuePDU(createMockPdu('$event1')) so you can drive retries
deterministically and observe delays, then assert the delay growth stops at
5000ms (maxBackoffMs) given initialBackoffMs:1000 and backoffMultiplier:2;
specifically, stub mockRequestService.put to always reject, advance timers
through multiple retry cycles for PerDestinationQueue, and either (a) verify the
time advanced between two successive scheduled retries does not exceed 5000, or
(b) spy on setTimeout calls (or the queue's internal scheduling method) and
assert one of the scheduled delays equals 5000ms to prove capping.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
packages/federation-sdk/src/services/federation-sender.service.ts (1)

60-66: Consider iterating destinations once for multi-EDU dispatch.

The current nested loop calls sendEDU for each destination-EDU combination. While functionally correct, this creates multiple getOrCreateQueue lookups per destination when sending multiple EDUs.

♻️ Optional optimization
 sendEDUToMultiple(destinations: string[], edus: BaseEDU[]): void {
 	for (const destination of destinations) {
+		if (destination === this.configService.serverName) {
+			continue;
+		}
+		const queue = this.getOrCreateQueue(destination);
 		for (const edu of edus) {
-			this.sendEDU(destination, edu);
+			queue.enqueueEDU(edu);
 		}
 	}
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/federation-sdk/src/services/federation-sender.service.ts` around
lines 60 - 66, sendEDUToMultiple currently does repeated getOrCreateQueue
lookups via sendEDU for each edu; instead, for each destination call
getOrCreateQueue(destination) once, store the resulting queue, and then dispatch
all edus using that queue. Update sendEDUToMultiple to iterate destinations
once, call getOrCreateQueue(destination) and then loop edus to enqueue/send
using the retrieved queue; if sendEDU currently calls getOrCreateQueue
internally, refactor sendEDU to accept an optional queue parameter (or add a
private helper like sendEDUWithQueue(queue, edu)) and use that to avoid
duplicate lookups. Ensure references to sendEDU, getOrCreateQueue, and
sendEDUToMultiple are updated accordingly.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@packages/federation-sdk/src/services/federation-sender.service.ts`:
- Around line 60-66: sendEDUToMultiple currently does repeated getOrCreateQueue
lookups via sendEDU for each edu; instead, for each destination call
getOrCreateQueue(destination) once, store the resulting queue, and then dispatch
all edus using that queue. Update sendEDUToMultiple to iterate destinations
once, call getOrCreateQueue(destination) and then loop edus to enqueue/send
using the retrieved queue; if sendEDU currently calls getOrCreateQueue
internally, refactor sendEDU to accept an optional queue parameter (or add a
private helper like sendEDUWithQueue(queue, edu)) and use that to avoid
duplicate lookups. Ensure references to sendEDU, getOrCreateQueue, and
sendEDUToMultiple are updated accordingly.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 9db7b65e-a63c-4829-89a0-5fd0e850a763

📥 Commits

Reviewing files that changed from the base of the PR and between 5c369e2 and c53d9be.

📒 Files selected for processing (7)
  • packages/federation-sdk/src/queues/per-destination.queue.spec.ts
  • packages/federation-sdk/src/queues/per-destination.queue.ts
  • packages/federation-sdk/src/sdk.ts
  • packages/federation-sdk/src/services/edu.service.ts
  • packages/federation-sdk/src/services/federation-sender.service.ts
  • packages/federation-sdk/src/services/federation.service.ts
  • packages/homeserver/src/middlewares/isAuthenticated.ts
🚧 Files skipped from review as they are similar to previous changes (2)
  • packages/homeserver/src/middlewares/isAuthenticated.ts
  • packages/federation-sdk/src/queues/per-destination.queue.ts
📜 Review details
🧰 Additional context used
🧠 Learnings (3)
📓 Common learnings
Learnt from: ricardogarim
Repo: RocketChat/homeserver PR: 184
File: packages/federation-sdk/src/services/media.service.ts:21-31
Timestamp: 2025-09-14T13:15:46.588Z
Learning: In the RocketChat homeserver project, ricardogarim prefers to defer timeout and retry enhancements for media downloads when there's already a TODO to make timeouts configurable, indicating they manage technical debt systematically rather than implementing every suggested improvement immediately.
Learnt from: sampaiodiego
Repo: RocketChat/homeserver PR: 338
File: packages/federation-sdk/src/queues/per-destination.queue.ts:204-208
Timestamp: 2026-03-04T13:37:36.036Z
Learning: In `packages/federation-sdk/src/queues/per-destination.queue.ts`, the `PerDestinationQueue` intentionally generates a new `txnId` for each retry attempt. This is by design: retries are not strict resends of the same payload — new PDUs/EDUs may be added to the queue during the backoff window and batched into the next attempt, making each transaction distinct. Reusing the same `txnId` across retries would therefore be incorrect for this implementation, even though the Matrix federation spec requires txnId reuse for identical payload retries.
📚 Learning: 2025-10-10T22:18:31.655Z
Learnt from: sampaiodiego
Repo: RocketChat/homeserver PR: 224
File: packages/federation-sdk/src/services/event-authorization.service.ts:261-268
Timestamp: 2025-10-10T22:18:31.655Z
Learning: In packages/federation-sdk/src/services/state.service.ts, the method `StateService.getLatestRoomState(roomId: string)` has return type `Promise<State>` and never returns undefined. If the state is not found, it throws an error with message "No state found for room ${roomId}" instead of returning undefined.

Applied to files:

  • packages/federation-sdk/src/services/edu.service.ts
📚 Learning: 2026-03-04T13:37:36.036Z
Learnt from: sampaiodiego
Repo: RocketChat/homeserver PR: 338
File: packages/federation-sdk/src/queues/per-destination.queue.ts:204-208
Timestamp: 2026-03-04T13:37:36.036Z
Learning: In `packages/federation-sdk/src/queues/per-destination.queue.ts`, the `PerDestinationQueue` intentionally generates a new `txnId` for each retry attempt. This is by design: retries are not strict resends of the same payload — new PDUs/EDUs may be added to the queue during the backoff window and batched into the next attempt, making each transaction distinct. Reusing the same `txnId` across retries would therefore be incorrect for this implementation, even though the Matrix federation spec requires txnId reuse for identical payload retries.

Applied to files:

  • packages/federation-sdk/src/services/federation.service.ts
  • packages/federation-sdk/src/services/federation-sender.service.ts
  • packages/federation-sdk/src/queues/per-destination.queue.spec.ts
🔇 Additional comments (12)
packages/federation-sdk/src/queues/per-destination.queue.spec.ts (5)

66-96: LGTM on basic enqueueing tests.

The helper functions and test structure are clean. The tests correctly validate that PDUs and EDUs are enqueued and sent successfully.


131-166: LGTM on batching limit tests.

The tests correctly validate that PDUs are batched to 50 per transaction and EDUs to 100 per transaction, with proper assertions on transaction counts and batch sizes.


318-327: LGTM on setTimeout spy lifecycle management.

The spy is now properly set up in beforeEach and restored in afterEach, ensuring no leakage between tests.


238-255: Test name and assertion do not validate maxBackoffMs capping.

The test asserts only that the first call happened but never proves the retry delay is capped at 5000ms. Consider spying on setTimeout and asserting that scheduled delays do not exceed maxBackoffMs.


77-77: Consider using fake timers for deterministic test execution.

The test suite contains multiple real-time waits (setTimeout) which can cause flaky behavior under CI load. Using fake timers (jest.useFakeTimers() / jest.advanceTimersByTime()) would make tests faster and more reliable.

packages/federation-sdk/src/services/edu.service.ts (1)

107-108: LGTM on getServerSetInRoom integration.

The change correctly uses the updated getServerSetInRoom method that returns a Set<string>, and applies the same Array.from().filter() pattern used consistently in sendTypingNotification (line 27-28) and sendPresenceUpdateToRooms (line 52-57).

packages/federation-sdk/src/services/federation-sender.service.ts (2)

25-33: LGTM on sendPDU implementation.

The method correctly skips local server destinations and delegates to the per-destination queue.


18-18: Add queue lifecycle cleanup (eviction + shutdown teardown).

The queues Map grows indefinitely as new destinations are added, but there's no eviction path or shutdown method to clear/stop queues. This can retain idle destinations and queue timers indefinitely in long-running deployments.

Consider adding:

  1. A shutdown() method that iterates queues, calls cleanup, and clears the Map
  2. An idle TTL eviction policy for destinations that haven't been used recently
packages/federation-sdk/src/services/federation.service.ts (3)

8-20: LGTM on FederationSenderService integration.

The service correctly injects FederationSenderService and updates imports. The dependency injection via tsyringe's @singleton() pattern ensures proper lifecycle management.


199-224: LGTM on sendEventToAllServersInRoom refactoring.

The implementation correctly:

  1. Signs the event once before queuing (line 210) - avoids redundant signing per destination
  2. Filters out both event origin and local server (line 213)
  3. Returns early when no destinations exist (lines 215-218)
  4. Delegates to federationSenderService.sendPDUToMultiple for queued delivery

226-249: LGTM on sendEDUToServers and notifyRemoteServerUp.

Both methods follow consistent patterns:

  • sendEDUToServers filters local server and delegates to the sender service
  • notifyRemoteServerUp provides a clean delegation path for the server-up signal
packages/federation-sdk/src/sdk.ts (1)

273-276: LGTM on notifyRemoteServerUp SDK exposure.

The method follows the established delegation pattern used throughout the SDK, correctly forwarding to federationService.notifyRemoteServerUp with proper type inference via Parameters<typeof>.

@ggazzo ggazzo merged commit f867571 into main Mar 11, 2026
4 checks passed
@ggazzo ggazzo deleted the feat-outgoing-queue branch March 11, 2026 13:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants