Skip to content

Commit 2bcd505

Browse files
committed
disk connector tweaks
1 parent 77511df commit 2bcd505

File tree

6 files changed

+69
-13
lines changed

6 files changed

+69
-13
lines changed

opteryx/__version__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
# THIS FILE IS AUTOMATICALLY UPDATED DURING THE BUILD PROCESS
22
# DO NOT EDIT THIS FILE DIRECTLY
33

4-
__build__ = 1673
4+
__build__ = 1674
55
__author__ = "@joocer"
6-
__version__ = "0.26.0-beta.1673"
6+
__version__ = "0.26.0-beta.1674"
77

88
# Store the version here so:
99
# 1) we don't load dependencies by storing it in __init__.py

opteryx/compiled/structures/lru_k.pyx

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,6 @@ cdef class LRU_K:
177177
"""Evict one item using simplified LRU-K algorithm."""
178178
cdef bytes candidate_key = None
179179
cdef bytes candidate_value = None
180-
cdef int64_t oldest_kth_time = -1
181180
cdef int64_t kth_time
182181
cdef list history
183182

opteryx/connectors/disk_connector.py

Lines changed: 48 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,11 @@
88
given as a folder on local disk
99
"""
1010

11+
import contextlib
12+
import ctypes
1113
import mmap
1214
import os
15+
import platform
1316
import time
1417
from typing import Dict
1518
from typing import List
@@ -34,6 +37,7 @@
3437

3538
OS_SEP = os.sep
3639
IS_WINDOWS = is_windows()
40+
IS_LINUX = platform.system() == "Linux"
3741

3842
# Define os.O_BINARY for non-Windows platforms if it's not already defined
3943
if not hasattr(os, "O_BINARY"):
@@ -43,7 +47,12 @@
4347

4448
mmap_config = {}
4549
if not IS_WINDOWS:
46-
mmap_config["flags"] = mmap.MAP_PRIVATE
50+
# prefer MAP_PRIVATE and on Linux enable MAP_POPULATE to fault pages in
51+
flags = mmap.MAP_PRIVATE
52+
if IS_LINUX and hasattr(mmap, "MAP_POPULATE"):
53+
with contextlib.suppress(Exception):
54+
flags |= mmap.MAP_POPULATE
55+
mmap_config["flags"] = flags
4756
mmap_config["prot"] = mmap.PROT_READ
4857
else:
4958
mmap_config["access"] = mmap.ACCESS_READ
@@ -129,14 +138,42 @@ def read_blob(
129138
OSError:
130139
If an I/O error occurs while reading the file.
131140
"""
141+
file_descriptor = None
142+
_map = None
132143
try:
133144
file_descriptor = os.open(blob_name, os.O_RDONLY | os.O_BINARY)
145+
# on platforms that support it give the kernel a hint about access pattern
134146
if hasattr(os, "posix_fadvise"):
135-
os.posix_fadvise(file_descriptor, 0, 0, os.POSIX_FADV_WILLNEED)
147+
# sequential access is the common pattern for dataset reads
148+
try:
149+
os.posix_fadvise(file_descriptor, 0, 0, os.POSIX_FADV_SEQUENTIAL)
150+
except OSError:
151+
# fallback to WILLNEED if SEQUENTIAL is not allowed
152+
with contextlib.suppress(Exception):
153+
os.posix_fadvise(file_descriptor, 0, 0, os.POSIX_FADV_WILLNEED)
154+
136155
size = os.fstat(file_descriptor).st_size
137156
_map = mmap.mmap(file_descriptor, length=size, **mmap_config)
157+
158+
# On Linux advise the kernel that access will be sequential to improve readahead
159+
if IS_LINUX:
160+
try:
161+
libc = ctypes.CDLL("libc.so.6")
162+
# MADV_SEQUENTIAL is 2 on Linux, but don't hardcode if available
163+
MADV_SEQUENTIAL = 2
164+
addr = ctypes.c_void_p(ctypes.addressof(ctypes.c_char.from_buffer(_map)))
165+
length = ctypes.c_size_t(size)
166+
libc.madvise(addr, length, MADV_SEQUENTIAL)
167+
except Exception:
168+
# best-effort: if anything goes wrong, ignore
169+
pass
170+
171+
# pass a memoryview of the mmap to decoders - this makes intent explicit
172+
# and lets decoders that can accept memoryviews avoid extra copies
173+
buffer = memoryview(_map)
174+
138175
result = decoder(
139-
_map,
176+
buffer,
140177
just_schema=just_schema,
141178
projection=projection,
142179
selection=selection,
@@ -146,14 +183,20 @@ def read_blob(
146183

147184
if not just_schema:
148185
stats = self.read_blob_statistics(
149-
blob_name=blob_name, blob_bytes=_map, decoder=decoder
186+
blob_name=blob_name, blob_bytes=buffer, decoder=decoder
150187
)
151188
if self.relation_statistics is None:
152189
self.relation_statistics = stats
153190

154191
return result
155192
finally:
156-
os.close(file_descriptor)
193+
# Ensure mmap is closed before closing the file descriptor
194+
with contextlib.suppress(Exception):
195+
if _map is not None:
196+
_map.close()
197+
with contextlib.suppress(Exception):
198+
if file_descriptor is not None:
199+
os.close(file_descriptor)
157200

158201
@single_item_cache
159202
def get_list_of_blob_names(self, *, prefix: str) -> List[str]:

opteryx/utils/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,12 @@
88
from typing import Iterable
99
from typing import Optional
1010

11+
from orso.tools import single_item_cache
12+
1113
from opteryx.third_party.mbleven import compare
1214

1315

16+
@single_item_cache
1417
def is_windows() -> bool:
1518
return platform.system().lower() == "windows"
1619

opteryx/utils/file_decoders.py

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,7 @@ def zstd_decoder(
181181
else:
182182
stream = buffer
183183

184+
# zstandard.open returns a file-like which we pass directly to jsonl_decoder
184185
with zstandard.open(stream, "rb") as file:
185186
return jsonl_decoder(
186187
file, projection=projection, selection=selection, just_schema=just_schema
@@ -318,9 +319,17 @@ def parquet_decoder(
318319

319320
# If we're here, we can't use rugo - we need to read the file with pyarrow
320321

321-
# Open the parquet file only once
322+
# Open the parquet file only once. Prefer pyarrow.BufferReader with a
323+
# pyarrow.Buffer when we have a memoryview to avoid creating intermediate
324+
# Python bytes objects.
322325
if isinstance(buffer, memoryview):
323-
stream = MemoryViewStream(buffer)
326+
# pyarrow.py_buffer accepts buffer-protocol objects and is zero-copy
327+
try:
328+
pa_buf = pyarrow.py_buffer(buffer)
329+
stream = pyarrow.BufferReader(pa_buf)
330+
except Exception:
331+
# fallback to MemoryViewStream if pyarrow can't handle this memoryview
332+
stream = MemoryViewStream(buffer)
324333
elif isinstance(buffer, bytes):
325334
stream = pyarrow.BufferReader(buffer)
326335
else:
@@ -444,10 +453,12 @@ def jsonl_decoder(
444453

445454
from opteryx.third_party.tktech import csimdjson as simdjson
446455

456+
# Normalize inputs: accept memoryview, bytes, or file-like objects.
447457
if isinstance(buffer, memoryview):
448-
# If it's a memoryview, we need to convert it to bytes
458+
# Convert to bytes once; many downstream codepaths expect a bytes object
449459
buffer = buffer.tobytes()
450-
if not isinstance(buffer, bytes):
460+
elif not isinstance(buffer, bytes) and hasattr(buffer, "read"):
461+
# file-like: read once into memory
451462
buffer = buffer.read()
452463

453464
# If it's COUNT(*), we don't need to create a full dataset

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "opteryx"
3-
version = "0.26.0-beta.1673"
3+
version = "0.26.0-beta.1674"
44
description = "Query your data, where it lives"
55
requires-python = '>=3.11'
66
readme = {file = "README.md", content-type = "text/markdown"}

0 commit comments

Comments
 (0)