diff --git a/package-lock.json b/package-lock.json index 2b46771..bc1393c 100644 --- a/package-lock.json +++ b/package-lock.json @@ -9,6 +9,7 @@ "version": "0.1.0", "license": "AGPL-3.0-or-later", "dependencies": { + "@supercharge/promise-pool": "^3.2.0", "@types/showdown": "^2.0.6", "axios": "^1.7.2", "dotenv": "^16.4.5", @@ -1340,6 +1341,15 @@ "version": "1.2.5", "license": "MIT" }, + "node_modules/@supercharge/promise-pool": { + "version": "3.2.0", + "resolved": "https://registry.npmjs.org/@supercharge/promise-pool/-/promise-pool-3.2.0.tgz", + "integrity": "sha512-pj0cAALblTZBPtMltWOlZTQSLT07jIaFNeM8TWoJD1cQMgDB9mcMlVMoetiB35OzNJpqQ2b+QEtwiR9f20mADg==", + "license": "MIT", + "engines": { + "node": ">=8" + } + }, "node_modules/@types/babel__core": { "version": "7.20.5", "dev": true, diff --git a/package.json b/package.json index 7e40ec8..020cb40 100644 --- a/package.json +++ b/package.json @@ -49,6 +49,7 @@ "typescript": "^5.5.4" }, "dependencies": { + "@supercharge/promise-pool": "^3.2.0", "@types/showdown": "^2.0.6", "axios": "^1.7.2", "dotenv": "^16.4.5", diff --git a/src/app.ts b/src/app.ts index 3e6611e..b7c5ab0 100644 --- a/src/app.ts +++ b/src/app.ts @@ -3,11 +3,12 @@ dotenv.config() import { AxiosError } from 'axios' import lineByLine from 'n-readlines' import { exit } from 'node:process' +import { PromisePool } from '@supercharge/promise-pool' import 'reflect-metadata' import { Entity, entities } from './Entities' import { handleDirectChats } from './handlers/directChats' import { handleRoomMemberships } from './handlers/handleRoomMemberships' -import { handle as handleMessage } from './handlers/messages' +import { handle as handleMessage, RcMessage } from './handlers/messages' import { handlePinnedMessages } from './handlers/pinnedMessages' import { handle as handleRoom } from './handlers/rooms' import { handle as handleUser } from './handlers/users' @@ -17,32 +18,74 @@ import { whoami } from './helpers/synapse' log.info('rocketchat2matrix starts.') +/** + * Reads a file line by line, parses them to JSON and yields the JSON object + * This is needed because lineByLine isn't implemented as an iterator + * @param path The path of the file + */ +async function* jsonIterator(path: string) { + const rl = new lineByLine(path) + let line: false | Buffer + while ((line = rl.next())) { + yield JSON.parse(line.toString()) + } +} + /** * Reads a file line by line and handles the lines parsed to JSON according to the expected type * @param entity The Entity with it's file name and type definitions */ async function loadRcExport(entity: Entity) { - const rl = new lineByLine(`./inputs/${entities[entity].filename}`) + const concurrency = parseInt(process.env.CONCURRENCY_LIMIT || '50') + let messagesPerRoom: Map = new Map() + const messageBatchSize = parseInt(process.env.MESSAGE_BATCH_SIZE || '1000000') + const jsonItems = jsonIterator(`./inputs/${entities[entity].filename}`) - let line: false | Buffer - while ((line = rl.next())) { - const item = JSON.parse(line.toString()) - switch (entity) { - case Entity.Users: - await handleUser(item) - break + switch (entity) { + case Entity.Users: + await PromisePool.withConcurrency(concurrency) + .for(jsonItems) + .process((item) => handleUser(item)) + break - case Entity.Rooms: - await handleRoom(item) - break + case Entity.Rooms: + await PromisePool.withConcurrency(concurrency) + .for(jsonItems) + .process((item) => handleRoom(item)) + break - case Entity.Messages: - await handleMessage(item) - break + case Entity.Messages: + let i = 0 + for await (const item of jsonItems) { + if (messagesPerRoom.has(item.rid)) { + messagesPerRoom.get(item.rid)?.push(item) + } else { + messagesPerRoom.set(item.rid, [item]) + } + if (i % messageBatchSize === 0) { + await PromisePool.withConcurrency(concurrency) + .for(messagesPerRoom.values()) + .process(async (room) => { + for (const item of room) { + await handleMessage(item) + } + }) + messagesPerRoom = new Map() + } + i++ + } + // handle messages again for the last (incomplete) batch + await PromisePool.withConcurrency(concurrency) + .for(messagesPerRoom.values()) + .process(async (room) => { + for (const item of room) { + await handleMessage(item) + } + }) + break - default: - throw new Error(`Unhandled Entity: ${entity}`) - } + default: + throw new Error(`Unhandled Entity: ${entity}`) } }