Skip to content

Commit aec5b7e

Browse files
committed
proof of concept for thread list api implementation
1 parent 9eb7290 commit aec5b7e

File tree

4 files changed

+218
-14
lines changed

4 files changed

+218
-14
lines changed

src/client.ts

Lines changed: 123 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -566,6 +566,13 @@ interface IMessagesResponse {
566566
state: IStateEvent[];
567567
}
568568

569+
interface IThreadedMessagesResponse {
570+
prev_batch: string;
571+
next_batch: string;
572+
chunk: IRoomEvent[];
573+
state: IStateEvent[];
574+
}
575+
569576
export interface IRequestTokenResponse {
570577
sid: string;
571578
submit_url?: string;
@@ -1178,12 +1185,12 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
11781185
}
11791186

11801187
try {
1181-
const { serverSupport, stable } = await this.doesServerSupportThread();
1182-
Thread.setServerSideSupport(serverSupport, stable);
1188+
const { serverSupport, stable, listThreads } = await this.doesServerSupportThread();
1189+
Thread.setServerSideSupport(serverSupport, stable, listThreads);
11831190
} catch (e) {
11841191
// Most likely cause is that `doesServerSupportThread` returned `null` (as it
11851192
// is allowed to do) and thus we enter "degraded mode" on threads.
1186-
Thread.setServerSideSupport(false, true);
1193+
Thread.setServerSideSupport(false, true, false);
11871194
}
11881195

11891196
// shallow-copy the opts dict before modifying and storing it
@@ -5480,6 +5487,63 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
54805487
return this.http.authedRequest(undefined, Method.Get, path, params);
54815488
}
54825489

