Skip to content

Commit ac5fee0

Browse files
authored
Fix race conditions around threads (matrix-org#2331)
1 parent 274d6a9 commit ac5fee0

File tree

7 files changed

+217
-194
lines changed

7 files changed

+217
-194
lines changed

spec/unit/matrix-client.spec.ts

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -811,9 +811,7 @@ describe("MatrixClient", function() {
811811
}
812812
},
813813
},
814-
threads: {
815-
get: jest.fn(),
816-
},
814+
getThread: jest.fn(),
817815
addPendingEvent: jest.fn(),
818816
updatePendingEvent: jest.fn(),
819817
reEmitter: {

spec/unit/room.spec.ts

Lines changed: 65 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ import { RoomState } from "../../src/models/room-state";
3636
import { UNSTABLE_ELEMENT_FUNCTIONAL_USERS } from "../../src/@types/event";
3737
import { TestClient } from "../TestClient";
3838
import { emitPromise } from "../test-utils/test-utils";
39-
import { ThreadEvent } from "../../src/models/thread";
39+
import { Thread, ThreadEvent } from "../../src/models/thread";
4040

4141
describe("Room", function() {
4242
const roomId = "!foo:bar";
@@ -1914,7 +1914,7 @@ describe("Room", function() {
19141914
},
19151915
});
19161916

1917-
room.createThread(undefined, [eventWithoutARootEvent]);
1917+
room.createThread("$000", undefined, [eventWithoutARootEvent]);
19181918

19191919
const rootEvent = new MatrixEvent({
19201920
event_id: "$666",
@@ -1932,7 +1932,7 @@ describe("Room", function() {
19321932
},
19331933
});
19341934

1935-
expect(() => room.createThread(rootEvent, [])).not.toThrow();
1935+
expect(() => room.createThread(rootEvent.getId(), rootEvent, [])).not.toThrow();
19361936
});
19371937

19381938
it("Edits update the lastReply event", async () => {
@@ -1959,14 +1959,16 @@ describe("Room", function() {
19591959
},
19601960
});
19611961

1962+
let prom = emitPromise(room, ThreadEvent.New);
19621963
room.addLiveEvents([randomMessage, threadRoot, threadResponse]);
1963-
const thread = await emitPromise(room, ThreadEvent.New);
1964+
const thread = await prom;
19641965

19651966
expect(thread.replyToEvent).toBe(threadResponse);
19661967
expect(thread.replyToEvent.getContent().body).toBe(threadResponse.getContent().body);
19671968

1969+
prom = emitPromise(thread, ThreadEvent.Update);
19681970
room.addLiveEvents([threadResponseEdit]);
1969-
await emitPromise(thread, ThreadEvent.Update);
1971+
await prom;
19701972
expect(thread.replyToEvent.getContent().body).toBe(threadResponseEdit.getContent()["m.new_content"].body);
19711973
});
19721974

@@ -1993,15 +1995,17 @@ describe("Room", function() {
19931995
},
19941996
});
19951997

1998+
let prom = emitPromise(room, ThreadEvent.New);
19961999
room.addLiveEvents([threadRoot, threadResponse1, threadResponse2]);
1997-
const thread = await emitPromise(room, ThreadEvent.New);
2000+
const thread = await prom;
19982001

19992002
expect(thread).toHaveLength(2);
20002003
expect(thread.replyToEvent.getId()).toBe(threadResponse2.getId());
20012004

2005+
prom = emitPromise(thread, ThreadEvent.Update);
20022006
const threadResponse1Redaction = mkRedaction(threadResponse1);
20032007
room.addLiveEvents([threadResponse1Redaction]);
2004-
await emitPromise(thread, ThreadEvent.Update);
2008+
await prom;
20052009
expect(thread).toHaveLength(1);
20062010
expect(thread.replyToEvent.getId()).toBe(threadResponse2.getId());
20072011
});
@@ -2030,15 +2034,17 @@ describe("Room", function() {
20302034
},
20312035
});
20322036

2037+
let prom = emitPromise(room, ThreadEvent.New);
20332038
room.addLiveEvents([threadRoot, threadResponse1, threadResponse2, threadResponse2Reaction]);
2034-
const thread = await emitPromise(room, ThreadEvent.New);
2039+
const thread = await prom;
20352040

20362041
expect(thread).toHaveLength(2);
20372042
expect(thread.replyToEvent.getId()).toBe(threadResponse2.getId());
20382043

