Skip to content

Commit 8326d7f

Browse files
authored
Add TimelineRedactionQueue. (#949)
This just batches the redactions going on within a single room to the same `/messages` request. And blocks the next request for that room from happening until the current request has completed. This is necessary because `/messages` pagination is really heavy on homeservers as it can force them to backfill. There is also some really strange bug in Synapse that is triggered by Draupnir's behaviour element-hq/synapse#18829.
1 parent f1dad52 commit 8326d7f

File tree

6 files changed

+146
-36
lines changed

6 files changed

+146
-36
lines changed

package.json

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@
5353
"@sinclair/typebox": "0.34.13",
5454
"@the-draupnir-project/interface-manager": "4.2.5",
5555
"@the-draupnir-project/matrix-basic-types": "1.4.0",
56-
"@the-draupnir-project/mps-interface-adaptor": "^0.4.1",
56+
"@the-draupnir-project/mps-interface-adaptor": "0.5.0",
5757
"better-sqlite3": "^9.4.3",
5858
"body-parser": "^1.20.2",
5959
"config": "^3.3.9",
@@ -63,8 +63,8 @@
6363
"jsdom": "^24.0.0",
6464
"matrix-appservice-bridge": "^10.3.1",
6565
"matrix-bot-sdk": "npm:@vector-im/matrix-bot-sdk@^0.7.1-element.6",
66-
"matrix-protection-suite": "npm:@gnuxie/matrix-protection-suite@3.11.0",
67-
"matrix-protection-suite-for-matrix-bot-sdk": "npm:@gnuxie/matrix-protection-suite-for-matrix-bot-sdk@3.11.0",
66+
"matrix-protection-suite": "npm:@gnuxie/matrix-protection-suite@3.12.0",
67+
"matrix-protection-suite-for-matrix-bot-sdk": "npm:@gnuxie/matrix-protection-suite-for-matrix-bot-sdk@3.12.0",
6868
"pg": "^8.8.0",
6969
"yaml": "^2.3.2"
7070
},

src/Draupnir.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ import {
8080
MatrixReactionHandler,
8181
sendMatrixEventsFromDeadDocument,
8282
} from "@the-draupnir-project/mps-interface-adaptor";
83+
import { TimelineRedactionQueue } from "./queues/TimelineRedactionQueue";
8384

8485
const log = new Logger("Draupnir");
8586

