Skip to content

Commit 275cbb1

Browse files
committed
jsonl decoder
1 parent e20629c commit 275cbb1

File tree

10 files changed

+254
-194
lines changed

10 files changed

+254
-194
lines changed

opteryx/__version__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
# THIS FILE IS AUTOMATICALLY UPDATED DURING THE BUILD PROCESS
22
# DO NOT EDIT THIS FILE DIRECTLY
33

4-
__build__ = 1654
4+
__build__ = 1656
55
__author__ = "@joocer"
6-
__version__ = "0.26.0-beta.1654"
6+
__version__ = "0.26.0-beta.1656"
77

88
# Store the version here so:
99
# 1) we don't load dependencies by storing it in __init__.py

opteryx/compiled/structures/jsonl_decoder.pyx

Lines changed: 185 additions & 124 deletions
Large diffs are not rendered by default.

opteryx/connectors/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,7 @@ def connector_factory(dataset, statistics, **config):
270270
break
271271
else:
272272
# Check if dataset is a file or contains wildcards
273-
has_wildcards = any(char in dataset for char in ['*', '?', '['])
273+
has_wildcards = any(char in dataset for char in ["*", "?", "["])
274274
if os.path.isfile(dataset) or has_wildcards:
275275
from opteryx.connectors import file_connector
276276

@@ -286,7 +286,7 @@ def connector_factory(dataset, statistics, **config):
286286
remove_prefix = connector_entry.pop("remove_prefix", False)
287287
if prefix and remove_prefix and dataset.startswith(prefix):
288288
# Remove the prefix. If there's a separator (. or //) after the prefix, skip it too
289-
dataset = dataset[len(prefix):]
289+
dataset = dataset[len(prefix) :]
290290
if dataset.startswith(".") or dataset.startswith("//"):
291291
dataset = dataset[1:] if dataset.startswith(".") else dataset[2:]
292292

opteryx/connectors/aws_s3_connector.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -86,13 +86,13 @@ def __init__(self, credentials=None, **kwargs):
8686
)
8787

8888
self.minio = Minio(end_point, access_key, secret_key, secure=secure)
89-
89+
9090
# Only convert dots to path separators if the dataset doesn't already contain slashes
9191
# Dataset references like "my.dataset.table" use dots as separators
9292
# File paths like "bucket/path/file.parquet" already have slashes and should not be converted
9393
if OS_SEP not in self.dataset and "/" not in self.dataset:
9494
self.dataset = self.dataset.replace(".", OS_SEP)
95-
95+
9696
# Check if dataset contains wildcards
9797
self.has_wildcards = paths.has_wildcards(self.dataset)
9898
if self.has_wildcards:
@@ -111,28 +111,28 @@ def get_list_of_blob_names(self, *, prefix: str) -> List[str]:
111111
else:
112112
list_prefix = prefix
113113
filter_pattern = None
114-
114+
115115
bucket, object_path, _, _ = paths.get_parts(list_prefix)
116116
blobs = self.minio.list_objects(bucket_name=bucket, prefix=object_path, recursive=True)
117-
117+
118118
blob_list = []
119119
for blob in blobs:
120120
if blob.object_name.endswith("/"):
121121
continue
122-
122+
123123
full_path = bucket + "/" + blob.object_name
124-
124+
125125
# Check if blob has valid extension
126126
if ("." + full_path.split(".")[-1].lower()) not in VALID_EXTENSIONS:
127127
continue
128-
128+
129129
# If we have a wildcard pattern, filter by it
130130
if filter_pattern:
131131
if paths.match_wildcard(filter_pattern, full_path):
132132
blob_list.append(full_path)
133133
else:
134134
blob_list.append(full_path)
135-
135+
136136
return sorted(blob_list)
137137

