Skip to content

Commit 97c3b0b

Browse files
authored
Merge pull request #58 from mabel-dev/missing-dep
fix compression and limit pushdown
2 parents d0306c2 + 120a2ef commit 97c3b0b

File tree

11 files changed

+449
-302
lines changed

11 files changed

+449
-302
lines changed

dev/build_counter.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ class VersionStatus(Enum):
2626

2727
__major_version__ = 0
2828
__minor_version__ = 5
29-
__revision_version__ = 3
29+
__revision_version__ = 4
3030
__author__ = "@joocer"
3131
__status__ = VersionStatus.RELEASE
3232

opteryx/__version__.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
# THIS FILE IS AUTOMATICALLY UPDATED DURING THE BUILD PROCESS
22
# DO NOT EDIT THIS FILE DIRECTLY
33

4-
__build__ = 116
4+
__build__ = 117
55
__author__ = "@joocer"
6-
__version__ = "0.5.3"
6+
__version__ = "0.5.4"
77
__lib__ = "opteryx-core"
8-
__build_date__ = "2025-12-30T17:48:32.909410+00:00Z"
8+
__build_date__ = "2025-12-31T19:34:31.469445+00:00Z"
99

1010
# Store the version here so:
1111
# 1) we don't load dependencies by storing it in __init__.py

opteryx/connectors/filesystem_connector.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -185,12 +185,13 @@ def blocking_read():
185185
telemetry.bytes_read += len(data)
186186
return ref
187187

188-
def get_list_of_blob_names(self, *, prefix: str) -> List[str]:
188+
def get_list_of_blob_names(self, *, prefix: str, predicates: list = []) -> List[str]:
189189
"""
190190
List all blobs matching the prefix.
191191
192192
Args:
193193
prefix: Path prefix to search
194+
predicates: Optional predicates for filtering (subclasses may use this)
194195
195196
Returns:
196197
List of blob paths
@@ -230,7 +231,7 @@ def read_dataset(
230231
Yields:
231232
PyArrow Tables or schemas
232233
"""
233-
blob_names = self.get_list_of_blob_names(prefix=self.dataset)
234+
blob_names = self.get_list_of_blob_names(prefix=self.dataset, predicates=predicates or [])
234235

235236
if just_schema:
236237
for blob_name in blob_names:

opteryx/connectors/opteryx_connector.py

Lines changed: 8 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,11 @@
44
# Distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND.
55

66
"""
7-
Iceberg Connector - Refactored Architecture
7+
Opteryx Connector - Refactored Architecture
88
99
Architecture:
10-
- IcebergConnector: Long-lived catalog gateway (handles catalog operations, views, introspection)
11-
- IcebergTable: Transient table-specific engine (handles data reading for one table)
10+
- OpteryxConnector: Long-lived catalog gateway (handles catalog operations, views, introspection)
11+
- OpteryxTable: Transient table-specific engine (handles data reading for one table)
1212
"""
1313

1414
import datetime
@@ -105,7 +105,7 @@ def __init__(self, dataset: str, catalog, workspace: str, **kwargs):
105105

106106
# Call FileSystemTable.__init__ which calls BaseTable.__init__
107107
FileSystemTable.__init__(
108-
self, dataset=dataset, filesystem=filesystem, storage_type="ICEBERG", **kwargs
108+
self, dataset=dataset, filesystem=filesystem, storage_type="OPTERYX", **kwargs
109109
)
110110
Diachronic.__init__(self, **kwargs)
111111
Statistics.__init__(self, **kwargs)
@@ -180,7 +180,7 @@ def get_dataset_schema(self) -> RelationSchema:
180180
# Use Parquet manifest reader instead of Opteryx inspect API to avoid Avro
181181
try:
182182
import pyarrow as pa
183-
from opteryx_catalof.parquet_manifest import read_parquet_manifest
183+
from opteryx_catalog.parquet_manifest import read_parquet_manifest
184184