@@ -135,6 +136,12 @@ export class Draupnir implements Client, MatrixAdaptorContext {
135136

136137
private readonly JSInterfaceDispatcher: JSInterfaceCommandDispatcher<BasicInvocationInformation> =
137138
makeDraupnirJSCommandDispatcher(this);
139+
140+
public readonly timelineRedactionQueue = new TimelineRedactionQueue(
141+
this.clientPlatform.toRoomMessages(),
142+
this.clientPlatform.toRoomEventRedacter()
143+
);
144+
138145
private constructor(
139146
public readonly client: MatrixSendClient,
140147
public readonly clientUserID: StringUserID,

src/protections/RedactionSynchronisation.ts

Lines changed: 13 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@
44

55
import {
66
AbstractProtection,
7-
ActionException,
8-
ActionExceptionKind,
97
ActionResult,
108
Capability,
119
CapabilityMethodSchema,
@@ -41,10 +39,9 @@ import {
4139
StringRoomID,
4240
StringUserID,
4341
} from "@the-draupnir-project/matrix-basic-types";
44-
import { Result } from "@gnuxie/typescript-result";
42+
import { isError, Result } from "@gnuxie/typescript-result";
4543
import { Type } from "@sinclair/typebox";
4644
import { revisionRulesMatchingUser } from "../commands/unban/UnbanUsers";
47-
import { redactUserMessagesIn } from "../utils";
4845

4946
export type RedactionSynchronisationProtectionCapabilitiesSet = {
5047
consequences: RedactionSynchronisationConsequences;
@@ -106,23 +103,20 @@ describeCapabilityProvider({
106103
requiredStatePermissions: [],
107104
requiredEventPermissions: [],
108105
async redactMessagesIn(userIDOrGlob, reason, roomIDs) {
109-
const redactionResult = await redactUserMessagesIn(
110-
draupnir.client,
111-
draupnir.managementRoomOutput,
112-
userIDOrGlob,
113-
roomIDs
114-
).then(
115-
(_) => Ok(undefined),
116-
(error) =>
117-
ActionException.Result(
118-
`Error redacting messages for ${userIDOrGlob}`,
119-
{
120-
exception: error,
121-
exceptionKind: ActionExceptionKind.Unknown,
122-
}
106+
const redactionResults = await Promise.all(
107+
roomIDs.map((roomID) =>
108+
draupnir.timelineRedactionQueue.enqueueRedaction(
109+
userIDOrGlob,
110+
roomID
123111
)
112+
)
124113
);
125-
return redactionResult;
114+
const firstError = redactionResults.find((result) => isError(result));
115+
if (firstError) {
116+
return firstError;
117+
} else {
118+
return Ok(undefined);
119+
}
126120
},
127121
async rejectInvite(roomID, _sender, target, reason) {
128122
return await draupnir.clientPlatform
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
// SPDX-FileCopyrightText: 2025 Gnuxie <[email protected]>
2+
//
3+
// SPDX-License-Identifier: AFL-3.0
4+
5+
import { isError, Ok, Result } from "@gnuxie/typescript-result";
6+
import {
7+
MatrixGlob,
8+
StringEventID,
9+
StringRoomID,
10+
StringUserID,
11+
} from "@the-draupnir-project/matrix-basic-types";
12+
import {
13+
KeyedBatchQueue,
14+
Logger,
15+
RoomEventRedacter,
16+
RoomMessages,
17+
} from "matrix-protection-suite";
18+
19+
const log = new Logger("TimelineRedactionQueue");
20+
21+
export class TimelineRedactionQueue {
22+
private readonly batchProcessor = this.processBatch.bind(this);
23+
private readonly queue = new KeyedBatchQueue<StringRoomID, StringUserID>(
24+
this.batchProcessor
25+
);
26+
27+
public constructor(
28+
private readonly roomMessages: RoomMessages,
29+
private readonly roomEventRedacter: RoomEventRedacter,
30+
// FIXME: We really should have a way of adding a limit to the batch enqueue operation...
31+
// It doesn't really seem possible though. Unless the maximum limit anyone enqueues is chosen.
32+
// Yeah that sounds like how it would have to work. Meanwhile, it doesn't really matter since
33+
// the current behaviour is constrained to 1000.
34+
private readonly limit = 1000
35+
) {
36+
// nothing to do.
37+
}
38+
private async processBatch(
39+
roomID: StringRoomID,
40+
userIDs: StringUserID[]
41+
): Promise<Result<void>> {
42+
const globUserIDs = userIDs.filter(
43+
(userID) => userID.includes("*") || userID.includes("?")
44+
);
45+
const globsToTest = globUserIDs.map((userID) => new MatrixGlob(userID));
46+
const isGlobInUsers = globsToTest.length !== 0;
47+
const usersToTest = userIDs.filter(
48+
(userID) => !globUserIDs.includes(userID)
49+
);
50+
const paginator = this.roomMessages.toRoomMessagesIterator(roomID, {
51+
direction: "backwards",
52+
limit: this.limit,
53+
...(isGlobInUsers ? {} : { filter: { senders: userIDs } }),
54+
});
55+
const eventsToRedact: StringEventID[] = [];
56+
const paginationResult = await paginator.forEachItem({
57+
forEachItemCB: (event) => {
58+
if (
59+
// Always add users when there are no globs, since events are filtered by sender.
60+
!isGlobInUsers ||
61+
usersToTest.includes(event.sender) ||
62+
globsToTest.some((glob) => glob.test(event.sender))
63+
) {
64+
eventsToRedact.push(event.event_id);
65+
}
66+
},
67+
totalItemLimit: this.limit,
68+
});
69+
if (isError(paginationResult)) {
70+
return paginationResult.elaborate(
71+
`Failed to paginate /messages in ${roomID} to begin redaction`
72+
);
73+
}
74+
// TODO: It would be good if we had a way of throttling these requests
75+
// per draupnir and in general but y'know.
76+
for (const eventID of eventsToRedact) {
77+
const redactResult = await this.roomEventRedacter.redactEvent(
78+
roomID,
79+
eventID
80+
);
81+
if (isError(redactResult)) {
82+
log.error(
83+
`Error while trying to redact messages for in ${roomID}:`,
84+
eventID,
85+
redactResult.error
86+
);
87+
}
88+
}
89+
return Ok(undefined);
90+
}
91+
92+
public async enqueueRedaction(
93+
userID: StringUserID,
94+
roomID: StringRoomID
95+
): Promise<Result<void>> {
96+
return await this.queue.enqueue(roomID, userID);
97+
}
98+
}

test/integration/commands/commandUtils.ts

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import {
2222
TextMessageContent,
2323
Value,
2424
StringEventIDSchema,
25+
ReactionContent,
26+
hasOwn,
2527
} from "matrix-protection-suite";
2628
import { Type } from "@sinclair/typebox";
2729
import { Draupnir } from "../../../src/Draupnir";
@@ -145,14 +147,19 @@ export async function getFirstReaction(
145147
const addEvent = function (roomId: string, event: RoomEvent) {
146148
if (roomId !== targetRoom) return;
147149
if (!Value.Check(ReactionEvent, event)) return;
150+
if (Object.keys(event.content).length === 0) return;
148151
reactionEvents.push(event);
149152
};
150153
let targetCb;
151154
try {
152155
matrix.on("room.event", addEvent);
153156
const targetEventId = await targetEventThunk();
154157
for (const event of reactionEvents) {
155-
const relates_to = event.content?.["m.relates_to"];
158+
const relates_to = (
159+
hasOwn(event.content, "m.relates_to")
160+
? event.content["m.relates_to"]
161+
: undefined
162+
) as ReactionContent["m.relates_to"];
156163
if (
157164
relates_to?.event_id === targetEventId &&
158165
relates_to.key === reactionKey
@@ -164,7 +171,11 @@ export async function getFirstReaction(
164171
targetCb = function (roomId: string, event: RoomEvent) {
165172
if (roomId !== targetRoom) return;
166173
if (!Value.Check(ReactionEvent, event)) return;
167-
const relates_to = event.content["m.relates_to"];
174+
const relates_to = (
175+
hasOwn(event.content, "m.relates_to")
176+
? event.content["m.relates_to"]
177+
: undefined
178+
) as ReactionContent["m.relates_to"];
168179
if (
169180
relates_to?.event_id === targetEventId &&
170181
relates_to.key === reactionKey

yarn.lock

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -327,10 +327,10 @@
327327
"@gnuxie/typescript-result" "^1.0.0"
328328
glob-to-regexp "^0.4.1"
329329

330-
"@the-draupnir-project/mps-interface-adaptor@^0.4.1":
331-
version "0.4.1"
332-
resolved "https://registry.yarnpkg.com/@the-draupnir-project/mps-interface-adaptor/-/mps-interface-adaptor-0.4.1.tgz#673b20e3af916d3587f440fedfd6ea94b60d368e"
333-
integrity sha512-nHyQuC7nrJn7RtNpthEWL/6uNK5HPS2UDBT74Ml00FofzqSohAyNpi9BeCi9zkMjpdmVWvsWiEqn2Iv8fc/UjQ==
330+
"@the-draupnir-project/mps-interface-adaptor@0.5.0":
331+
version "0.5.0"
332+
resolved "https://registry.yarnpkg.com/@the-draupnir-project/mps-interface-adaptor/-/mps-interface-adaptor-0.5.0.tgz#de0bf229a51c906f4b53f72f6a8ce0be33399434"
333+
integrity sha512-7lSDeHGsf/UtXFxJ1OYBt8ScU36bSJAdxLxM4tBBDNe2EWDlyOd1oBaa7OfEcf+/4btBI0T+Np5eseCqVfLo0Q==
334334

335335
"@tsconfig/node10@^1.0.7":
336336
version "1.0.11"
@@ -2598,18 +2598,18 @@ matrix-appservice@^2.0.0:
25982598
request-promise "^4.2.6"
25992599
sanitize-html "^2.11.0"
26002600

2601-
"matrix-protection-suite-for-matrix-bot-sdk@npm:@gnuxie/matrix-protection-suite-for-matrix-bot-sdk@3.11.0":
2602-
version "3.11.0"
2603-
resolved "https://registry.yarnpkg.com/@gnuxie/matrix-protection-suite-for-matrix-bot-sdk/-/matrix-protection-suite-for-matrix-bot-sdk-3.11.0.tgz#0f972cd69a918422aca5b05f0f917766b2899e13"
2604-
integrity sha512-fNT0rdGXN8VLZtkqS5s4wySYZPJ3LaqsU/CP3iS4WK7XQ6EXT1n3UM4chxmwL5YhQITRW34PehTx7ScdKpL+4w==
2601+
"matrix-protection-suite-for-matrix-bot-sdk@npm:@gnuxie/matrix-protection-suite-for-matrix-bot-sdk@3.12.0":
2602+
version "3.12.0"
2603+
resolved "https://registry.yarnpkg.com/@gnuxie/matrix-protection-suite-for-matrix-bot-sdk/-/matrix-protection-suite-for-matrix-bot-sdk-3.12.0.tgz#74394ad099fb0d60709d817065c8221154b2132f"
2604+
integrity sha512-hdTLuwvhFg+qcZeOJCrKzhg5Tpn2X6VMRAj2uKQkHdx6KF+2MwiZoOrWCZ1BlssFXOyZwIvTLULeDU7Wa1JNPQ==
26052605
dependencies:
26062606
"@gnuxie/typescript-result" "^1.0.0"
26072607
await-lock "^2.2.2"
26082608

2609-
"matrix-protection-suite@npm:@gnuxie/matrix-protection-suite@3.11.0":
2610-
version "3.11.0"
2611-
resolved "https://registry.yarnpkg.com/@gnuxie/matrix-protection-suite/-/matrix-protection-suite-3.11.0.tgz#2ed31e8bd0c68b89927117090f049b5905c9ed82"
2612-
integrity sha512-Y/on4KzPN8413dNv1zLBP1LaD9ijgZSR7qXPJUZRZHqezEruQp1bg8q2YIOdzsrI5dk/TK2ftTnEC/lbz2aWYA==
2609+
"matrix-protection-suite@npm:@gnuxie/matrix-protection-suite@3.12.0":
2610+
version "3.12.0"
2611+
resolved "https://registry.yarnpkg.com/@gnuxie/matrix-protection-suite/-/matrix-protection-suite-3.12.0.tgz#d89d73b8ff90d0eb7b9c8f9d1771acf042e97bc7"
2612+
integrity sha512-f/nuzHYaCB/uVwN7UZy4BatcIvmQL/pTzRE4O2TGMrdv2fDQPsSvc3HZyJTJI/t8iU5g+k/58tk2TyJmLzMS6A==
26132613
dependencies:
26142614
"@gnuxie/typescript-result" "^1.0.0"
26152615
await-lock "^2.2.2"

0 commit comments

Comments
 (0)