Skip to content

Commit 068b921

Browse files
Copilotjoocer
andcommitted
Implement fast JSONL decoder with regex-based extraction
Co-authored-by: joocer <[email protected]>
1 parent df4418d commit 068b921

File tree

2 files changed

+410
-0
lines changed

2 files changed

+410
-0
lines changed

opteryx/utils/file_decoders.py

Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -429,13 +429,201 @@ def orc_decoder(
429429
return *full_shape, 0, table
430430

431431

432+
def fast_jsonl_decoder(
433+
buffer: bytes,
434+
*,
435+
projection: Optional[list] = None,
436+
selection: Optional[list] = None,
437+
sample_size: int = 100,
438+
) -> Tuple[int, int, pyarrow.Table]:
439+
"""
440+
Fast JSONL decoder that parses a sample to infer schema, then extracts values
441+
using regex patterns without full JSON parsing for subsequent lines.
442+
443+
This is optimized for datasets with consistent schema across all records.
444+
"""
445+
import re
446+
from opteryx.third_party.tktech import csimdjson as simdjson
447+
448+
# Split buffer into lines
449+
lines = buffer.split(b'\n')
450+
lines = [line for line in lines if line.strip()]
451+
452+
if not lines:
453+
return 0, 0, pyarrow.Table.from_pylist([])
454+
455+
# Parse sample lines to infer schema
456+
parser = simdjson.Parser()
457+
sample_records = []
458+
sample_parsed_records = []
459+
keys_union = set()
460+
461+
num_sample = min(sample_size, len(lines))
462+
463+
for i in range(num_sample):
464+
try:
465+
record = parser.parse(lines[i])
466+
row = record.as_dict()
467+
sample_records.append(row)
468+
sample_parsed_records.append(record)
469+
keys_union.update(row.keys())
470+
except Exception:
471+
continue
472+
473+
if not sample_records:
474+
return 0, 0, pyarrow.Table.from_pylist([])
475+
476+
# If projection is specified, only extract projected columns
477+
if projection:
478+
columns_to_extract = {c.value for c in projection}
479+
else:
480+
columns_to_extract = keys_union
481+
482+
# Infer types from sample
483+
column_types = {}
484+
for key in columns_to_extract:
485+
for record in sample_records:
486+
if key in record and record[key] is not None:
487+
val = record[key]
488+
if isinstance(val, bool):
489+
column_types[key] = 'bool'
490+
elif isinstance(val, int):
491+
column_types[key] = 'int'
492+
elif isinstance(val, float):
493+
column_types[key] = 'float'
494+
elif isinstance(val, str):
495+
column_types[key] = 'str'
496+
elif isinstance(val, list):
497+
column_types[key] = 'list'
498+
elif isinstance(val, dict):
499+
column_types[key] = 'dict'
500+
break
501+
if key not in column_types:
502+
column_types[key] = 'null'
503+
504+
# Build regex patterns for each column
505+
# Pattern to match: "key": value
506+
column_patterns = {}
507+
for key in columns_to_extract:
508+
# Escape special regex characters in key name
509+
escaped_key = re.escape(key)
510+
511+
# Create pattern based on expected type
512+
col_type = column_types.get(key, 'null')
513+
514+
if col_type == 'bool':
515+
# Match true/false
516+
pattern = rb'"' + escaped_key.encode() + rb'":\s*(true|false)'
517+
elif col_type in ('int', 'float'):
518+
# Match numbers (including negative, decimals, scientific notation, null)
519+
pattern = rb'"' + escaped_key.encode() + rb'":\s*(-?\d+\.?\d*(?:[eE][+-]?\d+)?|null)'
520+
elif col_type == 'str':
521+
# Match quoted strings (non-greedy, handle escaped quotes) or null
522+
pattern = rb'"' + escaped_key.encode() + rb'":\s*(?:"((?:[^"\\]|\\.)*)"|null)'
523+
elif col_type == 'null':
524+
# Match null
525+
pattern = rb'"' + escaped_key.encode() + rb'":\s*null'
526+
elif col_type == 'list':
527+
# Match arrays (including empty arrays) or null
528+
pattern = rb'"' + escaped_key.encode() + rb'":\s*(\[(?:[^\[\]]|\[.*?\])*?\]|null)'
529+
elif col_type == 'dict':
530+
# Match objects - use balanced brace matching or null
531+
# This is a simplified pattern that works for non-nested dicts
532+
pattern = rb'"' + escaped_key.encode() + rb'":\s*(\{[^{}]*\}|null)'
533+
else:
534+
pattern = None
535+
536+
if pattern:
537+
column_patterns[key] = (re.compile(pattern), col_type)
538+
539+
# Extract values from all lines using regex
540+
column_data = {key: [] for key in columns_to_extract}
541+
542+
for line in lines:
543+
if not line.strip():
544+
continue
545+
546+
for key in columns_to_extract:
547+
if key not in column_patterns:
548+
column_data[key].append(None)
549+
continue
550+
551+
pattern, col_type = column_patterns[key]
552+
match = pattern.search(line)
553+
554+
if match:
555+
if col_type == 'bool':
556+
value = match.group(1) == b'true'
557+
elif col_type in ('int', 'float'):
558+
try:
559+
matched_val = match.group(1)
560+
if matched_val == b'null':
561+
value = None
562+
elif col_type == 'int':
563+
value = int(matched_val)
564+
else:
565+
value = float(matched_val)
566+
except (ValueError, IndexError):
567+
value = None
568+
elif col_type == 'str':
569+
try:
570+
# Group 1 captures the string content (without quotes)
571+
matched_val = match.group(1)
572+
if matched_val is None: # null was matched
573+
value = None
574+
else:
575+
# Decode and handle escaped characters
576+
raw_str = matched_val.decode('utf-8', errors='replace')
577+
# Simple unescape for common cases
578+
value = raw_str.replace('\\n', '\n').replace('\\t', '\t').replace('\\"', '"').replace('\\\\', '\\')
579+
except (UnicodeDecodeError, IndexError):
580+
value = None
581+
elif col_type == 'null':
582+
value = None
583+
elif col_type in ('list', 'dict'):
584+
# For complex types, fall back to JSON parsing
585+
try:
586+
matched_val = match.group(1)
587+
if matched_val == b'null':
588+
value = None
589+
else:
590+
import json
591+
value = json.loads(matched_val.decode('utf-8'))
592+
if col_type == 'dict' and isinstance(value, dict):
593+
# Convert dict to JSON string (similar to record[key].mini)
594+
value = json.dumps(value, ensure_ascii=False)
595+
except (json.JSONDecodeError, UnicodeDecodeError, IndexError):
596+
value = None
597+
else:
598+
value = None
599+
else:
600+
value = None
601+
602+
column_data[key].append(value)
603+
604+
# Convert to PyArrow table
605+
arrays = []
606+
names = []
607+
608+
for key in sorted(columns_to_extract):
609+
arrays.append(pyarrow.array(column_data[key]))
610+
names.append(key)
611+
612+
if not arrays:
613+
return 0, 0, pyarrow.Table.from_pylist([])
614+
615+
table = pyarrow.Table.from_arrays(arrays, names=names)
616+
return len(lines), len(columns_to_extract), table
617+
618+
432619
def jsonl_decoder(
433620
buffer: Union[memoryview, bytes, BinaryIO],
434621
*,
435622
projection: Optional[list] = None,
436623
selection: Optional[list] = None,
437624
just_schema: bool = False,
438625
just_statistics: bool = False,
626+
use_fast_decoder: bool = True,
439627
**kwargs,
440628
) -> Tuple[int, int, pyarrow.Table]:
441629
if just_statistics:
@@ -456,6 +644,22 @@ def jsonl_decoder(
456644
table = pyarrow.Table.from_arrays([[num_rows]], names=["$COUNT(*)"])
457645
return (num_rows, 0, 0, table)
458646

647+
# Use fast decoder if enabled and no complex filtering is needed
648+
# Fast decoder is most effective for large files with consistent schema
649+
if use_fast_decoder and not just_schema and not selection:
650+
try:
651+
num_rows, num_cols, table = fast_jsonl_decoder(
652+
buffer, projection=projection, selection=selection
653+
)
654+
655+
if projection:
656+
table = post_read_projector(table, projection)
657+
658+
return num_rows, num_cols, 0, table
659+
except Exception:
660+
# Fall back to traditional decoder if fast decoder fails
661+
pass
662+
459663
parser = simdjson.Parser()
460664

461665
# preallocate and reuse dicts

0 commit comments

Comments
 (0)