Skip to content

Commit f867571

Browse files
authored
feat: event outgoing queue (#338)
1 parent 2ca978b commit f867571

File tree

8 files changed

+1095
-107
lines changed

8 files changed

+1095
-107
lines changed

packages/federation-sdk/src/queues/per-destination.queue.spec.ts

Lines changed: 661 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 293 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,293 @@
1+
import type { BaseEDU } from '@rocket.chat/federation-core';
2+
import { createLogger } from '@rocket.chat/federation-core';
3+
import type { Pdu } from '@rocket.chat/federation-room';
4+
5+
import type { FederationRequestService } from '../services/federation-request.service';
6+
import { FederationEndpoints, type SendTransactionResponse, type Transaction } from '../specs/federation-api';
7+
8+
/**
9+
* Per-destination queue for sending PDUs and EDUs to a specific homeserver.
10+
* Implements retry logic with exponential backoff and batching of events.
11+
*
12+
* Configuration:
13+
* - Matrix spec constants (hardcoded):
14+
* - MAX_PDUS_PER_TRANSACTION = 50
15+
* - MAX_EDUS_PER_TRANSACTION = 100
16+
*
17+
* - Environment variables (configurable):
18+
* - FEDERATION_OUTGOING_MAX_RETRIES: Max retry attempts (default: 10)
19+
* - FEDERATION_OUTGOING_INITIAL_BACKOFF_MS: Initial backoff in milliseconds (default: 1000)
20+
* - FEDERATION_OUTGOING_MAX_BACKOFF_MS: Maximum backoff in milliseconds (default: 3600000 = 1 hour)
21+
* - FEDERATION_OUTGOING_BACKOFF_MULTIPLIER: Backoff multiplier for exponential backoff (default: 2)
22+
*/
23+
24+
interface QueuedPDU {
25+
pdu: Pdu;
26+
queuedAt: number;
27+
}
28+
29+
interface QueuedEDU {
30+
edu: BaseEDU;
31+
queuedAt: number;
32+
}
33+
34+
// Matrix spec constants (hardcoded per spec)
35+
const MAX_PDUS_PER_TRANSACTION = 50;
36+
const MAX_EDUS_PER_TRANSACTION = 100;
37+
38+
interface RetryConfig {
39+
maxRetries: number;
40+
initialBackoffMs: number;
41+
maxBackoffMs: number;
42+
backoffMultiplier: number;
43+
}
44+
45+
/**
46+
* Get retry configuration from environment variables with sensible defaults
47+
*/
48+
function getRetryConfigFromEnv(): RetryConfig {
49+
return {
50+
maxRetries: parseInt(process.env.FEDERATION_OUTGOING_MAX_RETRIES || '10', 10),
51+
initialBackoffMs: parseInt(process.env.FEDERATION_OUTGOING_INITIAL_BACKOFF_MS || '1000', 10),
52+
maxBackoffMs: parseInt(process.env.FEDERATION_OUTGOING_MAX_BACKOFF_MS || '3600000', 10), // 1 hour
53+
backoffMultiplier: parseFloat(process.env.FEDERATION_OUTGOING_BACKOFF_MULTIPLIER || '2'),
54+
};
55+
}
56+
57+
export class PerDestinationQueue {
58+
private logger;
59+
60+
private pduQueue: QueuedPDU[] = [];
61+
62+
private eduQueue: QueuedEDU[] = [];
63+
64+
private processing = false;
65+
66+
private retryCount = 0;
67+
68+
private nextRetryAt = 0;
69+
70+
private readonly retryConfig: RetryConfig;
71+
72+
private retryTimerId: NodeJS.Timeout | null = null;
73+
74+
constructor(
75+
private readonly destination: string,
76+
private readonly origin: string,
77+
private readonly requestService: FederationRequestService,
78+
retryConfig?: Partial<RetryConfig>,
79+
) {
80+
// Load config from env vars, allow override for testing
81+
const envConfig = getRetryConfigFromEnv();
82+
this.retryConfig = { ...envConfig, ...retryConfig };
83+
84+
this.logger = createLogger('PerDestinationQueue').child({ destination });
85+
}
86+
87+
/**
88+
* Enqueue a PDU for sending to the destination
89+
*/
90+
enqueuePDU(pdu: Pdu): void {
91+
this.pduQueue.push({
92+
pdu,
93+
queuedAt: Date.now(),
94+
});
95+
this.logger.debug({ queueSize: this.pduQueue.length }, 'Enqueued PDU');
96+
this.processQueue();
97+
}
98+
99+
/**
100+
* Enqueue an EDU for sending to the destination
101+
*/
102+
enqueueEDU(edu: BaseEDU): void {
103+
this.eduQueue.push({
104+
edu,
105+
queuedAt: Date.now(),
106+
});
107+
this.logger.debug({ queueSize: this.eduQueue.length }, 'Enqueued EDU');
108+
this.processQueue();
109+
}
110+
111+
/**
112+
* Check if the queue is empty
113+
*/
114+
isEmpty(): boolean {
115+
return this.pduQueue.length === 0 && this.eduQueue.length === 0;
116+
}
117+
118+
/**
119+
* Notify that the remote server is back online.
120+
* This clears the backoff and triggers immediate processing.
121+
*/
122+
notifyServerUp(): void {
123+
this.logger.info('Remote server is back online, clearing backoff');
124+
this.clearRetryTimer();
125+
this.retryCount = 0;
126+
this.nextRetryAt = 0;
127+
128+
// Trigger immediate processing if there are items in queue
129+
if (!this.isEmpty()) {
130+
void this.processQueue();
131+
}
132+
}
133+
134+
/**
135+
* Clear any pending retry timer
136+
*/
137+
private clearRetryTimer(): void {
138+
if (this.retryTimerId) {
139+
clearTimeout(this.retryTimerId);
140+
this.retryTimerId = null;
141+
}
142+
}
143+
144+
/**
145+
* Process the queue by sending batched transactions
146+
*/
147+
private async processQueue(): Promise<void> {
148+
// Don't process if already processing or if we need to wait for retry
149+
if (this.processing) {
150+
return;
151+
}
152+
153+
const now = Date.now();
154+
if (this.nextRetryAt > now) {
155+
// Already in backoff period, timer from handleRetry() will process the queue later
156+
const waitTime = this.nextRetryAt - now;
157+
this.logger.debug({ waitTimeMs: waitTime, nextRetryAt: this.nextRetryAt }, 'Still in backoff period, skipping');
158+
return;
159+
}
160+
161+
// Don't process if queue is empty
162+
if (this.isEmpty()) {
163+
return;
164+
}
165+
166+
this.processing = true;
167+
168+
let shouldContinue = false;
169+
170+
try {
171+
// Batch PDUs and EDUs into a transaction
172+
// Matrix spec: max 50 PDUs and 100 EDUs per transaction
173+
const pdusToSend = this.pduQueue.slice(0, MAX_PDUS_PER_TRANSACTION).map((item) => item.pdu);
174+
const edusToSend = this.eduQueue.slice(0, MAX_EDUS_PER_TRANSACTION).map((item) => item.edu);
175+
176+
this.logger.info({ pduCount: pdusToSend.length, eduCount: edusToSend.length }, 'Sending transaction');
177+
178+
await this.sendTransaction({
179+
origin: this.origin,
180+
origin_server_ts: Date.now(),
181+
pdus: pdusToSend,
182+
edus: edusToSend,
183+
});
184+
185+
// Transaction successful, remove sent items from queue
186+
this.pduQueue.splice(0, pdusToSend.length);
187+
this.eduQueue.splice(0, edusToSend.length);
188+
189+
// Reset retry count on success
190+
this.retryCount = 0;
191+
this.nextRetryAt = 0;
192+
193+
this.logger.info('Successfully sent transaction');
194+
195+
// Mark that we should continue processing after finally block completes
196+
shouldContinue = !this.isEmpty();
197+
} catch (error) {
198+
this.logger.error(
199+
{
200+
err: error,
201+
retryCount: this.retryCount,
202+
},
203+
'Failed to send transaction',
204+
);
205+
206+
// Handle retry with exponential backoff
207+
this.handleRetry();
208+
} finally {
209+
this.processing = false;
210+
}
211+
212+
// Continue processing if there are more items (after finally has released the mutex)
213+
if (shouldContinue) {
214+
void this.processQueue();
215+
}
216+
}
217+
218+
/**
219+
* Send a transaction to the destination server
220+
*/
221+
private async sendTransaction(transaction: Transaction): Promise<SendTransactionResponse> {
222+
const txnId = `${Date.now()}_${Math.random().toString(36).substring(2, 9)}`;
223+
const uri = FederationEndpoints.sendTransaction(txnId);
224+
225+
return this.requestService.put<SendTransactionResponse>(this.destination, uri, transaction);
226+
}
227+
228+
/**
229+
* Handle retry logic with exponential backoff
230+
*/
231+
private handleRetry(): void {
232+
this.retryCount++;
233+
234+
if (this.retryCount > this.retryConfig.maxRetries) {
235+
this.logger.error(
236+
{
237+
maxRetries: this.retryConfig.maxRetries,
238+
droppedPdus: this.pduQueue.length,
239+
droppedEdus: this.eduQueue.length,
240+
},
241+
'Max retries reached, dropping events',
242+
);
243+
// Clear the queue on max retries
244+
this.pduQueue = [];
245+
this.eduQueue = [];
246+
this.retryCount = 0;
247+
this.nextRetryAt = 0;
248+
this.clearRetryTimer();
249+
return;
250+
}
251+
252+
// Calculate exponential backoff
253+
const backoff = Math.min(
254+
this.retryConfig.initialBackoffMs * Math.pow(this.retryConfig.backoffMultiplier, this.retryCount - 1),
255+
this.retryConfig.maxBackoffMs,
256+
);
257+
258+
// Check if backoff exceeds 1 hour threshold (per Synapse spec, should enter catch-up mode)
259+
if (backoff >= 3600000) {
260+
this.logger.warn(
261+
{
262+
destination: this.destination,
263+
backoffMs: backoff,
264+
pduQueueSize: this.pduQueue.length,
265+
eduQueueSize: this.eduQueue.length,
266+
},
267+
'Backoff exceeded 1 hour. Emptying queue and stop retrying until server is up.',
268+
);
269+
this.pduQueue = [];
270+
this.eduQueue = [];
271+
this.retryCount = 0;
272+
this.nextRetryAt = Infinity;
273+
this.clearRetryTimer();
274+
return;
275+
}
276+
277+
this.nextRetryAt = Date.now() + backoff;
278+
279+
this.logger.info(
280+
{
281+
destination: this.destination,
282+
retryCount: this.retryCount,
283+
maxRetries: this.retryConfig.maxRetries,
284+
backoffMs: backoff,
285+
},
286+
'Scheduling retry',
287+
);
288+
289+
// Clear any existing timer before scheduling a new retry
290+
this.clearRetryTimer();
291+
this.retryTimerId = setTimeout(() => this.processQueue(), backoff);
292+
}
293+
}

packages/federation-sdk/src/sdk.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,4 +270,8 @@ export class FederationSDK {
270270
sendReadReceipt(...args: Parameters<typeof this.eduService.sendReadReceipt>) {
271271
return this.eduService.sendReadReceipt(...args);
272272
}
273+
274+
notifyRemoteServerUp(...args: Parameters<typeof this.federationService.notifyRemoteServerUp>) {
275+
return this.federationService.notifyRemoteServerUp(...args);
276+
}
273277
}

packages/federation-sdk/src/services/edu.service.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ export class EduService {
2424

2525
this.logger.debug(`Sending typing notification for room ${roomId}: ${userId} (typing: ${typing}) to all servers in room`);
2626

27+
// TODO we need a map of rooms and destinations to avoid having to get rooms state just to send an event to all servers in the room.
2728
const servers = await this.stateService.getServerSetInRoom(roomId);
2829
const uniqueServers = Array.from(servers).filter((server) => server !== origin);
2930

@@ -49,6 +50,7 @@ export class EduService {
4950

5051
await Promise.all(
5152
roomIds.map(async (roomId) => {
53+
// TODO we need a map of rooms and destinations to avoid having to get rooms state just to send an event to all servers in the room.
5254
const servers = await this.stateService.getServerSetInRoom(roomId);
5355
for (const server of servers) {
5456
if (server !== origin) {
@@ -104,6 +106,7 @@ export class EduService {
104106
`Sending read receipt for user ${userId} in room ${roomId} for events ${eventIds.join(', ')} to all servers in room`,
105107
);
106108

109+
// TODO we need a map of rooms and destinations to avoid having to get rooms state just to send an event to all servers in the room.
107110
const servers = await this.stateService.getServerSetInRoom(roomId);
108111
const uniqueServers = Array.from(servers).filter((server) => server !== origin);
109112

packages/federation-sdk/src/services/event-authorization.service.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,7 @@ export class EventAuthorizationService {
249249
return false;
250250
}
251251

252+
// TODO we need a map of rooms and destinations to avoid having to get rooms state just to send an event to all servers in the room.
252253
const serversInRoom = await this.stateService.getServerSetInRoom(roomId, state);
253254
if (serversInRoom.has(serverName)) {
254255
this.logger.debug(`Server ${serverName} is in room, allowing access`);

0 commit comments

Comments
 (0)