Skip to content

Commit b840c38

Browse files
authored
Merge pull request #3831 from asgerf/asgerf/streaming-jsonl
Add streaming 'jsonl' parser
2 parents 46c284d + 57e2b51 commit b840c38

File tree

2 files changed

+130
-13
lines changed

2 files changed

+130
-13
lines changed
Lines changed: 43 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,56 @@
1-
import { readFile } from "fs-extra";
1+
import { stat } from "fs/promises";
2+
import { createReadStream } from "fs-extra";
3+
import type { BaseLogger } from "./logging";
4+
5+
const doubleLineBreakRegexp = /\n\r?\n/;
26

37
/**
48
* Read a file consisting of multiple JSON objects. Each object is separated from the previous one
59
* by a double newline sequence. This is basically a more human-readable form of JSONL.
610
*
7-
* The current implementation reads the entire text of the document into memory, but in the future
8-
* it will stream the document to improve the performance with large documents.
9-
*
1011
* @param path The path to the file.
1112
* @param handler Callback to be invoked for each top-level JSON object in order.
1213
*/
1314
export async function readJsonlFile<T>(
1415
path: string,
1516
handler: (value: T) => Promise<void>,
17+
logger?: BaseLogger,
1618
): Promise<void> {
17-
const logSummary = await readFile(path, "utf-8");
18-
19-
// Remove newline delimiters because summary is in .jsonl format.
20-
const jsonSummaryObjects: string[] = logSummary.split(/\r?\n\r?\n/g);
21-
22-
for (const obj of jsonSummaryObjects) {
23-
const jsonObj = JSON.parse(obj) as T;
24-
await handler(jsonObj);
25-
}
19+
// Stream the data as large evaluator logs won't fit in memory.
20+
// Also avoid using 'readline' as it is slower than our manual line splitting.
21+
void logger?.log(
22+
`Parsing ${path} (${(await stat(path)).size / 1024 / 1024} MB)...`,
23+
);
24+
return new Promise((resolve, reject) => {
25+
const stream = createReadStream(path, { encoding: "utf8" });
26+
let buffer = "";
27+
stream.on("data", async (chunk: string) => {
28+
const parts = (buffer + chunk).split(doubleLineBreakRegexp);
29+
buffer = parts.pop()!;
30+
if (parts.length > 0) {
31+
try {
32+
stream.pause();
33+
for (const part of parts) {
34+
await handler(JSON.parse(part));
35+
}
36+
stream.resume();
37+
} catch (e) {
38+
stream.destroy();
39+
reject(e);
40+
}
41+
}
42+
});
43+
stream.on("end", async () => {
44+
try {
45+
if (buffer.trim().length > 0) {
46+
await handler(JSON.parse(buffer));
47+
}
48+
void logger?.log(`Finished parsing ${path}`);
49+
resolve();
50+
} catch (e) {
51+
reject(e);
52+
}
53+
});
54+
stream.on("error", reject);
55+
});
2656
}
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/**
2+
* Benchmarks the jsonl-parser against a reference implementation and checks that it generates
3+
* the same output.
4+
*
5+
* Usage:
6+
*
7+
* ts-node json-reader.bench.ts [evaluator-log.summary.jsonl] [count]
8+
*
9+
* The log file defaults to a small checked-in log and count defaults to 100
10+
* (and should be lowered significantly for large files).
11+
*
12+
* At the time of writing it is about as fast as the synchronous reference implementation,
13+
* but doesn't run out of memory for large files.
14+
*/
15+
import { readFile } from "fs-extra";
16+
import { readJsonlFile } from "../../src/common/jsonl-reader";
17+
import { performance } from "perf_hooks";
18+
import { join } from "path";
19+
20+
/** An "obviously correct" implementation to test against. */
21+
async function readJsonlReferenceImpl<T>(
22+
path: string,
23+
handler: (value: T) => Promise<void>,
24+
): Promise<void> {
25+
const logSummary = await readFile(path, "utf-8");
26+
27+
// Remove newline delimiters because summary is in .jsonl format.
28+
const jsonSummaryObjects: string[] = logSummary.split(/\r?\n\r?\n/g);
29+
30+
for (const obj of jsonSummaryObjects) {
31+
const jsonObj = JSON.parse(obj) as T;
32+
await handler(jsonObj);
33+
}
34+
}
35+
36+
type ParserFn = (
37+
text: string,
38+
callback: (v: unknown) => Promise<void>,
39+
) => Promise<void>;
40+
41+
const parsers: Record<string, ParserFn> = {
42+
readJsonlReferenceImpl,
43+
readJsonlFile,
44+
};
45+
46+
async function main() {
47+
const args = process.argv.slice(2);
48+
const file =
49+
args.length > 0
50+
? args[0]
51+
: join(
52+
__dirname,
53+
"../unit-tests/data/evaluator-log-summaries/bad-join-order.jsonl",
54+
);
55+
const numTrials = args.length > 1 ? Number(args[1]) : 100;
56+
const referenceValues: any[] = [];
57+
await readJsonlReferenceImpl(file, async (event) => {
58+
referenceValues.push(event);
59+
});
60+
const referenceValueString = JSON.stringify(referenceValues);
61+
// Do warm-up runs and check against reference implementation
62+
for (const [name, parser] of Object.entries(parsers)) {
63+
const values: unknown[] = [];
64+
await parser(file, async (event) => {
65+
values.push(event);
66+
});
67+
if (JSON.stringify(values) !== referenceValueString) {
68+
console.error(`${name}: failed to match reference implementation`);
69+
}
70+
}
71+
for (const [name, parser] of Object.entries(parsers)) {
72+
const startTime = performance.now();
73+
for (let i = 0; i < numTrials; ++i) {
74+
await Promise.all([
75+
parser(file, async () => {}),
76+
parser(file, async () => {}),
77+
]);
78+
}
79+
const duration = performance.now() - startTime;
80+
const durationPerTrial = duration / numTrials;
81+
console.log(`${name}: ${durationPerTrial.toFixed(1)} ms`);
82+
}
83+
}
84+
85+
main().catch((err: unknown) => {
86+
console.error(err);
87+
});

0 commit comments

Comments
 (0)