Skip to content

Commit cf29df5

Browse files
authored
Merge pull request #3817 from github/esbena/fetch-logs-through-dca
PoC: compare-performance of external logs
2 parents 27db76e + 87e4d1e commit cf29df5

File tree

3 files changed

+825
-307
lines changed

3 files changed

+825
-307
lines changed
Lines changed: 50 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,66 @@
1-
import { readFile } from "fs-extra";
1+
import { statSync } from "fs";
2+
import { createReadStream } from "fs-extra";
3+
import { createInterface } from "readline";
24

35
/**
46
* Read a file consisting of multiple JSON objects. Each object is separated from the previous one
57
* by a double newline sequence. This is basically a more human-readable form of JSONL.
68
*
7-
* The current implementation reads the entire text of the document into memory, but in the future
8-
* it will stream the document to improve the performance with large documents.
9-
*
109
* @param path The path to the file.
1110
* @param handler Callback to be invoked for each top-level JSON object in order.
1211
*/
1312
export async function readJsonlFile<T>(
1413
path: string,
1514
handler: (value: T) => Promise<void>,
15+
logger?: { log: (message: string) => void },
1616
): Promise<void> {
17-
const logSummary = await readFile(path, "utf-8");
17+
function parseJsonFromCurrentLines() {
18+
try {
19+
return JSON.parse(currentLineSequence.join("\n")) as T;
20+
} catch (e) {
21+
void logger?.log(
22+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
23+
`Error: Failed to parse at line ${lineCount} of ${path} as JSON: ${(e as any)?.message ?? "UNKNOWN REASON"}. Problematic line below:\n${JSON.stringify(currentLineSequence, null, 2)}`,
24+
);
25+
throw e;
26+
}
27+
}
1828

19-
// Remove newline delimiters because summary is in .jsonl format.
20-
const jsonSummaryObjects: string[] = logSummary.split(/\r?\n\r?\n/g);
29+
function logProgress() {
30+
void logger?.log(
31+
`Processed ${lineCount} lines with ${parseCounts} parses...`,
32+
);
33+
}
2134

22-
for (const obj of jsonSummaryObjects) {
23-
const jsonObj = JSON.parse(obj) as T;
24-
await handler(jsonObj);
35+
void logger?.log(
36+
`Parsing ${path} (${statSync(path).size / 1024 / 1024} MB)...`,
37+
);
38+
const fileStream = createReadStream(path, "utf8");
39+
const rl = createInterface({
40+
input: fileStream,
41+
crlfDelay: Infinity,
42+
});
43+
44+
let lineCount = 0;
45+
let parseCounts = 0;
46+
let currentLineSequence: string[] = [];
47+
for await (const line of rl) {
48+
if (line === "") {
49+
// as mentioned above: a double newline sequence indicates the end of the current JSON object, so we parse it and pass it to the handler
50+
await handler(parseJsonFromCurrentLines());
51+
parseCounts++;
52+
currentLineSequence = [];
53+
} else {
54+
currentLineSequence.push(line);
55+
}
56+
lineCount++;
57+
if (lineCount % 1000000 === 0) {
58+
logProgress();
59+
}
60+
}
61+
// in case the file is not newline-terminated, we need to handle the last JSON object
62+
if (currentLineSequence.length > 0) {
63+
await handler(parseJsonFromCurrentLines());
2564
}
65+
logProgress();
2666
}

0 commit comments

Comments
 (0)