Skip to content

Commit 59b1750

Browse files
committed
adaptive blob reader
1 parent edf4c24 commit 59b1750

File tree

3 files changed

+74
-42
lines changed

3 files changed

+74
-42
lines changed

opteryx/__version__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
# THIS FILE IS AUTOMATICALLY UPDATED DURING THE BUILD PROCESS
22
# DO NOT EDIT THIS FILE DIRECTLY
33

4-
__build__ = 1701
4+
__build__ = 1702
55
__author__ = "@joocer"
6-
__version__ = "0.26.0-beta.1701"
6+
__version__ = "0.26.0-beta.1702"
77

88
# Store the version here so:
99
# 1) we don't load dependencies by storing it in __init__.py

opteryx/connectors/disk_connector.py

Lines changed: 71 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -30,31 +30,21 @@
3030
from opteryx.exceptions import DatasetNotFoundError
3131
from opteryx.exceptions import EmptyDatasetError
3232
from opteryx.exceptions import UnsupportedFileTypeError
33-
from opteryx.utils import is_windows
3433
from opteryx.utils.file_decoders import TUPLE_OF_VALID_EXTENSIONS
3534
from opteryx.utils.file_decoders import get_decoder
3635

3736
OS_SEP = os.sep
38-
IS_WINDOWS = is_windows()
3937
IS_LINUX = platform.system() == "Linux"
4038

41-
# Define os.O_BINARY for non-Windows platforms if it's not already defined
42-
if not hasattr(os, "O_BINARY"):
43-
os.O_BINARY = 0 # Value has no effect on non-Windows platforms
44-
if not hasattr(os, "O_DIRECT"):
45-
os.O_DIRECT = 0 # Value has no effect on non-Windows platforms
4639

40+
# prefer MAP_PRIVATE and on Linux enable MAP_POPULATE to fault pages in
41+
flags = mmap.MAP_PRIVATE
42+
if IS_LINUX:
43+
with contextlib.suppress(Exception):
44+
flags |= getattr(mmap, "MAP_POPULATE", 0)
4745
mmap_config = {}
48-
if not IS_WINDOWS:
49-
# prefer MAP_PRIVATE and on Linux enable MAP_POPULATE to fault pages in
50-
flags = mmap.MAP_PRIVATE
51-
if IS_LINUX and hasattr(mmap, "MAP_POPULATE"):
52-
with contextlib.suppress(Exception):
53-
flags |= mmap.MAP_POPULATE
54-
mmap_config["flags"] = flags
55-
mmap_config["prot"] = mmap.PROT_READ
56-
else:
57-
mmap_config["access"] = mmap.ACCESS_READ
46+
mmap_config["flags"] = flags
47+
mmap_config["prot"] = mmap.PROT_READ
5848

5949

6050
class DiskConnector(BaseConnector, Partitionable, PredicatePushable, LimitPushable, Statistics):
@@ -137,31 +127,73 @@ def read_blob(
137127
OSError:
138128
If an I/O error occurs while reading the file.
139129
"""
140-
try:
141-
file_descriptor = os.open(blob_name, os.O_RDONLY | os.O_BINARY)
142-
if hasattr(os, "posix_fadvise"):
143-
os.posix_fadvise(file_descriptor, 0, 0, os.POSIX_FADV_WILLNEED)
144-
size = os.fstat(file_descriptor).st_size
145-
_map = mmap.mmap(file_descriptor, length=size, **mmap_config)
146-
result = decoder(
147-
_map,
148-
just_schema=just_schema,
149-
projection=projection,
150-
selection=selection,
151-
use_threads=True,
152-
)
153-
self.statistics.bytes_read += size
130+
# Hybrid strategy: choose mmap or read+memoryview depending on OS
131+
# macOS -> mmap, Linux -> read.
132+
133+
# helper to use mmap path
134+
def _use_mmap():
135+
fd = os.open(blob_name, os.O_RDONLY)
136+
try:
137+
if hasattr(os, "posix_fadvise"):
138+
with contextlib.suppress(Exception):
139+
os.posix_fadvise(fd, 0, 0, os.POSIX_FADV_WILLNEED)
140+
size = os.fstat(fd).st_size
141+
_map = mmap.mmap(fd, length=size, **mmap_config)
142+
result = decoder(
143+
_map,
144+
just_schema=just_schema,
145+
projection=projection,
146+
selection=selection,
147+
use_threads=True,
148+
)
149+
150+
self.statistics.bytes_read += size
154151

155-
if not just_schema:
156-
stats = self.read_blob_statistics(
157-
blob_name=blob_name, blob_bytes=_map, decoder=decoder
152+
if not just_schema:
153+
stats = self.read_blob_statistics(
154+
blob_name=blob_name, blob_bytes=_map, decoder=decoder
155+
)
156+
if self.relation_statistics is None:
157+
self.relation_statistics = stats
158+
159+
return result
160+
finally:
161+
os.close(fd)
162+
163+
# helper to use read()+memoryview path
164+
def _use_read():
165+
with open(blob_name, "rb") as f:
166+
if hasattr(os, "posix_fadvise"):
167+
with contextlib.suppress(Exception):
168+
os.posix_fadvise(f.fileno(), 0, 0, os.POSIX_FADV_WILLNEED)
169+
170+
data = f.read()
171+
size = len(data)
172+
buf = memoryview(data)
173+
174+
result = decoder(
175+
buf,
176+
just_schema=just_schema,
177+
projection=projection,
178+
selection=selection,
179+
use_threads=True,
158180
)
159-
if self.relation_statistics is None:
160-
self.relation_statistics = stats
161181

162-
return result
163-
finally:
164-
os.close(file_descriptor)
182+
self.statistics.bytes_read += size
183+
184+
if not just_schema:
185+
stats = self.read_blob_statistics(
186+
blob_name=blob_name, blob_bytes=buf, decoder=decoder
187+
)
188+
if self.relation_statistics is None:
189+
self.relation_statistics = stats
190+
191+
return result
192+
193+
# macOS: use mmap; Linux: prefer read (observed faster on some Linux setups)
194+
if platform.system() == "Darwin":
195+
return _use_mmap()
196+
return _use_read()
165197

166198
@single_item_cache
167199
def get_list_of_blob_names(self, *, prefix: str) -> List[str]:

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "opteryx"
3-
version = "0.26.0-beta.1701"
3+
version = "0.26.0-beta.1702"
44
description = "Query your data, where it lives"
55
requires-python = '>=3.11'
66
readme = {file = "README.md", content-type = "text/markdown"}

0 commit comments

Comments
 (0)