1- import { PassThrough } from 'stream' ;
1+ import { PassThrough , Transform } from 'stream' ;
22import type { Readable } from 'stream' ;
33import Papa from 'papaparse' ;
44import StreamJSON from 'stream-json' ;
@@ -11,34 +11,50 @@ import { Utf8Validator } from '../utils/utf8-validator';
1111const debug = createDebug ( 'import-guess-filetype' ) ;
1212
1313function detectJSON ( input : Readable ) : Promise < 'json' | 'jsonl' | null > {
14- let jsonVariant : 'json' | 'jsonl' | null = null ;
15-
1614 return new Promise ( function ( resolve ) {
1715 const parser = StreamJSON . parser ( ) ;
1816
17+ let found = false ;
18+
1919 parser . once ( 'data' , ( data ) => {
2020 debug ( 'detectJSON:data' , data ) ;
21+
22+ let jsonVariant : 'json' | 'jsonl' | null = null ;
2123 if ( data . name === 'startObject' ) {
2224 jsonVariant = 'jsonl' ;
2325 } else if ( data . name === 'startArray' ) {
2426 jsonVariant = 'json' ;
2527 }
26- parser . destroy ( ) ;
28+
29+ found = true ;
30+ resolve ( jsonVariant ) ;
2731 } ) ;
2832
2933 parser . on ( 'end' , ( ) => {
30- debug ( 'detectJSON:end' , jsonVariant ) ;
31- resolve ( jsonVariant ) ;
34+ debug ( 'detectJSON:end' ) ;
35+ if ( ! found ) {
36+ found = true ;
37+ // reached the end before a full doc
38+ resolve ( null ) ;
39+ }
3240 } ) ;
3341
3442 parser . on ( 'close' , ( err : Error ) => {
35- debug ( 'detectJSON:close' , err , jsonVariant ) ;
36- resolve ( jsonVariant ) ;
43+ debug ( 'detectJSON:close' , err ) ;
44+ if ( ! found ) {
45+ found = true ;
46+ // stream closed before a full doc
47+ resolve ( null ) ;
48+ }
3749 } ) ;
3850
3951 parser . on ( 'error' , ( err : Error ) => {
4052 debug ( 'detectJSON:error' , err ) ;
41- resolve ( null ) ;
53+ if ( ! found ) {
54+ found = true ;
55+ // got an error before a full doc
56+ resolve ( null ) ;
57+ }
4258 } ) ;
4359
4460 input . pipe ( parser ) ;
@@ -73,40 +89,71 @@ function redetectDelimiter({ data }: { data: string[] }): string {
7389 return ',' ;
7490}
7591
76- function detectCSV ( input : Readable ) : Promise < Delimiter | null > {
92+ function detectCSV (
93+ input : Readable ,
94+ jsonPromise : Promise < 'json' | 'jsonl' | null >
95+ ) : Promise < Delimiter | null > {
7796 let csvDelimiter : Delimiter | null = null ;
7897 let lines = 0 ;
79-
80- return new Promise ( function ( resolve ) {
81- Papa . parse ( input , {
82- // NOTE: parsing without header: true otherwise the delimiter detection
83- // can't fail and will always detect ,
84- delimitersToGuess : supportedDelimiters ,
85- step : function ( results : Papa . ParseStepResult < string [ ] > , parser ) {
86- ++ lines ;
87- debug ( 'detectCSV:step' , lines , results ) ;
88- if ( lines === 1 ) {
89- if ( hasDelimiterError ( results ) ) {
90- csvDelimiter = redetectDelimiter ( results ) ;
91- } else {
92- csvDelimiter = results . meta . delimiter ;
93- }
98+ let found = false ;
99+
100+ // stop processing CSV as soon as we detect JSON
101+ const jsonDetected = new Promise < null > ( function ( resolve ) {
102+ jsonPromise
103+ . then ( ( jsonType ) => {
104+ if ( jsonType ) {
105+ resolve ( null ) ;
94106 }
95- // must be at least two lines for header row and data
96- if ( lines === 2 ) {
97- parser . abort ( ) ;
98- }
99- } ,
100- complete : function ( ) {
101- debug ( 'detectCSV:complete' ) ;
102- resolve ( lines === 2 ? csvDelimiter : null ) ;
103- } ,
104- error : function ( ) {
105- debug ( 'detectCSV:error' ) ;
106- resolve ( null ) ;
107- } ,
108- } ) ;
107+ } )
108+ . catch ( ( ) => {
109+ // if the file was not valid JSON, then ignore this because either CSV
110+ // detection will eventually succeed FileSizeEnforcer will error
111+ } ) ;
109112 } ) ;
113+
114+ return Promise . race ( [
115+ jsonDetected ,
116+ new Promise < Delimiter | null > ( function ( resolve ) {
117+ Papa . parse ( input , {
118+ // NOTE: parsing without header: true otherwise the delimiter detection
119+ // can't fail and will always detect ,
120+ delimitersToGuess : supportedDelimiters ,
121+ step : function ( results : Papa . ParseStepResult < string [ ] > ) {
122+ ++ lines ;
123+ debug ( 'detectCSV:step' , lines , results ) ;
124+ if ( lines === 1 ) {
125+ if ( hasDelimiterError ( results ) ) {
126+ csvDelimiter = redetectDelimiter ( results ) ;
127+ } else {
128+ csvDelimiter = results . meta . delimiter ;
129+ }
130+ }
131+ // must be at least two lines for header row and data
132+ if ( lines === 2 ) {
133+ found = true ;
134+ debug ( 'detectCSV:complete' ) ;
135+ resolve ( lines === 2 ? csvDelimiter : null ) ;
136+ }
137+ } ,
138+ complete : function ( ) {
139+ debug ( 'detectCSV:complete' ) ;
140+ if ( ! found ) {
141+ found = true ;
142+ // we reached the end before two lines
143+ resolve ( null ) ;
144+ }
145+ } ,
146+ error : function ( err ) {
147+ debug ( 'detectCSV:error' , err ) ;
148+ if ( ! found ) {
149+ found = true ;
150+ // something failed before we got to the end of two lines
151+ resolve ( null ) ;
152+ }
153+ } ,
154+ } ) ;
155+ } ) ,
156+ ] ) ;
110157}
111158
112159type GuessFileTypeOptions = {
@@ -122,6 +169,25 @@ type GuessFileTypeResult =
122169 csvDelimiter : Delimiter ;
123170 } ;
124171
172+ const MAX_LENGTH = 1000000 ;
173+
174+ class FileSizeEnforcer extends Transform {
175+ length = 0 ;
176+
177+ _transform (
178+ chunk : Buffer ,
179+ enc : unknown ,
180+ cb : ( err : null | Error , chunk ?: Buffer ) => void
181+ ) {
182+ this . length += chunk . length ;
183+ if ( this . length > MAX_LENGTH ) {
184+ cb ( new Error ( `CSV still not detected after ${ MAX_LENGTH } bytes` ) ) ;
185+ } else {
186+ cb ( null , chunk ) ;
187+ }
188+ }
189+ }
190+
125191export function guessFileType ( {
126192 input,
127193} : GuessFileTypeOptions ) : Promise < GuessFileTypeResult > {
@@ -134,13 +200,21 @@ export function guessFileType({
134200 } ) ;
135201
136202 const jsStream = input . pipe ( new PassThrough ( ) ) ;
137- const csvStream = input . pipe ( new PassThrough ( ) ) ;
203+ const csvStream = input
204+ . pipe ( new PassThrough ( ) )
205+ . pipe ( new FileSizeEnforcer ( ) ) ;
206+
207+ const jsonPromise = detectJSON ( jsStream ) ;
138208
139209 const [ jsonVariant , csvDelimiter ] = await Promise . all ( [
140- detectJSON ( jsStream ) ,
141- detectCSV ( csvStream ) ,
210+ jsonPromise ,
211+ detectCSV ( csvStream , jsonPromise ) ,
142212 ] ) ;
143213
214+ // keep streaming until both promises resolved, then destroy the input
215+ // stream to stop further processing
216+ input . destroy ( ) ;
217+
144218 debug ( 'guessFileType' , jsonVariant , csvDelimiter ) ;
145219
146220 // check JSON first because practically anything will parse as CSV
0 commit comments