diff --git a/spec/integ/matrix-client-event-timeline.spec.ts b/spec/integ/matrix-client-event-timeline.spec.ts index c656f6447e2..8640e82be6c 100644 --- a/spec/integ/matrix-client-event-timeline.spec.ts +++ b/spec/integ/matrix-client-event-timeline.spec.ts @@ -15,10 +15,21 @@ limitations under the License. */ import * as utils from "../test-utils/test-utils"; -import { ClientEvent, EventTimeline, Filter, IEvent, MatrixClient, MatrixEvent, Room } from "../../src/matrix"; +import { + ClientEvent, + Direction, + EventTimeline, + EventTimelineSet, + Filter, + IEvent, + MatrixClient, + MatrixEvent, + Room, +} from "../../src/matrix"; import { logger } from "../../src/logger"; +import { encodeUri } from "../../src/utils"; import { TestClient } from "../TestClient"; -import { Thread, THREAD_RELATION_TYPE } from "../../src/models/thread"; +import { FeatureSupport, Thread, THREAD_RELATION_TYPE } from "../../src/models/thread"; const userId = "@alice:localhost"; const userName = "Alice"; @@ -145,8 +156,11 @@ SYNC_THREAD_ROOT.unsigned = { }, }; +type HttpBackend = TestClient["httpBackend"]; +type ExpectedHttpRequest = ReturnType; + // start the client, and wait for it to initialise -function startClient(httpBackend: TestClient["httpBackend"], client: MatrixClient) { +function startClient(httpBackend: HttpBackend, client: MatrixClient) { httpBackend.when("GET", "/versions").respond(200, {}); httpBackend.when("GET", "/pushrules").respond(200, {}); httpBackend.when("POST", "/filter").respond(200, { filter_id: "fid" }); @@ -172,7 +186,7 @@ function startClient(httpBackend: TestClient["httpBackend"], client: MatrixClien } describe("getEventTimeline support", function() { - let httpBackend: TestClient["httpBackend"]; + let httpBackend: HttpBackend; let client: MatrixClient; beforeEach(function() { @@ -189,6 +203,16 @@ describe("getEventTimeline support", function() { }); it("timeline support must be enabled to work", function() { + const testClient = new TestClient( + userId, + "DEVICE", + accessToken, + undefined, + { timelineSupport: false }, + ); + client = testClient.client; + httpBackend = testClient.httpBackend; + return startClient(httpBackend, client).then(function() { const room = client.getRoom(roomId); const timelineSet = room.getTimelineSets()[0]; @@ -214,6 +238,23 @@ describe("getEventTimeline support", function() { }); }); + it("only works with room timelines", function() { + const testClient = new TestClient( + userId, + "DEVICE", + accessToken, + undefined, + { timelineSupport: true }, + ); + client = testClient.client; + httpBackend = testClient.httpBackend; + + return startClient(httpBackend, client).then(function() { + const timelineSet = new EventTimelineSet(undefined); + expect(client.getEventTimeline(timelineSet, "event")).rejects.toBeTruthy(); + }); + }); + it("scrollback should be able to scroll back to before a gappy /sync", function() { // need a client with timelineSupport disabled to make this work let room: Room; @@ -280,7 +321,7 @@ describe("getEventTimeline support", function() { describe("MatrixClient event timelines", function() { let client: MatrixClient; - let httpBackend: TestClient["httpBackend"]; + let httpBackend: HttpBackend; beforeEach(function() { const testClient = new TestClient( @@ -299,7 +340,7 @@ describe("MatrixClient event timelines", function() { afterEach(function() { httpBackend.verifyNoOutstandingExpectation(); client.stopClient(); - Thread.setServerSideSupport(false, false); + Thread.setServerSideSupport(FeatureSupport.None); }); describe("getEventTimeline", function() { @@ -552,7 +593,7 @@ describe("MatrixClient event timelines", function() { it("should handle thread replies with server support by fetching a contiguous thread timeline", async () => { // @ts-ignore client.clientOpts.experimentalThreadSupport = true; - Thread.setServerSideSupport(true, false); + Thread.setServerSideSupport(FeatureSupport.Experimental); client.stopClient(); // we don't need the client to be syncing at this time const room = client.getRoom(roomId); const thread = room.createThread(THREAD_ROOT.event_id, undefined, [], false); @@ -598,7 +639,7 @@ describe("MatrixClient event timelines", function() { it("should return relevant timeline from non-thread timelineSet when asking for the thread root", async () => { // @ts-ignore client.clientOpts.experimentalThreadSupport = true; - Thread.setServerSideSupport(true, false); + Thread.setServerSideSupport(FeatureSupport.Experimental); client.stopClient(); // we don't need the client to be syncing at this time const room = client.getRoom(roomId); const threadRoot = new MatrixEvent(THREAD_ROOT); @@ -630,7 +671,7 @@ describe("MatrixClient event timelines", function() { it("should return undefined when event is not in the thread that the given timelineSet is representing", () => { // @ts-ignore client.clientOpts.experimentalThreadSupport = true; - Thread.setServerSideSupport(true, false); + Thread.setServerSideSupport(FeatureSupport.Experimental); client.stopClient(); // we don't need the client to be syncing at this time const room = client.getRoom(roomId); const threadRoot = new MatrixEvent(THREAD_ROOT); @@ -658,7 +699,7 @@ describe("MatrixClient event timelines", function() { it("should return undefined when event is within a thread but timelineSet is not", () => { // @ts-ignore client.clientOpts.experimentalThreadSupport = true; - Thread.setServerSideSupport(true, false); + Thread.setServerSideSupport(FeatureSupport.Experimental); client.stopClient(); // we don't need the client to be syncing at this time const room = client.getRoom(roomId); const timelineSet = room.getTimelineSets()[0]; @@ -711,6 +752,63 @@ describe("MatrixClient event timelines", function() { }); describe("getLatestTimeline", function() { + it("timeline support must be enabled to work", async function() { + await client.stopClient(); + + const testClient = new TestClient( + userId, + "DEVICE", + accessToken, + undefined, + { timelineSupport: false }, + ); + client = testClient.client; + httpBackend = testClient.httpBackend; + await startClient(httpBackend, client); + + const room = client.getRoom(roomId); + const timelineSet = room.getTimelineSets()[0]; + await expect(client.getLatestTimeline(timelineSet)).rejects.toBeTruthy(); + }); + + it("timeline support works when enabled", async function() { + await client.stopClient(); + + const testClient = new TestClient( + userId, + "DEVICE", + accessToken, + undefined, + { timelineSupport: true }, + ); + client = testClient.client; + httpBackend = testClient.httpBackend; + + return startClient(httpBackend, client).then(() => { + const room = client.getRoom(roomId); + const timelineSet = room.getTimelineSets()[0]; + expect(client.getLatestTimeline(timelineSet)).rejects.toBeFalsy(); + }); + }); + + it("only works with room timelines", async function() { + await client.stopClient(); + + const testClient = new TestClient( + userId, + "DEVICE", + accessToken, + undefined, + { timelineSupport: true }, + ); + client = testClient.client; + httpBackend = testClient.httpBackend; + await startClient(httpBackend, client); + + const timelineSet = new EventTimelineSet(undefined); + await expect(client.getLatestTimeline(timelineSet)).rejects.toBeTruthy(); + }); + it("should create a new timeline for new events", function() { const room = client.getRoom(roomId); const timelineSet = room.getTimelineSets()[0]; @@ -925,6 +1023,236 @@ describe("MatrixClient event timelines", function() { }); }); + describe("paginateEventTimeline for thread list timeline", function() { + async function flushHttp(promise: Promise): Promise { + return Promise.all([promise, httpBackend.flushAllExpected()]).then(([result]) => result); + } + + const RANDOM_TOKEN = "7280349c7bee430f91defe2a38a0a08c"; + + function respondToFilter(): ExpectedHttpRequest { + const request = httpBackend.when("POST", "/filter"); + request.respond(200, { filter_id: "fid" }); + return request; + } + + function respondToSync(): ExpectedHttpRequest { + const request = httpBackend.when("GET", "/sync"); + request.respond(200, INITIAL_SYNC_DATA); + return request; + } + + function respondToThreads( + response = { + chunk: [THREAD_ROOT], + state: [], + next_batch: RANDOM_TOKEN, + }, + ): ExpectedHttpRequest { + const request = httpBackend.when("GET", encodeUri("/_matrix/client/r0/rooms/$roomId/threads", { + $roomId: roomId, + })); + request.respond(200, response); + return request; + } + + function respondToContext(): ExpectedHttpRequest { + const request = httpBackend.when("GET", encodeUri("/_matrix/client/r0/rooms/$roomId/context/$eventId", { + $roomId: roomId, + $eventId: THREAD_ROOT.event_id!, + })); + request.respond(200, { + end: `${Direction.Forward}${RANDOM_TOKEN}1`, + start: `${Direction.Backward}${RANDOM_TOKEN}1`, + state: [], + events_before: [], + events_after: [], + event: THREAD_ROOT, + }); + return request; + } + function respondToMessagesRequest(): ExpectedHttpRequest { + const request = httpBackend.when("GET", encodeUri("/_matrix/client/r0/rooms/$roomId/messages", { + $roomId: roomId, + })); + request.respond(200, { + chunk: [THREAD_ROOT], + state: [], + start: `${Direction.Forward}${RANDOM_TOKEN}2`, + end: `${Direction.Backward}${RANDOM_TOKEN}2`, + }); + return request; + } + + describe("with server compatibility", function() { + beforeEach(() => { + // @ts-ignore + client.clientOpts.experimentalThreadSupport = true; + Thread.setServerSideSupport(FeatureSupport.Experimental); + Thread.setServerSideListSupport(FeatureSupport.Stable); + }); + + async function testPagination(timelineSet: EventTimelineSet, direction: Direction) { + respondToContext(); + await flushHttp(client.getEventTimeline(timelineSet, THREAD_ROOT.event_id!)); + respondToThreads(); + const timeline = await flushHttp(client.getLatestTimeline(timelineSet)); + expect(timeline).not.toBeNull(); + + respondToThreads(); + const success = await flushHttp(client.paginateEventTimeline(timeline!, { + backwards: direction === Direction.Backward, + })); + expect(success).toBeTruthy(); + expect(timeline!.getEvents().map(it => it.event)).toEqual([THREAD_ROOT]); + expect(timeline!.getPaginationToken(direction)).toEqual(RANDOM_TOKEN); + } + + it("should allow you to paginate all threads backwards", async function() { + const room = client.getRoom(roomId); + const timelineSets = await (room?.createThreadsTimelineSets()); + expect(timelineSets).not.toBeNull(); + const [allThreads, myThreads] = timelineSets!; + await testPagination(allThreads, Direction.Backward); + await testPagination(myThreads, Direction.Backward); + }); + + it("should allow you to paginate all threads forwards", async function() { + const room = client.getRoom(roomId); + const timelineSets = await (room?.createThreadsTimelineSets()); + expect(timelineSets).not.toBeNull(); + const [allThreads, myThreads] = timelineSets!; + + await testPagination(allThreads, Direction.Forward); + await testPagination(myThreads, Direction.Forward); + }); + + it("should allow fetching all threads", async function() { + const room = client.getRoom(roomId); + const timelineSets = await room?.createThreadsTimelineSets(); + expect(timelineSets).not.toBeNull(); + respondToThreads(); + respondToThreads(); + httpBackend.when("GET", "/sync").respond(200, INITIAL_SYNC_DATA); + await flushHttp(room.fetchRoomThreads()); + }); + }); + + describe("without server compatibility", function() { + beforeEach(() => { + // @ts-ignore + client.clientOpts.experimentalThreadSupport = true; + Thread.setServerSideSupport(FeatureSupport.Experimental); + Thread.setServerSideListSupport(FeatureSupport.None); + }); + + async function testPagination(timelineSet: EventTimelineSet, direction: Direction) { + respondToContext(); + respondToSync(); + await flushHttp(client.getEventTimeline(timelineSet, THREAD_ROOT.event_id!)); + + respondToMessagesRequest(); + const timeline = await flushHttp(client.getLatestTimeline(timelineSet)); + expect(timeline).not.toBeNull(); + + respondToMessagesRequest(); + const success = await flushHttp(client.paginateEventTimeline(timeline!, { + backwards: direction === Direction.Backward, + })); + + expect(success).toBeTruthy(); + expect(timeline!.getEvents().map(it => it.event)).toEqual([THREAD_ROOT]); + expect(timeline!.getPaginationToken(direction)).toEqual(`${direction}${RANDOM_TOKEN}2`); + } + + it("should allow you to paginate all threads", async function() { + const room = client.getRoom(roomId); + + respondToFilter(); + respondToSync(); + respondToFilter(); + respondToSync(); + + const timelineSetsPromise = room?.createThreadsTimelineSets(); + expect(timelineSetsPromise).not.toBeNull(); + const timelineSets = await flushHttp(timelineSetsPromise!); + expect(timelineSets).not.toBeNull(); + const [allThreads, myThreads] = timelineSets!; + + await testPagination(allThreads, Direction.Backward); + await testPagination(myThreads, Direction.Backward); + }); + + it("should allow fetching all threads", async function() { + const room = client.getRoom(roomId); + + respondToFilter(); + respondToSync(); + respondToFilter(); + respondToSync(); + + const timelineSetsPromise = room?.createThreadsTimelineSets(); + expect(timelineSetsPromise).not.toBeNull(); + await flushHttp(timelineSetsPromise!); + respondToFilter(); + respondToSync(); + respondToSync(); + respondToSync(); + respondToMessagesRequest(); + await flushHttp(room.fetchRoomThreads()); + }); + }); + + it("should add lazy loading filter", async () => { + // @ts-ignore + client.clientOpts.experimentalThreadSupport = true; + Thread.setServerSideSupport(FeatureSupport.Experimental); + Thread.setServerSideListSupport(FeatureSupport.Stable); + // @ts-ignore + client.clientOpts.lazyLoadMembers = true; + + const room = client.getRoom(roomId); + const timelineSets = await room?.createThreadsTimelineSets(); + expect(timelineSets).not.toBeNull(); + const [allThreads] = timelineSets!; + + respondToThreads().check((request) => { + expect(request.queryParams.filter).toEqual(JSON.stringify({ + "lazy_load_members": true, + })); + }); + + await flushHttp(client.paginateEventTimeline(allThreads.getLiveTimeline(), { + backwards: true, + })); + }); + + it("should correctly pass pagination token", async () => { + // @ts-ignore + client.clientOpts.experimentalThreadSupport = true; + Thread.setServerSideSupport(FeatureSupport.Experimental); + Thread.setServerSideListSupport(FeatureSupport.Stable); + + const room = client.getRoom(roomId); + const timelineSets = await room?.createThreadsTimelineSets(); + expect(timelineSets).not.toBeNull(); + const [allThreads] = timelineSets!; + + respondToThreads({ + chunk: [THREAD_ROOT], + state: [], + next_batch: null, + }).check((request) => { + expect(request.queryParams.from).toEqual(RANDOM_TOKEN); + }); + + allThreads.getLiveTimeline().setPaginationToken(RANDOM_TOKEN, Direction.Backward); + await flushHttp(client.paginateEventTimeline(allThreads.getLiveTimeline(), { + backwards: true, + })); + }); + }); + describe("event timeline for sent events", function() { const TXN_ID = "txn1"; const event = utils.mkMessage({ @@ -1092,7 +1420,7 @@ describe("MatrixClient event timelines", function() { it("should re-insert room IDs for bundled thread relation events", async () => { // @ts-ignore client.clientOpts.experimentalThreadSupport = true; - Thread.setServerSideSupport(true, false); + Thread.setServerSideSupport(FeatureSupport.Experimental); httpBackend.when("GET", "/sync").respond(200, { next_batch: "s_5_4", diff --git a/spec/unit/room.spec.ts b/spec/unit/room.spec.ts index d6891f369e3..de9f0c5f9d1 100644 --- a/spec/unit/room.spec.ts +++ b/spec/unit/room.spec.ts @@ -38,7 +38,7 @@ import { UNSTABLE_ELEMENT_FUNCTIONAL_USERS } from "../../src/@types/event"; import { TestClient } from "../TestClient"; import { emitPromise } from "../test-utils/test-utils"; import { ReceiptType } from "../../src/@types/read_receipts"; -import { Thread, ThreadEvent } from "../../src/models/thread"; +import { FeatureSupport, Thread, ThreadEvent } from "../../src/models/thread"; import { WrappedReceipt } from "../../src/models/read-receipt"; describe("Room", function() { @@ -2408,7 +2408,7 @@ describe("Room", function() { }); it("should aggregate relations in thread event timeline set", () => { - Thread.setServerSideSupport(true, true); + Thread.setServerSideSupport(FeatureSupport.Stable); const threadRoot = mkMessage(); const rootReaction = mkReaction(threadRoot); const threadResponse = mkThreadResponse(threadRoot); diff --git a/src/client.ts b/src/client.ts index ed3441f17c1..80296c472ab 100644 --- a/src/client.ts +++ b/src/client.ts @@ -19,7 +19,7 @@ limitations under the License. * @module client */ -import { EmoteEvent, IPartialEvent, MessageEvent, NoticeEvent } from "matrix-events-sdk"; +import { EmoteEvent, IPartialEvent, MessageEvent, NoticeEvent, Optional } from "matrix-events-sdk"; import { ISyncStateData, SyncApi, SyncState } from "./sync"; import { @@ -33,7 +33,7 @@ import { } from "./models/event"; import { StubStore } from "./store/stub"; import { CallEvent, CallEventHandlerMap, createNewMatrixCall, MatrixCall, supportsMatrixCall } from "./webrtc/call"; -import { Filter, IFilterDefinition } from "./filter"; +import { Filter, IFilterDefinition, IRoomEventFilter } from "./filter"; import { CallEventHandlerEvent, CallEventHandler, CallEventHandlerEventHandlerMap } from './webrtc/callEventHandler'; import * as utils from './utils'; import { sleep } from './utils'; @@ -195,7 +195,7 @@ import { TypedEventEmitter } from "./models/typed-event-emitter"; import { ReceiptType } from "./@types/read_receipts"; import { MSC3575SlidingSyncRequest, MSC3575SlidingSyncResponse, SlidingSync } from "./sliding-sync"; import { SlidingSyncSdk } from "./sliding-sync-sdk"; -import { Thread, THREAD_RELATION_TYPE } from "./models/thread"; +import { FeatureSupport, Thread, THREAD_RELATION_TYPE, determineFeatureSupport } from "./models/thread"; import { MBeaconInfoEventContent, M_BEACON_INFO } from "./@types/beacon"; import { UnstableValue } from "./NamespacedValue"; import { ToDeviceMessageQueue } from "./ToDeviceMessageQueue"; @@ -596,6 +596,13 @@ interface IMessagesResponse { state: IStateEvent[]; } +interface IThreadedMessagesResponse { + prev_batch: string; + next_batch: string; + chunk: IRoomEvent[]; + state: IStateEvent[]; +} + export interface IRequestTokenResponse { sid: string; submit_url?: string; @@ -1190,14 +1197,9 @@ export class MatrixClient extends TypedEventEmitter { + public async getEventTimeline(timelineSet: EventTimelineSet, eventId: string): Promise> { // don't allow any timeline support unless it's been enabled. if (!this.timelineSupport) { throw new Error("timeline support is disabled. Set the 'timelineSupport'" + " parameter to true when creating MatrixClient to enable it."); } + if (!timelineSet?.room) { + throw new Error("getEventTimeline only supports room timelines"); + } + if (timelineSet.getTimelineForEvent(eventId)) { return timelineSet.getTimelineForEvent(eventId); } @@ -5345,7 +5351,7 @@ export class MatrixClient extends TypedEventEmitter = undefined; + let params: Record | undefined = undefined; if (this.clientOpts.lazyLoadMembers) { params = { filter: JSON.stringify(Filter.LAZY_LOADING_MESSAGES_FILTER) }; } @@ -5438,27 +5444,36 @@ export class MatrixClient extends TypedEventEmitter { + public async getLatestTimeline(timelineSet: EventTimelineSet): Promise> { // don't allow any timeline support unless it's been enabled. if (!this.timelineSupport) { throw new Error("timeline support is disabled. Set the 'timelineSupport'" + " parameter to true when creating MatrixClient to enable it."); } - const messagesPath = utils.encodeUri( - "/rooms/$roomId/messages", { - $roomId: timelineSet.room.roomId, - }, - ); - - const params: Record = { - dir: 'b', - }; - if (this.clientOpts.lazyLoadMembers) { - params.filter = JSON.stringify(Filter.LAZY_LOADING_MESSAGES_FILTER); + if (!timelineSet.room) { + throw new Error("getLatestTimeline only supports room timelines"); } - const res = await this.http.authedRequest(undefined, Method.Get, messagesPath, params); + let res: IMessagesResponse; + const roomId = timelineSet.room.roomId; + if (timelineSet.isThreadTimeline) { + res = await this.createThreadListMessagesRequest( + roomId, + null, + 1, + Direction.Backward, + timelineSet.getFilter(), + ); + } else { + res = await this.createMessagesRequest( + roomId, + null, + 1, + Direction.Backward, + timelineSet.getFilter(), + ); + } const event = res.chunk?.[0]; if (!event) { throw new Error("No message returned from /messages when trying to construct getLatestTimeline"); @@ -5498,7 +5513,7 @@ export class MatrixClient extends TypedEventEmitter { + const path = utils.encodeUri("/rooms/$roomId/threads", { $roomId: roomId }); + + const params: Record = { + limit: limit.toString(), + dir: dir, + include: 'all', + }; + + if (fromToken) { + params.from = fromToken; + } + + let filter: IRoomEventFilter | null = null; + if (this.clientOpts.lazyLoadMembers) { + // create a shallow copy of LAZY_LOADING_MESSAGES_FILTER, + // so the timelineFilter doesn't get written into it below + filter = { + ...filter, + ...Filter.LAZY_LOADING_MESSAGES_FILTER, + }; + } + if (timelineFilter) { + // XXX: it's horrific that /messages' filter parameter doesn't match + // /sync's one - see https://matrix.org/jira/browse/SPEC-451 + filter = { + ...filter, + ...timelineFilter.getRoomTimelineFilterComponent()?.toJSON(), + }; + } + if (filter) { + params.filter = JSON.stringify(filter); + } + + const opts: { prefix?: string } = {}; + if (Thread.hasServerSideListSupport === FeatureSupport.Experimental) { + opts.prefix = "/_matrix/client/unstable/org.matrix.msc3856"; + } + + return this.http.authedRequest(undefined, Method.Get, path, params, undefined, opts) + .then(res => ({ + ...res, + start: res.prev_batch, + end: res.next_batch, + })); + } + /** * Take an EventTimeline, and back/forward-fill results. * @@ -5531,6 +5612,8 @@ export class MatrixClient extends TypedEventEmitter { const isNotifTimeline = (eventTimeline.getTimelineSet() === this.notifTimelineSet); + const room = this.getRoom(eventTimeline.getRoomId()); + const isThreadTimeline = eventTimeline.getTimelineSet().isThreadTimeline; // TODO: we should implement a backoff (as per scrollback()) to deal more // nicely with HTTP errors. @@ -5564,7 +5647,7 @@ export class MatrixClient extends TypedEventEmitter { const token = res.next_token; - const matrixEvents = []; + const matrixEvents: MatrixEvent[] = []; for (let i = 0; i < res.notifications.length; i++) { const notification = res.notifications[i]; @@ -5596,13 +5679,48 @@ export class MatrixClient extends TypedEventEmitter { + eventTimeline.paginationRequests[dir] = null; + }); + eventTimeline.paginationRequests[dir] = promise; + } else if (isThreadTimeline) { + if (!room) { + throw new Error("Unknown room " + eventTimeline.getRoomId()); + } + + promise = this.createThreadListMessagesRequest( + eventTimeline.getRoomId(), + token, + opts.limit, + dir, + eventTimeline.getFilter(), + ).then((res) => { + if (res.state) { + const roomState = eventTimeline.getState(dir); + const stateEvents = res.state.map(this.getEventMapper()); + roomState.setUnknownStateEvents(stateEvents); + } + const token = res.end; + const matrixEvents = res.chunk.map(this.getEventMapper()); + + const timelineSet = eventTimeline.getTimelineSet(); + timelineSet.addEventsToTimeline(matrixEvents, backwards, eventTimeline, token); + this.processBeaconEvents(room, matrixEvents); + this.processThreadRoots(room, matrixEvents, backwards); + + // if we've hit the end of the timeline, we need to stop trying to + // paginate. We need to keep the 'forwards' token though, to make sure + // we can recover from gappy syncs. + if (backwards && res.end == res.start) { + eventTimeline.setPaginationToken(null, dir); + } + return res.end !== res.start; }).finally(() => { eventTimeline.paginationRequests[dir] = null; }); eventTimeline.paginationRequests[dir] = promise; } else { - const room = this.getRoom(eventTimeline.getRoomId()); if (!room) { throw new Error("Unknown room " + eventTimeline.getRoomId()); } @@ -5623,9 +5741,9 @@ export class MatrixClient extends TypedEventEmitter { + threads: FeatureSupport; + list: FeatureSupport; + }> { try { - const hasUnstableSupport = await this.doesServerSupportUnstableFeature("org.matrix.msc3440"); - const hasStableSupport = await this.doesServerSupportUnstableFeature("org.matrix.msc3440.stable"); + const [threadUnstable, threadStable, listUnstable, listStable] = await Promise.all([ + this.doesServerSupportUnstableFeature("org.matrix.msc3440"), + this.doesServerSupportUnstableFeature("org.matrix.msc3440.stable"), + this.doesServerSupportUnstableFeature("org.matrix.msc3856"), + this.doesServerSupportUnstableFeature("org.matrix.msc3856.stable"), + ]); // TODO: Use `this.isVersionSupported("v1.3")` for whatever spec version includes MSC3440 formally. return { - serverSupport: hasUnstableSupport || hasStableSupport, - stable: hasStableSupport, + threads: determineFeatureSupport(threadStable, threadUnstable), + list: determineFeatureSupport(listStable, listUnstable), }; } catch (e) { - // Assume server support and stability aren't available: null/no data return. - // XXX: This should just return an object with `false` booleans instead. - return null; + return { + threads: FeatureSupport.None, + list: FeatureSupport.None, + }; } } @@ -9160,6 +9283,13 @@ export class MatrixClient extends TypedEventEmitter> = { + public paginationRequests: Record | null> = { [Direction.Backward]: null, [Direction.Forward]: null, }; @@ -311,7 +311,7 @@ export class EventTimeline { * token for going backwards in time; EventTimeline.FORWARDS to set the * pagination token for going forwards in time. */ - public setPaginationToken(token: string, direction: Direction): void { + public setPaginationToken(token: string | null, direction: Direction): void { this.getState(direction).paginationToken = token; } diff --git a/src/models/room-state.ts b/src/models/room-state.ts index b0104cf707c..3cca4a7b86b 100644 --- a/src/models/room-state.ts +++ b/src/models/room-state.ts @@ -97,7 +97,7 @@ export class RoomState extends TypedEventEmitter // XXX: Should be read-only public members: Record = {}; // userId: RoomMember public events = new Map>(); // Map> - public paginationToken: string = null; + public paginationToken: string | null = null; public readonly beacons = new Map(); private _liveBeaconIds: BeaconIdentifier[] = []; diff --git a/src/models/room.ts b/src/models/room.ts index c8a31f00da5..aa1ffdd743f 100644 --- a/src/models/room.ts +++ b/src/models/room.ts @@ -359,7 +359,7 @@ export class Room extends ReadReceipt { } private threadTimelineSetsPromise: Promise<[EventTimelineSet, EventTimelineSet]> | null = null; - public async createThreadsTimelineSets(): Promise<[EventTimelineSet, EventTimelineSet]> { + public async createThreadsTimelineSets(): Promise<[EventTimelineSet, EventTimelineSet] | null> { if (this.threadTimelineSetsPromise) { return this.threadTimelineSetsPromise; } @@ -372,10 +372,13 @@ export class Room extends ReadReceipt { ]); const timelineSets = await this.threadTimelineSetsPromise; this.threadsTimelineSets.push(...timelineSets); + return timelineSets; } catch (e) { this.threadTimelineSetsPromise = null; + return null; } } + return null; } /** @@ -1612,7 +1615,14 @@ export class Room extends ReadReceipt { private async createThreadTimelineSet(filterType?: ThreadFilterType): Promise { let timelineSet: EventTimelineSet; - if (Thread.hasServerSideSupport) { + if (Thread.hasServerSideListSupport) { + timelineSet = + new EventTimelineSet(this, this.opts, undefined, undefined, Boolean(Thread.hasServerSideListSupport)); + this.reEmitter.reEmit(timelineSet, [ + RoomEvent.Timeline, + RoomEvent.TimelineReset, + ]); + } else if (Thread.hasServerSideSupport) { const filter = await this.getThreadListFilter(filterType); timelineSet = this.getOrCreateFilteredTimelineSet( @@ -1645,81 +1655,148 @@ export class Room extends ReadReceipt { return timelineSet; } - public threadsReady = false; + private threadsReady = false; + + /** + * Takes the given thread root events and creates threads for them. + * @param events + * @param toStartOfTimeline + */ + public processThreadRoots(events: MatrixEvent[], toStartOfTimeline: boolean): void { + for (const rootEvent of events) { + EventTimeline.setEventMetadata( + rootEvent, + this.currentState, + toStartOfTimeline, + ); + if (!this.getThread(rootEvent.getId())) { + this.createThread(rootEvent.getId(), rootEvent, [], toStartOfTimeline); + } + } + } + /** + * Fetch the bare minimum of room threads required for the thread list to work reliably. + * With server support that means fetching one page. + * Without server support that means fetching as much at once as the server allows us to. + */ public async fetchRoomThreads(): Promise { if (this.threadsReady || !this.client.supportsExperimentalThreads()) { return; } - const allThreadsFilter = await this.getThreadListFilter(); - - const { chunk: events } = await this.client.createMessagesRequest( - this.roomId, - "", - Number.MAX_SAFE_INTEGER, - Direction.Backward, - allThreadsFilter, - ); - - if (!events.length) return; - - // Sorted by last_reply origin_server_ts - const threadRoots = events - .map(this.client.getEventMapper()) - .sort((eventA, eventB) => { - /** - * `origin_server_ts` in a decentralised world is far from ideal - * but for lack of any better, we will have to use this - * Long term the sorting should be handled by homeservers and this - * is only meant as a short term patch - */ - const threadAMetadata = eventA - .getServerAggregatedRelation(RelationType.Thread); - const threadBMetadata = eventB - .getServerAggregatedRelation(RelationType.Thread); - return threadAMetadata.latest_event.origin_server_ts - threadBMetadata.latest_event.origin_server_ts; - }); + if (Thread.hasServerSideListSupport) { + await Promise.all([ + this.fetchRoomThreadList(ThreadFilterType.All), + this.fetchRoomThreadList(ThreadFilterType.My), + ]); + } else { + const allThreadsFilter = await this.getThreadListFilter(); + + const { chunk: events } = await this.client.createMessagesRequest( + this.roomId, + "", + Number.MAX_SAFE_INTEGER, + Direction.Backward, + allThreadsFilter, + ); - let latestMyThreadsRootEvent: MatrixEvent; - const roomState = this.getLiveTimeline().getState(EventTimeline.FORWARDS); - for (const rootEvent of threadRoots) { - this.threadsTimelineSets[0].addLiveEvent(rootEvent, { - duplicateStrategy: DuplicateStrategy.Ignore, - fromCache: false, - roomState, - }); + if (!events.length) return; + + // Sorted by last_reply origin_server_ts + const threadRoots = events + .map(this.client.getEventMapper()) + .sort((eventA, eventB) => { + /** + * `origin_server_ts` in a decentralised world is far from ideal + * but for lack of any better, we will have to use this + * Long term the sorting should be handled by homeservers and this + * is only meant as a short term patch + */ + const threadAMetadata = eventA + .getServerAggregatedRelation(THREAD_RELATION_TYPE.name); + const threadBMetadata = eventB + .getServerAggregatedRelation(THREAD_RELATION_TYPE.name); + return threadAMetadata.latest_event.origin_server_ts - + threadBMetadata.latest_event.origin_server_ts; + }); - const threadRelationship = rootEvent - .getServerAggregatedRelation(RelationType.Thread); - if (threadRelationship.current_user_participated) { - this.threadsTimelineSets[1].addLiveEvent(rootEvent, { + let latestMyThreadsRootEvent: MatrixEvent; + const roomState = this.getLiveTimeline().getState(EventTimeline.FORWARDS); + for (const rootEvent of threadRoots) { + this.threadsTimelineSets[0].addLiveEvent(rootEvent, { duplicateStrategy: DuplicateStrategy.Ignore, fromCache: false, roomState, }); - latestMyThreadsRootEvent = rootEvent; - } - if (!this.getThread(rootEvent.getId())) { - this.createThread(rootEvent.getId(), rootEvent, [], true); + const threadRelationship = rootEvent + .getServerAggregatedRelation(THREAD_RELATION_TYPE.name); + if (threadRelationship.current_user_participated) { + this.threadsTimelineSets[1].addLiveEvent(rootEvent, { + duplicateStrategy: DuplicateStrategy.Ignore, + fromCache: false, + roomState, + }); + latestMyThreadsRootEvent = rootEvent; + } } - } - this.client.decryptEventIfNeeded(threadRoots[threadRoots.length -1]); - if (latestMyThreadsRootEvent) { - this.client.decryptEventIfNeeded(latestMyThreadsRootEvent); + this.processThreadRoots(threadRoots, true); + + this.client.decryptEventIfNeeded(threadRoots[threadRoots.length -1]); + if (latestMyThreadsRootEvent) { + this.client.decryptEventIfNeeded(latestMyThreadsRootEvent); + } } + this.on(ThreadEvent.NewReply, this.onThreadNewReply); this.threadsReady = true; + } - this.on(ThreadEvent.NewReply, this.onThreadNewReply); + /** + * Fetch a single page of threadlist messages for the specific thread filter + * @param filter + * @private + */ + private async fetchRoomThreadList(filter?: ThreadFilterType): Promise { + const timelineSet = filter === ThreadFilterType.My + ? this.threadsTimelineSets[1] + : this.threadsTimelineSets[0]; + + const { chunk: events, end } = await this.client.createThreadListMessagesRequest( + this.roomId, + null, + undefined, + Direction.Backward, + timelineSet.getFilter(), + ); + + timelineSet.getLiveTimeline().setPaginationToken(end, Direction.Backward); + + if (!events.length) return; + + const matrixEvents = events.map(this.client.getEventMapper()); + this.processThreadRoots(matrixEvents, true); + const roomState = this.getLiveTimeline().getState(EventTimeline.FORWARDS); + for (const rootEvent of matrixEvents) { + timelineSet.addLiveEvent(rootEvent, { + duplicateStrategy: DuplicateStrategy.Replace, + fromCache: false, + roomState, + }); + } } private onThreadNewReply(thread: Thread): void { + const roomState = this.getLiveTimeline().getState(EventTimeline.FORWARDS); for (const timelineSet of this.threadsTimelineSets) { timelineSet.removeEvent(thread.id); - timelineSet.addLiveEvent(thread.rootEvent); + timelineSet.addLiveEvent(thread.rootEvent, { + duplicateStrategy: DuplicateStrategy.Replace, + fromCache: false, + roomState, + }); } } @@ -1865,8 +1942,6 @@ export class Room extends ReadReceipt { this.lastThread = thread; } - this.emit(ThreadEvent.New, thread, toStartOfTimeline); - if (this.threadsReady) { this.threadsTimelineSets.forEach(timelineSet => { if (thread.rootEvent) { @@ -1883,6 +1958,8 @@ export class Room extends ReadReceipt { }); } + this.emit(ThreadEvent.New, thread, toStartOfTimeline); + return thread; } diff --git a/src/models/thread.ts b/src/models/thread.ts index 3c7add2f4d4..60757f9c316 100644 --- a/src/models/thread.ts +++ b/src/models/thread.ts @@ -51,11 +51,28 @@ interface IThreadOpts { client: MatrixClient; } +export enum FeatureSupport { + None = 0, + Experimental = 1, + Stable = 2 +} + +export function determineFeatureSupport(stable: boolean, unstable: boolean): FeatureSupport { + if (stable) { + return FeatureSupport.Stable; + } else if (unstable) { + return FeatureSupport.Experimental; + } else { + return FeatureSupport.None; + } +} + /** * @experimental */ export class Thread extends ReadReceipt { - public static hasServerSideSupport: boolean; + public static hasServerSideSupport = FeatureSupport.None; + public static hasServerSideListSupport = FeatureSupport.None; /** * A reference to all the events ID at the bottom of the threads @@ -134,15 +151,23 @@ export class Thread extends ReadReceipt { this.emit(ThreadEvent.Update, this); } - public static setServerSideSupport(hasServerSideSupport: boolean, useStable: boolean): void { - Thread.hasServerSideSupport = hasServerSideSupport; - if (!useStable) { + public static setServerSideSupport( + status: FeatureSupport, + ): void { + Thread.hasServerSideSupport = status; + if (status !== FeatureSupport.Stable) { FILTER_RELATED_BY_SENDERS.setPreferUnstable(true); FILTER_RELATED_BY_REL_TYPES.setPreferUnstable(true); THREAD_RELATION_TYPE.setPreferUnstable(true); } } + public static setServerSideListSupport( + status: FeatureSupport, + ): void { + Thread.hasServerSideListSupport = status; + } + private onBeforeRedaction = (event: MatrixEvent, redaction: MatrixEvent) => { if (event?.isRelation(THREAD_RELATION_TYPE.name) && this.room.eventShouldLiveIn(event).threadId === this.id &&