Skip to content

Commit 4b1b389

Browse files
authored
Merge pull request #2872 from mabel-dev/clickbench-performance-regression-investigation-1
custom disk reader
2 parents 82ce111 + d7ee463 commit 4b1b389

File tree

8 files changed

+431
-66
lines changed

8 files changed

+431
-66
lines changed

opteryx/__version__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
# THIS FILE IS AUTOMATICALLY UPDATED DURING THE BUILD PROCESS
22
# DO NOT EDIT THIS FILE DIRECTLY
33

4-
__build__ = 1703
4+
__build__ = 1704
55
__author__ = "@joocer"
6-
__version__ = "0.26.0-beta.1703"
6+
__version__ = "0.26.0-beta.1704"
77

88
# Store the version here so:
99
# 1) we don't load dependencies by storing it in __init__.py

opteryx/compiled/io/__init__.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
"""
2+
Compiled I/O operations for high-performance file access.
3+
"""
4+
5+
try:
6+
from .disk_reader import read_file
7+
from .disk_reader import read_file_to_bytes
8+
9+
__all__ = ["read_file", "read_file_to_bytes"]
10+
except ImportError:
11+
# Module not yet compiled
12+
__all__ = []
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
# cython: language_level=3
2+
# cython: nonecheck=False
3+
# cython: cdivision=True
4+
# cython: initializedcheck=False
5+
# cython: infer_types=True
6+
# cython: wraparound=False
7+
# cython: boundscheck=False
8+
9+
"""
10+
Ultra-fast disk reader module
11+
"""
12+
13+
from cpython.buffer cimport PyBuffer_FillInfo
14+
from libc.stdlib cimport free
15+
16+
cdef extern from "disk_io.h":
17+
int read_all_pread(const char* path, unsigned char* dst, size_t* out_len,
18+
bint sequential, bint willneed, bint drop_after)
19+
int read_all_mmap(const char* path, unsigned char** dst, size_t* out_len)
20+
int unmap_memory_c(unsigned char* addr, size_t size)
21+
22+
cdef class MappedMemory:
23+
cdef unsigned char* data
24+
cdef size_t size
25+
cdef bint owned
26+
27+
def __dealloc__(self):
28+
if self.owned and self.data != NULL:
29+
# Free the allocated memory (for non-mmap case)
30+
free(self.data)
31+
32+
def __getbuffer__(self, Py_buffer* buffer, int flags):
33+
PyBuffer_FillInfo(buffer, self, self.data, self.size, 1, flags)
34+
35+
def __len__(self):
36+
return self.size
37+
38+
39+
def read_file(str path, bint sequential=True, bint willneed=True, bint drop_after=False):
40+
"""
41+
Read an entire file into memory with optimized I/O.
42+
"""
43+
import os
44+
45+
if not os.path.exists(path):
46+
raise FileNotFoundError(f"File not found: {path}")
47+
48+
cdef size_t size = os.path.getsize(path)
49+
cdef size_t out_len = 0
50+
51+
path_b = path.encode("utf-8")
52+
cdef const char* c_path = path_b
53+
54+
# Allocate buffer - use bytearray for mutable buffer
55+
buf = bytearray(size)
56+
cdef unsigned char[::1] buf_view = buf
57+
cdef unsigned char* dst = &buf_view[0]
58+
59+
cdef int rc = read_all_pread(c_path, dst, &out_len, sequential, willneed, drop_after)
60+
61+
if rc != 0:
62+
raise OSError(-rc, f"Failed to read file: {path}")
63+
64+
return memoryview(buf)[:out_len]
65+
66+
67+
def read_file_mmap(str path):
68+
"""
69+
Read file using memory mapping - returns an object that provides memoryview interface
70+
but MUST be manually closed to avoid resource leaks.
71+
"""
72+
import os
73+
74+
if not os.path.exists(path):
75+
raise FileNotFoundError(f"File not found: {path}")
76+
77+
path_b = path.encode("utf-8")
78+
cdef const char* c_path = path_b
79+
cdef unsigned char* mapped_data = NULL
80+
cdef size_t size = 0
81+
82+
cdef int rc = read_all_mmap(c_path, &mapped_data, &size)
83+
84+
if rc != 0:
85+
raise OSError(-rc, f"Failed to mmap file: {path}")
86+
87+
# Create wrapper that knows how to clean up
88+
cdef MappedMemory wrapper = MappedMemory.__new__(MappedMemory)
89+
wrapper.data = mapped_data
90+
wrapper.size = size
91+
wrapper.owned = False # This is mmap'd memory, not malloc'd
92+
93+
return wrapper
94+
95+
96+
def read_file_to_bytes(str path, bint sequential=True, bint willneed=True, bint drop_after=False):
97+
"""
98+
Read an entire file into memory as bytes.
99+
"""
100+
mv = read_file(path, sequential, willneed, drop_after)
101+
return bytes(mv)
102+
103+
104+
def unmap_memory(mem_obj):
105+
"""
106+
Explicitly unmap memory from read_file_mmap.
107+
MUST be called when done with the data to avoid resource leaks.
108+
"""
109+
cdef int rc
110+
if hasattr(mem_obj, 'data') and mem_obj.data is not None:
111+
# Import the unmap function from your C code
112+
rc = unmap_memory_c(mem_obj.data, mem_obj.size)
113+
mem_obj.data = None
114+
return rc == 0
115+
return True

