Skip to content

Commit edf4c24

Browse files
committed
proto io_uring reader for linux
1 parent adbe3df commit edf4c24

File tree

12 files changed

+3804
-243
lines changed

12 files changed

+3804
-243
lines changed

opteryx/__version__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
# THIS FILE IS AUTOMATICALLY UPDATED DURING THE BUILD PROCESS
22
# DO NOT EDIT THIS FILE DIRECTLY
33

4-
__build__ = 1698
4+
__build__ = 1701
55
__author__ = "@joocer"
6-
__version__ = "0.26.0-beta.1698"
6+
__version__ = "0.26.0-beta.1701"
77

88
# Store the version here so:
99
# 1) we don't load dependencies by storing it in __init__.py

opteryx/compiled/io/iouring.pxd

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
# distutils: language = c++
2+
# cython: language_level=3
3+
4+
cdef extern from "errno.h":
5+
int errno
6+
7+
cdef extern from "stdlib.h":
8+
int posix_memalign(void **memptr, size_t alignment, size_t size)
9+
void free(void *ptr)
10+
11+
cdef extern from "unistd.h":
12+
int close(int fd)
13+
14+
cdef extern from "fcntl.h":
15+
int open(const char *pathname, int flags, ...)
16+
17+
cdef extern from "sys/types.h":
18+
pass
19+
20+
cdef extern from "sys/uio.h":
21+
ctypedef struct iovec:
22+
void *iov_base
23+
size_t iov_len
24+
25+
cdef extern from "liburing.h":
26+
ctypedef struct io_uring:
27+
pass
28+
29+
ctypedef struct io_uring_sqe:
30+
pass
31+
32+
ctypedef struct io_uring_cqe:
33+
unsigned int flags
34+
int res
35+
unsigned long long user_data
36+
37+
int io_uring_queue_init(unsigned entries, io_uring *ring, unsigned flags)
38+
void io_uring_queue_exit(io_uring *ring)
39+
40+
io_uring_sqe* io_uring_get_sqe(io_uring *ring)
41+
int io_uring_submit(io_uring *ring)
42+
43+
int io_uring_wait_cqe(io_uring *ring, io_uring_cqe **cqe_ptr)
44+
int io_uring_peek_cqe(io_uring *ring, io_uring_cqe **cqe_ptr)
45+
void io_uring_cqe_seen(io_uring *ring, io_uring_cqe *cqe)
46+
47+
# buffer registration
48+
int io_uring_register_buffers(io_uring *ring, const iovec *iovecs, unsigned nr_iovecs)
49+
int io_uring_unregister_buffers(io_uring *ring)
50+
51+
# prep helpers
52+
void io_uring_prep_read_fixed(io_uring_sqe *sqe, int fd, void *buf, unsigned nbytes, long long offset, int buf_index)
53+
54+
# user_data helpers (declared in liburing.h as static inline)
55+
void io_uring_sqe_set_data64(io_uring_sqe *sqe, unsigned long long data)
56+
unsigned long long io_uring_cqe_get_data64(const io_uring_cqe *cqe)
57+
58+
# common setup flags
59+
cdef unsigned IORING_SETUP_CLAMP
60+
cdef unsigned IORING_SETUP_COOP_TASKRUN
61+
cdef unsigned IORING_SETUP_SINGLE_ISSUER
62+
63+
# open(2) flags
64+
cdef extern from "fcntl.h":
65+
int O_RDONLY
66+
int O_DIRECT
67+
int O_CLOEXEC
68+
69+
# Helper (defined in .pyx)
70+
cdef int _check_errno(int rc)

