From 2aa2626f5c5adf7594a7148894104727f26e8102 Mon Sep 17 00:00:00 2001 From: Tunui Franken Date: Fri, 26 Jul 2024 10:11:08 +0200 Subject: [PATCH 1/8] Use PromisePool to add concurrency to users and rooms --- package-lock.json | 10 ++++++++++ package.json | 1 + src/app.ts | 16 ++++++++++++++-- 3 files changed, 25 insertions(+), 2 deletions(-) diff --git a/package-lock.json b/package-lock.json index 2b46771..bc1393c 100644 --- a/package-lock.json +++ b/package-lock.json @@ -9,6 +9,7 @@ "version": "0.1.0", "license": "AGPL-3.0-or-later", "dependencies": { + "@supercharge/promise-pool": "^3.2.0", "@types/showdown": "^2.0.6", "axios": "^1.7.2", "dotenv": "^16.4.5", @@ -1340,6 +1341,15 @@ "version": "1.2.5", "license": "MIT" }, + "node_modules/@supercharge/promise-pool": { + "version": "3.2.0", + "resolved": "https://registry.npmjs.org/@supercharge/promise-pool/-/promise-pool-3.2.0.tgz", + "integrity": "sha512-pj0cAALblTZBPtMltWOlZTQSLT07jIaFNeM8TWoJD1cQMgDB9mcMlVMoetiB35OzNJpqQ2b+QEtwiR9f20mADg==", + "license": "MIT", + "engines": { + "node": ">=8" + } + }, "node_modules/@types/babel__core": { "version": "7.20.5", "dev": true, diff --git a/package.json b/package.json index 7e40ec8..020cb40 100644 --- a/package.json +++ b/package.json @@ -49,6 +49,7 @@ "typescript": "^5.5.4" }, "dependencies": { + "@supercharge/promise-pool": "^3.2.0", "@types/showdown": "^2.0.6", "axios": "^1.7.2", "dotenv": "^16.4.5", diff --git a/src/app.ts b/src/app.ts index 3e6611e..f3931a3 100644 --- a/src/app.ts +++ b/src/app.ts @@ -3,6 +3,7 @@ dotenv.config() import { AxiosError } from 'axios' import lineByLine from 'n-readlines' import { exit } from 'node:process' +import { PromisePool } from '@supercharge/promise-pool' import 'reflect-metadata' import { Entity, entities } from './Entities' import { handleDirectChats } from './handlers/directChats' @@ -22,6 +23,9 @@ log.info('rocketchat2matrix starts.') * @param entity The Entity with it's file name and type definitions */ async function loadRcExport(entity: Entity) { + const concurrency = parseInt(process.env.CONCURRENCY_LIMIT || '50') + const user_queue = [] + const room_queue = [] const rl = new lineByLine(`./inputs/${entities[entity].filename}`) let line: false | Buffer @@ -29,11 +33,11 @@ async function loadRcExport(entity: Entity) { const item = JSON.parse(line.toString()) switch (entity) { case Entity.Users: - await handleUser(item) + user_queue.push(item) break case Entity.Rooms: - await handleRoom(item) + room_queue.push(item) break case Entity.Messages: @@ -44,6 +48,14 @@ async function loadRcExport(entity: Entity) { throw new Error(`Unhandled Entity: ${entity}`) } } + + await PromisePool.withConcurrency(concurrency) + .for(user_queue) + .process((item) => handleUser(item)) + + await PromisePool.withConcurrency(concurrency) + .for(room_queue) + .process((item) => handleRoom(item)) } async function main() { From c19a9ad42206cb3ddfb2264cf85f72c86cfd4e24 Mon Sep 17 00:00:00 2001 From: Tunui Franken Date: Fri, 26 Jul 2024 12:42:15 +0200 Subject: [PATCH 2/8] Use a Map to assign messages to their corresponding room for processing --- src/app.ts | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/src/app.ts b/src/app.ts index f3931a3..bf10d33 100644 --- a/src/app.ts +++ b/src/app.ts @@ -8,7 +8,7 @@ import 'reflect-metadata' import { Entity, entities } from './Entities' import { handleDirectChats } from './handlers/directChats' import { handleRoomMemberships } from './handlers/handleRoomMemberships' -import { handle as handleMessage } from './handlers/messages' +import { handle as handleMessage, RcMessage } from './handlers/messages' import { handlePinnedMessages } from './handlers/pinnedMessages' import { handle as handleRoom } from './handlers/rooms' import { handle as handleUser } from './handlers/users' @@ -26,6 +26,8 @@ async function loadRcExport(entity: Entity) { const concurrency = parseInt(process.env.CONCURRENCY_LIMIT || '50') const user_queue = [] const room_queue = [] + const messages_per_room: Map = new Map() + const rl = new lineByLine(`./inputs/${entities[entity].filename}`) let line: false | Buffer @@ -41,7 +43,11 @@ async function loadRcExport(entity: Entity) { break case Entity.Messages: - await handleMessage(item) + if (messages_per_room.has(item.rid)) { + messages_per_room.get(item.rid)?.push(item) + } else { + messages_per_room.set(item.rid, [item]) + } break default: @@ -56,6 +62,14 @@ async function loadRcExport(entity: Entity) { await PromisePool.withConcurrency(concurrency) .for(room_queue) .process((item) => handleRoom(item)) + + await PromisePool.withConcurrency(concurrency) + .for(messages_per_room.values()) + .process(async (room) => { + for (const item of room) { + await handleMessage(item) + } + }) } async function main() { From 15cda97c5752211e962e531820e46a92f68f6d1a Mon Sep 17 00:00:00 2001 From: Tunui Franken Date: Fri, 26 Jul 2024 16:45:58 +0200 Subject: [PATCH 3/8] Add types for the processing queues --- src/app.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/app.ts b/src/app.ts index bf10d33..853a9da 100644 --- a/src/app.ts +++ b/src/app.ts @@ -10,8 +10,8 @@ import { handleDirectChats } from './handlers/directChats' import { handleRoomMemberships } from './handlers/handleRoomMemberships' import { handle as handleMessage, RcMessage } from './handlers/messages' import { handlePinnedMessages } from './handlers/pinnedMessages' -import { handle as handleRoom } from './handlers/rooms' -import { handle as handleUser } from './handlers/users' +import { handle as handleRoom, RcRoom } from './handlers/rooms' +import { handle as handleUser, RcUser } from './handlers/users' import log from './helpers/logger' import { initStorage } from './helpers/storage' import { whoami } from './helpers/synapse' @@ -24,8 +24,8 @@ log.info('rocketchat2matrix starts.') */ async function loadRcExport(entity: Entity) { const concurrency = parseInt(process.env.CONCURRENCY_LIMIT || '50') - const user_queue = [] - const room_queue = [] + const user_queue: RcUser[] = [] + const room_queue: RcRoom[] = [] const messages_per_room: Map = new Map() const rl = new lineByLine(`./inputs/${entities[entity].filename}`) From 78948bc0f7875f68e583d53de754ad6e84eea840 Mon Sep 17 00:00:00 2001 From: Tunui Franken Date: Mon, 29 Jul 2024 14:42:49 +0200 Subject: [PATCH 4/8] Use camelCase for new variables --- src/app.ts | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/src/app.ts b/src/app.ts index 853a9da..3a1a53a 100644 --- a/src/app.ts +++ b/src/app.ts @@ -24,9 +24,9 @@ log.info('rocketchat2matrix starts.') */ async function loadRcExport(entity: Entity) { const concurrency = parseInt(process.env.CONCURRENCY_LIMIT || '50') - const user_queue: RcUser[] = [] - const room_queue: RcRoom[] = [] - const messages_per_room: Map = new Map() + const userQueue: RcUser[] = [] + const roomQueue: RcRoom[] = [] + const messagesPerRoom: Map = new Map() const rl = new lineByLine(`./inputs/${entities[entity].filename}`) @@ -35,18 +35,18 @@ async function loadRcExport(entity: Entity) { const item = JSON.parse(line.toString()) switch (entity) { case Entity.Users: - user_queue.push(item) + userQueue.push(item) break case Entity.Rooms: - room_queue.push(item) + roomQueue.push(item) break case Entity.Messages: - if (messages_per_room.has(item.rid)) { - messages_per_room.get(item.rid)?.push(item) + if (messagesPerRoom.has(item.rid)) { + messagesPerRoom.get(item.rid)?.push(item) } else { - messages_per_room.set(item.rid, [item]) + messagesPerRoom.set(item.rid, [item]) } break @@ -56,15 +56,15 @@ async function loadRcExport(entity: Entity) { } await PromisePool.withConcurrency(concurrency) - .for(user_queue) + .for(userQueue) .process((item) => handleUser(item)) await PromisePool.withConcurrency(concurrency) - .for(room_queue) + .for(roomQueue) .process((item) => handleRoom(item)) await PromisePool.withConcurrency(concurrency) - .for(messages_per_room.values()) + .for(messagesPerRoom.values()) .process(async (room) => { for (const item of room) { await handleMessage(item) From 9fa8e033ed0686507e4c901caa0aaadc18fd345c Mon Sep 17 00:00:00 2001 From: Tunui Franken Date: Tue, 30 Jul 2024 14:57:26 +0200 Subject: [PATCH 5/8] Move switch out of while loop, add loops for each case --- src/app.ts | 30 ++++++++++++++++++------------ 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/src/app.ts b/src/app.ts index 3a1a53a..e09f04f 100644 --- a/src/app.ts +++ b/src/app.ts @@ -31,28 +31,34 @@ async function loadRcExport(entity: Entity) { const rl = new lineByLine(`./inputs/${entities[entity].filename}`) let line: false | Buffer - while ((line = rl.next())) { - const item = JSON.parse(line.toString()) - switch (entity) { - case Entity.Users: + switch (entity) { + case Entity.Users: + while ((line = rl.next())) { + const item = JSON.parse(line.toString()) userQueue.push(item) - break + } + break - case Entity.Rooms: + case Entity.Rooms: + while ((line = rl.next())) { + const item = JSON.parse(line.toString()) roomQueue.push(item) - break + } + break - case Entity.Messages: + case Entity.Messages: + while ((line = rl.next())) { + const item = JSON.parse(line.toString()) if (messagesPerRoom.has(item.rid)) { messagesPerRoom.get(item.rid)?.push(item) } else { messagesPerRoom.set(item.rid, [item]) } - break + } + break - default: - throw new Error(`Unhandled Entity: ${entity}`) - } + default: + throw new Error(`Unhandled Entity: ${entity}`) } await PromisePool.withConcurrency(concurrency) From 64935fd815b736a1b42f6dc1cb1f5d73c301722f Mon Sep 17 00:00:00 2001 From: Tunui Franken Date: Tue, 30 Jul 2024 15:13:42 +0200 Subject: [PATCH 6/8] Move PromisePools inside each case --- src/app.ts | 29 +++++++++++++---------------- 1 file changed, 13 insertions(+), 16 deletions(-) diff --git a/src/app.ts b/src/app.ts index e09f04f..6a768e6 100644 --- a/src/app.ts +++ b/src/app.ts @@ -37,6 +37,9 @@ async function loadRcExport(entity: Entity) { const item = JSON.parse(line.toString()) userQueue.push(item) } + await PromisePool.withConcurrency(concurrency) + .for(userQueue) + .process((item) => handleUser(item)) break case Entity.Rooms: @@ -44,6 +47,9 @@ async function loadRcExport(entity: Entity) { const item = JSON.parse(line.toString()) roomQueue.push(item) } + await PromisePool.withConcurrency(concurrency) + .for(roomQueue) + .process((item) => handleRoom(item)) break case Entity.Messages: @@ -55,27 +61,18 @@ async function loadRcExport(entity: Entity) { messagesPerRoom.set(item.rid, [item]) } } + await PromisePool.withConcurrency(concurrency) + .for(messagesPerRoom.values()) + .process(async (room) => { + for (const item of room) { + await handleMessage(item) + } + }) break default: throw new Error(`Unhandled Entity: ${entity}`) } - - await PromisePool.withConcurrency(concurrency) - .for(userQueue) - .process((item) => handleUser(item)) - - await PromisePool.withConcurrency(concurrency) - .for(roomQueue) - .process((item) => handleRoom(item)) - - await PromisePool.withConcurrency(concurrency) - .for(messagesPerRoom.values()) - .process(async (room) => { - for (const item of room) { - await handleMessage(item) - } - }) } async function main() { From 7a7fdecb73c49f830c68785933f0962007162730 Mon Sep 17 00:00:00 2001 From: Tunui Franken Date: Tue, 30 Jul 2024 15:24:08 +0200 Subject: [PATCH 7/8] Wrap lineByLine in iterator to remove queues --- src/app.ts | 38 +++++++++++++++++++------------------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/src/app.ts b/src/app.ts index 6a768e6..75475f7 100644 --- a/src/app.ts +++ b/src/app.ts @@ -10,51 +10,51 @@ import { handleDirectChats } from './handlers/directChats' import { handleRoomMemberships } from './handlers/handleRoomMemberships' import { handle as handleMessage, RcMessage } from './handlers/messages' import { handlePinnedMessages } from './handlers/pinnedMessages' -import { handle as handleRoom, RcRoom } from './handlers/rooms' -import { handle as handleUser, RcUser } from './handlers/users' +import { handle as handleRoom } from './handlers/rooms' +import { handle as handleUser } from './handlers/users' import log from './helpers/logger' import { initStorage } from './helpers/storage' import { whoami } from './helpers/synapse' log.info('rocketchat2matrix starts.') +/** + * Reads a file line by line, parses them to JSON and yields the JSON object + * This is needed because lineByLine isn't implemented as an iterator + * @param path The path of the file + */ +async function* jsonIterator(path: string) { + const rl = new lineByLine(path) + let line: false | Buffer + while ((line = rl.next())) { + yield JSON.parse(line.toString()) + } +} + /** * Reads a file line by line and handles the lines parsed to JSON according to the expected type * @param entity The Entity with it's file name and type definitions */ async function loadRcExport(entity: Entity) { const concurrency = parseInt(process.env.CONCURRENCY_LIMIT || '50') - const userQueue: RcUser[] = [] - const roomQueue: RcRoom[] = [] const messagesPerRoom: Map = new Map() - const rl = new lineByLine(`./inputs/${entities[entity].filename}`) - - let line: false | Buffer + const jsonItems = jsonIterator(`./inputs/${entities[entity].filename}`) switch (entity) { case Entity.Users: - while ((line = rl.next())) { - const item = JSON.parse(line.toString()) - userQueue.push(item) - } await PromisePool.withConcurrency(concurrency) - .for(userQueue) + .for(jsonItems) .process((item) => handleUser(item)) break case Entity.Rooms: - while ((line = rl.next())) { - const item = JSON.parse(line.toString()) - roomQueue.push(item) - } await PromisePool.withConcurrency(concurrency) - .for(roomQueue) + .for(jsonItems) .process((item) => handleRoom(item)) break case Entity.Messages: - while ((line = rl.next())) { - const item = JSON.parse(line.toString()) + for await (const item of jsonItems) { if (messagesPerRoom.has(item.rid)) { messagesPerRoom.get(item.rid)?.push(item) } else { From bee1c2fc192d313cb5cefc6ff65f3d1e8246cfdb Mon Sep 17 00:00:00 2001 From: Tunui Franken Date: Thu, 15 Aug 2024 12:40:03 +0200 Subject: [PATCH 8/8] Handle messages in batches to avoid OOM kill --- src/app.ts | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/src/app.ts b/src/app.ts index 75475f7..b7c5ab0 100644 --- a/src/app.ts +++ b/src/app.ts @@ -37,9 +37,10 @@ async function* jsonIterator(path: string) { */ async function loadRcExport(entity: Entity) { const concurrency = parseInt(process.env.CONCURRENCY_LIMIT || '50') - const messagesPerRoom: Map = new Map() - + let messagesPerRoom: Map = new Map() + const messageBatchSize = parseInt(process.env.MESSAGE_BATCH_SIZE || '1000000') const jsonItems = jsonIterator(`./inputs/${entities[entity].filename}`) + switch (entity) { case Entity.Users: await PromisePool.withConcurrency(concurrency) @@ -54,13 +55,26 @@ async function loadRcExport(entity: Entity) { break case Entity.Messages: + let i = 0 for await (const item of jsonItems) { if (messagesPerRoom.has(item.rid)) { messagesPerRoom.get(item.rid)?.push(item) } else { messagesPerRoom.set(item.rid, [item]) } + if (i % messageBatchSize === 0) { + await PromisePool.withConcurrency(concurrency) + .for(messagesPerRoom.values()) + .process(async (room) => { + for (const item of room) { + await handleMessage(item) + } + }) + messagesPerRoom = new Map() + } + i++ } + // handle messages again for the last (incomplete) batch await PromisePool.withConcurrency(concurrency) .for(messagesPerRoom.values()) .process(async (room) => {