Skip to content

Commit 7837ae1

Browse files
FindHaofacebook-github-bot
authored andcommitted
Refactor log data processing in dataLoader.ts for improved memory efficiency and error handling (#73)
Summary: - Introduced a new function, `parseLogDataFromStream`, to handle line-by-line parsing of NDJSON from a stream, enhancing memory efficiency for large files. - Updated `processArrayBuffer` to utilize the new streaming function for gzip files and improved error handling. - Adjusted `loadLogDataFromFile` to use streaming for files exceeding 100 MB, ensuring better performance with large datasets. These changes enhance the robustness and efficiency of log data processing in the application. Pull Request resolved: #73 Reviewed By: davidberard98 Differential Revision: D80458467 Pulled By: FindHao fbshipit-source-id: 009dd73c62d99af972930cda4afce0cceee46738
1 parent a8de824 commit 7837ae1

File tree

1 file changed

+74
-30
lines changed

1 file changed

+74
-30
lines changed

website/src/utils/dataLoader.ts

Lines changed: 74 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -293,48 +293,83 @@ function isGzipFile(buffer: ArrayBuffer): boolean {
293293
}
294294

295295
/**
296-
* Decompresses a gzip file using CompressionStream API
297-
* @param buffer - ArrayBuffer containing the gzip data
298-
* @returns Promise resolving to decompressed text
296+
* Parses log data from a stream, handling line-by-line NDJSON parsing.
297+
* This is memory-efficient and suitable for very large files.
298+
* @param stream - A ReadableStream of Uint8Array (e.g., from a decompressed file)
299+
* @returns A promise that resolves to an array of LogEntry objects
299300
*/
300-
async function decompressGzip(buffer: ArrayBuffer): Promise<string> {
301-
try {
302-
// Check if CompressionStream is supported
303-
if (!('DecompressionStream' in window)) {
304-
throw new Error('DecompressionStream API is not supported in this browser');
301+
async function parseLogDataFromStream(stream: ReadableStream<Uint8Array>): Promise<LogEntry[]> {
302+
const reader = stream.pipeThrough(new TextDecoderStream()).getReader();
303+
let buffer = '';
304+
const entries: LogEntry[] = [];
305+
306+
while (true) {
307+
const { done, value } = await reader.read();
308+
if (done) {
309+
if (buffer.trim()) {
310+
try {
311+
const parsedLine: LogEntry = JSON.parse(buffer);
312+
if (parsedLine && typeof parsedLine === 'object') {
313+
entries.push(parsedLine);
314+
}
315+
} catch (e) {
316+
console.warn(`Failed to parse final line as JSON: ${buffer.substring(0, 100)}...`);
317+
}
318+
}
319+
break;
305320
}
306321

307-
// Create a decompression stream
308-
const ds = new DecompressionStream('gzip');
309-
const decompressedStream = new Blob([buffer]).stream().pipeThrough(ds);
310-
const decompressedBlob = await new Response(decompressedStream).blob();
311-
return await decompressedBlob.text();
312-
} catch (error) {
313-
console.error('Error decompressing gzip file:', error);
314-
throw new Error(`Failed to decompress gzip file: ${error instanceof Error ? error.message : String(error)}`);
322+
buffer += value;
323+
const lines = buffer.split('\n');
324+
buffer = lines.pop() || '';
325+
326+
for (const line of lines) {
327+
if (line.trim() === '') continue;
328+
try {
329+
const parsedLine: LogEntry = JSON.parse(line);
330+
if (parsedLine && typeof parsedLine === 'object') {
331+
entries.push(parsedLine);
332+
}
333+
} catch (e) {
334+
console.warn(`Failed to parse line as JSON: ${line.substring(0, 100)}...`);
335+
}
336+
}
337+
}
338+
339+
if (entries.length === 0) {
340+
console.error("No valid JSON entries found in stream data");
341+
throw new Error("No valid JSON entries found in stream data");
315342
}
343+
344+
return entries;
316345
}
317346

347+
318348
/**
319-
* Processes ArrayBuffer data, handling gzip decompression if needed
349+
* Processes ArrayBuffer data, handling gzip decompression and parsing if needed
320350
* @param buffer - ArrayBuffer containing the data
321-
* @returns Promise resolving to text string
351+
* @returns Promise resolving to an array of LogEntry objects
322352
*/
323-
async function processArrayBuffer(buffer: ArrayBuffer): Promise<string> {
353+
async function processArrayBuffer(buffer: ArrayBuffer): Promise<LogEntry[]> {
324354
// Check if file is gzip compressed
325355
if (isGzipFile(buffer)) {
326356
try {
327-
const textData = await decompressGzip(buffer);
328-
return textData;
357+
if (!('DecompressionStream' in window)) {
358+
throw new Error('DecompressionStream API is not supported in this browser');
359+
}
360+
const ds = new DecompressionStream('gzip');
361+
const decompressedStream = new Blob([buffer]).stream().pipeThrough(ds);
362+
return await parseLogDataFromStream(decompressedStream);
329363
} catch (error) {
330-
console.error("Error decompressing gzip data:", error);
331-
throw new Error(`Failed to decompress gzip data: ${error instanceof Error ? error.message : String(error)}`);
364+
console.error('Error decompressing or parsing gzip stream:', error);
365+
const message = error instanceof Error ? error.message : String(error);
366+
throw new Error(`Failed to process gzip stream: ${message}`);
332367
}
333368
} else {
334-
// Convert ArrayBuffer to string if it's not compressed
369+
// For non-gzipped files that are small enough to fit in memory
335370
const decoder = new TextDecoder();
336371
const textData = decoder.decode(buffer);
337-
return textData;
372+
return parseLogData(textData);
338373
}
339374
}
340375

@@ -351,9 +386,7 @@ export async function loadLogData(url: string): Promise<LogEntry[]> {
351386
}
352387

353388
const buffer = await response.arrayBuffer();
354-
const textData = await processArrayBuffer(buffer);
355-
356-
return parseLogData(textData);
389+
return await processArrayBuffer(buffer);
357390
} catch (error) {
358391
console.error("Error loading log data:", error);
359392
throw error;
@@ -367,6 +400,18 @@ export async function loadLogData(url: string): Promise<LogEntry[]> {
367400
* @returns Promise resolving to an array of LogEntry objects
368401
*/
369402
export function loadLogDataFromFile(file: File): Promise<LogEntry[]> {
403+
// For large files, we should use streaming to avoid memory issues
404+
const LARGE_FILE_THRESHOLD = 100 * 1024 * 1024; // 100 MB
405+
if (file.size > LARGE_FILE_THRESHOLD) {
406+
console.log(`File size (${file.size} bytes) exceeds threshold, using streaming.`);
407+
// Note: This does not handle gzipped files selected locally, as we can't
408+
// easily detect gzip from a stream without reading parts of it first.
409+
// The assumption is that very large local files are not gzipped or
410+
// have already been decompressed.
411+
return parseLogDataFromStream(file.stream() as ReadableStream<Uint8Array>);
412+
}
413+
414+
// For smaller files, reading into memory is faster and simpler.
370415
return new Promise((resolve, reject) => {
371416
const reader = new FileReader();
372417

@@ -381,8 +426,7 @@ export function loadLogDataFromFile(file: File): Promise<LogEntry[]> {
381426
throw new Error("Expected ArrayBuffer from FileReader");
382427
}
383428

384-
const textData = await processArrayBuffer(result);
385-
resolve(parseLogData(textData));
429+
resolve(await processArrayBuffer(result));
386430
} catch (error) {
387431
console.error("Error parsing data from file:", error);
388432
reject(error);

0 commit comments

Comments
 (0)