185185
parquet_records = read_parquet_manifest(
186186
self.table.metadata,
@@ -230,16 +230,6 @@ def get_dataset_schema(self) -> RelationSchema:
230230

231231
relation_statistics.record_count = pyarrow.compute.sum(files.column("record_count")).as_py()
232232

233-
if "distinct_counts" in files.columns:
234-
for file in files.column("distinct_counts"):
235-
for k, v in file:
236-
relation_statistics.set_cardinality_estimate(column_names[k], v)
237-
238-
if "value_counts" in files.columns:
239-
for file in files.column("value_counts"):
240-
for k, v in file:
241-
relation_statistics.add_count(column_names[k], v)
242-
243233
self.relation_statistics = relation_statistics
244234

245235
return self.schema
@@ -250,6 +240,7 @@ def get_list_of_blob_names(self, *, prefix: str = None, predicates: list = []) -
250240
# Get the list of data files to read
251241
data_files = self.table.scan(
252242
#row_filter=pushed_filters,
243+
row_limit=self.limit,
253244
snapshot_id=self.snapshot_id,
254245
)
255246
return [data_file.file_path for data_file in data_files]
@@ -461,9 +452,8 @@ def get_view(self, view_name: str):
461452
# Parse relative_id into collection and name
462453
# For "clickbench.q01": collection="clickbench", name="q01"
463454
parts = relative_id.split(".")
464-
if len(parts) >= 2:
465-
name = parts[-1]
466-
collection = ".".join(parts[:-1])
455+
name = parts[-1]
456+
collection = ".".join(parts[:-1])
467457

468458
identifier = (collection, name)
469459
view = catalog.load_view(identifier)

opteryx/draken/vectors/arrow_vector.py

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,13 +42,77 @@ def __init__(self, arrow_array: "pyarrow.Array"):
4242
raise TypeError("ArrowVector requires a pyarrow.Array")
4343
self._arr = arrow_array
4444
self._pa = pa
45-
self._pc = pa.compute
45+
try:
46+
# Prefer direct access if available
47+
self._pc = pa.compute
48+
except Exception:
49+
try:
50+
# Some pyarrow builds expose compute as a submodule
51+
import pyarrow.compute as _pc
52+
53+
self._pc = _pc
54+
except Exception:
55+
# Minimal shim for required compute operations used by ArrowVector
56+
class _Shim:
57+
@staticmethod
58+
def take(arr, indices_arr):
59+
indices = indices_arr.to_pylist()
60+
vals = arr.to_pylist()
61+
return pa.array([vals[i] for i in indices])
62+
63+
@staticmethod
64+
def equal(arr, value):
65+
return pa.array([x == value for x in arr.to_pylist()])
66+
67+
@staticmethod
68+
def not_equal(arr, value):
69+
return pa.array([x != value for x in arr.to_pylist()])
70+
71+
@staticmethod
72+
def greater(arr, value):
73+
return pa.array([x > value for x in arr.to_pylist()])
74+
75+
@staticmethod
76+
def greater_equal(arr, value):
77+
return pa.array([x >= value for x in arr.to_pylist()])
78+
79+
@staticmethod
80+
def less(arr, value):
81+
return pa.array([x < value for x in arr.to_pylist()])
82+
83+
@staticmethod
84+
def less_equal(arr, value):
85+
return pa.array([x <= value for x in arr.to_pylist()])
86+
87+
@staticmethod
88+
def sum(arr):
89+
s = sum(x for x in arr.to_pylist() if x is not None)
90+
return pa.scalar(s)
91+
92+
@staticmethod
93+
def min(arr):
94+
vals = [x for x in arr.to_pylist() if x is not None]
95+
return pa.scalar(min(vals)) if vals else pa.scalar(None)
96+
97+
@staticmethod
98+
def max(arr):
99+
vals = [x for x in arr.to_pylist() if x is not None]
100+
return pa.scalar(max(vals)) if vals else pa.scalar(None)
101+
102+
@staticmethod
103+
def is_null(arr):
104+
return pa.array([x is None for x in arr.to_pylist()])
105+
106+
self._pc = _Shim()
46107

47108
# -------- Core metadata --------
48109
@property
49110
def length(self) -> int:
50111
return len(self._arr)
51112

113+
def __len__(self) -> int:
114+
return self.length
115+
52116
@property
53117
def dtype(self):
54118
from opteryx.draken.interop.arrow import arrow_type_to_draken

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "opteryx_core"
3-
version = "0.5.3"
3+
version = "0.5.4"
44
description = "Opteryx Query Engine"
55
requires-python = '>=3.13'
66
readme = {file = "README.md", content-type = "text/markdown"}

third_party/mabel/draken/vectors/date32_vector.pyx

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -366,6 +366,37 @@ cdef class Date32Vector(Vector):
366366

367367
dst[i] = mix_hash(dst[i], value)
368368

369+
cdef void compress_into(self, int64_t[::1] out_buf, Py_ssize_t offset=0) except *:
370+
"""Fast compress for Date32Vector: extend int32 days to int64."""
371+
cdef DrakenFixedBuffer* ptr = self.ptr
372+
cdef int32_t* data = <int32_t*> ptr.data
373+
cdef Py_ssize_t n = ptr.length
374+
cdef int64_t NULL_FLAG = <int64_t> -9223372036854775808
375+
376+
if n == 0:
377+
return
378+
379+
if offset < 0 or offset + n > out_buf.shape[0]:
380+
raise ValueError("Date32Vector.compress: output buffer too small")
381+
382+
cdef int64_t* dst = &out_buf[offset]
383+
cdef uint8_t* null_bitmap = ptr.null_bitmap
384+
cdef bint has_nulls = null_bitmap != NULL
385+
cdef Py_ssize_t i
386+
cdef uint8_t byte, bit
387+
388+
if has_nulls:
389+
for i in range(n):
390+
byte = null_bitmap[i >> 3]
391+
bit = (byte >> (i & 7)) & 1
392+
if bit:
393+
dst[i] = <int64_t> data[i]
394+
else:
395+
dst[i] = NULL_FLAG
396+
else:
397+
for i in range(n):
398+
dst[i] = <int64_t> data[i]
399+
369400
def __str__(self):
370401
cdef list vals = []
371402
cdef Py_ssize_t i, k = min(<Py_ssize_t>buf_length(self.ptr), 10)

third_party/mabel/draken/vectors/interval_vector.pyx

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,34 @@ cdef class IntervalVector(Vector):
328328
value = mix_hash(partial, <uint64_t>data[i].microseconds)
329329
dst[i] = mix_hash(dst[i], value)
330330

331+
cdef void compress_into(self, int64_t[::1] out_buf, Py_ssize_t offset=0) except *:
332+
"""Fast compress for IntervalVector: use months component for ordering."""
333+
cdef DrakenFixedBuffer* ptr = self.ptr
334+
cdef Py_ssize_t n = ptr.length
335+
cdef int64_t NULL_FLAG = <int64_t> -9223372036854775808
336+
337+
if n == 0:
338+
return
339+
340+
if offset < 0 or offset + n > out_buf.shape[0]:
341+
raise ValueError("IntervalVector.compress: output buffer too small")
342+
343+
cdef IntervalValue* data = <IntervalValue*> ptr.data
344+
cdef int64_t* dst = &out_buf[offset]
345+
cdef bint has_nulls = ptr.null_bitmap != NULL
346+
cdef Py_ssize_t i
347+
348+
if has_nulls:
349+
for i in range(n):
350+
if _is_valid(ptr, i):
351+
# Use months as primary component for ordering
352+
dst[i] = data[i].months
353+
else:
354+
dst[i] = NULL_FLAG
355+
else:
356+
for i in range(n):
357+
dst[i] = data[i].months
358+
331359
def __str__(self):
332360
cdef list preview = []
333361
cdef Py_ssize_t i, n = buf_length(self.ptr)

third_party/mabel/draken/vectors/time_vector.pyx

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ from libc.stdint cimport intptr_t
2727
from libc.stdint cimport uint64_t
2828
from libc.stdint cimport uint8_t
2929
from libc.stdlib cimport malloc
30-
from libc.string cimport memset
30+
from libc.string cimport memset, memcpy
3131

3232
from opteryx.draken.core.buffers cimport DrakenFixedBuffer
3333
from opteryx.draken.core.buffers cimport DRAKEN_TIME32
@@ -299,6 +299,52 @@ cdef class TimeVector(Vector):
299299

300300
dst[i] = mix_hash(dst[i], value)
301301

302+
cdef void compress_into(self, int64_t[::1] out_buf, Py_ssize_t offset=0) except *:
303+
"""Fast compress for TimeVector: handle both time32 and time64."""
304+
cdef DrakenFixedBuffer* ptr = self.ptr
305+
cdef Py_ssize_t n = ptr.length
306+
cdef int64_t NULL_FLAG = <int64_t> -9223372036854775808
307+
308+
if n == 0:
309+
return
310+
311+
if offset < 0 or offset + n > out_buf.shape[0]:
312+
raise ValueError("TimeVector.compress: output buffer too small")
313+
314+
cdef int64_t* dst = &out_buf[offset]
315+
cdef uint8_t* null_bitmap = ptr.null_bitmap
316+
cdef bint has_nulls = null_bitmap != NULL
317+
cdef Py_ssize_t i
318+
cdef uint8_t byte, bit
319+
cdef int64_t* data64
320+
cdef int32_t* data32
321+
322+
if self.is_time64:
323+
data64 = <int64_t*> ptr.data
324+
if not has_nulls:
325+
memcpy(<void*>dst, <const void*>data64, <size_t>(n * sizeof(int64_t)))
326+
return
327+
for i in range(n):
328+
byte = null_bitmap[i >> 3]
329+
bit = (byte >> (i & 7)) & 1
330+
if bit:
331+
dst[i] = data64[i]
332+
else:
333+
dst[i] = NULL_FLAG
334+
else:
335+
data32 = <int32_t*> ptr.data
336+
if has_nulls:
337+
for i in range(n):
338+
byte = null_bitmap[i >> 3]
339+
bit = (byte >> (i & 7)) & 1
340+
if bit:
341+
dst[i] = <int64_t> data32[i]
342+
else:
343+
dst[i] = NULL_FLAG
344+
else:
345+
for i in range(n):
346+
dst[i] = <int64_t> data32[i]
347+
302348
def __str__(self):
303349
cdef list vals = []
304350
cdef Py_ssize_t i, k = min(<Py_ssize_t>buf_length(self.ptr), 10)

third_party/mabel/draken/vectors/timestamp_vector.pyx

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ from libc.stdint cimport intptr_t
2727
from libc.stdint cimport uint64_t
2828
from libc.stdint cimport uint8_t
2929
from libc.stdlib cimport malloc
30-
from libc.string cimport memset
30+
from libc.string cimport memset, memcpy
3131

3232
from opteryx.draken.core.buffers cimport DrakenFixedBuffer
3333
from opteryx.draken.core.buffers cimport DRAKEN_TIMESTAMP64
@@ -348,6 +348,37 @@ cdef class TimestampVector(Vector):
348348

349349
dst[i] = mix_hash(dst[i], value)
350350

351+
cdef void compress_into(self, int64_t[::1] out_buf, Py_ssize_t offset=0) except *:
352+
"""Fast compress for TimestampVector: timestamps are already int64."""
353+
cdef DrakenFixedBuffer* ptr = self.ptr
354+
cdef int64_t* src = <int64_t*> ptr.data
355+
cdef Py_ssize_t n = ptr.length
356+
cdef int64_t* dst_base
357+
cdef int64_t NULL_FLAG = <int64_t> -9223372036854775808
358+
359+
if n == 0:
360+
return
361+
362+
if offset < 0 or offset + n > out_buf.shape[0]:
363+
raise ValueError("TimestampVector.compress: output buffer too small")
364+
365+
dst_base = &out_buf[0]
366+
cdef int64_t* dst = dst_base + offset
367+
cdef uint8_t* null_bitmap = ptr.null_bitmap
368+
cdef bint has_nulls = null_bitmap != NULL
369+
cdef Py_ssize_t i
370+
371+
if not has_nulls:
372+
# Fast path: bulk copy
373+
memcpy(<void*>dst, <const void*>src, <size_t>(n * sizeof(int64_t)))
374+
return
375+
376+
for i in range(n):
377+
if _bitmap_is_valid(null_bitmap, i, self.null_bit_offset):
378+
dst[i] = src[i]
379+
else:
380+
dst[i] = NULL_FLAG
381+
351382
def __str__(self):
352383
cdef list vals = []
353384
cdef Py_ssize_t i, k = min(<Py_ssize_t>buf_length(self.ptr), 10)

0 commit comments

Comments
 (0)