opteryx/compiled/io/iouring.pyx

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
# distutils: language = c++
2+
# cython: language_level=3
3+
4+
from cpython.mem cimport PyMem_Malloc, PyMem_Free
5+
from libc.stdint cimport uint64_t, uintptr_t
6+
from libc.string cimport memset
7+
8+
# IMPORTANT: cimport the module by its full package path
9+
cimport opteryx.compiled.io.iouring as C
10+
11+
cdef int _check_errno(int rc):
12+
if rc < 0:
13+
raise OSError(-rc, "io_uring error")
14+
return rc
15+
16+
cdef class BufferPool:
17+
cdef void **ptrs
18+
cdef C.iovec *iov
19+
cdef size_t nbuf
20+
cdef size_t buf_size
21+
cdef size_t alignment
22+
23+
def __cinit__(self, size_t nbuf, size_t buf_size, size_t alignment=4096):
24+
self.nbuf = nbuf
25+
self.buf_size = buf_size
26+
self.alignment = alignment
27+
self.ptrs = <void **>PyMem_Malloc(nbuf * sizeof(void *))
28+
if self.ptrs == NULL:
29+
raise MemoryError("alloc ptrs")
30+
self.iov = <C.iovec *>PyMem_Malloc(nbuf * sizeof(C.iovec))
31+
if self.iov == NULL:
32+
PyMem_Free(self.ptrs)
33+
raise MemoryError("alloc iov")
34+
for i in range(nbuf):
35+
self.ptrs[i] = NULL
36+
37+
cdef void *p
38+
for i in range(nbuf):
39+
if C.posix_memalign(&p, alignment, buf_size) != 0:
40+
self._cleanup(i)
41+
raise MemoryError(f"posix_memalign failed for buffer {i}")
42+
memset(p, 0, buf_size)
43+
self.ptrs[i] = p
44+
self.iov[i].iov_base = p
45+
self.iov[i].iov_len = buf_size
46+
47+
cdef void _cleanup(self, size_t upto):
48+
cdef size_t j
49+
for j in range(upto):
50+
if self.ptrs[j] != NULL:
51+
C.free(self.ptrs[j])
52+
if self.iov != NULL:
53+
PyMem_Free(self.iov)
54+
if self.ptrs != NULL:
55+
PyMem_Free(self.ptrs)
56+
57+
def __dealloc__(self):
58+
self._cleanup(self.nbuf)
59+
60+
property n:
61+
def __get__(self):
62+
return self.nbuf
63+
64+
property size:
65+
def __get__(self):
66+
return self.buf_size
67+
68+
def addr(self, size_t idx) -> int:
69+
if idx >= self.nbuf:
70+
raise IndexError
71+
return <uintptr_t>self.ptrs[idx]
72+
73+
def view(self, size_t idx, Py_ssize_t length):
74+
if idx >= self.nbuf or length > self.buf_size:
75+
raise IndexError
76+
cdef unsigned char[:] mv = <unsigned char[:length]> self.ptrs[idx]
77+
return mv
78+
79+
80+
cdef class Uring:
81+
cdef C.io_uring ring
82+
cdef BufferPool pool
83+
cdef bint buffers_registered
84+
85+
def __cinit__(self, unsigned entries=4096, unsigned flags=0):
86+
if flags == 0:
87+
flags = C.IORING_SETUP_CLAMP | C.IORING_SETUP_COOP_TASKRUN | C.IORING_SETUP_SINGLE_ISSUER
88+
_check_errno(C.io_uring_queue_init(entries, &self.ring, flags))
89+
self.pool = None
90+
self.buffers_registered = False
91+
92+
def __dealloc__(self):
93+
try:
94+
if self.buffers_registered:
95+
C.io_uring_unregister_buffers(&self.ring)
96+
except Exception:
97+
pass
98+
C.io_uring_queue_exit(&self.ring)
99+
100+
def register_buffers(self, BufferPool pool):
101+
if pool is None:
102+
raise ValueError("pool is None")
103+
_check_errno(C.io_uring_register_buffers(&self.ring, pool.iov, <unsigned>pool.nbuf))
104+
self.pool = pool
105+
self.buffers_registered = True
106+
107+
def submit_read_fixed(self, int fd, size_t buf_index, size_t nbytes, long long offset, uint64_t user_data=0):
108+
if not self.buffers_registered:
109+
raise RuntimeError("buffers not registered")
110+
if buf_index >= self.pool.nbuf:
111+
raise IndexError
112+
if nbytes > self.pool.buf_size:
113+
raise ValueError("nbytes > buffer size")
114+
115+
cdef C.io_uring_sqe* sqe = C.io_uring_get_sqe(&self.ring)
116+
if sqe == NULL:
117+
raise RuntimeError("no available SQE (ring full)")
118+
119+
C.io_uring_prep_read_fixed(sqe, fd, self.pool.ptrs[buf_index],
120+
<unsigned>nbytes, offset, <int>buf_index)
121+
# Use helper instead of touching struct fields
122+
C.io_uring_sqe_set_data64(sqe, user_data)
123+
124+
def submit(self) -> int:
125+
return _check_errno(C.io_uring_submit(&self.ring))
126+
127+
def wait_cqe(self):
128+
cdef C.io_uring_cqe* cqe
129+
_check_errno(C.io_uring_wait_cqe(&self.ring, &cqe))
130+
res = cqe.res
131+
ud = C.io_uring_cqe_get_data64(cqe)
132+
C.io_uring_cqe_seen(&self.ring, cqe)
133+
return res, ud
134+
135+
def peek_cqe(self):
136+
cdef C.io_uring_cqe* cqe
137+
rc = C.io_uring_peek_cqe(&self.ring, &cqe)
138+
if rc == 0 and cqe != NULL:
139+
res = cqe.res
140+
ud = C.io_uring_cqe_get_data64(cqe)
141+
C.io_uring_cqe_seen(&self.ring, cqe)
142+
return res, ud
143+
return None
144+
145+
146+
def open_direct(path: bytes) -> int:
147+
"""Open O_RDONLY|O_DIRECT|O_CLOEXEC. Caller must close(fd)."""
148+
cdef int fd = C.open(<const char*>path, C.O_RDONLY | C.O_DIRECT | C.O_CLOEXEC)
149+
if fd < 0:
150+
raise OSError(C.errno, "open(O_DIRECT) failed")
151+
return fd
152+
153+
154+
def close_fd(int fd):
155+
if C.close(fd) != 0:
156+
raise OSError(C.errno, "close failed")

