Skip to content

Commit 82ce111

Browse files
authored
Merge pull request #2871 from mabel-dev/clickbench-performance-regression-investigation-1
Clickbench performance regression investigation 1
2 parents e512bbc + a2bca50 commit 82ce111

File tree

5 files changed

+142
-123
lines changed

5 files changed

+142
-123
lines changed

opteryx/__version__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
# THIS FILE IS AUTOMATICALLY UPDATED DURING THE BUILD PROCESS
22
# DO NOT EDIT THIS FILE DIRECTLY
33

4-
__build__ = 1698
4+
__build__ = 1703
55
__author__ = "@joocer"
6-
__version__ = "0.26.0-beta.1698"
6+
__version__ = "0.26.0-beta.1703"
77

88
# Store the version here so:
99
# 1) we don't load dependencies by storing it in __init__.py

opteryx/connectors/disk_connector.py

Lines changed: 71 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
"""
1010

1111
import contextlib
12-
import ctypes
1312
import mmap
1413
import os
1514
import platform
@@ -31,31 +30,21 @@
3130
from opteryx.exceptions import DatasetNotFoundError
3231
from opteryx.exceptions import EmptyDatasetError
3332
from opteryx.exceptions import UnsupportedFileTypeError
34-
from opteryx.utils import is_windows
3533
from opteryx.utils.file_decoders import TUPLE_OF_VALID_EXTENSIONS
3634
from opteryx.utils.file_decoders import get_decoder
3735

3836
OS_SEP = os.sep
39-
IS_WINDOWS = is_windows()
4037
IS_LINUX = platform.system() == "Linux"
4138

42-
# Define os.O_BINARY for non-Windows platforms if it's not already defined
43-
if not hasattr(os, "O_BINARY"):
44-
os.O_BINARY = 0 # Value has no effect on non-Windows platforms
45-
if not hasattr(os, "O_DIRECT"):
46-
os.O_DIRECT = 0 # Value has no effect on non-Windows platforms
4739

40+
# prefer MAP_PRIVATE and on Linux enable MAP_POPULATE to fault pages in
41+
flags = mmap.MAP_PRIVATE
42+
if IS_LINUX:
43+
with contextlib.suppress(Exception):
44+
flags |= getattr(mmap, "MAP_POPULATE", 0)
4845
mmap_config = {}
49-
if not IS_WINDOWS:
50-
# prefer MAP_PRIVATE and on Linux enable MAP_POPULATE to fault pages in
51-
flags = mmap.MAP_PRIVATE
52-
if IS_LINUX and hasattr(mmap, "MAP_POPULATE"):
53-
with contextlib.suppress(Exception):
54-
flags |= mmap.MAP_POPULATE
55-
mmap_config["flags"] = flags
56-
mmap_config["prot"] = mmap.PROT_READ
57-
else:
58-
mmap_config["access"] = mmap.ACCESS_READ
46+
mmap_config["flags"] = flags
47+
mmap_config["prot"] = mmap.PROT_READ
5948

6049

6150
class DiskConnector(BaseConnector, Partitionable, PredicatePushable, LimitPushable, Statistics):
@@ -138,31 +127,73 @@ def read_blob(
138127
OSError:
139128
If an I/O error occurs while reading the file.
140129
"""
141-
try:
142-
file_descriptor = os.open(blob_name, os.O_RDONLY | os.O_BINARY)
143-
if hasattr(os, "posix_fadvise"):
144-
os.posix_fadvise(file_descriptor, 0, 0, os.POSIX_FADV_WILLNEED)
145-
size = os.fstat(file_descriptor).st_size
146-
_map = mmap.mmap(file_descriptor, length=size, **mmap_config)
147-
result = decoder(
148-
_map,
149-
just_schema=just_schema,
150-
projection=projection,
151-
selection=selection,
152-
use_threads=True,
153-
)
154-
self.statistics.bytes_read += size
130+
# Hybrid strategy: choose mmap or read+memoryview depending on OS
131+
# macOS -> mmap, Linux -> read.
132+
133+
# helper to use mmap path
134+
def _use_mmap():
135+
fd = os.open(blob_name, os.O_RDONLY)
136+
try:
137+
if hasattr(os, "posix_fadvise"):
138+
with contextlib.suppress(Exception):
139+
os.posix_fadvise(fd, 0, 0, os.POSIX_FADV_WILLNEED)
140+
size = os.fstat(fd).st_size
141+
_map = mmap.mmap(fd, length=size, **mmap_config)
142+
result = decoder(
143+
_map,
144+
just_schema=just_schema,
145+
projection=projection,
146+
selection=selection,
147+
use_threads=True,
148+
)
149+
150+
self.statistics.bytes_read += size
155151

