11import { stat } from "fs/promises" ;
22import { createReadStream } from "fs-extra" ;
3- import { createInterface } from "readline" ;
3+
4+ const doubleLineBreakRegexp = / \n \r ? \n / ;
45
56/**
67 * Read a file consisting of multiple JSON objects. Each object is separated from the previous one
@@ -13,64 +14,6 @@ export async function readJsonlFile<T>(
1314 path : string ,
1415 handler : ( value : T ) => Promise < void > ,
1516 logger ?: { log : ( message : string ) => void } ,
16- ) : Promise < void > {
17- function parseJsonFromCurrentLines ( ) {
18- try {
19- return JSON . parse ( currentLineSequence . join ( "\n" ) ) as T ;
20- } catch ( e ) {
21- void logger ?. log (
22- // eslint-disable-next-line @typescript-eslint/no-explicit-any
23- `Error: Failed to parse at line ${ lineCount } of ${ path } as JSON: ${ ( e as any ) ?. message ?? "UNKNOWN REASON" } . Problematic line below:\n${ JSON . stringify ( currentLineSequence , null , 2 ) } ` ,
24- ) ;
25- throw e ;
26- }
27- }
28-
29- function logProgress ( ) {
30- void logger ?. log (
31- `Processed ${ lineCount } lines with ${ parseCounts } parses...` ,
32- ) ;
33- }
34-
35- void logger ?. log (
36- `Parsing ${ path } (${ ( await stat ( path ) ) . size / 1024 / 1024 } MB)...` ,
37- ) ;
38- const fileStream = createReadStream ( path , "utf8" ) ;
39- const rl = createInterface ( {
40- input : fileStream ,
41- crlfDelay : Infinity ,
42- } ) ;
43-
44- let lineCount = 0 ;
45- let parseCounts = 0 ;
46- let currentLineSequence : string [ ] = [ ] ;
47- for await ( const line of rl ) {
48- if ( line === "" ) {
49- // as mentioned above: a double newline sequence indicates the end of the current JSON object, so we parse it and pass it to the handler
50- await handler ( parseJsonFromCurrentLines ( ) ) ;
51- parseCounts ++ ;
52- currentLineSequence = [ ] ;
53- } else {
54- currentLineSequence . push ( line ) ;
55- }
56- lineCount ++ ;
57- if ( lineCount % 1000000 === 0 ) {
58- logProgress ( ) ;
59- }
60- }
61- // in case the file is not newline-terminated, we need to handle the last JSON object
62- if ( currentLineSequence . length > 0 ) {
63- await handler ( parseJsonFromCurrentLines ( ) ) ;
64- }
65- logProgress ( ) ;
66- }
67-
68- const doubleLineBreakRegexp = / \n \r ? \n / ;
69-
70- export async function readJsonlFile2 < T > (
71- path : string ,
72- handler : ( value : T ) => Promise < void > ,
73- logger ?: { log : ( message : string ) => void } ,
7417) : Promise < void > {
7518 void logger ?. log (
7619 `Parsing ${ path } (${ ( await stat ( path ) ) . size / 1024 / 1024 } MB)...` ,
0 commit comments