5490+
/**
5491+
* Makes a request to /messages with the appropriate lazy loading filter set.
5492+
* XXX: if we do get rid of scrollback (as it's not used at the moment),
5493+
* we could inline this method again in paginateEventTimeline as that would
5494+
* then be the only call-site
5495+
* @param {string} roomId
5496+
* @param {string} fromToken
5497+
* @param {number} limit the maximum amount of events the retrieve
5498+
* @param {string} dir 'f' or 'b'
5499+
* @param {Filter} timelineFilter the timeline filter to pass
5500+
* @return {Promise}
5501+
*/
5502+
// XXX: Intended private, used by room.fetchRoomThreads
5503+
public createThreadMessagesRequest(
5504+
roomId: string,
5505+
fromToken: string | null,
5506+
limit = 30,
5507+
dir: Direction,
5508+
timelineFilter?: Filter,
5509+
): Promise<IMessagesResponse> {
5510+
const path = utils.encodeUri("/rooms/$roomId/threads", { $roomId: roomId });
5511+
5512+
const params: Record<string, string> = {
5513+
limit: limit.toString(),
5514+
dir: dir,
5515+
include: 'all',
5516+
};
5517+
5518+
if (fromToken) {
5519+
params.from = fromToken;
5520+
}
5521+
5522+
let filter = null;
5523+
if (this.clientOpts.lazyLoadMembers) {
5524+
// create a shallow copy of LAZY_LOADING_MESSAGES_FILTER,
5525+
// so the timelineFilter doesn't get written into it below
5526+
filter = Object.assign({}, Filter.LAZY_LOADING_MESSAGES_FILTER);
5527+
}
5528+
if (timelineFilter) {
5529+
// XXX: it's horrific that /messages' filter parameter doesn't match
5530+
// /sync's one - see https://matrix.org/jira/browse/SPEC-451
5531+
filter = filter || {};
5532+
Object.assign(filter, timelineFilter.getRoomTimelineFilterComponent()?.toJSON());
5533+
}
5534+
if (filter) {
5535+
params.filter = JSON.stringify(filter);
5536+
}
5537+
5538+
return this.http.authedRequest<IThreadedMessagesResponse>(undefined, Method.Get, path, params, undefined, {
5539+
prefix: "/_matrix/client/unstable/org.matrix.msc3856",
5540+
}).then(res => ({
5541+
...res,
5542+
start: res.prev_batch,
5543+
end: res.next_batch,
5544+
}));
5545+
}
5546+
54835547
/**
54845548
* Take an EventTimeline, and back/forward-fill results.
54855549
*
@@ -5495,6 +5559,8 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
54955559
*/
54965560
public paginateEventTimeline(eventTimeline: EventTimeline, opts: IPaginateOpts): Promise<boolean> {
54975561
const isNotifTimeline = (eventTimeline.getTimelineSet() === this.notifTimelineSet);
5562+
const room = this.getRoom(eventTimeline.getRoomId());
5563+
const isThreadTimeline = eventTimeline.getTimelineSet().isThreadTimeline;
54985564

54995565
// TODO: we should implement a backoff (as per scrollback()) to deal more
55005566
// nicely with HTTP errors.
@@ -5565,8 +5631,43 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
55655631
eventTimeline.paginationRequests[dir] = null;
55665632
});
55675633
eventTimeline.paginationRequests[dir] = promise;
5634+
} else if (isThreadTimeline) {
5635+
if (!room) {
5636+
throw new Error("Unknown room " + eventTimeline.getRoomId());
5637+
}
5638+
5639+
promise = this.createThreadMessagesRequest(
5640+
eventTimeline.getRoomId(),
5641+
token,
5642+
opts.limit,
5643+
dir,
5644+
eventTimeline.getFilter(),
5645+
).then((res) => {
5646+
if (res.state) {
5647+
const roomState = eventTimeline.getState(dir);
5648+
const stateEvents = res.state.map(this.getEventMapper());
5649+
roomState.setUnknownStateEvents(stateEvents);
5650+
}
5651+
const token = res.end;
5652+
const matrixEvents = res.chunk.map(this.getEventMapper());
5653+
5654+
const timelineSet = eventTimeline.getTimelineSet();
5655+
timelineSet.addEventsToTimeline(matrixEvents, backwards, eventTimeline, token);
5656+
this.processBeaconEvents(timelineSet.room, matrixEvents);
5657+
this.processThreadRoots(timelineSet.room, matrixEvents, backwards);
5658+
5659+
// if we've hit the end of the timeline, we need to stop trying to
5660+
// paginate. We need to keep the 'forwards' token though, to make sure
5661+
// we can recover from gappy syncs.
5662+
if (backwards && res.end == res.start) {
5663+
eventTimeline.setPaginationToken(null, dir);
5664+
}
5665+
return res.end != res.start;
5666+
}).finally(() => {
5667+
eventTimeline.paginationRequests[dir] = null;
5668+
});
5669+
eventTimeline.paginationRequests[dir] = promise;
55685670
} else {
5569-
const room = this.getRoom(eventTimeline.getRoomId());
55705671
if (!room) {
55715672
throw new Error("Unknown room " + eventTimeline.getRoomId());
55725673
}
@@ -6649,6 +6750,10 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
66496750
* @return {Promise<boolean>} true if the feature is supported
66506751
*/
66516752
public async doesServerSupportUnstableFeature(feature: string): Promise<boolean> {
6753+
// FIXME: WORKAROUND FOR NOW
6754+
if (feature === "org.matrix.msc3856") {
6755+
return this.http.opts.baseUrl === "https://threads-dev.lab.element.dev";
6756+
}
66526757
const response = await this.getVersions();
66536758
if (!response) return false;
66546759
const unstableFeatures = response["unstable_features"];
@@ -6678,16 +6783,21 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
66786783
public async doesServerSupportThread(): Promise<{
66796784
serverSupport: boolean;
66806785
stable: boolean;
6786+
listThreads: boolean;
66816787
} | null> {
66826788
try {
6683-
const hasUnstableSupport = await this.doesServerSupportUnstableFeature("org.matrix.msc3440");
6684-
const hasStableSupport = await this.doesServerSupportUnstableFeature("org.matrix.msc3440.stable");
6789+
const [hasUnstableSupport, hasStableSupport, hasListThreadsSupport] = await Promise.all([
6790+
this.doesServerSupportUnstableFeature("org.matrix.msc3440"),
6791+
this.doesServerSupportUnstableFeature("org.matrix.msc3440.stable"),
6792+
this.doesServerSupportUnstableFeature("org.matrix.msc3856"),
6793+
]);
66856794

66866795
// TODO: Use `this.isVersionSupported("v1.3")` for whatever spec version includes MSC3440 formally.
66876796

66886797
return {
66896798
serverSupport: hasUnstableSupport || hasStableSupport,
66906799
stable: hasStableSupport,
6800+
listThreads: hasListThreadsSupport,
66916801
};
66926802
} catch (e) {
66936803
// Assume server support and stability aren't available: null/no data return.
@@ -9060,6 +9170,13 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
90609170
room.processThreadedEvents(threadedEvents, toStartOfTimeline);
90619171
}
90629172

9173+
/**
9174+
* @experimental
9175+
*/
9176+
public processThreadRoots(room: Room, threadedEvents: MatrixEvent[], toStartOfTimeline: boolean): void {
9177+
room.processThreadRoots(threadedEvents, toStartOfTimeline);
9178+
}
9179+
90639180
public processBeaconEvents(
90649181
room?: Room,
90659182
events?: MatrixEvent[],

src/models/event-timeline-set.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,12 +123,15 @@ export class EventTimelineSet extends TypedEventEmitter<EmittedEvents, EventTime
123123
* @param {MatrixClient=} client the Matrix client which owns this EventTimelineSet,
124124
* can be omitted if room is specified.
125125
* @param {Thread=} thread the thread to which this timeline set relates.
126+
* @param {boolean} isThreadTimeline Whether this timeline set relates to a thread list timeline
127+
* (e.g., All threads or My threads)
126128
*/
127129
constructor(
128130
public readonly room: Room | undefined,
129131
opts: IOpts = {},
130132
client?: MatrixClient,
131133
public readonly thread?: Thread,
134+
public readonly isThreadTimeline: boolean = false,
132135
) {
133136
super();
134137

src/models/room.ts

Lines changed: 85 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1583,9 +1583,37 @@ export class Room extends TypedEventEmitter<EmittedEvents, RoomEventHandlerMap>
15831583
return filter;
15841584
}
15851585

1586+
/**
1587+
* Add a timelineSet for this room with the given filter
1588+
* @param {ThreadFilterType?} filterType Thread list type (e.g., All threads or My threads)
1589+
* @param {Object=} opts Configuration options
1590+
* @return {EventTimelineSet} The timelineSet
1591+
*/
1592+
public getOrCreateThreadTimelineSet(
1593+
filterType?: ThreadFilterType,
1594+
{
1595+
pendingEvents = true,
1596+
}: ICreateFilterOpts = {},
1597+
): EventTimelineSet {
1598+
if (this.threadsTimelineSets[filterType]) {
1599+
return this.filteredTimelineSets[filterType];
1600+
}
1601+
const opts = Object.assign({ pendingEvents }, this.opts);
1602+
const timelineSet =
1603+
new EventTimelineSet(this, opts, undefined, undefined, Thread.hasServerSideSupportForThreadList);
1604+
this.reEmitter.reEmit(timelineSet, [
1605+
RoomEvent.Timeline,
1606+
RoomEvent.TimelineReset,
1607+
]);
1608+
1609+
return timelineSet;
1610+
}
1611+
15861612
private async createThreadTimelineSet(filterType?: ThreadFilterType): Promise<EventTimelineSet> {
15871613
let timelineSet: EventTimelineSet;
1588-
if (Thread.hasServerSideSupport) {
1614+
if (Thread.hasServerSideSupportForThreadList) {
1615+
timelineSet = this.getOrCreateThreadTimelineSet(filterType);
1616+
} else if (Thread.hasServerSideSupport) {
15891617
const filter = await this.getThreadListFilter(filterType);
15901618

15911619
timelineSet = this.getOrCreateFilteredTimelineSet(
@@ -1620,11 +1648,38 @@ export class Room extends TypedEventEmitter<EmittedEvents, RoomEventHandlerMap>
16201648

16211649
public threadsReady = false;
16221650

1651+
public processThreadRoots(events: MatrixEvent[], toStartOfTimeline: boolean): void {
1652+
for (const rootEvent of events) {
1653+
EventTimeline.setEventMetadata(
1654+
rootEvent,
1655+
this.currentState,
1656+
toStartOfTimeline,
1657+
);
1658+
if (!this.getThread(rootEvent.getId())) {
1659+
this.createThread(rootEvent.getId(), rootEvent, [], toStartOfTimeline);
1660+
}
1661+
}
1662+
}
1663+
16231664
public async fetchRoomThreads(): Promise<void> {
16241665
if (this.threadsReady || !this.client.supportsExperimentalThreads()) {
16251666
return;
16261667
}
16271668

1669+
if (Thread.hasServerSideSupportForThreadList) {
1670+
await Promise.all([
1671+
this.fetchRoomThreadList(ThreadFilterType.All),
1672+
this.fetchRoomThreadList(ThreadFilterType.My),
1673+
]);
1674+
} else {
1675+
await this.fetchOldThreadList();
1676+
}
1677+
1678+
this.on(ThreadEvent.NewReply, this.onThreadNewReply);
1679+
this.threadsReady = true;
1680+
}
1681+
1682+
private async fetchOldThreadList(): Promise<void> {
16281683
const allThreadsFilter = await this.getThreadListFilter();
16291684

16301685
const { chunk: events } = await this.client.createMessagesRequest(
@@ -1673,20 +1728,43 @@ export class Room extends TypedEventEmitter<EmittedEvents, RoomEventHandlerMap>
16731728
});
16741729
latestMyThreadsRootEvent = rootEvent;
16751730
}
1676-
1677-
if (!this.getThread(rootEvent.getId())) {
1678-
this.createThread(rootEvent.getId(), rootEvent, [], true);
1679-
}
16801731
}
16811732

1733+
this.processThreadRoots(threadRoots, true);
1734+
16821735
this.client.decryptEventIfNeeded(threadRoots[threadRoots.length -1]);
16831736
if (latestMyThreadsRootEvent) {
16841737
this.client.decryptEventIfNeeded(latestMyThreadsRootEvent);
16851738
}
1739+
}
16861740

1687-
this.threadsReady = true;
1741+
private async fetchRoomThreadList(filter?: ThreadFilterType): Promise<void> {
1742+
const timelineSet = filter === ThreadFilterType.My
1743+
? this.threadsTimelineSets[1]
1744+
: this.threadsTimelineSets[0];
16881745

1689-
this.on(ThreadEvent.NewReply, this.onThreadNewReply);
1746+
const { chunk: events, end } = await this.client.createThreadMessagesRequest(
1747+
this.roomId,
1748+
null,
1749+
undefined,
1750+
Direction.Backward,
1751+
timelineSet.getFilter(),
1752+
);
1753+
1754+
timelineSet.getLiveTimeline().setPaginationToken(end, Direction.Backward);
1755+
1756+
if (!events.length) return;
1757+
1758+
const matrixEvents = events.map(this.client.getEventMapper());
1759+
this.processThreadRoots(matrixEvents, true);
1760+
const roomState = this.getLiveTimeline().getState(EventTimeline.FORWARDS);
1761+
for (const rootEvent of matrixEvents) {
1762+
timelineSet.addLiveEvent(rootEvent, {
1763+
duplicateStrategy: DuplicateStrategy.Ignore,
1764+
fromCache: false,
1765+
roomState,
1766+
});
1767+
}
16901768
}
16911769

16921770
private onThreadNewReply(thread: Thread): void {

src/models/thread.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ interface IThreadOpts {
5656
*/
5757
export class Thread extends TypedEventEmitter<EmittedEvents, EventHandlerMap> {
5858
public static hasServerSideSupport: boolean;
59+
public static hasServerSideSupportForThreadList: boolean;
5960

6061
/**
6162
* A reference to all the events ID at the bottom of the threads
@@ -134,8 +135,13 @@ export class Thread extends TypedEventEmitter<EmittedEvents, EventHandlerMap> {
134135
this.emit(ThreadEvent.Update, this);
135136
}
136137

137-
public static setServerSideSupport(hasServerSideSupport: boolean, useStable: boolean): void {
138+
public static setServerSideSupport(
139+
hasServerSideSupport: boolean,
140+
useStable: boolean,
141+
hasListThreads: boolean,
142+
): void {
138143
Thread.hasServerSideSupport = hasServerSideSupport;
144+
Thread.hasServerSideSupportForThreadList = hasListThreads;
139145
if (!useStable) {
140146
FILTER_RELATED_BY_SENDERS.setPreferUnstable(true);
141147
FILTER_RELATED_BY_REL_TYPES.setPreferUnstable(true);

0 commit comments

Comments
 (0)