156-
if not just_schema:
157-
stats = self.read_blob_statistics(
158-
blob_name=blob_name, blob_bytes=_map, decoder=decoder
152+
if not just_schema:
153+
stats = self.read_blob_statistics(
154+
blob_name=blob_name, blob_bytes=_map, decoder=decoder
155+
)
156+
if self.relation_statistics is None:
157+
self.relation_statistics = stats
158+
159+
return result
160+
finally:
161+
os.close(fd)
162+
163+
# helper to use read()+memoryview path
164+
def _use_read():
165+
with open(blob_name, "rb") as f:
166+
if hasattr(os, "posix_fadvise"):
167+
with contextlib.suppress(Exception):
168+
os.posix_fadvise(f.fileno(), 0, 0, os.POSIX_FADV_WILLNEED)
169+
170+
data = f.read()
171+
size = len(data)
172+
buf = memoryview(data)
173+
174+
result = decoder(
175+
buf,
176+
just_schema=just_schema,
177+
projection=projection,
178+
selection=selection,
179+
use_threads=True,
159180
)
160-
if self.relation_statistics is None:
161-
self.relation_statistics = stats
162181

163-
return result
164-
finally:
165-
os.close(file_descriptor)
182+
self.statistics.bytes_read += size
183+
184+
if not just_schema:
185+
stats = self.read_blob_statistics(
186+
blob_name=blob_name, blob_bytes=buf, decoder=decoder
187+
)
188+
if self.relation_statistics is None:
189+
self.relation_statistics = stats
190+
191+
return result
192+
193+
# macOS: use mmap; Linux: prefer read (observed faster on some Linux setups)
194+
if platform.system() == "Darwin":
195+
return _use_mmap()
196+
return _use_read()
166197

167198
@single_item_cache
168199
def get_list_of_blob_names(self, *, prefix: str) -> List[str]:

opteryx/utils/file_decoders.py

Lines changed: 28 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,7 @@ def parquet_decoder(
243243
just_schema: bool = False,
244244
just_statistics: bool = False,
245245
force_read: bool = False,
246-
use_threads: bool = False,
246+
use_threads: bool = True,
247247
statistics: Optional[RelationStatistics] = None,
248248
) -> Tuple[int, int, pyarrow.Table]:
249249
"""
@@ -334,21 +334,28 @@ def parquet_decoder(
334334

335335
return statistics
336336

337-
# If we're here, we can't use rugo - we need to read the file with pyarrow
338-
339-
# Open the parquet file only once. Prefer pyarrow.BufferReader with a
340-
# pyarrow.Buffer when we have a memoryview to avoid creating intermediate
341-
# Python bytes objects.
337+
# Use rugo's lightweight metadata reader first (faster than pyarrow)
342338
if isinstance(buffer, memoryview):
343-
pa_buf = pyarrow.py_buffer(buffer)
344-
stream = pyarrow.BufferReader(pa_buf)
345-
elif isinstance(buffer, bytes):
346-
stream = pyarrow.BufferReader(buffer)
339+
rmeta = parquet_meta.read_metadata_from_memoryview(buffer)
347340
else:
348-
stream = pyarrow.input_stream(buffer)
341+
rmeta = parquet_meta.read_metadata_from_memoryview(memoryview(buffer))
349342

350-
pq_meta = parquet.read_metadata(stream)
351-
stream.seek(0)
343+
# Build the pieces we need from the rugo metadata
344+
# schema names (parquet has same columns across row groups usually)
345+
if rmeta.get("row_groups"):
346+
schema_names = [c["name"] for c in rmeta["row_groups"][0]["columns"]]
347+
else:
348+
schema_names = []
349+
350+
num_rows = rmeta.get("num_rows")
351+
# number of columns - try to derive, fallback to length of schema_names
352+
num_columns = rmeta.get("num_columns") or len(schema_names)
353+
354+
# total uncompressed size (rugo uses total_byte_size)
355+
uncompressed_size = sum(
356+
sum(col.get("total_byte_size", 0) for col in rg.get("columns", []))
357+
for rg in rmeta.get("row_groups", [])
358+
)
352359

353360
# we need to work out if we have a selection which may force us
354361
# fetching columns just for filtering
@@ -361,36 +368,21 @@ def parquet_decoder(
361368
filter_columns = {
362369
c.value for c in get_all_nodes_of_type(processed_selection, (NodeType.IDENTIFIER,))
363370
}
364-
selected_columns = list(projection_set.union(filter_columns).intersection(pq_meta.schema.names))
371+
selected_columns = list(projection_set.union(filter_columns).intersection(schema_names))
365372

366373
# Read all columns if none are selected, unless force_read is set
367374
if not selected_columns and not force_read:
368375
selected_columns = []
369376

370-
# get the full data size of the file to see how effective projection/selection is
371-
uncompressed_size = sum(
372-
row_group.column(j).total_uncompressed_size
373-
for i in range(pq_meta.num_row_groups)
374-
for row_group in [pq_meta.row_group(i)]
375-
for j in range(row_group.num_columns)
376-
)
377-
378-
# If it's COUNT(*), we don't need to create a full dataset
379-
# We have a handler later to sum up the $COUNT(*) column
380-
if projection == [] and selection == []:
381-
table = pyarrow.Table.from_arrays([[pq_meta.num_rows]], names=["$COUNT(*)"])
382-
return (
383-
pq_meta.num_rows,
384-
pq_meta.num_columns,
385-
uncompressed_size,
386-
table,
387-
)
377+
# Open the parquet file only once. Fake a file-like object around the buffer
378+
if isinstance(buffer, memoryview):
379+
buffer = MemoryViewStream(buffer)
388380

389381
# Read the parquet table with the optimized column list and selection filters
390382
table = parquet.read_table(
391-
stream,
383+
buffer,
392384
columns=selected_columns,
393-
pre_buffer=False,
385+
pre_buffer=True,
394386
filters=dnf_filter,
395387
use_threads=use_threads,
396388
use_pandas_metadata=False,
@@ -401,8 +393,8 @@ def parquet_decoder(
401393
table = filter_records(processed_selection, table)
402394

403395
return (
404-
pq_meta.num_rows,
405-
pq_meta.num_columns,
396+
num_rows,
397+
num_columns,
406398
uncompressed_size,
407399
table,
408400
)

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "opteryx"
3-
version = "0.26.0-beta.1698"
3+
version = "0.26.0-beta.1703"
44
description = "Query your data, where it lives"
55
requires-python = '>=3.11'
66
readme = {file = "README.md", content-type = "text/markdown"}

0 commit comments

Comments
 (0)