diff --git a/src/BridgedRoom.ts b/src/BridgedRoom.ts index 126d6815..59d8ec5b 100644 --- a/src/BridgedRoom.ts +++ b/src/BridgedRoom.ts @@ -159,11 +159,17 @@ export class BridgedRoom { public MatrixRoomActive: boolean; private recentSlackMessages: string[] = []; - private slackSendLock: Promise = Promise.resolve(); + private bridgingQueue: Promise = Promise.resolve(); private waitingForJoin?: Promise; private waitingForJoinResolve?: () => void; + private async pushToBridgingQueue(fn: () => Promise): Promise { + return new Promise((resolve) => { + this.bridgingQueue = this.bridgingQueue.finally(() => fn().then(resolve)); + }); + } + /** * True if this instance has changed from the version last read/written to the RoomStore. */ @@ -528,7 +534,6 @@ export class BridgedRoom { body.as_user = true; delete body.username; } - let res: ChatPostMessageResponse; const chatPostMessageArgs = { ...body, // Ensure that text is defined, even for attachments. @@ -537,21 +542,25 @@ export class BridgedRoom { unfurl_links: true, }; - try { - res = await slackClient.chat.postMessage(chatPostMessageArgs) as ChatPostMessageResponse; - } catch (ex) { - const platformError = ex as WebAPIPlatformError; - if (platformError.data?.error === "not_in_channel") { - await slackClient.conversations.join({ - channel: chatPostMessageArgs.channel, - }); + const res = await this.pushToBridgingQueue(async () => { + let res: ChatPostMessageResponse; + try { res = await slackClient.chat.postMessage(chatPostMessageArgs) as ChatPostMessageResponse; - } else { - throw ex; + } catch (ex) { + const platformError = ex as WebAPIPlatformError; + if (platformError.data?.error === "not_in_channel") { + await slackClient.conversations.join({ + channel: chatPostMessageArgs.channel, + }); + res = await slackClient.chat.postMessage(chatPostMessageArgs) as ChatPostMessageResponse; + } else { + throw ex; + } } - } - this.addRecentSlackMessage(res.ts); + this.addRecentSlackMessage(res.ts); + return res; + }); this.main.incCounter(METRIC_SENT_MESSAGES, {side: "remote"}); // Log activity, but don't await the answer or throw errors @@ -710,7 +719,7 @@ export class BridgedRoom { if (ghostChanged) { await this.main.fixDMMetadata(this, ghost); } - this.slackSendLock = this.slackSendLock.then(() => { + await this.pushToBridgingQueue(async () => { // Check again if (this.recentSlackMessages.includes(message.ts)) { // We sent this, ignore @@ -720,7 +729,6 @@ export class BridgedRoom { log.warn(`Failed to handle slack message ${message.ts} for ${this.MatrixRoomId} ${this.slackChannelId}`, ex); }); }); - await this.slackSendLock; } catch (err) { log.error("Failed to process event"); log.error(err);