Skip to content

Commit c64dcba

Browse files
authored
Merge pull request #2875 from mabel-dev/clickbench-performance-regression-investigation-1
rewrite memory view stream
2 parents fe38b3b + 6d2758c commit c64dcba

File tree

7 files changed

+274
-165
lines changed

7 files changed

+274
-165
lines changed

opteryx/__version__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
# THIS FILE IS AUTOMATICALLY UPDATED DURING THE BUILD PROCESS
22
# DO NOT EDIT THIS FILE DIRECTLY
33

4-
__build__ = 1705
4+
__build__ = 1706
55
__author__ = "@joocer"
6-
__version__ = "0.26.0-beta.1705"
6+
__version__ = "0.26.0-beta.1706"
77

88
# Store the version here so:
99
# 1) we don't load dependencies by storing it in __init__.py

opteryx/compiled/io/disk_reader.h

Lines changed: 0 additions & 40 deletions
This file was deleted.
Lines changed: 264 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,264 @@
1+
# cython: language_level=3
2+
# cython: boundscheck=False
3+
# cython: wraparound=False
4+
# cython: cdivision=True
5+
# cython: initializedcheck=False
6+
# cython: nonecheck=False
7+
8+
"""
9+
Cython-optimized MemoryViewStream for high-performance memoryview reading.
10+
"""
11+
12+
import io
13+
14+
cdef class MemoryViewStream:
15+
"""
16+
Handle a memoryview like a stream without converting to bytes.
17+
18+
Optimized Cython implementation for maximum performance.
19+
"""
20+
cdef:
21+
const unsigned char[:] mv # Typed memoryview for direct access
22+
Py_ssize_t offset
23+
bint _closed
24+
Py_ssize_t _len
25+
object _underlying_bytes # bytes object if available
26+
27+
def __init__(self, object mv):
28+
self.mv = mv
29+
self.offset = 0
30+
self._closed = False
31+
self._len = len(mv)
32+
# Check if we can use the underlying bytes object directly
33+
self._underlying_bytes = mv.obj if isinstance(mv.obj, bytes) else None
34+
35+
cpdef read(self, Py_ssize_t n=-1):
36+
"""Read and return up to n bytes."""
37+
if self._closed:
38+
raise ValueError("I/O operation on closed file.")
39+
40+
cdef:
41+
Py_ssize_t offset = self.offset
42+
Py_ssize_t length = self._len
43+
Py_ssize_t bytes_to_read
44+
45+
if offset >= length:
46+
return b""
47+
48+
if n < 0 or offset + n > length:
49+
bytes_to_read = length - offset
50+
else:
51+
bytes_to_read = n
52+
53+
# Fast path: if backed by bytes, slice directly (no copy)
54+
if self._underlying_bytes is not None:
55+
result = self._underlying_bytes[offset : offset + bytes_to_read]
56+
self.offset = offset + bytes_to_read
57+
return result
58+
59+
# Use memoryview slicing which is more efficient than tobytes()
60+
result = self.mv[offset : offset + bytes_to_read]
61+
self.offset = offset + bytes_to_read
62+
# Use bytes() constructor instead of tobytes() for better performance
63+
return bytes(result)
64+
65+
cpdef Py_ssize_t readinto(self, bytearray b):
66+
"""Read bytes into a pre-allocated buffer (zero-copy when possible)."""
67+
if self._closed:
68+
raise ValueError("I/O operation on closed file.")
69+
70+
cdef:
71+
Py_ssize_t n = len(b)
72+
Py_ssize_t bytes_available = self._len - self.offset
73+
Py_ssize_t bytes_to_read
74+
75+
if bytes_available <= 0:
76+
return 0
77+
78+
bytes_to_read = n if n < bytes_available else bytes_available
79+
80+
# Direct memory copy for maximum performance
81+
cdef unsigned char[:] b_view = b
82+
cdef Py_ssize_t i
83+
for i in range(bytes_to_read):
84+
b_view[i] = self.mv[self.offset + i]
85+
86+
self.offset += bytes_to_read
87+
return bytes_to_read
88+
89+
cpdef read1(self, Py_ssize_t n=-1):
90+
"""Read and return up to n bytes (same as read for this implementation)."""
91+
return self.read(n)
92+
93+
cpdef Py_ssize_t seek(self, Py_ssize_t offset, int whence=0):
94+
"""Change stream position."""
95+
if self._closed:
96+
raise ValueError("I/O operation on closed file.")
97+
98+
cdef Py_ssize_t new_offset
99+
100+
if whence == 0: # SEEK_SET
101+
new_offset = offset
102+
elif whence == 1: # SEEK_CUR
103+
new_offset = self.offset + offset
104+
elif whence == 2: # SEEK_END
105+
new_offset = self._len + offset
106+
else:
107+
raise ValueError(f"Invalid value for whence: {whence}")
108+
109+
# Clamp to valid range
110+
if new_offset < 0:
111+
new_offset = 0
112+
elif new_offset > self._len:
113+
new_offset = self._len
114+
115+
self.offset = new_offset
116+
return self.offset
117+
118+
cpdef Py_ssize_t tell(self):
119+
"""Return current stream position."""
120+
return self.offset
121+
122+
def readable(self):
123+
"""Return whether object supports reading."""
124+
return True
125+
126+
def writable(self):
127+
"""Return whether object supports writing."""
128+
return False
129+
130+
def seekable(self):
131+
"""Return whether object supports random access."""
132+
return True
133+
134+
cpdef close(self):
135+
"""Close the stream."""
136+
self._closed = True
137+
138+
@property
139+
def closed(self):
140+
"""Return whether the stream is closed."""
141+
return self._closed
142+
143+
@property
144+
def mode(self):
145+
"""Return the mode of the stream."""
146+
return "rb"
147+
148+
def __len__(self):
149+
"""Return the length of the underlying buffer."""
150+
return self._len
151+
152+
def __enter__(self):
153+
"""Context manager entry."""
154+
return self
155+
156+
def __exit__(self, exc_type, exc_val, exc_tb):
157+
"""Context manager exit."""
158+
self.close()
159+
160+
def __iter__(self):
161+
"""Return an iterator over the memoryview."""
162+
return self
163+
164+
def __next__(self):
165+
"""Return the next byte."""
166+
if self._closed:
167+
raise ValueError("I/O operation on closed file.")
168+
if self.offset >= self._len:
169+
raise StopIteration()
170+
171+
# Direct access to memoryview for maximum performance
172+
cdef unsigned char byte = self.mv[self.offset]
173+
self.offset += 1
174+
return bytes([byte])
175+
176+
def fileno(self):
177+
"""Return file descriptor (not supported)."""
178+
return -1
179+
180+
def flush(self):
181+
"""Flush write buffers (not supported)."""
182+
raise io.UnsupportedOperation()
183+
184+
def isatty(self):
185+
"""Return whether this is an interactive stream."""
186+
return False
187+
188+
def readline(self, limit=-1):
189+
"""Read and return a line (not supported)."""
190+
raise io.UnsupportedOperation()
191+
192+
def readlines(self, hint=-1):
193+
"""Read and return a list of lines (not supported)."""
194+
raise io.UnsupportedOperation()
195+
196+
def truncate(self, size=None):
197+
"""Truncate file to size (not supported)."""
198+
raise io.UnsupportedOperation()
199+
200+
def write(self, s):
201+
"""Write string to file (not supported)."""
202+
raise io.UnsupportedOperation()
203+
204+
def writelines(self, lines):
205+
"""Write a list of lines to stream (not supported)."""
206+
raise io.UnsupportedOperation()
207+
208+
# Additional high-performance helper functions
209+
cdef class MemoryViewStreamOptimized(MemoryViewStream):
210+
"""
211+
Further optimized version with additional performance enhancements.
212+
"""
213+
214+
cpdef const unsigned char[:] read_memoryview(self, Py_ssize_t n=-1):
215+
"""
216+
Read as memoryview instead of bytes (zero-copy).
217+
218+
This avoids the copy in read() but the returned memoryview
219+
becomes invalid if the underlying buffer changes.
220+
"""
221+
if self._closed:
222+
raise ValueError("I/O operation on closed file.")
223+
224+
cdef:
225+
Py_ssize_t offset = self.offset
226+
Py_ssize_t length = self._len
227+
Py_ssize_t bytes_to_read
228+
229+
if offset >= length:
230+
return self.mv[0:0] # Empty memoryview
231+
232+
if n < 0 or offset + n > length:
233+
bytes_to_read = length - offset
234+
else:
235+
bytes_to_read = n
236+
237+
result = self.mv[offset : offset + bytes_to_read]
238+
self.offset = offset + bytes_to_read
239+
return result
240+
241+
cpdef Py_ssize_t readinto_memoryview(self, unsigned char[:] buffer):
242+
"""
243+
Read into existing memoryview (most efficient for large reads).
244+
"""
245+
if self._closed:
246+
raise ValueError("I/O operation on closed file.")
247+
248+
cdef:
249+
Py_ssize_t n = buffer.shape[0]
250+
Py_ssize_t bytes_available = self._len - self.offset
251+
Py_ssize_t bytes_to_read
252+
Py_ssize_t i
253+
254+
if bytes_available <= 0:
255+
return 0
256+
257+
bytes_to_read = n if n < bytes_available else bytes_available
258+
259+
# Direct memory copy - fastest possible
260+
for i in range(bytes_to_read):
261+
buffer[i] = self.mv[self.offset + i]
262+
263+
self.offset += bytes_to_read
264+
return bytes_to_read

opteryx/utils/file_decoders.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,13 @@
2323
from pyarrow import parquet
2424
from rugo.converters.orso import rugo_to_orso_schema
2525

26+
from opteryx.compiled.structures.memory_view_stream import MemoryViewStream
2627
from opteryx.connectors.capabilities import PredicatePushable
2728
from opteryx.exceptions import UnsupportedFileTypeError
2829
from opteryx.managers.expression import NodeType
2930
from opteryx.managers.expression import get_all_nodes_of_type
3031
from opteryx.models import RelationStatistics
3132
from opteryx.utils.arrow import post_read_projector
32-
from opteryx.utils.memory_view_stream import MemoryViewStream
3333

3434

3535
class ExtentionType(str, Enum):

0 commit comments

Comments
 (0)