Skip to content

Commit 32e08e0

Browse files
committed
New jsonl parser implementation and benchmark
1 parent 70bd215 commit 32e08e0

File tree

2 files changed

+121
-2
lines changed

2 files changed

+121
-2
lines changed

extensions/ql-vscode/src/common/jsonl-reader.ts

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { statSync } from "fs";
1+
import { stat } from "fs/promises";
22
import { createReadStream } from "fs-extra";
33
import { createInterface } from "readline";
44

@@ -33,7 +33,7 @@ export async function readJsonlFile<T>(
3333
}
3434

3535
void logger?.log(
36-
`Parsing ${path} (${statSync(path).size / 1024 / 1024} MB)...`,
36+
`Parsing ${path} (${(await stat(path)).size / 1024 / 1024} MB)...`,
3737
);
3838
const fileStream = createReadStream(path, "utf8");
3939
const rl = createInterface({
@@ -64,3 +64,48 @@ export async function readJsonlFile<T>(
6464
}
6565
logProgress();
6666
}
67+
68+
const doubleLineBreakRegexp = /\n\r?\n/;
69+
70+
export async function readJsonlFile2<T>(
71+
path: string,
72+
handler: (value: T) => Promise<void>,
73+
logger?: { log: (message: string) => void },
74+
): Promise<void> {
75+
void logger?.log(
76+
`Parsing ${path} (${(await stat(path)).size / 1024 / 1024} MB)...`,
77+
);
78+
return new Promise((resolve, reject) => {
79+
const stream = createReadStream(path, { encoding: "utf8" });
80+
let buffer = "";
81+
stream.on("data", async (chunk: string) => {
82+
const parts = (buffer + chunk).split(doubleLineBreakRegexp);
83+
buffer = parts.pop()!;
84+
if (parts.length > 0) {
85+
try {
86+
stream.pause();
87+
for (const part of parts) {
88+
await handler(JSON.parse(part));
89+
}
90+
stream.resume();
91+
} catch (e) {
92+
stream.destroy();
93+
reject(e);
94+
}
95+
}
96+
});
97+
stream.on("end", async () => {
98+
if (buffer.trim().length > 0) {
99+
try {
100+
await handler(JSON.parse(buffer));
101+
} catch (e) {
102+
reject(e);
103+
return;
104+
}
105+
}
106+
void logger?.log(`Finishing parsing ${path}`);
107+
resolve();
108+
});
109+
stream.on("error", reject);
110+
});
111+
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
import { readFile } from "fs-extra";
2+
import { readJsonlFile, readJsonlFile2 } from "../../src/common/jsonl-reader";
3+
import { performance } from "perf_hooks";
4+
import { join } from "path";
5+
6+
/** An "obviously correct" implementation to test against. */
7+
async function readJsonlReferenceImpl<T>(
8+
path: string,
9+
handler: (value: T) => Promise<void>,
10+
): Promise<void> {
11+
const logSummary = await readFile(path, "utf-8");
12+
13+
// Remove newline delimiters because summary is in .jsonl format.
14+
const jsonSummaryObjects: string[] = logSummary.split(/\r?\n\r?\n/g);
15+
16+
for (const obj of jsonSummaryObjects) {
17+
const jsonObj = JSON.parse(obj) as T;
18+
await handler(jsonObj);
19+
}
20+
}
21+
22+
type ParserFn = (
23+
text: string,
24+
callback: (v: unknown) => Promise<void>,
25+
) => Promise<void>;
26+
27+
const parsers: Record<string, ParserFn> = {
28+
readJsonlReferenceImpl,
29+
readJsonlFile,
30+
readJsonlFile2,
31+
};
32+
33+
async function main() {
34+
const args = process.argv.slice(2);
35+
const file =
36+
args.length > 0
37+
? args[0]
38+
: join(
39+
__dirname,
40+
"../unit-tests/data/evaluator-log-summaries/bad-join-order.jsonl",
41+
);
42+
const numTrials = args.length > 1 ? Number(args[1]) : 100;
43+
const referenceValues: any[] = [];
44+
await readJsonlReferenceImpl(file, async (event) => {
45+
referenceValues.push(event);
46+
});
47+
const referenceValueString = JSON.stringify(referenceValues);
48+
// Do warm-up runs and check against reference implementation
49+
for (const [name, parser] of Object.entries(parsers)) {
50+
const values: unknown[] = [];
51+
await parser(file, async (event) => {
52+
values.push(event);
53+
});
54+
if (JSON.stringify(values) !== referenceValueString) {
55+
console.error(`${name}: failed to match reference implementation`);
56+
}
57+
}
58+
for (const [name, parser] of Object.entries(parsers)) {
59+
const startTime = performance.now();
60+
for (let i = 0; i < numTrials; ++i) {
61+
await Promise.all([
62+
parser(file, async () => {}),
63+
parser(file, async () => {}),
64+
]);
65+
}
66+
const duration = performance.now() - startTime;
67+
const durationPerTrial = duration / numTrials;
68+
console.log(`${name}: ${durationPerTrial.toFixed(1)} ms`);
69+
}
70+
}
71+
72+
main().catch((err: unknown) => {
73+
console.error(err);
74+
});

0 commit comments

Comments
 (0)