opteryx/connectors/disk_connector.py

Lines changed: 27 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -127,73 +127,37 @@ def read_blob(
127127
OSError:
128128
If an I/O error occurs while reading the file.
129129
"""
130-
# Hybrid strategy: choose mmap or read+memoryview depending on OS
131-
# macOS -> mmap, Linux -> read.
132-
133-
# helper to use mmap path
134-
def _use_mmap():
135-
fd = os.open(blob_name, os.O_RDONLY)
136-
try:
137-
if hasattr(os, "posix_fadvise"):
138-
with contextlib.suppress(Exception):
139-
os.posix_fadvise(fd, 0, 0, os.POSIX_FADV_WILLNEED)
140-
size = os.fstat(fd).st_size
141-
_map = mmap.mmap(fd, length=size, **mmap_config)
142-
result = decoder(
143-
_map,
144-
just_schema=just_schema,
145-
projection=projection,
146-
selection=selection,
147-
use_threads=True,
148-
)
130+
from opteryx.compiled.io.disk_reader import read_file_mmap
131+
from opteryx.compiled.io.disk_reader import unmap_memory
132+
133+
# Read using mmap for maximum speed
134+
mmap_obj = read_file_mmap(blob_name)
135+
136+
try:
137+
# Create memoryview for the decoder
138+
mv = memoryview(mmap_obj)
139+
140+
result = decoder(
141+
mv,
142+
just_schema=just_schema,
143+
projection=projection,
144+
selection=selection,
145+
use_threads=True,
146+
)
149147

150-
self.statistics.bytes_read += size
148+
self.statistics.bytes_read += len(mv)
151149

152-
if not just_schema:
153-
stats = self.read_blob_statistics(
154-
blob_name=blob_name, blob_bytes=_map, decoder=decoder
155-
)
156-
if self.relation_statistics is None:
157-
self.relation_statistics = stats
158-
159-
return result
160-
finally:
161-
os.close(fd)
162-
163-
# helper to use read()+memoryview path
164-
def _use_read():
165-
with open(blob_name, "rb") as f:
166-
if hasattr(os, "posix_fadvise"):
167-
with contextlib.suppress(Exception):
168-
os.posix_fadvise(f.fileno(), 0, 0, os.POSIX_FADV_WILLNEED)
169-
170-
data = f.read()
171-
size = len(data)
172-
buf = memoryview(data)
173-
174-
result = decoder(
175-
buf,
176-
just_schema=just_schema,
177-
projection=projection,
178-
selection=selection,
179-
use_threads=True,
150+
if not just_schema:
151+
stats = self.read_blob_statistics(
152+
blob_name=blob_name, blob_bytes=mv, decoder=decoder
180153
)
154+
if self.relation_statistics is None:
155+
self.relation_statistics = stats
181156

182-
self.statistics.bytes_read += size
183-
184-
if not just_schema:
185-
stats = self.read_blob_statistics(
186-
blob_name=blob_name, blob_bytes=buf, decoder=decoder
187-
)
188-
if self.relation_statistics is None:
189-
self.relation_statistics = stats
190-
191-
return result
192-
193-
# macOS: use mmap; Linux: prefer read (observed faster on some Linux setups)
194-
if platform.system() == "Darwin":
195-
return _use_mmap()
196-
return _use_read()
157+
return result
158+
finally:
159+
# CRITICAL: Clean up the memory mapping
160+
unmap_memory(mmap_obj)
197161

198162
@single_item_cache
199163
def get_list_of_blob_names(self, *, prefix: str) -> List[str]:

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "opteryx"
3-
version = "0.26.0-beta.1703"
3+
version = "0.26.0-beta.1704"
44
description = "Query your data, where it lives"
55
requires-python = '>=3.11'
66
readme = {file = "README.md", content-type = "text/markdown"}

setup.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,13 @@ def rust_build(setup_kwargs: Dict[str, Any]) -> None:
248248
sources=["opteryx/compiled/structures/node.pyx"],
249249
extra_compile_args=C_COMPILE_FLAGS,
250250
),
251+
Extension(
252+
name="opteryx.compiled.io.disk_reader",
253+
sources=["opteryx/compiled/io/disk_reader.pyx", "src/cpp/disk_io.cpp"],
254+
include_dirs=include_dirs + ["src/cpp"],
255+
language="c++",
256+
extra_compile_args=CPP_COMPILE_FLAGS,
257+
),
251258
Extension(
252259
name="opteryx.compiled.table_ops.distinct",
253260
sources=["opteryx/compiled/table_ops/distinct.pyx"],

0 commit comments

Comments
 (0)