Skip to content

Commit f7c907e

Browse files
authored
Add simple client discovery phase for pool configurations (#1737)
* Add simple client discovery phase * fix * Speed up client connections on startup if connection exists * Drop bluebird import * Use native allSettled now that we're on a recent Node version * Better iteration * While * Cunning plan to parallelize room loads with proxy loads. * Cleanup delay code * Refactors to parallelize connections * Check for undefined * Skip checking if we're not syncing membership * Add debug comment * Drop extra awaits * Preserve ordering in test * Cleanup * Cleaner
1 parent eccbb75 commit f7c907e

File tree

8 files changed

+125
-90
lines changed

8 files changed

+125
-90
lines changed

changelog.d/1737.bugfix

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
Fix a case where a proxied client connection may get missed until they use the bridge on startup, leading to missed
2+
messages and logspam.

spec/unit/promiseutil.spec.js

Lines changed: 0 additions & 42 deletions
This file was deleted.

spec/util/e2e-test.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import { MatrixClient } from "matrix-bot-sdk";
88
import { TestIrcServer } from "matrix-org-irc";
99
import { IrcConnectionPool } from "../../src/pool-service/IrcConnectionPool";
1010
import dns from 'node:dns';
11-
1211
// Needed to make tests work on GitHub actions. Node 17+ defaults
1312
// to IPv6, and the homerunner domain resolves to IPv6, but the
1413
// runtime doesn't actually support IPv6 🤦
@@ -51,8 +50,12 @@ export class E2ETestMatrixClient extends MatrixClient {
5150

5251
// Check only the keys we care about
5352
for (const [key, value] of Object.entries(expected)) {
54-
console.log(key, value, "---", eventData.content[key]);
55-
if (JSON.stringify(eventData.content[key], undefined, 0) !== JSON.stringify(value, undefined, 0)) {
53+
const evValue = eventData.content[key] ?? undefined;
54+
const sortOrder = value !== null && typeof value === "object" ? Object.keys(value).sort() : undefined;
55+
const jsonLeft = JSON.stringify(evValue, sortOrder);
56+
const jsonRight = JSON.stringify(value, sortOrder);
57+
console.log(jsonLeft, "---", jsonRight);
58+
if (jsonLeft !== jsonRight) {
5659
return undefined;
5760
}
5861
}

src/bridge/IrcBridge.ts

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -681,6 +681,9 @@ export class IrcBridge {
681681

682682
this.clientPool = new ClientPool(this, this.dataStore, this.ircPoolClient);
683683

684+
// We can begin discovering clients from the pool immediately.
685+
const discoveringClientsPromise = this.clientPool.discoverPoolConnectedClients();
686+
684687
if (this.config.ircService.debugApi.enabled) {
685688
this.debugApi = new DebugApi(
686689
this,
@@ -756,6 +759,9 @@ export class IrcBridge {
756759
log.info("Syncing relevant membership lists...");
757760
const memberlistPromises: Promise<void>[] = [];
758761

762+
// Note in the following section we will be waiting for discoveringClientsPromise
763+
// to complete before we execute our first join, this is by design so we don't
764+
// acidentally connect the same user twice by doing two mass client create loops.
759765
this.ircServers.forEach((server) => {
760766
// If memberlist-syncing 100s of connections, the scheduler will cause massive
761767
// waiting times for connections to be created.
@@ -765,16 +771,17 @@ export class IrcBridge {
765771

766772
// TODO reduce deps required to make MemberListSyncers.
767773
// TODO Remove injectJoinFn bodge
768-
this.memberListSyncers[server.domain] = new MemberListSyncer(
774+
const syncer = this.memberListSyncers[server.domain] = new MemberListSyncer(
769775
this, this.membershipQueue, this.bridge.getBot(), server, this.appServiceUserId,
770-
(roomId: string, joiningUserId: string, displayName: string, isFrontier: boolean) => {
776+
async (roomId: string, joiningUserId: string, displayName: string, isFrontier: boolean) => {
771777
const req = new BridgeRequest(
772778
this.bridge.getRequestFactory().newRequest()
773779
);
780+
const isFresh = !this.clientPool.getBridgedClientByUserId(server, joiningUserId);
774781
const target = new MatrixUser(joiningUserId);
775782
// inject a fake join event which will do M->I connections and
776783
// therefore sync the member list
777-
return this.matrixHandler.onJoin(req, {
784+
await this.matrixHandler.onJoin(req, {
778785
room_id: roomId,
779786
content: {
780787
displayname: displayName,
@@ -786,10 +793,17 @@ export class IrcBridge {
786793
event_id: "!injected",
787794
_frontier: isFrontier
788795
}, target);
796+
return isFresh;
789797
}
790798
);
791799
memberlistPromises.push(
792-
this.memberListSyncers[server.domain].sync()
800+
// Before we can actually join Matrix users to channels, we need to ensure we've discovered
801+
// all the clients already connected to avoid races.
802+
syncer.sync().then(() =>
803+
discoveringClientsPromise
804+
).finally(() =>
805+
syncer.joinMatrixUsersToChannels()
806+
)
793807
);
794808
});
795809

@@ -813,14 +827,13 @@ export class IrcBridge {
813827
log.info("Connecting to IRC networks...");
814828
await this.connectToIrcNetworks();
815829

816-
await promiseutil.allSettled(this.ircServers.map((server) => {
830+
await Promise.allSettled(this.ircServers.map((server) => {
817831
// Call MODE on all known channels to get modes of all channels
818832
return Bluebird.cast(this.publicitySyncer.initModes(server));
819833
})).catch((err) => {
820834
log.error('Could not init modes for publicity syncer');
821835
log.error(err.stack);
822836
});
823-
824837
await Promise.all(memberlistPromises);
825838

826839
// Reset reconnectIntervals
@@ -962,7 +975,7 @@ export class IrcBridge {
962975
}
963976
await this.bridge.getIntent().join(roomId);
964977
}).map(Bluebird.cast);
965-
await promiseutil.allSettled(promises);
978+
await Promise.allSettled(promises);
966979
}
967980

968981
public async sendMatrixAction(room: MatrixRoom, from: MatrixUser|undefined, action: MatrixAction): Promise<void> {

src/bridge/MemberListSyncer.ts

Lines changed: 51 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
// Controls the logic for determining which membership lists should be synced and
22
// handles the sequence of events until the lists are in sync.
33

4-
import Bluebird from "bluebird";
54
import { IrcBridge } from "./IrcBridge";
65
import { AppServiceBot, MembershipQueue } from "matrix-appservice-bridge";
76
import { IrcServer } from "../irc/IrcServer";
@@ -35,8 +34,15 @@ interface LeaveQueueItem {
3534
userIds: string[];
3635
}
3736

37+
interface MemberJoinEntry {
38+
roomId: string;
39+
displayName: string;
40+
userId: string;
41+
frontier: boolean
42+
}
43+
3844
type InjectJoinFn = (roomId: string, joiningUserId: string,
39-
displayName: string, isFrontier: boolean) => PromiseLike<unknown>;
45+
displayName: string, isFrontier: boolean) => PromiseLike<boolean>;
4046

4147
export class MemberListSyncer {
4248
private syncableRoomsPromise: Promise<RoomInfo[]>|null = null;
@@ -46,6 +52,7 @@ export class MemberListSyncer {
4652
irc: {[channel: string]: string[]};
4753
matrix: {[roomId: string]: RoomInfo};
4854
} = { irc: {}, matrix: {} };
55+
private memberEntriesToSync?: MemberJoinEntry[];
4956

5057
constructor(private ircBridge: IrcBridge, private memberQueue: MembershipQueue,
5158
private appServiceBot: AppServiceBot, private server: IrcServer,
@@ -64,9 +71,9 @@ export class MemberListSyncer {
6471
log.info("Found %s syncable rooms (%sms)", rooms.length, Date.now() - start);
6572
this.leaveIrcUsersFromRooms(rooms);
6673
start = Date.now();
67-
log.info("Joining Matrix users to IRC channels...");
68-
await this.joinMatrixUsersToChannels(rooms, this.injectJoinFn);
69-
log.info("Joined Matrix users to IRC channels. (%sms)", Date.now() - start);
74+
log.info("Collecting all Matrix users in all channel rooms...");
75+
await this.collectMatrixUsersToJoinToChannels(rooms);
76+
log.info("Collected all Matrix users in all channel rooms. (%sms)", Date.now() - start);
7077
// NB: We do not need to explicitly join IRC users to Matrix rooms
7178
// because we get all of the NAMEs/JOINs as events when we connect to
7279
// the IRC server. This effectively "injects" the list for us.
@@ -234,8 +241,7 @@ export class MemberListSyncer {
234241
return this.syncableRoomsPromise;
235242
}
236243

237-
private async joinMatrixUsersToChannels(rooms: RoomInfo[], injectJoinFn: InjectJoinFn) {
238-
244+
private async collectMatrixUsersToJoinToChannels(rooms: RoomInfo[]) {
239245
// filter out rooms listed in the rules
240246
const filteredRooms: RoomInfo[] = [];
241247
rooms.forEach((roomInfo) => {
@@ -260,7 +266,7 @@ export class MemberListSyncer {
260266

261267
// map the filtered rooms to a list of users to join
262268
// [Room:{reals:[uid,uid]}, ...] => [{uid,roomid}, ...]
263-
const entries: { roomId: string; displayName: string; userId: string; frontier: boolean}[] = [];
269+
const entries: MemberJoinEntry[] = [];
264270
const idleRegex = this.server.ignoreIdleUsersOnStartupExcludeRegex;
265271
for (const roomInfo of filteredRooms) {
266272
for (const uid of roomInfo.realJoinedUsers) {
@@ -300,37 +306,52 @@ export class MemberListSyncer {
300306
});
301307

302308
log.debug("Got %s matrix join events to inject.", entries.length);
309+
this.memberEntriesToSync = entries;
310+
}
311+
312+
public async joinMatrixUsersToChannels() {
313+
const start = Date.now();
314+
log.info("Joining all Matrix users in all channel rooms");
315+
const entries = this.memberEntriesToSync;
316+
if (entries === undefined) {
317+
// Can be expected if syncing is off.
318+
log.info(`joinMatrixUsersToChannels: No entries collected for joining`);
319+
return;
320+
}
303321
this.usersToJoin = entries.length;
304-
const d = promiseutil.defer();
305-
// take the first entry and inject a join event
306-
const joinNextUser = () => {
307-
const entry = entries.shift();
308-
if (!entry) {
309-
d.resolve();
310-
return;
311-
}
322+
323+
let entry: MemberJoinEntry|undefined;
324+
const floodDelayMs = this.server.getMemberListFloodDelayMs();
325+
// eslint-disable-next-line no-cond-assign
326+
while (entry = entries.shift()) {
312327
this.usersToJoin--;
313328
if (entry.userId.startsWith("@-")) {
314-
joinNextUser();
315-
return;
329+
// Ignore guest users.
330+
continue;
316331
}
317332
log.debug(
318333
"Injecting join event for %s in %s (%s left) is_frontier=%s",
319334
entry.userId, entry.roomId, entries.length, entry.frontier
320335
);
321-
Bluebird.cast(injectJoinFn(entry.roomId, entry.userId, entry.displayName, entry.frontier)).timeout(
322-
this.server.getMemberListFloodDelayMs()
323-
).then(() => {
324-
joinNextUser();
325-
}).catch(() => {
326-
// discard error, this will be due to timeouts which we don't want to log
327-
joinNextUser();
328-
})
336+
try {
337+
// Inject a join to connect the user. We wait up til the delay time,
338+
// and then just connect the next user.
339+
// If the user connects *faster* than the delay time, then we need
340+
// to delay for the remainder.
341+
const delayPromise = promiseutil.delay(floodDelayMs);
342+
await Promise.race([
343+
delayPromise,
344+
this.injectJoinFn(entry.roomId, entry.userId, entry.displayName, entry.frontier),
345+
]);
346+
await delayPromise;
347+
}
348+
catch (ex) {
349+
// injectJoinFn may fail due to failure to get a client (user may be banned)
350+
// or any other reason, we should continue to iterate regardless
351+
log.debug(`Failed to inject join for ${entry.userId} ${entry.roomId}`, ex);
352+
}
329353
}
330-
331-
joinNextUser();
332-
333-
return d.promise;
354+
log.info("Joining all Matrix users in all channel rooms. (%sms)", Date.now() - start);
334355
}
335356

336357
public leaveIrcUsersFromRooms(rooms: RoomInfo[]) {

src/irc/ClientPool.ts

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,33 @@ export class ClientPool {
128128
return this.botClients.get(server.domain);
129129
}
130130

131+
/**
132+
* Discover any clients connected from a previous session when
133+
* using the pool
134+
*/
135+
public async discoverPoolConnectedClients() {
136+
if (!this.redisPool) {
137+
return;
138+
}
139+
140+
for await (const connection of this.redisPool.getPreviouslyConnectedClients()) {
141+
if (connection.clientId === 'bot') {
142+
// The bot will be connected via the usual process.
143+
continue;
144+
}
145+
// XXX: This is a safe assumption *for now* but when the proxy supports multiple
146+
// servers this will break!
147+
const server = this.ircBridge.getServers()[0];
148+
try {
149+
await this.getBridgedClient(server, connection.clientId);
150+
log.info(`Connected previously connected user ${connection.clientId}`);
151+
}
152+
catch (ex) {
153+
log.warn(`Failed to connect ${connection.clientId}, who is connected through the proxy`, ex);
154+
}
155+
}
156+
}
157+
131158
public async loginToServer(server: IrcServer): Promise<BridgedClient> {
132159
let bridgedClient = this.getBot(server);
133160
if (!bridgedClient) {

src/pool-service/IrcPoolClient.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,25 @@ export class IrcPoolClient extends (EventEmitter as unknown as new () => TypedEm
5353
log.debug(`Sent command in ${type}: ${payload}`);
5454
}
5555

56+
57+
public async *getPreviouslyConnectedClients(): AsyncGenerator<RedisIrcConnection> {
58+
let count = 0;
59+
for (const [clientId, clientAddressPair] of
60+
Object.entries(await this.redis.hgetall(REDIS_IRC_POOL_CONNECTIONS))) {
61+
const [, address, portStr] = /(.+):(\d+)/.exec(clientAddressPair) || [];
62+
63+
// Doing this here allows us to frontload the work that would be done in createOrGetIrcSocket
64+
const state = await IrcClientRedisState.create(this.redis, clientId, false);
65+
const connection = new RedisIrcConnection(this, clientId, state);
66+
const port = parseInt(portStr);
67+
connection.setConnectionInfo({ localPort: port, localIp: address, clientId });
68+
this.connections.set(clientId, Promise.resolve(connection));
69+
yield connection;
70+
count++;
71+
}
72+
log.info(`Found ${count} previously connected clients`);
73+
}
74+
5675
public async createOrGetIrcSocket(clientId: string, netOpts: ConnectionCreateArgs): Promise<RedisIrcConnection> {
5776
const existingConnection = this.connections.get(clientId);
5877
if (existingConnection) {

src/promiseutil.ts

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,3 @@ export function defer<T>(): Defer<T> {
4444
export function delay(ms: number): Promise<void> {
4545
return new Promise(resolve => setTimeout(resolve, ms));
4646
}
47-
48-
/**
49-
* Waits for all Promises to be "settled". A Promise is settled when
50-
* it resolves or rejects.
51-
*/
52-
export function allSettled<T>(promises: Bluebird<T>[]): Promise<Bluebird.Inspection<T>[]> {
53-
return Promise.all(promises.map(p => p.reflect()));
54-
}

0 commit comments

Comments
 (0)