Skip to content

Commit 0909f0b

Browse files
debdutdebggazzo
andauthored
refactor: state calculation (#252)
Co-authored-by: Guilherme Gazzo <[email protected]>
1 parent c544685 commit 0909f0b

29 files changed

+3469
-1216
lines changed

.github/workflows/ci.yml

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
1-
21
on:
2+
release:
3+
types: [published]
34
pull_request:
4-
branches: '**'
5+
branches: "**"
56
paths-ignore:
6-
- '**.md'
7+
- "**.md"
78
push:
89
branches:
910
- main
1011
paths-ignore:
11-
- '**.md'
12+
- "**.md"
1213

1314
name: my-workflow
1415
jobs:
@@ -26,15 +27,14 @@ jobs:
2627
restore-keys: |
2728
${{ runner.os }}-turbo-
2829
- uses: oven-sh/setup-bun@v2
29-
3030
- run: bun install
3131
- run: bun run build
3232
- run: bun lint:ci
3333
- run: bun tsc --noEmit
34-
- run: bun test:coverage
35-
34+
- uses: supercharge/[email protected]
35+
- run: LOG_LEVEL=debug RUN_MONGO_TESTS=1 bun test packages/federation-sdk/src/services/state.service.spec.ts
36+
- run: LOG_LEVEL=debug bun test:coverage
3637
- name: Upload coverage reports to Codecov
3738
uses: codecov/codecov-action@v5
3839
with:
3940
token: ${{ secrets.CODECOV_TOKEN }}
40-

packages/core/src/models/event.model.ts

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,9 @@
1-
import { EventID, Pdu, PduForType } from '@rocket.chat/federation-room';
1+
import type {
2+
EventID,
3+
Pdu,
4+
RejectCode,
5+
StateID,
6+
} from '@rocket.chat/federation-room';
27
import type { EventBase as CoreEventBase } from '../events/eventBase';
38

49
// TODO: use room package
@@ -19,9 +24,15 @@ interface PersistentEventBase<E = Pdu> {
1924

2025
// TODO: Merge with StagedEvent from event.service.ts
2126
export interface EventStore<E = Pdu> extends PersistentEventBase<E> {
22-
stateId: string;
27+
stateId: StateID;
2328
// for prev_events
24-
nextEventId: string;
29+
nextEventId: EventID | '';
30+
31+
rejectCode?: RejectCode;
32+
rejectDetail?: {
33+
reason: string;
34+
rejectedBy?: EventID;
35+
};
2536
}
2637

2738
export interface EventStagingStore extends PersistentEventBase {

packages/federation-sdk/src/container.ts

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import type {
55
EventStagingStore,
66
EventStore,
77
} from '@rocket.chat/federation-core';
8-
import type { Collection, WithId } from 'mongodb';
8+
import type { Collection } from 'mongodb';
99
import { container } from 'tsyringe';
1010

1111
import type { HomeserverEventSignatures } from './index';
@@ -17,7 +17,10 @@ import { Key, KeyRepository } from './repositories/key.repository';
1717
import { Lock, LockRepository } from './repositories/lock.repository';
1818
import { Room, RoomRepository } from './repositories/room.repository';
1919
import { Server, ServerRepository } from './repositories/server.repository';
20-
import { StateRepository, StateStore } from './repositories/state.repository';
20+
import {
21+
StateGraphRepository,
22+
StateGraphStore,
23+
} from './repositories/state-graph.repository';
2124
import { Upload, UploadRepository } from './repositories/upload.repository';
2225
import { ConfigService } from './services/config.service';
2326
import { DatabaseConnectionService } from './services/database-connection.service';
@@ -82,10 +85,6 @@ export async function createFederationContainer(
8285
useValue: db.collection<Room>('rocketchat_federation_rooms'),
8386
});
8487

85-
container.register<Collection<WithId<StateStore>>>('StateCollection', {
86-
useValue: db.collection<WithId<StateStore>>('rocketchat_federation_states'),
87-
});
88-
8988
container.register<Collection<Server>>('ServerCollection', {
9089
useValue: db.collection<Server>('rocketchat_federation_servers'),
9190
});
@@ -94,14 +93,20 @@ export async function createFederationContainer(
9493
useValue: db.collection<Upload>('rocketchat_uploads'),
9594
});
9695

96+
container.register<Collection<StateGraphStore>>('StateGraphCollection', {
97+
useValue: db.collection<StateGraphStore>(
98+
'rocketchat_federation_state_graphs',
99+
),
100+
});
101+
97102
container.registerSingleton(EventRepository);
98103
container.registerSingleton(EventStagingRepository);
99104
container.registerSingleton(KeyRepository);
100105
container.registerSingleton(LockRepository);
101106
container.registerSingleton(RoomRepository);
102-
container.registerSingleton(StateRepository);
103107
container.registerSingleton(ServerRepository);
104108
container.registerSingleton(UploadRepository);
109+
container.registerSingleton(StateGraphRepository);
105110

106111
container.registerSingleton(FederationRequestService);
107112
container.registerSingleton(FederationService);

packages/federation-sdk/src/index.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { container } from 'tsyringe';
44
import { ConfigService } from './services/config.service';
55
import { EduService } from './services/edu.service';
66
import { EventAuthorizationService } from './services/event-authorization.service';
7+
import { EventEmitterService } from './services/event-emitter.service';
78
import { EventService } from './services/event.service';
89
import { FederationRequestService } from './services/federation-request.service';
910
import { InviteService } from './services/invite.service';
@@ -94,7 +95,6 @@ export { EventRepository } from './repositories/event.repository';
9495
export { RoomRepository } from './repositories/room.repository';
9596
export { ServerRepository } from './repositories/server.repository';
9697
export { KeyRepository } from './repositories/key.repository';
97-
export { StateRepository } from './repositories/state.repository';
9898

9999
export interface HomeserverServices {
100100
room: RoomService;
@@ -111,6 +111,7 @@ export interface HomeserverServices {
111111
media: MediaService;
112112
request: FederationRequestService;
113113
federationAuth: EventAuthorizationService;
114+
emitter: EventEmitterService;
114115
}
115116

116117
export type HomeserverEventSignatures = {
@@ -292,6 +293,7 @@ export function getAllServices(): HomeserverServices {
292293
media: container.resolve(MediaService),
293294
request: container.resolve(FederationRequestService),
294295
federationAuth: container.resolve(EventAuthorizationService),
296+
emitter: container.resolve(EventEmitterService),
295297
};
296298
}
297299

packages/federation-sdk/src/repositories/event.repository.ts

Lines changed: 121 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,16 @@ import {
55
Pdu,
66
PduForType,
77
PduType,
8+
RejectCode,
9+
StateID,
810
} from '@rocket.chat/federation-room';
9-
import type { Collection, FindCursor, WithId } from 'mongodb';
10-
import { MongoError } from 'mongodb';
11+
import type {
12+
Collection,
13+
FindCursor,
14+
FindOptions,
15+
UpdateResult,
16+
WithId,
17+
} from 'mongodb';
1118
import { inject, singleton } from 'tsyringe';
1219

1320
@singleton()
@@ -110,15 +117,6 @@ export class EventRepository {
110117
);
111118
}
112119

113-
async create(
114-
origin: string,
115-
event: Pdu,
116-
eventId: EventID,
117-
stateId = '',
118-
): Promise<string | undefined> {
119-
return this.persistEvent(origin, event, eventId, stateId);
120-
}
121-
122120
async redactEvent(eventId: EventID, redactedEvent: Pdu): Promise<void> {
123121
await this.collection.updateOne(
124122
{ _id: eventId },
@@ -168,7 +166,7 @@ export class EventRepository {
168166
{
169167
'event.room_id': roomId,
170168
'event.origin_server_ts': { $lt: timestamp }, // events before passed timestamp
171-
stateId: { $ne: '' },
169+
stateId: { $ne: '' as StateID },
172170
},
173171
{
174172
sort: {
@@ -192,58 +190,25 @@ export class EventRepository {
192190
});
193191
}
194192

195-
async updateStateId(eventId: EventID, stateId: string): Promise<void> {
193+
async updateStateId(eventId: EventID, stateId: StateID): Promise<void> {
196194
await this.collection.updateOne({ _id: eventId }, { $set: { stateId } });
197195
}
198196

199197
// finds events not yet referenced by other events
200198
// more on the respective adr
201-
async findPrevEvents(roomId: string) {
199+
async findLatestEvents(roomId: string) {
202200
return this.collection
203-
.find({
204-
nextEventId: '',
205-
'event.room_id': roomId,
206-
_id: { $ne: '' as EventID },
207-
})
201+
.find(
202+
{
203+
nextEventId: '',
204+
'event.room_id': roomId,
205+
rejectCode: { $exists: false },
206+
},
207+
{ sort: { 'event.depth': 1, createdAt: 1 } },
208+
)
208209
.toArray();
209210
}
210211

211-
private async persistEvent(
212-
origin: string,
213-
event: Pdu,
214-
eventId: EventID,
215-
stateId: string,
216-
) {
217-
try {
218-
await this.collection.insertOne({
219-
_id: eventId,
220-
origin,
221-
event: event,
222-
stateId: stateId,
223-
createdAt: new Date(),
224-
nextEventId: '', // new events are not expected to have forward edges
225-
});
226-
} catch (e) {
227-
if (e instanceof MongoError) {
228-
if (e.code === 11000) {
229-
// duplicate key error
230-
// this is expected, if the same intentional event is attempted to be persisted again
231-
return;
232-
}
233-
}
234-
235-
throw e;
236-
}
237-
238-
// this must happen later to as to avoid finding 0 prev_events on a parallel request
239-
await this.collection.updateMany(
240-
{ _id: { $in: event.prev_events } },
241-
{ $set: { nextEventId: eventId } },
242-
);
243-
244-
return eventId;
245-
}
246-
247212
findMembershipEventsFromDirectMessageRooms(
248213
users: string[],
249214
): FindCursor<EventStore<PduForType<'m.room.member'>>> {
@@ -266,9 +231,23 @@ export class EventRepository {
266231
findByIds<T extends PduType>(
267232
eventIds: EventID[],
268233
): FindCursor<WithId<EventStore<PduForType<T>>>> {
269-
return this.collection.find({
270-
_id: { $in: eventIds },
271-
}) as FindCursor<WithId<EventStore<PduForType<T>>>>;
234+
return this.collection.find(
235+
{
236+
_id: { $in: eventIds },
237+
},
238+
{ sort: { 'event.depth': -1, createdAt: -1 } },
239+
) as FindCursor<WithId<EventStore<PduForType<T>>>>;
240+
}
241+
242+
findByIdsOrderedDescending<T extends PduType>(
243+
eventIds: EventID[],
244+
): FindCursor<WithId<EventStore<PduForType<T>>>> {
245+
return this.collection.find(
246+
{
247+
_id: { $in: eventIds },
248+
},
249+
{ sort: { 'event.depth': -1, createdAt: -1 } },
250+
) as FindCursor<WithId<EventStore<PduForType<T>>>>;
272251
}
273252

274253
findByRoomIdAndTypes(
@@ -422,4 +401,88 @@ export class EventRepository {
422401
.sort({ 'event.depth': -1, 'event.origin_server_ts': -1 })
423402
.limit(limit);
424403
}
404+
405+
// new ones
406+
// -------------------
407+
408+
insertOrUpdateEventWithStateId(
409+
eventId: EventID,
410+
event: Pdu,
411+
stateId: StateID,
412+
): Promise<UpdateResult> {
413+
return this.collection.updateOne(
414+
{ _id: eventId },
415+
{
416+
$setOnInsert: {
417+
event,
418+
nextEventId: '',
419+
createdAt: new Date(),
420+
},
421+
$set: {
422+
stateId,
423+
},
424+
},
425+
{ upsert: true },
426+
);
427+
}
428+
429+
async updateNextEventReferences(
430+
newEventId: EventID,
431+
previousEventIds: EventID[],
432+
): Promise<UpdateResult> {
433+
return this.collection.updateMany(
434+
{ _id: { $in: previousEventIds }, nextEventId: '' as EventID },
435+
{ $set: { nextEventId: newEventId } },
436+
);
437+
}
438+
439+
async findStateIdByEventId(eventId: EventID): Promise<StateID | undefined> {
440+
const result = await this.collection.findOne<Pick<EventStore, 'stateId'>>(
441+
{ _id: eventId },
442+
{ projection: { stateId: 1 } },
443+
);
444+
445+
return result?.stateId;
446+
}
447+
448+
async rejectEvent(
449+
eventId: EventID,
450+
event: Pdu,
451+
stateId: StateID,
452+
code: RejectCode,
453+
reason: string,
454+
rejectedBy?: EventID,
455+
): Promise<UpdateResult> {
456+
return this.collection.updateOne(
457+
{
458+
_id: eventId,
459+
},
460+
{
461+
$setOnInsert: {
462+
event,
463+
stateId,
464+
nextEventId: '',
465+
},
466+
$set: {
467+
rejectCode: code,
468+
rejectDetail: {
469+
reason,
470+
rejectedBy,
471+
},
472+
},
473+
},
474+
{ upsert: true },
475+
);
476+
}
477+
478+
findStateIdsByEventIds(eventIds: EventID[]) {
479+
return this.collection.find<Pick<EventStore, 'stateId'>>(
480+
{ _id: { $in: eventIds }, stateId: { $ne: '' as StateID } },
481+
{ projection: { stateId: 1 } },
482+
);
483+
}
484+
485+
findByType(type: PduType) {
486+
return this.collection.find({ 'event.type': type });
487+
}
425488
}

0 commit comments

Comments
 (0)