138138
def read_dataset(

opteryx/connectors/file_connector.py

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -136,10 +136,10 @@ def __init__(self, *args, **kwargs):
136136
if ".." in self.dataset or self.dataset[0] in ("\\", "/", "~"):
137137
# Don't find any datasets which look like path traversal
138138
raise DatasetNotFoundError(dataset=self.dataset)
139-
139+
140140
# Check if dataset contains wildcards
141-
self.has_wildcards = any(char in self.dataset for char in ['*', '?', '['])
142-
141+
self.has_wildcards = any(char in self.dataset for char in ["*", "?", "["])
142+
143143
if self.has_wildcards:
144144
# Expand wildcards to get list of files
145145
self.files = self._expand_wildcards(self.dataset)
@@ -150,43 +150,43 @@ def __init__(self, *args, **kwargs):
150150
else:
151151
self.files = [self.dataset]
152152
self.decoder = get_decoder(self.dataset)
153-
153+
154154
def _expand_wildcards(self, pattern: str) -> List[str]:
155155
"""
156156
Expand wildcard patterns in file paths while preventing path traversal.
157-
157+
158158
Supports wildcards:
159159
- * matches any number of characters
160-
- ? matches a single character
160+
- ? matches a single character
161161
- [range] matches a range of characters (e.g., [0-9], [a-z])
162-
162+
163163
Args:
164164
pattern: File path pattern with wildcards
165-
165+
166166
Returns:
167167
List of matching file paths
168168
"""
169169
# Additional path traversal check after expansion
170170
if ".." in pattern:
171171
raise DatasetNotFoundError(dataset=pattern)
172-
172+
173173
# Use glob to expand the pattern
174174
matched_files = glob.glob(pattern, recursive=False)
175-
175+
176176
# Filter out any results that might have path traversal
177177
# This is an extra safety check
178178
safe_files = []
179179
for file_path in matched_files:
180180
if ".." not in file_path and os.path.isfile(file_path):
181181
safe_files.append(file_path)
182-
182+
183183
return sorted(safe_files)
184184

185185
def read_dataset(
186186
self, columns: list = None, predicates: list = None, limit: int = None, **kwargs
187187
) -> pyarrow.Table:
188188
rows_read = 0
189-
189+
190190
# Iterate over all matched files
191191
for file_path in self.files:
192192
morsel = read_blob(
@@ -221,7 +221,7 @@ def get_dataset_schema(self) -> RelationSchema:
221221

222222
# Use the first file to get the schema
223223
first_file = self.files[0]
224-
224+
225225
try:
226226
file_descriptor = os.open(first_file, os.O_RDONLY | os.O_BINARY)
227227
size = os.path.getsize(first_file)

opteryx/connectors/gcp_cloudstorage_connector.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ def __init__(self, credentials=None, **kwargs):
9797
if OS_SEP not in self.dataset and "/" not in self.dataset:
9898
self.dataset = self.dataset.replace(".", OS_SEP)
9999
self.credentials = credentials
100-
100+
101101
# Check if dataset contains wildcards
102102
self.has_wildcards = paths.has_wildcards(self.dataset)
103103
if self.has_wildcards:
@@ -231,9 +231,9 @@ def get_list_of_blob_names(self, *, prefix: str) -> List[str]:
231231
name = blob["name"]
232232
if not name.endswith(TUPLE_OF_VALID_EXTENSIONS):
233233
continue
234-
234+
235235
full_path = f"{bucket}/{name}"
236-
236+
237237
# If we have a wildcard pattern, filter by it
238238
if filter_pattern:
239239
if paths.match_wildcard(filter_pattern, full_path):

opteryx/utils/file_decoders.py

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -461,13 +461,13 @@ def jsonl_decoder(
461461
if use_fast_decoder and not just_schema and not selection and len(buffer) > 10000:
462462
try:
463463
from opteryx.compiled.structures import jsonl_decoder as cython_decoder
464-
464+
465465
# Sample first 100 lines to infer schema
466466
parser = simdjson.Parser()
467467
sample_size = min(100, buffer.count(b"\n"))
468468
sample_records = []
469469
keys_union = set()
470-
470+
471471
start = 0
472472
for _ in range(sample_size):
473473
newline = buffer.find(b"\n", start)
@@ -483,57 +483,58 @@ def jsonl_decoder(
483483
keys_union.update(row.keys())
484484
except Exception:
485485
continue
486-
486+
487487
if sample_records:
488488
# Infer column types from sample
489489
column_types = {}
490490
columns_to_extract = list(keys_union)
491-
491+
492492
if projection:
493493
# If projection specified, only extract those columns
494494
columns_to_extract = [c.value for c in projection if c.value in keys_union]
495-
495+
496496
for key in columns_to_extract:
497497
for record in sample_records:
498498
if key in record and record[key] is not None:
499499
val = record[key]
500500
if isinstance(val, bool):
501-
column_types[key] = 'bool'
501+
column_types[key] = "bool"
502502
elif isinstance(val, int):
503-
column_types[key] = 'int'
503+
column_types[key] = "int"
504504
elif isinstance(val, float):
505-
column_types[key] = 'float'
505+
column_types[key] = "float"
506506
elif isinstance(val, str):
507-
column_types[key] = 'str'
507+
column_types[key] = "str"
508508
elif isinstance(val, list):
509-
column_types[key] = 'list'
509+
column_types[key] = "list"
510510
elif isinstance(val, dict):
511-
column_types[key] = 'dict'
511+
column_types[key] = "dict"
512512
break
513513
if key not in column_types:
514-
column_types[key] = 'str' # Default to string
515-
514+
column_types[key] = "str" # Default to string
515+
516516
# Use Cython decoder
517517
num_rows, num_cols, column_data = cython_decoder.fast_jsonl_decode_columnar(
518518
buffer, columns_to_extract, column_types, sample_size
519519
)
520-
520+
521521
# Convert to PyArrow table
522522
arrays = []
523523
names = []
524524
for key in sorted(columns_to_extract):
525525
arrays.append(pyarrow.array(column_data[key]))
526526
names.append(key)
527-
527+
528528
if arrays:
529529
table = pyarrow.Table.from_arrays(arrays, names=names)
530530
if projection:
531531
table = post_read_projector(table, projection)
532532
return num_rows, num_cols, 0, table
533-
533+
534534
except (ImportError, Exception) as e:
535535
# Fall back to standard decoder if Cython version fails
536536
import warnings
537+
537538
warnings.warn(f"Fast JSONL decoder failed, using standard decoder: {e}")
538539

539540
parser = simdjson.Parser()

opteryx/utils/paths.py

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -45,83 +45,83 @@ def get_parts(path_string: str):
4545
def has_wildcards(path: str) -> bool:
4646
"""
4747
Check if a path contains wildcard characters.
48-
48+
4949
Args:
5050
path: Path string to check
51-
51+
5252
Returns:
5353
True if path contains wildcards (*, ?, [])
5454
"""
55-
return any(char in path for char in ['*', '?', '['])
55+
return any(char in path for char in ["*", "?", "["])
5656

5757

5858
def split_wildcard_path(path: str):
5959
"""
6060
Split a path with wildcards into a non-wildcard prefix and wildcard pattern.
61-
61+
6262
For cloud storage, we need to list blobs with a prefix, then filter by pattern.
6363
This function finds the longest non-wildcard prefix for listing.
64-
64+
6565
Args:
6666
path: Path with potential wildcards (e.g., "bucket/path/subdir/*.parquet")
67-
67+
6868
Returns:
6969
tuple: (prefix, pattern) where:
7070
- prefix: Non-wildcard prefix for listing (e.g., "bucket/path/subdir/")
7171
- pattern: Full path with wildcards for matching (e.g., "bucket/path/subdir/*.parquet")
72-
72+
7373
Examples:
7474
>>> split_wildcard_path("bucket/path/*.parquet")
7575
('bucket/path/', 'bucket/path/*.parquet')
76-
76+
7777
>>> split_wildcard_path("bucket/path/file[0-9].parquet")
7878
('bucket/path/', 'bucket/path/file[0-9].parquet')
79-
79+
8080
>>> split_wildcard_path("bucket/*/data.parquet")
8181
('bucket/', 'bucket/*/data.parquet')
8282
"""
8383
if not has_wildcards(path):
8484
return path, path
85-
85+
8686
# Find the first wildcard character
8787
wildcard_pos = len(path)
88-
for char in ['*', '?', '[']:
88+
for char in ["*", "?", "["]:
8989
pos = path.find(char)
9090
if pos != -1 and pos < wildcard_pos:
9191
wildcard_pos = pos
92-
92+
9393
# Find the last path separator before the wildcard
9494
prefix = path[:wildcard_pos]
9595
last_sep = prefix.rfind(OS_SEP)
96-
96+
9797
if last_sep != -1:
9898
# Include the separator in the prefix
99-
prefix = path[:last_sep + 1]
99+
prefix = path[: last_sep + 1]
100100
else:
101101
# No separator before wildcard, prefix is empty or bucket name
102102
prefix = ""
103-
103+
104104
return prefix, path
105105

106106

107107
def match_wildcard(pattern: str, path: str) -> bool:
108108
"""
109109
Match a path against a wildcard pattern using glob-like semantics.
110-
110+
111111
Unlike fnmatch, this function treats path separators specially:
112112
- '*' matches any characters EXCEPT path separators
113113
- '?' matches any single character EXCEPT path separators
114114
- Use '**' to match across directory boundaries (not yet supported)
115-
115+
116116
This ensures consistent behavior with glob.glob() used for local files.
117-
117+
118118
Args:
119119
pattern: Pattern with wildcards (e.g., "bucket/path/*.parquet")
120120
path: Path to match (e.g., "bucket/path/file1.parquet")
121-
121+
122122
Returns:
123123
True if path matches pattern
124-
124+
125125
Examples:
126126
>>> match_wildcard("bucket/path/*.parquet", "bucket/path/file.parquet")
127127
True
@@ -131,14 +131,14 @@ def match_wildcard(pattern: str, path: str) -> bool:
131131
# Split pattern and path into parts using OS path separator for cross-platform compatibility
132132
pattern_parts = pattern.split(OS_SEP)
133133
path_parts = path.split(OS_SEP)
134-
134+
135135
# Must have same number of path parts for a match (wildcards don't cross directory boundaries)
136136
if len(pattern_parts) != len(path_parts):
137137
return False
138-
138+
139139
# Match each part using fnmatch
140140
for pattern_part, path_part in zip(pattern_parts, path_parts):
141141
if not fnmatch.fnmatch(path_part, pattern_part):
142142
return False
143-
143+
144144
return True

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "opteryx"
3-
version = "0.26.0-beta.1654"
3+
version = "0.26.0-beta.1656"
44
description = "Query your data, where it lives"
55
requires-python = '>=3.11'
66
readme = {file = "README.md", content-type = "text/markdown"}

0 commit comments

Comments
 (0)