opteryx/connectors/disk_connector.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
"""
1010

1111
import contextlib
12-
import ctypes
1312
import mmap
1413
import os
1514
import platform

opteryx/utils/file_decoders.py

Lines changed: 28 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,7 @@ def parquet_decoder(
243243
just_schema: bool = False,
244244
just_statistics: bool = False,
245245
force_read: bool = False,
246-
use_threads: bool = False,
246+
use_threads: bool = True,
247247
statistics: Optional[RelationStatistics] = None,
248248
) -> Tuple[int, int, pyarrow.Table]:
249249
"""
@@ -334,21 +334,28 @@ def parquet_decoder(
334334

335335
return statistics
336336

337-
# If we're here, we can't use rugo - we need to read the file with pyarrow
338-
339-
# Open the parquet file only once. Prefer pyarrow.BufferReader with a
340-
# pyarrow.Buffer when we have a memoryview to avoid creating intermediate
341-
# Python bytes objects.
337+
# Use rugo's lightweight metadata reader first (faster than pyarrow)
342338
if isinstance(buffer, memoryview):
343-
pa_buf = pyarrow.py_buffer(buffer)
344-
stream = pyarrow.BufferReader(pa_buf)
345-
elif isinstance(buffer, bytes):
346-
stream = pyarrow.BufferReader(buffer)
339+
rmeta = parquet_meta.read_metadata_from_memoryview(buffer)
347340
else:
348-
stream = pyarrow.input_stream(buffer)
341+
rmeta = parquet_meta.read_metadata_from_memoryview(memoryview(buffer))
349342

350-
pq_meta = parquet.read_metadata(stream)
351-
stream.seek(0)
343+
# Build the pieces we need from the rugo metadata
344+
# schema names (parquet has same columns across row groups usually)
345+
if rmeta.get("row_groups"):
346+
schema_names = [c["name"] for c in rmeta["row_groups"][0]["columns"]]
347+
else:
348+
schema_names = []
349+
350+
num_rows = rmeta.get("num_rows")
351+
# number of columns - try to derive, fallback to length of schema_names
352+
num_columns = rmeta.get("num_columns") or len(schema_names)
353+
354+
# total uncompressed size (rugo uses total_byte_size)
355+
uncompressed_size = sum(
356+
sum(col.get("total_byte_size", 0) for col in rg.get("columns", []))
357+
for rg in rmeta.get("row_groups", [])
358+
)
352359

353360
# we need to work out if we have a selection which may force us
354361
# fetching columns just for filtering
@@ -361,36 +368,21 @@ def parquet_decoder(
361368
filter_columns = {
362369
c.value for c in get_all_nodes_of_type(processed_selection, (NodeType.IDENTIFIER,))
363370
}
364-
selected_columns = list(projection_set.union(filter_columns).intersection(pq_meta.schema.names))
371+
selected_columns = list(projection_set.union(filter_columns).intersection(schema_names))
365372

366373
# Read all columns if none are selected, unless force_read is set
367374
if not selected_columns and not force_read:
368375
selected_columns = []
369376

370-
# get the full data size of the file to see how effective projection/selection is
371-
uncompressed_size = sum(
372-
row_group.column(j).total_uncompressed_size
373-
for i in range(pq_meta.num_row_groups)
374-
for row_group in [pq_meta.row_group(i)]
375-
for j in range(row_group.num_columns)
376-
)
377-
378-
# If it's COUNT(*), we don't need to create a full dataset
379-
# We have a handler later to sum up the $COUNT(*) column
380-
if projection == [] and selection == []:
381-
table = pyarrow.Table.from_arrays([[pq_meta.num_rows]], names=["$COUNT(*)"])
382-
return (
383-
pq_meta.num_rows,
384-
pq_meta.num_columns,
385-
uncompressed_size,
386-
table,
387-
)
377+
# Open the parquet file only once. Fake a file-like object around the buffer
378+
if isinstance(buffer, memoryview):
379+
buffer = MemoryViewStream(buffer)
388380

389381
# Read the parquet table with the optimized column list and selection filters
390382
table = parquet.read_table(
391-
stream,
383+
buffer,
392384
columns=selected_columns,
393-
pre_buffer=False,
385+
pre_buffer=True,
394386
filters=dnf_filter,
395387
use_threads=use_threads,
396388
use_pandas_metadata=False,
@@ -401,8 +393,8 @@ def parquet_decoder(
401393
table = filter_records(processed_selection, table)
402394

403395
return (
404-
pq_meta.num_rows,
405-
pq_meta.num_columns,
396+
num_rows,
397+
num_columns,
406398
uncompressed_size,
407399
table,
408400
)

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "opteryx"
3-
version = "0.26.0-beta.1698"
3+
version = "0.26.0-beta.1701"
44
description = "Query your data, where it lives"
55
requires-python = '>=3.11'
66
readme = {file = "README.md", content-type = "text/markdown"}

0 commit comments

Comments
 (0)