Skip to content

Commit 93dacb0

Browse files
authored
Merge pull request #3829 from asgerf/asgerf/jsonl-parser2
Faster streaming 'jsonl' parser
2 parents a5fb820 + 53a6f00 commit 93dacb0

File tree

2 files changed

+109
-48
lines changed

2 files changed

+109
-48
lines changed
Lines changed: 36 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1-
import { statSync } from "fs";
1+
import { stat } from "fs/promises";
22
import { createReadStream } from "fs-extra";
3-
import { createInterface } from "readline";
3+
4+
const doubleLineBreakRegexp = /\n\r?\n/;
45

56
/**
67
* Read a file consisting of multiple JSON objects. Each object is separated from the previous one
@@ -14,53 +15,40 @@ export async function readJsonlFile<T>(
1415
handler: (value: T) => Promise<void>,
1516
logger?: { log: (message: string) => void },
1617
): Promise<void> {
17-
function parseJsonFromCurrentLines() {
18-
try {
19-
return JSON.parse(currentLineSequence.join("\n")) as T;
20-
} catch (e) {
21-
void logger?.log(
22-
// eslint-disable-next-line @typescript-eslint/no-explicit-any
23-
`Error: Failed to parse at line ${lineCount} of ${path} as JSON: ${(e as any)?.message ?? "UNKNOWN REASON"}. Problematic line below:\n${JSON.stringify(currentLineSequence, null, 2)}`,
24-
);
25-
throw e;
26-
}
27-
}
28-
29-
function logProgress() {
30-
void logger?.log(
31-
`Processed ${lineCount} lines with ${parseCounts} parses...`,
32-
);
33-
}
34-
3518
void logger?.log(
36-
`Parsing ${path} (${statSync(path).size / 1024 / 1024} MB)...`,
19+
`Parsing ${path} (${(await stat(path)).size / 1024 / 1024} MB)...`,
3720
);
38-
const fileStream = createReadStream(path, "utf8");
39-
const rl = createInterface({
40-
input: fileStream,
41-
crlfDelay: Infinity,
21+
return new Promise((resolve, reject) => {
22+
const stream = createReadStream(path, { encoding: "utf8" });
23+
let buffer = "";
24+
stream.on("data", async (chunk: string) => {
25+
const parts = (buffer + chunk).split(doubleLineBreakRegexp);
26+
buffer = parts.pop()!;
27+
if (parts.length > 0) {
28+
try {
29+
stream.pause();
30+
for (const part of parts) {
31+
await handler(JSON.parse(part));
32+
}
33+
stream.resume();
34+
} catch (e) {
35+
stream.destroy();
36+
reject(e);
37+
}
38+
}
39+
});
40+
stream.on("end", async () => {
41+
if (buffer.trim().length > 0) {
42+
try {
43+
await handler(JSON.parse(buffer));
44+
} catch (e) {
45+
reject(e);
46+
return;
47+
}
48+
}
49+
void logger?.log(`Finishing parsing ${path}`);
50+
resolve();
51+
});
52+
stream.on("error", reject);
4253
});
43-
44-
let lineCount = 0;
45-
let parseCounts = 0;
46-
let currentLineSequence: string[] = [];
47-
for await (const line of rl) {
48-
if (line === "") {
49-
// as mentioned above: a double newline sequence indicates the end of the current JSON object, so we parse it and pass it to the handler
50-
await handler(parseJsonFromCurrentLines());
51-
parseCounts++;
52-
currentLineSequence = [];
53-
} else {
54-
currentLineSequence.push(line);
55-
}
56-
lineCount++;
57-
if (lineCount % 1000000 === 0) {
58-
logProgress();
59-
}
60-
}
61-
// in case the file is not newline-terminated, we need to handle the last JSON object
62-
if (currentLineSequence.length > 0) {
63-
await handler(parseJsonFromCurrentLines());
64-
}
65-
logProgress();
6654
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
import { readFile } from "fs-extra";
2+
import { readJsonlFile } from "../../src/common/jsonl-reader";
3+
import { performance } from "perf_hooks";
4+
import { join } from "path";
5+
6+
/** An "obviously correct" implementation to test against. */
7+
async function readJsonlReferenceImpl<T>(
8+
path: string,
9+
handler: (value: T) => Promise<void>,
10+
): Promise<void> {
11+
const logSummary = await readFile(path, "utf-8");
12+
13+
// Remove newline delimiters because summary is in .jsonl format.
14+
const jsonSummaryObjects: string[] = logSummary.split(/\r?\n\r?\n/g);
15+
16+
for (const obj of jsonSummaryObjects) {
17+
const jsonObj = JSON.parse(obj) as T;
18+
await handler(jsonObj);
19+
}
20+
}
21+
22+
type ParserFn = (
23+
text: string,
24+
callback: (v: unknown) => Promise<void>,
25+
) => Promise<void>;
26+
27+
const parsers: Record<string, ParserFn> = {
28+
readJsonlReferenceImpl,
29+
readJsonlFile,
30+
};
31+
32+
async function main() {
33+
const args = process.argv.slice(2);
34+
const file =
35+
args.length > 0
36+
? args[0]
37+
: join(
38+
__dirname,
39+
"../unit-tests/data/evaluator-log-summaries/bad-join-order.jsonl",
40+
);
41+
const numTrials = args.length > 1 ? Number(args[1]) : 100;
42+
const referenceValues: any[] = [];
43+
await readJsonlReferenceImpl(file, async (event) => {
44+
referenceValues.push(event);
45+
});
46+
const referenceValueString = JSON.stringify(referenceValues);
47+
// Do warm-up runs and check against reference implementation
48+
for (const [name, parser] of Object.entries(parsers)) {
49+
const values: unknown[] = [];
50+
await parser(file, async (event) => {
51+
values.push(event);
52+
});
53+
if (JSON.stringify(values) !== referenceValueString) {
54+
console.error(`${name}: failed to match reference implementation`);
55+
}
56+
}
57+
for (const [name, parser] of Object.entries(parsers)) {
58+
const startTime = performance.now();
59+
for (let i = 0; i < numTrials; ++i) {
60+
await Promise.all([
61+
parser(file, async () => {}),
62+
parser(file, async () => {}),
63+
]);
64+
}
65+
const duration = performance.now() - startTime;
66+
const durationPerTrial = duration / numTrials;
67+
console.log(`${name}: ${durationPerTrial.toFixed(1)} ms`);
68+
}
69+
}
70+
71+
main().catch((err: unknown) => {
72+
console.error(err);
73+
});

0 commit comments

Comments
 (0)