diff --git a/extensions/ql-vscode/src/common/jsonl-reader.ts b/extensions/ql-vscode/src/common/jsonl-reader.ts index 9151af7c563..b1d4932f8ce 100644 --- a/extensions/ql-vscode/src/common/jsonl-reader.ts +++ b/extensions/ql-vscode/src/common/jsonl-reader.ts @@ -1,6 +1,7 @@ -import { statSync } from "fs"; +import { stat } from "fs/promises"; import { createReadStream } from "fs-extra"; -import { createInterface } from "readline"; + +const doubleLineBreakRegexp = /\n\r?\n/; /** * Read a file consisting of multiple JSON objects. Each object is separated from the previous one @@ -14,53 +15,40 @@ export async function readJsonlFile( handler: (value: T) => Promise, logger?: { log: (message: string) => void }, ): Promise { - function parseJsonFromCurrentLines() { - try { - return JSON.parse(currentLineSequence.join("\n")) as T; - } catch (e) { - void logger?.log( - // eslint-disable-next-line @typescript-eslint/no-explicit-any - `Error: Failed to parse at line ${lineCount} of ${path} as JSON: ${(e as any)?.message ?? "UNKNOWN REASON"}. Problematic line below:\n${JSON.stringify(currentLineSequence, null, 2)}`, - ); - throw e; - } - } - - function logProgress() { - void logger?.log( - `Processed ${lineCount} lines with ${parseCounts} parses...`, - ); - } - void logger?.log( - `Parsing ${path} (${statSync(path).size / 1024 / 1024} MB)...`, + `Parsing ${path} (${(await stat(path)).size / 1024 / 1024} MB)...`, ); - const fileStream = createReadStream(path, "utf8"); - const rl = createInterface({ - input: fileStream, - crlfDelay: Infinity, + return new Promise((resolve, reject) => { + const stream = createReadStream(path, { encoding: "utf8" }); + let buffer = ""; + stream.on("data", async (chunk: string) => { + const parts = (buffer + chunk).split(doubleLineBreakRegexp); + buffer = parts.pop()!; + if (parts.length > 0) { + try { + stream.pause(); + for (const part of parts) { + await handler(JSON.parse(part)); + } + stream.resume(); + } catch (e) { + stream.destroy(); + reject(e); + } + } + }); + stream.on("end", async () => { + if (buffer.trim().length > 0) { + try { + await handler(JSON.parse(buffer)); + } catch (e) { + reject(e); + return; + } + } + void logger?.log(`Finishing parsing ${path}`); + resolve(); + }); + stream.on("error", reject); }); - - let lineCount = 0; - let parseCounts = 0; - let currentLineSequence: string[] = []; - for await (const line of rl) { - if (line === "") { - // as mentioned above: a double newline sequence indicates the end of the current JSON object, so we parse it and pass it to the handler - await handler(parseJsonFromCurrentLines()); - parseCounts++; - currentLineSequence = []; - } else { - currentLineSequence.push(line); - } - lineCount++; - if (lineCount % 1000000 === 0) { - logProgress(); - } - } - // in case the file is not newline-terminated, we need to handle the last JSON object - if (currentLineSequence.length > 0) { - await handler(parseJsonFromCurrentLines()); - } - logProgress(); } diff --git a/extensions/ql-vscode/test/benchmarks/jsonl-reader.bench.ts b/extensions/ql-vscode/test/benchmarks/jsonl-reader.bench.ts new file mode 100644 index 00000000000..0d61e9d7197 --- /dev/null +++ b/extensions/ql-vscode/test/benchmarks/jsonl-reader.bench.ts @@ -0,0 +1,73 @@ +import { readFile } from "fs-extra"; +import { readJsonlFile } from "../../src/common/jsonl-reader"; +import { performance } from "perf_hooks"; +import { join } from "path"; + +/** An "obviously correct" implementation to test against. */ +async function readJsonlReferenceImpl( + path: string, + handler: (value: T) => Promise, +): Promise { + const logSummary = await readFile(path, "utf-8"); + + // Remove newline delimiters because summary is in .jsonl format. + const jsonSummaryObjects: string[] = logSummary.split(/\r?\n\r?\n/g); + + for (const obj of jsonSummaryObjects) { + const jsonObj = JSON.parse(obj) as T; + await handler(jsonObj); + } +} + +type ParserFn = ( + text: string, + callback: (v: unknown) => Promise, +) => Promise; + +const parsers: Record = { + readJsonlReferenceImpl, + readJsonlFile, +}; + +async function main() { + const args = process.argv.slice(2); + const file = + args.length > 0 + ? args[0] + : join( + __dirname, + "../unit-tests/data/evaluator-log-summaries/bad-join-order.jsonl", + ); + const numTrials = args.length > 1 ? Number(args[1]) : 100; + const referenceValues: any[] = []; + await readJsonlReferenceImpl(file, async (event) => { + referenceValues.push(event); + }); + const referenceValueString = JSON.stringify(referenceValues); + // Do warm-up runs and check against reference implementation + for (const [name, parser] of Object.entries(parsers)) { + const values: unknown[] = []; + await parser(file, async (event) => { + values.push(event); + }); + if (JSON.stringify(values) !== referenceValueString) { + console.error(`${name}: failed to match reference implementation`); + } + } + for (const [name, parser] of Object.entries(parsers)) { + const startTime = performance.now(); + for (let i = 0; i < numTrials; ++i) { + await Promise.all([ + parser(file, async () => {}), + parser(file, async () => {}), + ]); + } + const duration = performance.now() - startTime; + const durationPerTrial = duration / numTrials; + console.log(`${name}: ${durationPerTrial.toFixed(1)} ms`); + } +} + +main().catch((err: unknown) => { + console.error(err); +});