2044+
prom = emitPromise(thread, ThreadEvent.Update);
20392045
const threadResponse2ReactionRedaction = mkRedaction(threadResponse2Reaction);
20402046
room.addLiveEvents([threadResponse2ReactionRedaction]);
2041-
await emitPromise(thread, ThreadEvent.Update);
2047+
await prom;
20422048
expect(thread).toHaveLength(2);
20432049
expect(thread.replyToEvent.getId()).toBe(threadResponse2.getId());
20442050
});
@@ -2067,15 +2073,17 @@ describe("Room", function() {
20672073
},
20682074
});
20692075

2076+
let prom = emitPromise(room, ThreadEvent.New);
20702077
room.addLiveEvents([threadRoot, threadResponse1, threadResponse2, threadResponse2Reaction]);
2071-
const thread = await emitPromise(room, ThreadEvent.New);
2078+
const thread = await prom;
20722079

20732080
expect(thread).toHaveLength(2);
20742081
expect(thread.replyToEvent.getId()).toBe(threadResponse2.getId());
20752082

2083+
prom = emitPromise(room, ThreadEvent.Update);
20762084
const threadRootRedaction = mkRedaction(threadRoot);
20772085
room.addLiveEvents([threadRootRedaction]);
2078-
await emitPromise(thread, ThreadEvent.Update);
2086+
await prom;
20792087
expect(thread).toHaveLength(2);
20802088
});
20812089

@@ -2102,21 +2110,24 @@ describe("Room", function() {
21022110
},
21032111
});
21042112

2113+
let prom = emitPromise(room, ThreadEvent.New);
21052114
room.addLiveEvents([threadRoot, threadResponse1, threadResponse2]);
2106-
const thread = await emitPromise(room, ThreadEvent.New);
2115+
const thread = await prom;
21072116

21082117
expect(thread).toHaveLength(2);
21092118
expect(thread.replyToEvent.getId()).toBe(threadResponse2.getId());
21102119

2120+
prom = emitPromise(room, ThreadEvent.Update);
21112121
const threadResponse2Redaction = mkRedaction(threadResponse2);
21122122
room.addLiveEvents([threadResponse2Redaction]);
2113-
await emitPromise(thread, ThreadEvent.Update);
2123+
await prom;
21142124
expect(thread).toHaveLength(1);
21152125
expect(thread.replyToEvent.getId()).toBe(threadResponse1.getId());
21162126

2127+
prom = emitPromise(room, ThreadEvent.Update);
21172128
const threadResponse1Redaction = mkRedaction(threadResponse1);
21182129
room.addLiveEvents([threadResponse1Redaction]);
2119-
await emitPromise(thread, ThreadEvent.Update);
2130+
await prom;
21202131
expect(thread).toHaveLength(0);
21212132
expect(thread.replyToEvent.getId()).toBe(threadRoot.getId());
21222133
});
@@ -2234,5 +2245,45 @@ describe("Room", function() {
22342245
expect(room.eventShouldLiveIn(reply2, events, roots).shouldLiveInRoom).toBeTruthy();
22352246
expect(room.eventShouldLiveIn(reply2, events, roots).shouldLiveInThread).toBeFalsy();
22362247
});
2248+
2249+
it("should aggregate relations in thread event timeline set", () => {
2250+
Thread.setServerSideSupport(true, true);
2251+
const threadRoot = mkMessage();
2252+
const rootReaction = mkReaction(threadRoot);
2253+
const threadResponse = mkThreadResponse(threadRoot);
2254+
const threadReaction = mkReaction(threadResponse);
2255+
2256+
const events = [
2257+
threadRoot,
2258+
rootReaction,
2259+
threadResponse,
2260+
threadReaction,
2261+
];
2262+
2263+
room.addLiveEvents(events);
2264+
2265+
const thread = threadRoot.getThread();
2266+
expect(thread.rootEvent).toBe(threadRoot);
2267+
2268+
const rootRelations = thread.timelineSet.getRelationsForEvent(
2269+
threadRoot.getId(),
2270+
RelationType.Annotation,
2271+
EventType.Reaction,
2272+
).getSortedAnnotationsByKey();
2273+
expect(rootRelations).toHaveLength(1);
2274+
expect(rootRelations[0][0]).toEqual(rootReaction.getRelation().key);
2275+
expect(rootRelations[0][1].size).toEqual(1);
2276+
expect(rootRelations[0][1].has(rootReaction)).toBeTruthy();
2277+
2278+
const responseRelations = thread.timelineSet.getRelationsForEvent(
2279+
threadResponse.getId(),
2280+
RelationType.Annotation,
2281+
EventType.Reaction,
2282+
).getSortedAnnotationsByKey();
2283+
expect(responseRelations).toHaveLength(1);
2284+
expect(responseRelations[0][0]).toEqual(threadReaction.getRelation().key);
2285+
expect(responseRelations[0][1].size).toEqual(1);
2286+
expect(responseRelations[0][1].has(threadReaction)).toBeTruthy();
2287+
});
22372288
});
22382289
});

