@@ -429,205 +429,13 @@ def orc_decoder(
429429 return * full_shape , 0 , table
430430
431431
432- def fast_jsonl_decoder (
433- buffer : bytes ,
434- * ,
435- projection : Optional [list ] = None ,
436- selection : Optional [list ] = None ,
437- sample_size : int = 100 ,
438- ) -> Tuple [int , int , pyarrow .Table ]:
439- """
440- Fast JSONL decoder that parses a sample to infer schema, then extracts values
441- using regex patterns without full JSON parsing for subsequent lines.
442-
443- This is optimized for datasets with consistent schema across all records.
444- """
445- import re
446- from opteryx .third_party .tktech import csimdjson as simdjson
447-
448- # Split buffer into lines
449- lines = buffer .split (b'\n ' )
450- lines = [line for line in lines if line .strip ()]
451-
452- if not lines :
453- return 0 , 0 , pyarrow .Table .from_pylist ([])
454-
455- # Parse sample lines to infer schema
456- parser = simdjson .Parser ()
457- sample_records = []
458- sample_parsed_records = []
459- keys_union = set ()
460-
461- num_sample = min (sample_size , len (lines ))
462-
463- for i in range (num_sample ):
464- try :
465- record = parser .parse (lines [i ])
466- row = record .as_dict ()
467- sample_records .append (row )
468- sample_parsed_records .append (record )
469- keys_union .update (row .keys ())
470- except Exception :
471- continue
472-
473- if not sample_records :
474- return 0 , 0 , pyarrow .Table .from_pylist ([])
475-
476- # If projection is specified, only extract projected columns
477- if projection :
478- columns_to_extract = {c .value for c in projection }
479- else :
480- columns_to_extract = keys_union
481-
482- # Infer types from sample
483- column_types = {}
484- for key in columns_to_extract :
485- for record in sample_records :
486- if key in record and record [key ] is not None :
487- val = record [key ]
488- if isinstance (val , bool ):
489- column_types [key ] = 'bool'
490- elif isinstance (val , int ):
491- column_types [key ] = 'int'
492- elif isinstance (val , float ):
493- column_types [key ] = 'float'
494- elif isinstance (val , str ):
495- column_types [key ] = 'str'
496- elif isinstance (val , list ):
497- column_types [key ] = 'list'
498- elif isinstance (val , dict ):
499- column_types [key ] = 'dict'
500- break
501- if key not in column_types :
502- column_types [key ] = 'null'
503-
504- # Build regex patterns for each column
505- # Pattern to match: "key": value
506- column_patterns = {}
507- for key in columns_to_extract :
508- # Escape special regex characters in key name
509- escaped_key = re .escape (key )
510-
511- # Create pattern based on expected type
512- col_type = column_types .get (key , 'null' )
513-
514- if col_type == 'bool' :
515- # Match true/false
516- pattern = rb'"' + escaped_key .encode () + rb'":\s*(true|false)'
517- elif col_type in ('int' , 'float' ):
518- # Match numbers (including negative, decimals, scientific notation, null)
519- # Ensures decimal point is followed by digits
520- pattern = rb'"' + escaped_key .encode () + rb'":\s*(-?\d+(?:\.\d+)?(?:[eE][+-]?\d+)?|null)'
521- elif col_type == 'str' :
522- # Match quoted strings (non-greedy, handle escaped quotes) or null
523- pattern = rb'"' + escaped_key .encode () + rb'":\s*(?:"((?:[^"\\]|\\.)*)"|null)'
524- elif col_type == 'null' :
525- # Match null
526- pattern = rb'"' + escaped_key .encode () + rb'":\s*null'
527- elif col_type == 'list' :
528- # Match arrays (including empty arrays) or null
529- # Note: This pattern handles simple arrays and single-level nested arrays
530- # For deeply nested arrays, the fast decoder may fall back to JSON parsing
531- pattern = rb'"' + escaped_key .encode () + rb'":\s*(\[(?:[^\[\]]|\[.*?\])*?\]|null)'
532- elif col_type == 'dict' :
533- # Match objects - simplified pattern for single-level objects
534- # Note: For nested objects, this will fall back to JSON parsing in the value extraction
535- # This limitation is documented and acceptable since complex nested objects are less common
536- pattern = rb'"' + escaped_key .encode () + rb'":\s*(\{[^{}]*\}|null)'
537- else :
538- pattern = None
539-
540- if pattern :
541- column_patterns [key ] = (re .compile (pattern ), col_type )
542-
543- # Extract values from all lines using regex
544- column_data = {key : [] for key in columns_to_extract }
545-
546- for line in lines :
547- if not line .strip ():
548- continue
549-
550- for key in columns_to_extract :
551- if key not in column_patterns :
552- column_data [key ].append (None )
553- continue
554-
555- pattern , col_type = column_patterns [key ]
556- match = pattern .search (line )
557-
558- if match :
559- if col_type == 'bool' :
560- value = match .group (1 ) == b'true'
561- elif col_type in ('int' , 'float' ):
562- try :
563- matched_val = match .group (1 )
564- if matched_val == b'null' :
565- value = None
566- elif col_type == 'int' :
567- value = int (matched_val )
568- else :
569- value = float (matched_val )
570- except (ValueError , IndexError ):
571- value = None
572- elif col_type == 'str' :
573- try :
574- # Group 1 captures the string content (without quotes)
575- matched_val = match .group (1 )
576- if matched_val is None : # null was matched
577- value = None
578- else :
579- # Decode and handle escaped characters
580- raw_str = matched_val .decode ('utf-8' , errors = 'replace' )
581- # Simple unescape for common cases
582- value = raw_str .replace ('\\ n' , '\n ' ).replace ('\\ t' , '\t ' ).replace ('\\ "' , '"' ).replace ('\\ \\ ' , '\\ ' )
583- except (UnicodeDecodeError , IndexError ):
584- value = None
585- elif col_type == 'null' :
586- value = None
587- elif col_type in ('list' , 'dict' ):
588- # For complex types, fall back to JSON parsing
589- try :
590- matched_val = match .group (1 )
591- if matched_val == b'null' :
592- value = None
593- else :
594- import json
595- value = json .loads (matched_val .decode ('utf-8' ))
596- if col_type == 'dict' and isinstance (value , dict ):
597- # Convert dict to JSON string (similar to record[key].mini)
598- value = json .dumps (value , ensure_ascii = False )
599- except (json .JSONDecodeError , UnicodeDecodeError , IndexError ):
600- value = None
601- else :
602- value = None
603- else :
604- value = None
605-
606- column_data [key ].append (value )
607-
608- # Convert to PyArrow table
609- arrays = []
610- names = []
611-
612- for key in sorted (columns_to_extract ):
613- arrays .append (pyarrow .array (column_data [key ]))
614- names .append (key )
615-
616- if not arrays :
617- return 0 , 0 , pyarrow .Table .from_pylist ([])
618-
619- table = pyarrow .Table .from_arrays (arrays , names = names )
620- return len (lines ), len (columns_to_extract ), table
621-
622-
623432def jsonl_decoder (
624433 buffer : Union [memoryview , bytes , BinaryIO ],
625434 * ,
626435 projection : Optional [list ] = None ,
627436 selection : Optional [list ] = None ,
628437 just_schema : bool = False ,
629438 just_statistics : bool = False ,
630- use_fast_decoder : bool = True ,
631439 ** kwargs ,
632440) -> Tuple [int , int , pyarrow .Table ]:
633441 if just_statistics :
@@ -648,26 +456,6 @@ def jsonl_decoder(
648456 table = pyarrow .Table .from_arrays ([[num_rows ]], names = ["$COUNT(*)" ])
649457 return (num_rows , 0 , 0 , table )
650458
651- # Use fast decoder if enabled and no complex filtering is needed
652- # Fast decoder is most effective for large files with consistent schema
653- if use_fast_decoder and not just_schema and not selection :
654- try :
655- num_rows , num_cols , table = fast_jsonl_decoder (
656- buffer , projection = projection , selection = selection
657- )
658-
659- if projection :
660- table = post_read_projector (table , projection )
661-
662- return num_rows , num_cols , 0 , table
663- except Exception as e :
664- # Fall back to traditional decoder if fast decoder fails
665- # This ensures robustness even with unexpected data
666- import warnings
667- warnings .warn (f"Fast JSONL decoder failed, falling back to standard decoder: { e } " )
668-
669- parser = simdjson .Parser ()
670-
671459 parser = simdjson .Parser ()
672460
673461 # preallocate and reuse dicts
0 commit comments