Skip to content

Commit bacff36

Browse files
authored
chore: limit staging area processing by 10 tries (#278)
1 parent 67cffdb commit bacff36

File tree

4 files changed

+59
-33
lines changed

4 files changed

+59
-33
lines changed

packages/core/src/models/event.model.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ export interface EventStore<E = Pdu> extends PersistentEventBase<E> {
3939

4040
export interface EventStagingStore extends PersistentEventBase {
4141
roomId: string;
42+
got: number;
4243
from: 'join' | 'transaction';
4344
}
4445

packages/federation-sdk/src/repositories/event-staging.repository.ts

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,12 @@ export class EventStagingRepository {
1010
@inject('EventStagingCollection')
1111
private readonly collection: Collection<EventStagingStore>,
1212
) {
13-
this.collection.createIndex({ roomId: 1, createdAt: 1 });
13+
this.collection.createIndex({
14+
roomId: 1,
15+
got: 1,
16+
'event.depth': 1,
17+
createdAt: 1,
18+
});
1419
}
1520

1621
async create(
@@ -30,6 +35,7 @@ export class EventStagingRepository {
3035
$setOnInsert: {
3136
roomId: event.room_id,
3237
createdAt: new Date(),
38+
got: 0,
3339
},
3440
$set: {
3541
event,
@@ -43,9 +49,18 @@ export class EventStagingRepository {
4349
}
4450

4551
getLeastDepthEventForRoom(roomId: string): Promise<EventStagingStore | null> {
46-
return this.collection.findOne(
52+
return this.collection.findOneAndUpdate(
4753
{ roomId },
48-
{ sort: { 'event.depth': 1, createdAt: 1 } },
54+
{
55+
$inc: {
56+
got: 1,
57+
},
58+
},
59+
{
60+
sort: { got: 1, 'event.depth': 1, createdAt: 1 },
61+
upsert: false,
62+
returnDocument: 'before',
63+
},
4964
);
5065
}
5166

packages/federation-sdk/src/server-discovery/discovery.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ const SERVER_DISCOVERY_CACHE_MAX_AGE =
3737
}
3838

3939
throw new Error('Invalid SERVER_DISCOVERY_CACHE_MAX_AGE value');
40-
})(process.env.SERVER_DISCOVERY_CACHE_MAX_AGE) ?? 3_600_000; // default to 1 hour
40+
})(process.env.SERVER_DISCOVERY_CACHE_MAX_AGE) ?? 300_000; // default to 5 minutes
4141

4242
// should only be needed if input is from a dns server
4343
function fix6(addr: string): `[${string}]` {

packages/federation-sdk/src/services/staging-area.service.ts

Lines changed: 39 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,18 @@ import { FederationService } from './federation.service';
2626
import { MissingEventService } from './missing-event.service';
2727
import { PartialStateResolutionError, StateService } from './state.service';
2828

29+
const MAX_EVENT_RETRY =
30+
((maxRetry?: string) => {
31+
if (!maxRetry) return;
32+
33+
const n = Number.parseInt(maxRetry, 10);
34+
if (!Number.isNaN(n) && n >= 0) {
35+
return n;
36+
}
37+
38+
throw new Error('Invalid MAX_EVENT_RETRY value');
39+
})(process.env.MAX_EVENT_RETRY) ?? 10;
40+
2941
class MissingAuthorizationEventsError extends Error {
3042
constructor(message: string) {
3143
super(message);
@@ -62,16 +74,6 @@ export class StagingAreaService {
6274
}
6375

6476
async processEventForRoom(roomId: string) {
65-
let event = await this.eventService.getLeastDepthEventForRoom(roomId);
66-
if (!event) {
67-
this.logger.debug({ msg: 'No staged event found for room', roomId });
68-
await this.lockRepository.releaseLock(
69-
roomId,
70-
this.configService.instanceId,
71-
);
72-
return;
73-
}
74-
7577
const roomIdToRoomVersion = new Map<string, RoomVersion>();
7678
const getRoomVersion = async (roomId: string) => {
7779
if (roomIdToRoomVersion.has(roomId)) {
@@ -88,9 +90,32 @@ export class StagingAreaService {
8890
return PersistentEventFactory.createFromRawEvent(pdu, version);
8991
};
9092

91-
while (event) {
93+
let event: EventStagingStore | null = null;
94+
95+
do {
96+
event = await this.eventService.getLeastDepthEventForRoom(roomId);
97+
if (!event) {
98+
this.logger.debug({ msg: 'No staged event found for room', roomId });
99+
break;
100+
}
101+
102+
if (event.got > MAX_EVENT_RETRY) {
103+
this.logger.warn(
104+
`Event ${event._id} has been tried ${MAX_EVENT_RETRY} times, removing from staging area`,
105+
);
106+
await this.eventService.markEventAsUnstaged(event);
107+
continue;
108+
}
109+
92110
this.logger.info({ msg: 'Processing event', eventId: event._id });
93111

112+
// if we got an event, we need to update the lock's timestamp to avoid it being timed out
113+
// and acquired by another instance while we're processing a batch of events for this room
114+
await this.lockRepository.updateLockTimestamp(
115+
roomId,
116+
this.configService.instanceId,
117+
);
118+
94119
try {
95120
const addedMissing = await this.processDependencyStage(event);
96121
if (addedMissing) {
@@ -123,33 +148,18 @@ export class StagingAreaService {
123148
});
124149
} else if (err instanceof MissingEventsError) {
125150
this.logger.info({
126-
msg: 'Added missing events, postponing current event processing',
151+
msg: 'Added missing events, postponing event processing',
127152
eventId: event._id,
128153
});
129154
} else {
130155
this.logger.error({
131-
msg: 'Error processing event',
156+
msg: 'Error processing event, postponing event processing',
132157
event,
133158
err,
134159
});
135-
136-
await this.eventService.markEventAsUnstaged(event);
137160
}
138161
}
139-
140-
// TODO: what should we do to avoid infinite loops in case the next event is always the same event
141-
142-
event = await this.eventService.getLeastDepthEventForRoom(roomId);
143-
144-
// if we got an event, we need to update the lock's timestamp to avoid it being timed out
145-
// and acquired by another instance while we're processing a batch of events for this room
146-
if (event) {
147-
await this.lockRepository.updateLockTimestamp(
148-
roomId,
149-
this.configService.instanceId,
150-
);
151-
}
152-
}
162+
} while (event);
153163

154164
// release the lock after processing
155165
await this.lockRepository.releaseLock(

0 commit comments

Comments
 (0)