src/client.ts

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,9 @@ import { IRoomEncryption, RoomList } from './crypto/RoomList';
4848
import { logger } from './logger';
4949
import { SERVICE_TYPES } from './service-types';
5050
import {
51-
FileType, HttpApiEvent, HttpApiEventHandlerMap,
51+
FileType,
52+
HttpApiEvent,
53+
HttpApiEventHandlerMap,
5254
IHttpOpts,
5355
IUpload,
5456
MatrixError,
@@ -3741,7 +3743,7 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
37413743
"rel_type": THREAD_RELATION_TYPE.name,
37423744
"event_id": threadId,
37433745
};
3744-
const thread = this.getRoom(roomId)?.threads.get(threadId);
3746+
const thread = this.getRoom(roomId)?.getThread(threadId);
37453747
if (thread) {
37463748
content["m.relates_to"]["m.in_reply_to"] = {
37473749
"event_id": thread.lastReply((ev: MatrixEvent) => {
@@ -3790,7 +3792,7 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
37903792
}));
37913793

37923794
const room = this.getRoom(roomId);
3793-
const thread = room?.threads.get(threadId);
3795+
const thread = room?.getThread(threadId);
37943796
if (thread) {
37953797
localEvent.setThread(thread);
37963798
}
@@ -5185,7 +5187,7 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
51855187
limit,
51865188
Direction.Backward,
51875189
);
5188-
}).then(async (res: IMessagesResponse) => {
5190+
}).then((res: IMessagesResponse) => {
51895191
const matrixEvents = res.chunk.map(this.getEventMapper());
51905192
if (res.state) {
51915193
const stateEvents = res.state.map(this.getEventMapper());
@@ -5196,7 +5198,7 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
51965198

51975199
this.processBeaconEvents(room, timelineEvents);
51985200
room.addEventsToTimeline(timelineEvents, true, room.getLiveTimeline());
5199-
await this.processThreadEvents(room, threadedEvents, true);
5201+
this.processThreadEvents(room, threadedEvents, true);
52005202

52015203
room.oldState.paginationToken = res.end;
52025204
if (res.chunk.length === 0) {
@@ -5299,25 +5301,27 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
52995301
event.isRelation(THREAD_RELATION_TYPE.name)
53005302
) {
53015303
const [, threadedEvents] = timelineSet.room.partitionThreadedEvents(events);
5302-
const thread = await timelineSet.room.createThreadFetchRoot(event.threadRootId, threadedEvents, true);
5303-
5304-
let nextBatch: string;
5305-
const response = await thread.fetchInitialEvents();
5306-
if (response?.nextBatch) {
5307-
nextBatch = response.nextBatch;
5304+
let thread = timelineSet.room.getThread(event.threadRootId);
5305+
if (!thread) {
5306+
thread = timelineSet.room.createThread(event.threadRootId, undefined, threadedEvents, true);
53085307
}
53095308

53105309
const opts: IRelationsRequestOpts = {
5310+
direction: Direction.Backward,
53115311
limit: 50,
53125312
};
53135313

5314-
// Fetch events until we find the one we were asked for
5314+
await thread.fetchInitialEvents();
5315+
let nextBatch = thread.liveTimeline.getPaginationToken(Direction.Backward);
5316+
5317+
// Fetch events until we find the one we were asked for, or we run out of pages
53155318
while (!thread.findEventById(eventId)) {
53165319
if (nextBatch) {
53175320
opts.from = nextBatch;
53185321
}
53195322

53205323
({ nextBatch } = await thread.fetchEvents(opts));
5324+
if (!nextBatch) break;
53215325
}
53225326

53235327
return thread.liveTimeline;
@@ -5336,7 +5340,7 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
53365340
const [timelineEvents, threadedEvents] = timelineSet.room.partitionThreadedEvents(events);
53375341
timelineSet.addEventsToTimeline(timelineEvents, true, timeline, res.start);
53385342
// The target event is not in a thread but process the contextual events, so we can show any threads around it.
5339-
await this.processThreadEvents(timelineSet.room, threadedEvents, true);
5343+
this.processThreadEvents(timelineSet.room, threadedEvents, true);
53405344
this.processBeaconEvents(timelineSet.room, timelineEvents);
53415345

53425346
// There is no guarantee that the event ended up in "timeline" (we might have switched to a neighbouring
@@ -5493,7 +5497,7 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
54935497
opts.limit,
54945498
dir,
54955499
eventTimeline.getFilter(),
5496-
).then(async (res) => {
5500+
).then((res) => {
54975501
if (res.state) {
54985502
const roomState = eventTimeline.getState(dir);
54995503
const stateEvents = res.state.map(this.getEventMapper());
@@ -5506,7 +5510,7 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
55065510
const [timelineEvents, threadedEvents] = timelineSet.room.partitionThreadedEvents(matrixEvents);
55075511
timelineSet.addEventsToTimeline(timelineEvents, backwards, eventTimeline, token);
55085512
this.processBeaconEvents(timelineSet.room, timelineEvents);
5509-
await this.processThreadEvents(room, threadedEvents, backwards);
5513+
this.processThreadEvents(room, threadedEvents, backwards);
55105514

55115515
// if we've hit the end of the timeline, we need to stop trying to
55125516
// paginate. We need to keep the 'forwards' token though, to make sure
@@ -6663,7 +6667,7 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
66636667
eventId: string,
66646668
relationType?: RelationType | string | null,
66656669
eventType?: EventType | string | null,
6666-
opts: IRelationsRequestOpts = {},
6670+
opts: IRelationsRequestOpts = { direction: Direction.Backward },
66676671
): Promise<{
66686672
originalEvent: MatrixEvent;
66696673
events: MatrixEvent[];
@@ -7204,7 +7208,7 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
72047208
eventId: string,
72057209
relationType?: RelationType | string | null,
72067210
eventType?: EventType | string | null,
7207-
opts: IRelationsRequestOpts = {},
7211+
opts: IRelationsRequestOpts = { direction: Direction.Backward },
72087212
): Promise<IRelationsResponse> {
72097213
const queryString = utils.encodeParams(opts as Record<string, string | number>);
72107214

@@ -8916,12 +8920,8 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
89168920
/**
89178921
* @experimental
89188922
*/
8919-
public async processThreadEvents(
8920-
room: Room,
8921-
threadedEvents: MatrixEvent[],
8922-
toStartOfTimeline: boolean,
8923-
): Promise<void> {
8924-
await room.processThreadedEvents(threadedEvents, toStartOfTimeline);
8923+
public processThreadEvents(room: Room, threadedEvents: MatrixEvent[], toStartOfTimeline: boolean): void {
8924+
room.processThreadedEvents(threadedEvents, toStartOfTimeline);
89258925
}
89268926

89278927
public processBeaconEvents(

src/models/event-timeline-set.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -852,14 +852,13 @@ export class EventTimelineSet extends TypedEventEmitter<EmittedEvents, EventTime
852852
}
853853
let relationsWithEventType = relationsWithRelType[eventType];
854854

855-
let relatesToEvent: MatrixEvent;
856855
if (!relationsWithEventType) {
857856
relationsWithEventType = relationsWithRelType[eventType] = new Relations(
858857
relationType,
859858
eventType,
860859
this.room,
861860
);
862-
relatesToEvent = this.findEventById(relatesToEventId) || this.room.getPendingEvent(relatesToEventId);
861+
const relatesToEvent = this.findEventById(relatesToEventId) || this.room.getPendingEvent(relatesToEventId);
863862
if (relatesToEvent) {
864863
relationsWithEventType.setTargetEvent(relatesToEvent);
865864
}

src/models/event.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1303,8 +1303,7 @@ export class MatrixEvent extends TypedEventEmitter<EmittedEvents, MatrixEventHan
13031303
public isRelation(relType: string = undefined): boolean {
13041304
// Relation info is lifted out of the encrypted content when sent to
13051305
// encrypted rooms, so we have to check `getWireContent` for this.
1306-
const content = this.getWireContent();
1307-
const relation = content && content["m.relates_to"];
1306+
const relation = this.getWireContent()?.["m.relates_to"];
13081307
return relation && relation.rel_type && relation.event_id &&
13091308
((relType && relation.rel_type === relType) || !relType);
13101309
}

0 commit comments

Comments
 (0)