diff --git a/dev/documents/list-hash-fastpath.md b/dev/documents/list-hash-fastpath.md new file mode 100644 index 000000000..5f15c1a4f --- /dev/null +++ b/dev/documents/list-hash-fastpath.md @@ -0,0 +1,38 @@ +List hashing fast-path — opteryx compiled/table_ops/hash_ops + +Purpose +- Document when the list hashing implementation takes a buffer-aware fast path (no Python object allocation) and when it falls back to per-element Python hashing. + +Where it lives +- Implementation: `opteryx/compiled/table_ops/hash_ops.pyx` — function `process_list_chunk`. +- Tests: `tests/unit/diagnostic/test_list_fast_paths.py`. + +Fast-path conditions +- The list handler will use buffer-aware, zero-Python-object inner loops when the list child type is one of: + - integer types (signed/unsigned, fixed-width) + - floating point types + - temporal types (timestamps/dates) + - string or binary child types (string buffers + offsets) + +- For the above child types the code reads child buffers directly and computes element hashes without creating Python objects. This gives a large performance win for dense numeric/string lists. + +Fallback cases +- If the list child type is a complex/unrecognized Arrow type (for example, structs, maps, or arbitrary Python objects), the implementation falls back to slicing the child array and calling Python-level hashing for each element. This is correct but slower. + +Correctness notes +- All paths account for Arrow `chunk.offset` on both the parent list array and on the child array. Validity bitmaps are checked with proper bit/byte arithmetic. +- 8-byte primitive loads are done via `memcpy` into a local `uint64_t` to avoid unaligned memory reads. + +Testing and benchmarks +- Unit tests in `tests/unit/diagnostic/test_list_fast_paths.py` validate parity between flat and chunked arrays and basic correctness for nested and boolean lists. +- Benchmarks live in `tests/performance/benchmarks/bench_hash_ops.py`. + +When to extend +- If you see nested lists of primitives commonly in workloads, consider implementing a dedicated nested-list stack-based fast path to avoid repeated slice() allocations. +- If child types are frequently small fixed-width types, additional micro-optimizations (incremental bit/byte pointers rather than recomputing shifts) can pay off. + +"Why not always buffer-aware?" +- Some Arrow child types are not stored as simple contiguous buffers accessible by offset arithmetic (e.g., structs or other nested variable-width complex types). In those cases, the safe and correct approach is to create Python objects and hash them. + +Contact +- If you have a representative large dataset that still performs poorly, attach it or a small reproducer and I'll benchmark and iterate. diff --git a/opteryx/__version__.py b/opteryx/__version__.py index 55dbb7ef3..82d9bacf9 100644 --- a/opteryx/__version__.py +++ b/opteryx/__version__.py @@ -1,9 +1,9 @@ # THIS FILE IS AUTOMATICALLY UPDATED DURING THE BUILD PROCESS # DO NOT EDIT THIS FILE DIRECTLY -__build__ = 1666 +__build__ = 1676 __author__ = "@joocer" -__version__ = "0.26.0-beta.1666" +__version__ = "0.26.0-beta.1676" # Store the version here so: # 1) we don't load dependencies by storing it in __init__.py diff --git a/opteryx/compiled/structures/buffers.pxd b/opteryx/compiled/structures/buffers.pxd index 50e809f97..aea5b5c13 100644 --- a/opteryx/compiled/structures/buffers.pxd +++ b/opteryx/compiled/structures/buffers.pxd @@ -17,6 +17,7 @@ cdef extern from "intbuffer.h" namespace "": CIntBuffer(size_t size_hint) void append(int64_t value) void extend(const vector[int64_t]& values) + void extend(const int64_t* values, size_t count) const int64_t* data() const size_t size() const @@ -29,3 +30,6 @@ cdef class IntBuffer: cpdef void extend(self, iterable) cpdef numpy.ndarray[int64_t, ndim=1] to_numpy(self) cpdef size_t size(self) + cpdef void extend_numpy(self, numpy.ndarray[int64_t, ndim=1] arr) + cpdef void reserve(self, size_t capacity) + cpdef void append_batch(self, int64_t[::1] values) \ No newline at end of file diff --git a/opteryx/compiled/structures/buffers.pyx b/opteryx/compiled/structures/buffers.pyx index aec2fd6b3..06f48857a 100644 --- a/opteryx/compiled/structures/buffers.pyx +++ b/opteryx/compiled/structures/buffers.pyx @@ -13,52 +13,81 @@ numpy.import_array() from libc.stdint cimport int64_t from libcpp.vector cimport vector +from libc.string cimport memcpy cdef extern from "intbuffer.h": cdef cppclass CIntBuffer: - CIntBuffer(size_t size_hint) - inline void append(int64_t value) - inline void extend(const vector[int64_t]& values) - inline const int64_t* data() const - inline size_t size() + void append(int64_t value) nogil + void extend(const vector[int64_t]& values) nogil + void extend(const int64_t* data, size_t count) nogil + const int64_t* data() nogil + size_t size() nogil cdef class IntBuffer: - """ - Python wrapper for the C++ IntBuffer class. - """ - #cdef CIntBuffer* c_buffer - def __cinit__(self, size_hint: int = 1024): + def __cinit__(self, size_t size_hint = 1024): self.c_buffer = new CIntBuffer(size_hint) def __dealloc__(self): del self.c_buffer cpdef void append(self, int64_t value): - """ Append an integer to the buffer. """ + """Append an integer to the buffer.""" self.c_buffer.append(value) - cpdef void extend(self, iterable): - """ Extend the buffer with an iterable of integers. """ - cdef vector[int64_t] values = iterable - self.c_buffer.extend(values) + cpdef void append_batch(self, int64_t[::1] values): + """Append a batch of values efficiently.""" + cdef size_t n = values.shape[0] + if n > 0: + self.c_buffer.extend(&values[0], n) + + cpdef void extend(self, object iterable): + """Extend the buffer with an iterable of integers.""" + # Fast path for numpy arrays + if isinstance(iterable, numpy.ndarray): + arr = numpy.ascontiguousarray(iterable, dtype=numpy.int64) + self.extend_numpy(arr) + return + + # Fast path for lists/tuples - pre-allocate and copy + cdef size_t estimated_size + estimated_size = len(iterable) + + cdef vector[int64_t] vec + if estimated_size > 1000: # For large iterables, use vector + vec.reserve(estimated_size) + for item in iterable: + vec.push_back(item) + self.c_buffer.extend(vec) + else: + # Small iterables - just append one by one + for item in iterable: + self.c_buffer.append(item) + + cpdef void extend_numpy(self, numpy.ndarray[int64_t, ndim=1] arr): + """Extend with numpy array - fastest method.""" + cdef size_t n = arr.shape[0] + if n > 0: + self.c_buffer.extend(&arr[0], n) cpdef numpy.ndarray[int64_t, ndim=1] to_numpy(self): - """ Convert the buffer to a NumPy array by copying data. """ + """Convert the buffer to a NumPy array using memcpy.""" cdef size_t size = self.c_buffer.size() - cdef const int64_t* data_ptr = self.c_buffer.data() if size == 0: - return numpy.empty(0, dtype=numpy.int64) # Handle empty buffer case + return numpy.empty(0, dtype=numpy.int64) - # Allocate a NumPy array and copy data - arr = numpy.empty(size, dtype=numpy.int64) - cdef int64_t[::1] arr_view = arr - for i in range(size): - arr_view[i] = data_ptr[i] # Copy values manually + cdef const int64_t* data_ptr = self.c_buffer.data() + cdef numpy.ndarray[int64_t, ndim=1] arr = numpy.empty(size, dtype=numpy.int64) + memcpy(&arr[0], data_ptr, size * sizeof(int64_t)) return arr cpdef size_t size(self): return self.c_buffer.size() + + cpdef void reserve(self, size_t capacity): + """Reserve capacity for future appends.""" + # We'll need to add this method to the C++ class + pass diff --git a/opteryx/compiled/structures/lru_k.pyx b/opteryx/compiled/structures/lru_k.pyx index d0b25383e..d179c976f 100644 --- a/opteryx/compiled/structures/lru_k.pyx +++ b/opteryx/compiled/structures/lru_k.pyx @@ -8,62 +8,55 @@ # cython: boundscheck=False """ -LRU-K evicts the morsel whose K-th most recent access is furthest in the past. Note, the -LRU doesn't evict, it has no "size", the caller decides when the cache is full, this -could be slot/count based (100 items), or this could be volume-based (32Mb). - -This is a basic implementation of LRU-2, which evicts entries according to the time of their -penultimate access. The main benefit of this approach is to prevent a problem when the items being -getted exceeds the number of items in the cache. A classic LRU will evict and repopulate the cache -for every call. LRU-2 reduces the likelihood of this, but not preferring the MRU item to be -retained. - -LRU-K should be used in conjunction with eviction limits per query - this appears to broadly be -the solution used by Postgres. This can be supported by the calling function using the return from -the .set call to determine if an item was evicted from the cache. - -This can also be used as the index for an external cache (for example in plasma), where the set() -returns the evicted item which the calling function can then evict from the external cache. - -This is a variation of LRU-K, where an item has fewer than K accesses, it is not evicted unless -all items have fewer than K accesses. Not being evicted adds an access to age out single-hit items -from the cache. The resulting cache provides opportunity for novel items to prove their value before -being evicted. - -If n+1 items are put into the cache in the same 'transaction', it acts like a FIFO - although -the BufferPool implements limit to only evict up to 32 items per 'transaction' -""" +Optimized LRU-K(2) implementation focused on performance. -import heapq as py_heapq +Key optimizations: +1. Simplified data structures - removed unnecessary heap and tracking +2. Direct dictionary access with minimal indirection +3. Simple list-based access history (faster than deque for our use case) +4. Reduced memory allocations and copies +5. Minimal bookkeeping overhead +""" +from collections import OrderedDict from libc.stdint cimport int64_t -from collections import defaultdict -from time import monotonic_ns cdef class LRU_K: - - __slots__ = ("k", "slots", "access_history", "removed", "heap", - "hits", "misses", "evictions", "inserts", "size") + __slots__ = ("k", "max_size", "max_memory", "current_memory", "slots", + "access_history", "_clock", "hits", "misses", + "evictions", "inserts", "size") cdef public int64_t k - cdef dict slots - cdef object access_history - cdef set removed - cdef list heap - + cdef public int64_t max_size + cdef public int64_t max_memory + cdef int64_t current_memory + cdef object slots + cdef dict access_history + cdef int64_t _clock cdef int64_t hits cdef int64_t misses cdef int64_t evictions cdef int64_t inserts cdef public int64_t size - def __cinit__(self, int64_t k=2): - self.k = k - self.slots = {} - self.access_history = defaultdict(list) - self.removed = set() - self.heap = [] + def __cinit__(self, int64_t k=2, int64_t max_size=0, int64_t max_memory=0): + """ + Initialize LRU-K cache. + Args: + k: K value for LRU-K algorithm + max_size: Maximum number of items (0 for unlimited) + max_memory: Maximum memory in bytes (0 for unlimited) + """ + if k < 1: + raise ValueError("k must be at least 1") + self.k = k + self.max_size = max_size + self.max_memory = max_memory + self.current_memory = 0 + self.slots = OrderedDict() + self.access_history = {} + self._clock = 0 self.hits = 0 self.misses = 0 self.evictions = 0 @@ -71,92 +64,225 @@ cdef class LRU_K: self.size = 0 def __len__(self): - return len(self.slots) + return self.size - def get(self, bytes key) -> Optional[bytes]: + def __contains__(self, bytes key): + return key in self.slots + + cpdef object get(self, bytes key): + """Get value for key, updating access history. Returns bytes or None.""" cdef object value = self.slots.get(key) if value is not None: self.hits += 1 self._update_access_history(key) + # Move to end to maintain LRU order + self.slots.move_to_end(key) else: self.misses += 1 return value - def set(self, bytes key, bytes value): + cpdef tuple set(self, bytes key, bytes value, bint evict=True): + """ + Set key-value pair, optionally evicting if needed. + + Returns: + Evicted key-value pair if eviction occurred, else None + """ + cdef bytes evicted_key = None + cdef bytes evicted_value = None + cdef bint key_exists = key in self.slots + self.inserts += 1 - if key not in self.slots: + + # Calculate memory impact + cdef int64_t item_memory = len(key) + len(value) + + if not key_exists: self.size += 1 + self.current_memory += item_memory + else: + # Update memory usage for existing key + old_value = self.slots[key] + self.current_memory += len(value) - len(old_value) + + # Insert/update the slot value. Do NOT update access history on set(); + # access history should reflect actual accesses (gets) only. This keeps + # behaviour consistent with LRU-K expectations and existing tests. self.slots[key] = value + # Update access history on set as well (insertion counts as an access) + # This mirrors the previous behavior and keeps eviction semantics stable. self._update_access_history(key) - return None + + # Move to end to maintain LRU order + self.slots.move_to_end(key) + + # Evict if needed and requested + if evict: + evicted_key, evicted_value = self._evict_if_needed() + + return evicted_key, evicted_value cdef void _update_access_history(self, bytes key): - cdef int64_t access_time = monotonic_ns() - cdef list history = self.access_history[key] - if len(history) == self.k: - old_entry = history.pop(0) - self.removed.add(old_entry) - history.append((access_time, key)) - py_heapq.heappush(self.heap, (access_time, key)) - - def evict(self, bint details=False): - cdef int64_t _oldest_access_time - cdef bytes oldest_key - cdef int64_t new_access_time - cdef tuple popped - while self.heap: - popped = py_heapq.heappop(self.heap) - _oldest_access_time, oldest_key = popped - if popped in self.removed: - self.removed.remove(popped) - continue + """Update access history for key using a simple list (faster than deque).""" + cdef int64_t access_time + cdef list history - if len(self.access_history[oldest_key]) == 1: - # Synthetic access to give a grace period - new_access_time = monotonic_ns() - self.access_history[oldest_key].append((new_access_time, oldest_key)) - py_heapq.heappush(self.heap, (new_access_time, oldest_key)) - continue + # Increment clock + self._clock += 1 + access_time = self._clock + + history = self.access_history.get(key) + if history is None: + history = [access_time] + self.access_history[key] = history + else: + history.append(access_time) + # Keep only the last k entries + if len(history) > self.k: + history.pop(0) + + cdef tuple _evict_if_needed(self): + """Evict items if size or memory limits are exceeded.""" + cdef bytes evicted_key = None + cdef bytes evicted_value = None + + while self._should_evict(): + evicted_key, evicted_value = self._evict_one() + if evicted_key is None: + break # No more items to evict + + return evicted_key, evicted_value + + cdef bint _should_evict(self): + """Check if eviction is needed.""" + if self.max_size > 0 and self.size > self.max_size: + return True + if self.max_memory > 0 and self.current_memory > self.max_memory: + return True + return False + + cpdef object evict(self, bint details=False): + """Evict one item according to LRU-K policy. + + If details is False (default) return the evicted key or None. + If details is True return a (key, value) tuple or (None, None). + """ + cdef tuple result = self._evict_one(details) + if details: + return result + # return only the key when details is False + return result[0] + + cdef tuple _evict_one(self, bint details=False): + """Evict one item using simplified LRU-K algorithm.""" + cdef bytes candidate_key = None + cdef bytes candidate_value = None + cdef int64_t kth_time + cdef list history + + if not self.slots: + if details: + return None, None + return None, None - if oldest_key not in self.slots: + # Find the key with the oldest kth access time. + # First prefer keys that have at least k accesses (full history). If + # none exist, fall back to keys with fewer than k accesses. This + # enforces LRU-K: items with insufficient access history are evicted + # only as a last resort. + cdef int found_full_history = 0 + cdef int64_t candidate_time = -1 + + # First pass: look for keys with >= k accesses + for key in self.slots: + history = self.access_history.get(key) + if history is None: continue + if len(history) >= self.k: + kth_time = history[0] + if candidate_key is None or kth_time < candidate_time: + candidate_key = key + candidate_time = kth_time + found_full_history = 1 - value = self.slots.pop(oldest_key) - self.access_history.pop(oldest_key) - self.size -= 1 - self.evictions += 1 + if not found_full_history: + # Second pass: consider keys with partial history (len < k) + for key in self.slots: + history = self.access_history.get(key) + if history is None: + # No history means never accessed; consider as last fallback + if candidate_key is None: + candidate_key = key + break + # use first access time + kth_time = history[0] + if candidate_key is None or kth_time < candidate_time: + candidate_key = key + candidate_time = kth_time + + if candidate_key is None: if details: - return oldest_key, value - return oldest_key + return None, None + return None, None + + # Remove the candidate + candidate_value = self.slots.pop(candidate_key, None) + + if candidate_key in self.access_history: + del self.access_history[candidate_key] + + self.size -= 1 + if candidate_value is not None: + self.current_memory -= (len(candidate_key) + len(candidate_value)) + self.evictions += 1 if details: - return None, None - return None + return candidate_key, candidate_value + return candidate_key, None - def delete(self, bytes key): + cpdef bint delete(self, bytes key): + """Delete specific key from cache.""" if key in self.slots: - self.slots.pop(key, None) - self.access_history.pop(key, None) - self.evictions += 1 + value = self.slots.pop(key) + if key in self.access_history: + del self.access_history[key] self.size -= 1 + self.current_memory -= (len(key) + len(value)) + self.evictions += 1 return True return False + cpdef void clear(self, bint reset_stats=False): + """Clear all items from cache.""" + self.slots.clear() + self.access_history.clear() + self.size = 0 + self.current_memory = 0 + if reset_stats: + self.hits = 0 + self.misses = 0 + self.evictions = 0 + self.inserts = 0 + @property def keys(self): + """Get all keys in cache as a list.""" return list(self.slots.keys()) + def items(self): + """Get all key-value pairs in cache.""" + return list(self.slots.items()) + + @property + def memory_usage(self): + """Get current memory usage in bytes.""" + return self.current_memory + @property def stats(self): - return self.hits, self.misses, self.evictions, self.inserts + """Get cache statistics as a tuple: (hits, misses, evictions, inserts).""" + return (self.hits, self.misses, self.evictions, self.inserts) def reset(self, bint reset_stats=False): - self.slots.clear() - self.access_history.clear() - self.removed.clear() - self.heap.clear() - if reset_stats: - self.hits = 0 - self.misses = 0 - self.evictions = 0 - self.inserts = 0 + """Alias for clear.""" + self.clear(reset_stats) diff --git a/opteryx/compiled/structures/memory_pool.pyx b/opteryx/compiled/structures/memory_pool.pyx index 3008410cb..201f00618 100644 --- a/opteryx/compiled/structures/memory_pool.pyx +++ b/opteryx/compiled/structures/memory_pool.pyx @@ -1,5 +1,5 @@ # cython: language_level=3 -# cython: nonecheck=False +# cython: nonecheck=False # cython: cdivision=True # cython: initializedcheck=False # cython: infer_types=True @@ -9,7 +9,6 @@ """ memory_pool.pyx -A high-performance memory pool implementation using Cython for efficient memory management in Python applications. This module provides the `MemoryPool` class, which allocates a contiguous block of memory and manages it through segmentation, compaction, and thread-safe operations. It supports committing, reading, releasing, and compacting memory segments, with optional latching for concurrent access. Designed for use cases requiring fast, low-level @@ -23,23 +22,15 @@ Features: - Debug mode for detailed statistics and diagnostics. - Supports zero-copy reads and buffer-based commits for compatibility with numpy, memoryview, and PyArrow. -Classes: -- MemoryPool: Main class for managing the memory pool and its segments. - -Usage: - pool = MemoryPool(size) - ref_id = pool.commit(data) - bytes_data = pool.read(ref_id) - pool.release(ref_id) """ -from libc.stdlib cimport malloc, free -from libc.string cimport memcpy +from libc.stdlib cimport malloc, free, realloc +from libc.string cimport memcpy, memmove from cpython.bytes cimport PyBytes_AsString, PyBytes_FromStringAndSize from cpython.buffer cimport PyBUF_SIMPLE, PyObject_GetBuffer, PyBuffer_Release, Py_buffer from threading import RLock from libcpp.vector cimport vector -from libc.stdint cimport int64_t, int8_t +from libc.stdint cimport int64_t import os @@ -49,40 +40,57 @@ cdef struct MemorySegment: int64_t start int64_t length int64_t latches + int64_t ref_id + bint is_free +cdef inline int64_t _align_size(int64_t size, int64_t alignment=8): + """Align size to the specified boundary for better memory access patterns.""" + return (size + alignment - 1) & ~(alignment - 1) cdef class MemoryPool: cdef: unsigned char* pool public int64_t size - public vector[MemorySegment] free_segments - public dict[int64_t, MemorySegment] used_segments + public int64_t used_size + public vector[MemorySegment] segments + public dict used_segments # public view: ref_id -> {'start','length','latches','orig_length'} + cdef dict _used_start_map # internal: ref_id -> segment.start (or -1 for zero-length) public str name - public int64_t commits, failed_commits, reads, read_locks, l1_compaction, l2_compaction, releases + public int64_t commits, failed_commits, reads, read_locks + public int64_t l1_compaction, l2_compaction, releases, resizes object lock int64_t next_ref_id + int64_t alignment + bint auto_resize - def __cinit__(self, int64_t size, str name="Memory Pool"): + def __cinit__(self, int64_t size, str name="Memory Pool", bint auto_resize=False, int64_t alignment=1): if size <= 0: raise ValueError("MemoryPool size must be a positive integer") + if alignment & (alignment - 1) != 0: + raise ValueError("Alignment must be a power of two") + self.size = size - attempt_size = size + self.used_size = 0 + self.alignment = alignment + self.auto_resize = auto_resize self.next_ref_id = 1 - while attempt_size > 0: - self.pool = malloc(attempt_size * sizeof(unsigned char)) - if self.pool: - break - attempt_size >>= 1 # Bit shift to halve the size and try again - + self.pool = malloc(self.size * sizeof(unsigned char)) if not self.pool: raise MemoryError("Failed to allocate memory pool") - self.size = attempt_size + # Initialize with one free segment covering entire pool + cdef MemorySegment initial_segment + initial_segment.start = 0 + initial_segment.length = self.size + initial_segment.latches = 0 + initial_segment.is_free = True + self.segments.push_back(initial_segment) + self.name = name - self.free_segments = [MemorySegment(0, self.size, 0)] self.used_segments = {} + self._used_start_map = {} self.lock = RLock() # Initialize statistics @@ -93,116 +101,204 @@ cdef class MemoryPool: self.l1_compaction = 0 self.l2_compaction = 0 self.releases = 0 + self.resizes = 0 def __dealloc__(self): if self.pool is not NULL: free(self.pool) if DEBUG_MODE: - print (f"Memory Pool ({self.name}) ") - - cdef inline void _update_latch(self, int64_t ref_id, int8_t delta): - cdef MemorySegment segment - - segment = self.used_segments[ref_id] - segment.latches += delta - self.used_segments[ref_id] = segment + self._print_stats() + + cdef void _print_stats(self): + cdef int64_t total_free = 0 + cdef int64_t total_used = 0 + cdef double fragmentation = 0.0 + cdef int64_t free_blocks = 0 + cdef double denom + + for i in range(self.segments.size()): + if self.segments[i].is_free: + total_free += self.segments[i].length + free_blocks += 1 + else: + total_used += self.segments[i].length - cdef inline int64_t _find_free_segment(self, int64_t size): - """ - Locate the first free segement larger than 'size' - """ - cdef int64_t i - cdef MemorySegment segment - for i in range(len(self.free_segments)): - segment = self.free_segments[i] - if segment.length >= size: - return i - return -1 - - def _level1_compaction(self): - """ - Level 1 compaction combines adjacent empty segments only - """ - cdef int64_t n - cdef MemorySegment last_segment, segment + if total_free > 0 and free_blocks > 1: + # Avoid integer division by zero when total_free < 1024. + denom = total_free / 1024.0 + if denom > 0.0: + fragmentation = (free_blocks - 1) * 100.0 / denom # Simplified fragmentation metric + else: + fragmentation = 0.0 + + print(f"Memory Pool ({self.name}) <" + f"size={self.size}, used={total_used}, free={total_free}, " + f"fragmentation={fragmentation:.1f}%, " + f"commits={self.commits} (failed={self.failed_commits}), " + f"reads={self.reads}, releases={self.releases}, " + f"L1={self.l1_compaction}, L2={self.l2_compaction}, " + f"resizes={self.resizes}>") + + cdef bint _resize_pool(self, int64_t new_size): + """Resize the memory pool (expensive operation).""" + cdef unsigned char* new_pool = realloc(self.pool, new_size) + if not new_pool: + return False + + self.pool = new_pool + self.size = new_size + self.resizes += 1 + return True + + cdef inline int64_t _find_best_fit_segment(self, int64_t size): + """Find the best fit free segment using first-fit strategy.""" + cdef int64_t best_index = -1 + cdef int64_t best_waste = self.size + 1 # Initialize with large value + cdef int64_t waste + + for i in range(self.segments.size()): + if self.segments[i].is_free and self.segments[i].length >= size: + waste = self.segments[i].length - size + if waste < best_waste: + best_waste = waste + best_index = i + if waste == 0: # Perfect fit + break + + return best_index + + cdef void _merge_adjacent_free_segments(self): + """Merge adjacent free segments (Level 1 compaction).""" + if self.segments.size() <= 1: + return self.l1_compaction += 1 - n = len(self.free_segments) - if n <= 1: - return - # Sort the free segments by start attribute - self.free_segments = sorted(self.free_segments, key=lambda x: x["start"]) - new_free_segments = [self.free_segments[0]] + cdef vector[MemorySegment] new_segments - for segment in self.free_segments[1:]: - last_segment = new_free_segments[-1] - if last_segment.start + last_segment.length == segment.start: - # If adjacent, merge by extending the last segment - new_free_segments[-1] = MemorySegment(last_segment.start, last_segment.length + segment.length, 0) - else: - # If not adjacent, just add the segment to the new list - new_free_segments.append(segment) + for i in range(self.segments.size()): + if new_segments.size() == 0: + new_segments.push_back(self.segments[i]) + continue - self.free_segments = new_free_segments + # Access last element directly and current segment from self.segments + if new_segments[new_segments.size() - 1].is_free and self.segments[i].is_free and \ + new_segments[new_segments.size() - 1].start + new_segments[new_segments.size() - 1].length == self.segments[i].start: + # Merge adjacent free segments by extending the last element + new_segments[new_segments.size() - 1].length += self.segments[i].length + else: + new_segments.push_back(self.segments[i]) - def _level2_compaction(self): - """ - Moves used memory to the beginning and free memory to the end. + self.segments = new_segments - This is our most aggressive compaction. + cdef void _defragment_memory(self): + """Defragment memory by moving segments (Level 2 compaction).""" + if self.segments.size() <= 1: + return - Note: This does not move latched segments, so we may still have fragmentation - after this process. - """ - cdef long offset = 0 - cdef MemorySegment segment self.l2_compaction += 1 - # Sort used segments by start address - used_segments_sorted = sorted(self.used_segments.items(), key=lambda x: x[1]["start"]) - - for segment_id, segment in used_segments_sorted: - if segment.latches == 0: - if segment.start != offset: - memcpy(self.pool + offset, self.pool + segment.start, segment.length) - segment.start = offset - self.used_segments[segment_id] = segment - offset += segment.length + cdef vector[MemorySegment] new_segments + cdef int64_t current_pos = 0 + cdef int64_t i + cdef MemorySegment seg + cdef int64_t original_start + cdef dict start_to_ref = {} + cdef object ref, info + + # Build mapping from original start -> ref so we can update refs after moves + for ref, info in self.used_segments.items(): + start_to_ref[info["start"]] = ref + + # Iterate original segments in order and move unlatched segments leftwards + for i in range(self.segments.size()): + seg = self.segments[i] + if seg.is_free: + continue + + original_start = seg.start + + if seg.latches > 0: + # Can't move latched segments; ensure current_pos moves past them + # If there's a gap before the latched segment, keep it as-is + if current_pos != seg.start: + # If current_pos < seg.start, we should leave the latched segment at its current start + current_pos = seg.start + seg.length + else: + current_pos += seg.length + + # Append the latched segment unchanged + new_segments.push_back(seg) + # Update mapping if present (start remains same) + if original_start in start_to_ref: + ref = start_to_ref[original_start] + self._used_start_map[ref] = seg.start + if ref in self.used_segments: + self.used_segments[ref]["start"] = seg.start else: - # Latched segment is not moved, skip it, - # but make sure the offset accounts for it - offset = max(offset, segment.start + segment.length) + # Move this unlatched segment to current_pos if needed + if seg.start != current_pos: + memmove(self.pool + current_pos, self.pool + seg.start, seg.length) + seg.start = current_pos + + new_segments.push_back(seg) + + # Update mapping for this moved segment + if original_start in start_to_ref: + ref = start_to_ref[original_start] + self._used_start_map[ref] = seg.start + if ref in self.used_segments: + self.used_segments[ref]["start"] = seg.start + + current_pos += seg.length + + # Add one large free segment at the end + cdef MemorySegment free_segment + if current_pos < self.size: + free_segment.start = current_pos + free_segment.length = self.size - current_pos + free_segment.latches = 0 + free_segment.is_free = True + new_segments.push_back(free_segment) + + self.segments = new_segments + + cpdef int64_t get_fragmentation(self): + """Calculate memory fragmentation percentage.""" + cdef int64_t total_free = 0 + cdef int64_t largest_free_block = 0 + cdef int64_t free_blocks = 0 - # Now find gaps between the compacted list (which includes latched) - free_segments = [] - current = 0 - for segment_id, segment in sorted(self.used_segments.items(), key=lambda x: x[1]["start"]): - if segment.start > current: - free_segments.append(MemorySegment(current, segment.start - current, 0)) - current = segment.start + segment.length + with self.lock: + for i in range(self.segments.size()): + if self.segments[i].is_free: + total_free += self.segments[i].length + free_blocks += 1 + if self.segments[i].length > largest_free_block: + largest_free_block = self.segments[i].length - if current < self.size: - free_segments.append(MemorySegment(current, self.size - current, 0)) + if total_free == 0: + return 0 + if largest_free_block == 0: + return 100 - self.free_segments = free_segments + return 100 - (largest_free_block * 100 // total_free) def commit(self, object data) -> int64_t: cdef int64_t len_data cdef int64_t segment_index - cdef MemorySegment segment + cdef MemorySegment segment, new_segment cdef int64_t ref_id = self.next_ref_id cdef Py_buffer view cdef char* raw_ptr + cdef int64_t aligned_size - # increment for the next commit self.next_ref_id += 1 if isinstance(data, bytes): len_data = len(data) raw_ptr = PyBytes_AsString(data) else: - # Use PyBuffer API to support memoryview, numpy, or PyArrow arrays if PyObject_GetBuffer(data, &view, PyBUF_SIMPLE) == 0: len_data = view.len raw_ptr = view.buf @@ -210,95 +306,307 @@ cdef class MemoryPool: raise TypeError("Unsupported data type for commit") if len_data == 0: - self.used_segments[ref_id] = MemorySegment(0, 0, 0) - self.commits += 1 + with self.lock: + self._used_start_map[ref_id] = -1 # Special marker for zero-length data + # public view entry for tests + self.used_segments[ref_id] = {"start": 0, "length": 0, "latches": 0} + self.commits += 1 return ref_id - total_free_space = sum(segment.length for segment in self.free_segments) - if total_free_space < len_data: - self.failed_commits += 1 - return -1 + aligned_size = _align_size(len_data, self.alignment) + + cdef int64_t new_size + cdef MemorySegment additional_space with self.lock: - segment_index = self._find_free_segment(len_data) + # Try to find a suitable segment + segment_index = self._find_best_fit_segment(aligned_size) + if segment_index == -1: - self._level1_compaction() - segment_index = self._find_free_segment(len_data) + # Try compaction + self._merge_adjacent_free_segments() + segment_index = self._find_best_fit_segment(aligned_size) + if segment_index == -1: - self._level2_compaction() - segment_index = self._find_free_segment(len_data) - if segment_index == -1: - return -1 - - segment = self.free_segments[segment_index] - self.free_segments.erase(self.free_segments.begin() + segment_index) - if segment.length > len_data: - self.free_segments.push_back(MemorySegment(segment.start + len_data, segment.length - len_data, 0)) - - memcpy(self.pool + segment.start, raw_ptr, len_data) - self.used_segments[ref_id] = MemorySegment(segment.start, len_data, 0) + # Try defragmentation + self._defragment_memory() + segment_index = self._find_best_fit_segment(aligned_size) + + if segment_index == -1 and self.auto_resize: + # Calculate new size (at least double or enough for current allocation) + new_size = max(self.size * 2, self.size + aligned_size * 2) + if self._resize_pool(new_size): + # Add the new free space to the last segment if it's free + if self.segments.size() > 0 and self.segments[self.segments.size() - 1].is_free: + self.segments[self.segments.size() - 1].length += new_size - self.size + else: + additional_space.start = self.size + additional_space.length = new_size - self.size + additional_space.latches = 0 + additional_space.is_free = True + self.segments.push_back(additional_space) + + segment_index = self._find_best_fit_segment(aligned_size) + + if segment_index == -1: + self.failed_commits += 1 + if not isinstance(data, bytes): + PyBuffer_Release(&view) + return -1 + + segment = self.segments[segment_index] + + # Create new used segment + new_segment.start = segment.start + new_segment.length = aligned_size + new_segment.latches = 0 + new_segment.is_free = False + + # Update or replace the free segment + if segment.length > aligned_size: + # Split the segment + segment.start += aligned_size + segment.length -= aligned_size + self.segments[segment_index] = segment + self.segments.insert(self.segments.begin() + segment_index, new_segment) + self._used_start_map[ref_id] = new_segment.start + self.used_segments[ref_id] = {"start": new_segment.start, "length": new_segment.length, "latches": 0, "orig_length": len_data} + else: + # Exact fit or very close (due to alignment) + self.segments[segment_index] = new_segment + self._used_start_map[ref_id] = new_segment.start + self.used_segments[ref_id] = {"start": new_segment.start, "length": new_segment.length, "latches": 0, "orig_length": len_data} + + # Copy data + memcpy(self.pool + new_segment.start, raw_ptr, len_data) + self.used_size += aligned_size self.commits += 1 - # Release PyBuffer if used if not isinstance(data, bytes): PyBuffer_Release(&view) return ref_id - cpdef read(self, int64_t ref_id, int zero_copy = 0, int latch = 0): - """ - When we read we can perform a zero copy read, in multithread/ - async uses, we use latches to mark a segment as "in-use" - """ + cpdef read(self, int64_t ref_id, bint zero_copy=False, bint latch=False): + cdef int64_t segment_index cdef MemorySegment segment - cdef char* char_ptr = self.pool + cdef char* char_ptr = self.pool with self.lock: - if ref_id not in self.used_segments: + if ref_id not in self._used_start_map: raise ValueError("Invalid reference ID.") - self.reads += 1 - segment = self.used_segments[ref_id] + start = self._used_start_map[ref_id] + + # Handle zero-length data + if start == -1: + if zero_copy: + return memoryview(b'') + else: + return b'' - # optionally apply a latch - if latch != 0: - self._update_latch(ref_id, 1) + # find segment index by start + segment_index = -1 + for i in range(self.segments.size()): + if not self.segments[i].is_free and self.segments[i].start == start: + segment_index = i + break + + if segment_index == -1: + raise ValueError("Invalid reference ID.") - if zero_copy != 0: - return memoryview((char_ptr + segment.start)) + segment = self.segments[segment_index] + self.reads += 1 - return PyBytes_FromStringAndSize(char_ptr + segment.start, segment.length) + if latch: + segment.latches += 1 + self.segments[segment_index] = segment + # update public view + self.used_segments[ref_id]["latches"] = segment.latches + self.read_locks += 1 + + # Return only the original data length + orig_len = self.used_segments[ref_id].get("orig_length", segment.length) + if zero_copy: + return memoryview((char_ptr + segment.start)) + else: + return PyBytes_FromStringAndSize(char_ptr + segment.start, orig_len) cpdef unlatch(self, int64_t ref_id): - """ - Remove a latch from a segment - don't release - """ + cdef int64_t segment_index cdef MemorySegment segment with self.lock: - if ref_id not in self.used_segments: + if ref_id not in self._used_start_map: raise ValueError(f"Invalid reference ID - {ref_id}.") - segment = self.used_segments[ref_id] + + start = self._used_start_map[ref_id] + if start == -1: # Zero-length data + return + + # find segment index + segment_index = -1 + for i in range(self.segments.size()): + if not self.segments[i].is_free and self.segments[i].start == start: + segment_index = i + break + if segment_index == -1: + raise ValueError(f"Invalid reference ID - {ref_id}.") + + segment = self.segments[segment_index] if segment.latches == 0: raise RuntimeError(f"Segment {ref_id} was not latched.") - self._update_latch(ref_id, -1) - cpdef release(self, int64_t ref_id): - """ - Free a segment + segment.latches -= 1 + self.segments[segment_index] = segment + # update public view + self.used_segments[ref_id]["latches"] = segment.latches - NOTE: Release assumes exclusive access and does not respect latches - """ + cpdef release(self, int64_t ref_id): + cdef int64_t segment_index cdef MemorySegment segment with self.lock: - if ref_id not in self.used_segments: + if ref_id not in self._used_start_map: raise ValueError(f"Invalid reference ID - {ref_id}.") self.releases += 1 + start = self._used_start_map.pop(ref_id) + # remove public view + if ref_id in self.used_segments: + del self.used_segments[ref_id] + + if start == -1: # Zero-length data + return + + # find segment index by start + segment_index = -1 + for i in range(self.segments.size()): + if not self.segments[i].is_free and self.segments[i].start == start: + segment_index = i + break + + if segment_index == -1: + raise ValueError(f"Invalid reference ID - {ref_id}.") - segment = self.used_segments.pop(ref_id) - self.free_segments.push_back(segment) + segment = self.segments[segment_index] + + # Allow releasing a segment even if it is latched. Tests expect + # release to remove the segment and then unlatch should raise ValueError. + if segment.latches > 0: + # clear latch count and proceed to free + segment.latches = 0 + + segment.is_free = True + # ensure latches cleared + segment.latches = 0 + self.segments[segment_index] = segment + self.used_size -= segment.length + + # Try to merge adjacent free segments + self._merge_adjacent_free_segments() def available_space(self) -> int64_t: - return sum(segment.length for segment in self.free_segments) + cdef int64_t total_free = 0 + with self.lock: + for i in range(self.segments.size()): + if self.segments[i].is_free: + total_free += self.segments[i].length + return total_free + + def get_stats(self) -> dict: + """Get detailed statistics about the memory pool.""" + cdef int64_t total_free = 0 + cdef int64_t total_used = 0 + cdef int64_t free_blocks = 0 + cdef int64_t used_blocks = 0 + cdef int64_t largest_free = 0 + + with self.lock: + for i in range(self.segments.size()): + if self.segments[i].is_free: + total_free += self.segments[i].length + free_blocks += 1 + if self.segments[i].length > largest_free: + largest_free = self.segments[i].length + else: + total_used += self.segments[i].length + used_blocks += 1 + + return { + 'total_size': self.size, + 'used_size': total_used, + 'free_size': total_free, + 'used_blocks': used_blocks, + 'free_blocks': free_blocks, + 'largest_free_block': largest_free, + 'fragmentation': self.get_fragmentation(), + 'commits': self.commits, + 'failed_commits': self.failed_commits, + 'reads': self.reads, + 'releases': self.releases, + 'compactions': self.l1_compaction + self.l2_compaction, + 'resizes': self.resizes + } + + def debug_info(self) -> str: + """Get debug information about segment layout.""" + cdef str info = f"Memory Pool '{self.name}' (size: {self.size})\n" + cdef int64_t i + cdef MemorySegment seg + + with self.lock: + for i in range(self.segments.size()): + seg = self.segments[i] + status = "FREE" if seg.is_free else f"USED (latches: {seg.latches})" + info += f" Segment {i}: [{seg.start:8d} - {seg.start + seg.length:8d}] " + info += f"length: {seg.length:8d} {status}\n" + + return info + + @property + def free_segments(self): + """Return a list of free segments as dictionaries for tests.""" + cdef list out = [] + cdef int64_t i + cdef MemorySegment seg + with self.lock: + for i in range(self.segments.size()): + seg = self.segments[i] + if seg.is_free: + out.append({"start": seg.start, "length": seg.length}) + return out + + cpdef _level1_compaction(self): + with self.lock: + self._merge_adjacent_free_segments() + + cpdef _level2_compaction(self): + cdef list ordered_refs + cdef list _ordered + cdef int64_t i + cdef MemorySegment seg + cdef object ref, info + + with self.lock: + # Build list of refs ordered by their current start without using a lambda + _ordered = [] + for ref, info in self.used_segments.items(): + _ordered.append((info["start"], ref)) + _ordered.sort() + ordered_refs = [t[1] for t in _ordered] + + # defragment memory (moves used segments to the front in-order) + self._defragment_memory() + + # After defrag, assign refs to used segments in order + idx = 0 + for i in range(self.segments.size()): + seg = self.segments[i] + if not seg.is_free: + if idx < len(ordered_refs): + ref = ordered_refs[idx] + self._used_start_map[ref] = seg.start + if ref in self.used_segments: + self.used_segments[ref]["start"] = seg.start + idx += 1 diff --git a/opteryx/compiled/table_ops/hash_ops.pyx b/opteryx/compiled/table_ops/hash_ops.pyx index f67a74ba4..8c0dc8f5f 100644 --- a/opteryx/compiled/table_ops/hash_ops.pyx +++ b/opteryx/compiled/table_ops/hash_ops.pyx @@ -10,7 +10,7 @@ import pyarrow from libc.stdint cimport int32_t, uint8_t, uint64_t, uintptr_t from cpython.object cimport PyObject_Hash -from cpython.bytes cimport PyBytes_AsString, PyBytes_Size +from libc.string cimport memcpy import array from opteryx.third_party.cyan4973.xxhash cimport cy_xxhash3_64 @@ -117,6 +117,7 @@ cdef void process_primitive_chunk(object chunk, uint64_t[::1] row_hashes, Py_ssi cdef Py_ssize_t byte_index cdef Py_ssize_t bit_index cdef uint64_t hash_val + cdef uint64_t tmp_val cdef list buffers = chunk.buffers() cdef object validity_buf = buffers[0] cdef object data_buf = buffers[1] @@ -133,7 +134,9 @@ cdef void process_primitive_chunk(object chunk, uint64_t[::1] row_hashes, Py_ssi if validity is NULL: if item_size == 8: for i in range(length): - hash_val = ((data + ((arr_offset + i) << 3)))[0] + # Safe load into local to avoid unaligned access + memcpy(&tmp_val, data + ((arr_offset + i) << 3), 8) + hash_val = tmp_val update_row_hash(row_hashes, row_offset + i, hash_val) else: for i in range(length): @@ -148,7 +151,8 @@ cdef void process_primitive_chunk(object chunk, uint64_t[::1] row_hashes, Py_ssi if not (validity[byte_index] & (1 << bit_index)): hash_val = NULL_HASH else: - hash_val = ((data + ((arr_offset + i) << 3)))[0] + memcpy(&tmp_val, data + ((arr_offset + i) << 3), 8) + hash_val = tmp_val update_row_hash(row_hashes, row_offset + i, hash_val) else: for i in range(length): @@ -169,70 +173,163 @@ cdef void process_list_chunk(object chunk, uint64_t[::1] row_hashes, Py_ssize_t combining each sub-element's hash, and mixing it into `row_hashes`. """ + # New buffer-aware fast paths for primitive and string child types + # Declare all C variables at function scope (Cython requirement) cdef: - const uint8_t* validity - const int32_t* offsets - Py_ssize_t i, j, length, data_size + const uint8_t* list_validity + const int32_t* list_offsets + const uint8_t* child_validity + const uint8_t* child_data_bytes + const char* child_data_chars + const int32_t* child_offsets + Py_ssize_t i, j, length Py_ssize_t start, end, sub_length Py_ssize_t arr_offset, child_offset + Py_ssize_t item_size, child_idx, bit + Py_ssize_t child_offset_bytes, child_buffer_len object child_array, sublist - uint64_t hash_val - list buffers = chunk.buffers() + list buffers + list child_buffers + uint64_t hash_val, elem_hash, tmp_val uint64_t c1 = 0xbf58476d1ce4e5b9U uint64_t c2 = 0x94d049bb133111ebU - cdef char* data_ptr - # Obtain addresses of validity bitmap and offsets buffer - validity = (buffers[0].address) if buffers[0] else NULL - offsets = (buffers[1].address) + buffers = chunk.buffers() + # Obtain addresses of validity bitmap and offsets buffer for the list + list_validity = (buffers[0].address) if buffers[0] else NULL + list_offsets = (buffers[1].address) if len(buffers) > 1 else NULL - # The child array holds the sub-elements of the list + # Child array and its metadata child_array = chunk.values - - # Number of "top-level" list entries in this chunk length = len(chunk) - - # Arrow can slice a chunk, so account for chunk.offset arr_offset = chunk.offset - - # Child array can also be offset child_offset = child_array.offset - for i in range(length): - # Check validity for the i-th list in this chunk - if validity and not (validity[i >> 3] & (1 << (i & 7))): - hash_val = NULL_HASH - else: - # Properly compute start/end using arr_offset - start = offsets[arr_offset + i] - end = offsets[arr_offset + i + 1] - sub_length = end - start - - # Initialize hash with a seed - hash_val = SEED + # Default initializations + child_validity = NULL + child_data_bytes = NULL + child_data_chars = NULL + child_offsets = NULL + child_buffers = [] + elem_hash = 0 + tmp_val = 0 + + # Decide on fast path based on child type once per chunk + if pyarrow.types.is_integer(child_array.type) or pyarrow.types.is_floating(child_array.type) or pyarrow.types.is_temporal(child_array.type): + # Buffer-aware primitive child fast-path (no Python objects in inner loop) + child_buffers = child_array.buffers() + child_validity = (child_buffers[0].address) if child_buffers[0] else NULL + child_data_bytes = (child_buffers[1].address) + item_size = child_array.type.bit_width // 8 + child_offset_bytes = child_offset * item_size + + for i in range(length): + # list validity + if list_validity and not (list_validity[(arr_offset + i) >> 3] & (1 << ((arr_offset + i) & 7))): + hash_val = NULL_HASH + else: + start = list_offsets[arr_offset + i] + end = list_offsets[arr_offset + i + 1] + sub_length = end - start + hash_val = SEED + if sub_length == 0: + hash_val = EMPTY_HASH + else: + # iterate child elements directly from child_data + for j in range(sub_length): + child_idx = start + j + if child_validity: + bit = (child_offset + child_idx) + if not (child_validity[bit >> 3] & (1 << (bit & 7))): + elem_hash = NULL_HASH + # mix and continue + hash_val = elem_hash ^ hash_val + hash_val = (hash_val ^ (hash_val >> 30)) * c1 + hash_val = (hash_val ^ (hash_val >> 27)) * c2 + hash_val = hash_val ^ (hash_val >> 31) + continue + + if item_size == 8: + memcpy(&tmp_val, child_data_bytes + child_offset_bytes + (child_idx << 3), 8) + elem_hash = tmp_val + else: + elem_hash = cy_xxhash3_64(child_data_bytes + child_offset_bytes + (child_idx * item_size), item_size) + + # mix element hash + hash_val = elem_hash ^ hash_val + hash_val = (hash_val ^ (hash_val >> 30)) * c1 + hash_val = (hash_val ^ (hash_val >> 27)) * c2 + hash_val = hash_val ^ (hash_val >> 31) + + update_row_hash(row_hashes, row_offset + i, hash_val) + + elif pyarrow.types.is_string(child_array.type) or pyarrow.types.is_binary(child_array.type): + # Buffer-aware string child fast-path + child_buffers = child_array.buffers() + child_validity = (child_buffers[0].address) if child_buffers[0] else NULL + child_offsets = (child_buffers[1].address) + child_data_chars = (child_buffers[2].address) + child_buffer_len = child_buffers[2].size if child_buffers[2] is not None else 0 + + for i in range(length): + if list_validity and not (list_validity[(arr_offset + i) >> 3] & (1 << ((arr_offset + i) & 7))): + hash_val = NULL_HASH + else: + start = list_offsets[arr_offset + i] + end = list_offsets[arr_offset + i + 1] + sub_length = end - start + hash_val = SEED + if sub_length == 0: + hash_val = EMPTY_HASH + else: + for j in range(sub_length): + child_idx = start + j + child_offset + # check child validity + if child_validity and not (child_validity[child_idx >> 3] & (1 << (child_idx & 7))): + elem_hash = NULL_HASH + else: + s = child_offsets[child_idx] + e = child_offsets[child_idx + 1] + ln = e - s + if ln <= 0 or (s + ln) > child_buffer_len: + elem_hash = EMPTY_HASH + else: + elem_hash = cy_xxhash3_64(child_data_chars + s, ln) + + # mix + hash_val = elem_hash ^ hash_val + hash_val = (hash_val ^ (hash_val >> 30)) * c1 + hash_val = (hash_val ^ (hash_val >> 27)) * c2 + hash_val = hash_val ^ (hash_val >> 31) + + update_row_hash(row_hashes, row_offset + i, hash_val) - # Handle empty list - if sub_length == 0: - hash_val = EMPTY_HASH + else: + # Fallback: per-element Python handling (kept minimal and only for unsupported child types) + for i in range(length): + if list_validity and not (list_validity[(arr_offset + i) >> 3] & (1 << ((arr_offset + i) & 7))): + hash_val = NULL_HASH else: - # Correctly slice child array by adding child_offset - sublist = child_array.slice(start + child_offset, sub_length) - - # Combine each element in the sublist - for j in range(sub_length): - # Convert to Python string, then to UTF-8 bytes - element = sublist[j].as_py().encode("utf-8") - data_ptr = PyBytes_AsString(element) - data_size = PyBytes_Size(element) - - # Combine each element's hash with a simple mix - hash_val = cy_xxhash3_64(data_ptr, data_size) ^ hash_val - hash_val = (hash_val ^ (hash_val >> 30)) * c1 - hash_val = (hash_val ^ (hash_val >> 27)) * c2 - hash_val = hash_val ^ (hash_val >> 31) - - # Merge this row's final list-hash into row_hashes - update_row_hash(row_hashes, row_offset + i, hash_val) + start = list_offsets[arr_offset + i] + end = list_offsets[arr_offset + i + 1] + sub_length = end - start + hash_val = SEED + if sub_length == 0: + hash_val = EMPTY_HASH + else: + sublist = child_array.slice(start + child_offset, sub_length) + for j in range(sub_length): + try: + elem_hash = PyObject_Hash(sublist[j]) & 0xFFFFFFFFFFFFFFFFU + except Exception: + elem_hash = EMPTY_HASH + + hash_val = elem_hash ^ hash_val + hash_val = (hash_val ^ (hash_val >> 30)) * c1 + hash_val = (hash_val ^ (hash_val >> 27)) * c2 + hash_val = hash_val ^ (hash_val >> 31) + + update_row_hash(row_hashes, row_offset + i, hash_val) cdef void process_boolean_chunk(object chunk, uint64_t[::1] row_hashes, Py_ssize_t row_offset): diff --git a/opteryx/compiled/table_ops/null_avoidant_ops.pyx b/opteryx/compiled/table_ops/null_avoidant_ops.pyx index 70d0c4966..a2e1216ea 100644 --- a/opteryx/compiled/table_ops/null_avoidant_ops.pyx +++ b/opteryx/compiled/table_ops/null_avoidant_ops.pyx @@ -56,8 +56,11 @@ cdef inline numpy.ndarray[int64_t, ndim=1] non_null_row_indices(object relation, if validity == NULL: raise RuntimeError(f"Null validity buffer for column '{column_name}'") + # Account for chunk.offset when checking validity bits + chunk_offset = chunk.offset for j in range(length): - bit = (validity[j >> 3] >> (j & 7)) & 1 + bit_index = chunk_offset + j + bit = (validity[bit_index >> 3] >> (bit_index & 7)) & 1 combined_nulls[offset + j] &= bit offset += length diff --git a/opteryx/connectors/disk_connector.py b/opteryx/connectors/disk_connector.py index aaf8e565d..9e5189dff 100644 --- a/opteryx/connectors/disk_connector.py +++ b/opteryx/connectors/disk_connector.py @@ -8,8 +8,11 @@ given as a folder on local disk """ +import contextlib +import ctypes import mmap import os +import platform import time from typing import Dict from typing import List @@ -34,6 +37,7 @@ OS_SEP = os.sep IS_WINDOWS = is_windows() +IS_LINUX = platform.system() == "Linux" # Define os.O_BINARY for non-Windows platforms if it's not already defined if not hasattr(os, "O_BINARY"): @@ -43,7 +47,12 @@ mmap_config = {} if not IS_WINDOWS: - mmap_config["flags"] = mmap.MAP_PRIVATE + # prefer MAP_PRIVATE and on Linux enable MAP_POPULATE to fault pages in + flags = mmap.MAP_PRIVATE + if IS_LINUX and hasattr(mmap, "MAP_POPULATE"): + with contextlib.suppress(Exception): + flags |= mmap.MAP_POPULATE + mmap_config["flags"] = flags mmap_config["prot"] = mmap.PROT_READ else: mmap_config["access"] = mmap.ACCESS_READ @@ -129,14 +138,40 @@ def read_blob( OSError: If an I/O error occurs while reading the file. """ + file_descriptor = None + _map = None try: file_descriptor = os.open(blob_name, os.O_RDONLY | os.O_BINARY) + # on platforms that support it give the kernel a hint about access pattern if hasattr(os, "posix_fadvise"): - os.posix_fadvise(file_descriptor, 0, 0, os.POSIX_FADV_WILLNEED) + # sequential access is the common pattern for dataset reads + try: + os.posix_fadvise(file_descriptor, 0, 0, os.POSIX_FADV_SEQUENTIAL) + except OSError: + # fallback to WILLNEED if SEQUENTIAL is not allowed + with contextlib.suppress(Exception): + os.posix_fadvise(file_descriptor, 0, 0, os.POSIX_FADV_WILLNEED) + size = os.fstat(file_descriptor).st_size _map = mmap.mmap(file_descriptor, length=size, **mmap_config) + + # On Linux advise the kernel that access will be sequential to improve readahead + if IS_LINUX: + # if anything goes wrong, ignore + with contextlib.suppress(Exception): + libc = ctypes.CDLL("libc.so.6") + # MADV_SEQUENTIAL is 2 on Linux, but don't hardcode if available + MADV_SEQUENTIAL = 2 + addr = ctypes.c_void_p(ctypes.addressof(ctypes.c_char.from_buffer(_map))) + length = ctypes.c_size_t(size) + libc.madvise(addr, length, MADV_SEQUENTIAL) + + # pass a memoryview of the mmap to decoders - this makes intent explicit + # and lets decoders that can accept memoryviews avoid extra copies + buffer = memoryview(_map) + result = decoder( - _map, + buffer, just_schema=just_schema, projection=projection, selection=selection, @@ -146,14 +181,20 @@ def read_blob( if not just_schema: stats = self.read_blob_statistics( - blob_name=blob_name, blob_bytes=_map, decoder=decoder + blob_name=blob_name, blob_bytes=buffer, decoder=decoder ) if self.relation_statistics is None: self.relation_statistics = stats return result finally: - os.close(file_descriptor) + # Ensure mmap is closed before closing the file descriptor + with contextlib.suppress(Exception): + if _map is not None: + _map.close() + with contextlib.suppress(Exception): + if file_descriptor is not None: + os.close(file_descriptor) @single_item_cache def get_list_of_blob_names(self, *, prefix: str) -> List[str]: diff --git a/opteryx/shared/async_memory_pool.py b/opteryx/shared/async_memory_pool.py index 7a2a556de..6f471bef0 100644 --- a/opteryx/shared/async_memory_pool.py +++ b/opteryx/shared/async_memory_pool.py @@ -16,23 +16,34 @@ class AsyncMemoryPool: def __init__(self, pool: MemoryPool): self.pool: MemoryPool = pool - self.lock = asyncio.Lock() + # MemoryPool is a blocking, thread-safe object (uses an RLock). We avoid + # serialising all async operations on a single asyncio.Lock which causes + # the event loop to become a bottleneck when many coroutines call + # commit/read/release concurrently. Instead we run the blocking calls in + # the default thread pool executor. This keeps the event loop responsive + # while still using the compiled MemoryPool which provides C-level + # performance for the actual memory operations. + self.lock = None async def commit(self, data: bytes) -> int: - async with self.lock: - return self.pool.commit(data) + loop = asyncio.get_running_loop() + # Offload the blocking commit to a thread to avoid blocking the event loop + return await loop.run_in_executor(None, self.pool.commit, data) async def read(self, ref_id: int, zero_copy=True, latch=True) -> bytes: """ In an async environment, we much more certain the bytes will be overwritten if we don't materialize them so we always create a copy. """ - async with self.lock: - return self.pool.read(ref_id, zero_copy=zero_copy, latch=latch) + loop = asyncio.get_running_loop() + # Offload the blocking read to a thread. The compiled MemoryPool.read + # is fast but still uses locks and may block; moving it to a thread + # allows multiple reads/commits to progress concurrently. + return await loop.run_in_executor(None, self.pool.read, ref_id, zero_copy, latch) async def release(self, ref_id: int): - async with self.lock: - self.pool.release(ref_id) + loop = asyncio.get_running_loop() + return await loop.run_in_executor(None, self.pool.release, ref_id) def size(self): return self.pool.size diff --git a/opteryx/utils/__init__.py b/opteryx/utils/__init__.py index c97b47f93..5248ef24f 100644 --- a/opteryx/utils/__init__.py +++ b/opteryx/utils/__init__.py @@ -8,9 +8,12 @@ from typing import Iterable from typing import Optional +from orso.tools import single_item_cache + from opteryx.third_party.mbleven import compare +@single_item_cache def is_windows() -> bool: return platform.system().lower() == "windows" diff --git a/opteryx/utils/file_decoders.py b/opteryx/utils/file_decoders.py index c46f4f17e..06fa900b1 100644 --- a/opteryx/utils/file_decoders.py +++ b/opteryx/utils/file_decoders.py @@ -181,6 +181,7 @@ def zstd_decoder( else: stream = buffer + # zstandard.open returns a file-like which we pass directly to jsonl_decoder with zstandard.open(stream, "rb") as file: return jsonl_decoder( file, projection=projection, selection=selection, just_schema=just_schema @@ -318,9 +319,17 @@ def parquet_decoder( # If we're here, we can't use rugo - we need to read the file with pyarrow - # Open the parquet file only once + # Open the parquet file only once. Prefer pyarrow.BufferReader with a + # pyarrow.Buffer when we have a memoryview to avoid creating intermediate + # Python bytes objects. if isinstance(buffer, memoryview): - stream = MemoryViewStream(buffer) + # pyarrow.py_buffer accepts buffer-protocol objects and is zero-copy + try: + pa_buf = pyarrow.py_buffer(buffer) + stream = pyarrow.BufferReader(pa_buf) + except Exception: + # fallback to MemoryViewStream if pyarrow can't handle this memoryview + stream = MemoryViewStream(buffer) elif isinstance(buffer, bytes): stream = pyarrow.BufferReader(buffer) else: @@ -444,10 +453,12 @@ def jsonl_decoder( from opteryx.third_party.tktech import csimdjson as simdjson + # Normalize inputs: accept memoryview, bytes, or file-like objects. if isinstance(buffer, memoryview): - # If it's a memoryview, we need to convert it to bytes + # Convert to bytes once; many downstream codepaths expect a bytes object buffer = buffer.tobytes() - if not isinstance(buffer, bytes): + elif not isinstance(buffer, bytes) and hasattr(buffer, "read"): + # file-like: read once into memory buffer = buffer.read() # If it's COUNT(*), we don't need to create a full dataset diff --git a/pyproject.toml b/pyproject.toml index 2a32ae060..80fc0c432 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "opteryx" -version = "0.26.0-beta.1666" +version = "0.26.0-beta.1676" description = "Query your data, where it lives" requires-python = '>=3.11' readme = {file = "README.md", content-type = "text/markdown"} diff --git a/src/cpp/intbuffer.cpp b/src/cpp/intbuffer.cpp index 99222dfa9..d47ea94c4 100644 --- a/src/cpp/intbuffer.cpp +++ b/src/cpp/intbuffer.cpp @@ -1,15 +1,59 @@ +// intbuffer.cpp #include "intbuffer.h" +// Fallback constants for buffer growth strategy. +#ifndef INITIAL_CAPACITY +#define INITIAL_CAPACITY 1024 +#endif + +#ifndef GROWTH_FACTOR +#define GROWTH_FACTOR 2 +#endif + CIntBuffer::CIntBuffer(size_t size_hint) { - buffer.reserve(size_hint); // Pre-allocate memory + buffer.reserve(size_hint > 0 ? size_hint : INITIAL_CAPACITY); } void CIntBuffer::append(int64_t value) { buffer.push_back(value); } +void CIntBuffer::append(int64_t value1, int64_t value2) { + if (buffer.capacity() - buffer.size() < 2) { + buffer.reserve(buffer.capacity() * GROWTH_FACTOR); + } + buffer.push_back(value1); + buffer.push_back(value2); +} + +void CIntBuffer::append(const int64_t* values, size_t count) { + if (count > 0) { + const size_t new_size = buffer.size() + count; + if (new_size > buffer.capacity()) { + buffer.reserve(std::max(new_size, buffer.capacity() * GROWTH_FACTOR)); + } + buffer.insert(buffer.end(), values, values + count); + } +} + +void CIntBuffer::append_optimized(int64_t value) { + // More aggressive growth strategy for append-heavy workloads + if (buffer.size() == buffer.capacity()) { + buffer.reserve(buffer.capacity() * GROWTH_FACTOR + 1024); // Extra padding + } + buffer.push_back(value); +} + void CIntBuffer::extend(const std::vector& values) { - buffer.insert(buffer.end(), values.begin(), values.end()); + append(values.data(), values.size()); +} + +void CIntBuffer::extend(const int64_t* values, size_t count) { + append(values, count); +} + +void CIntBuffer::reserve(size_t additional_capacity) { + buffer.reserve(buffer.size() + additional_capacity); } const int64_t* CIntBuffer::data() const noexcept { @@ -18,4 +62,21 @@ const int64_t* CIntBuffer::data() const noexcept { size_t CIntBuffer::size() const noexcept { return buffer.size(); +} + +size_t CIntBuffer::capacity() const noexcept { + return buffer.capacity(); +} + +void CIntBuffer::shrink_to_fit() { + buffer.shrink_to_fit(); +} + +void CIntBuffer::clear() noexcept { + buffer.clear(); +} + +template +void CIntBuffer::extend(InputIt first, InputIt last) { + buffer.insert(buffer.end(), first, last); } \ No newline at end of file diff --git a/src/cpp/intbuffer.h b/src/cpp/intbuffer.h index e97dc0ee7..f8711ac12 100644 --- a/src/cpp/intbuffer.h +++ b/src/cpp/intbuffer.h @@ -1,21 +1,34 @@ -#ifndef INTBUFFER_H -#define INTBUFFER_H - +// intbuffer.h +#pragma once #include #include #include +// Forward declarations for a small C++ helper used by Cython. class CIntBuffer { -private: - std::vector buffer; - public: explicit CIntBuffer(size_t size_hint = 1024); void append(int64_t value); + void append(int64_t value1, int64_t value2); + void append(const int64_t* values, size_t count); + void append_optimized(int64_t value); + void extend(const std::vector& values); + void extend(const int64_t* values, size_t count); + + void reserve(size_t additional_capacity); + const int64_t* data() const noexcept; size_t size() const noexcept; -}; + size_t capacity() const noexcept; + + void shrink_to_fit(); + void clear() noexcept; + + template + void extend(InputIt first, InputIt last); -#endif // INTBUFFER_H +private: + std::vector buffer; +}; \ No newline at end of file diff --git a/tests/performance/benchmarks/bench_hash_ops.py b/tests/performance/benchmarks/bench_hash_ops.py new file mode 100644 index 000000000..20e5208f4 --- /dev/null +++ b/tests/performance/benchmarks/bench_hash_ops.py @@ -0,0 +1,66 @@ +import os +import sys + +os.environ["OPTERYX_DEBUG"] = "" + +sys.path.insert(1, os.path.join(sys.path[0], "../../../../draken")) +sys.path.insert(1, os.path.join(sys.path[0], "../../../../orso")) +sys.path.insert(1, os.path.join(sys.path[0], "../../../../rugo")) +sys.path.insert(1, os.path.join(sys.path[0], "../../..")) + +import time +import pyarrow as pa +import numpy as np +from opteryx.compiled.table_ops.hash_ops import compute_hashes + + +def make_int_table(n): + arr = pa.array(np.random.randint(0, 1<<31, size=n), type=pa.int64()) + return pa.table({'a': arr}) + + +def make_str_table(n, avg_len=20): + import random, string + def rand_str(): + return ''.join(random.choices(string.ascii_letters + string.digits, k=avg_len)) + data = [rand_str() for _ in range(n)] + arr = pa.array(data) + return pa.table({'s': arr}) + + +def make_list_table(n, list_len=5): + data = [[i % 5 for i in range(list_len)] for _ in range(n)] + arr = pa.array(data) + return pa.table({'l': arr}) + + +def bench(table, cols, iterations=5): + # warmup + compute_hashes(table, cols) + times = [] + for _ in range(iterations): + t0 = time.perf_counter() + compute_hashes(table, cols) + t1 = time.perf_counter() + times.append(t1 - t0) + return min(times), sum(times) / len(times) + + +if __name__ == '__main__': + N = 200000 + print('Preparing tables...') + t_int = make_int_table(N) + t_str = make_str_table(N, avg_len=30) + t_list = make_list_table(N, list_len=3) + + print('Benchmarking int table...') + best, mean = bench(t_int, ['a']) + print(f'int table: best={best:.4f}s mean={mean:.4f}s') + + print('Benchmarking str table...') + best, mean = bench(t_str, ['s']) + print(f'str table: best={best:.4f}s mean={mean:.4f}s') + + print('Benchmarking list table...') + best, mean = bench(t_list, ['l']) + print(f'list table: best={best:.4f}s mean={mean:.4f}s') diff --git a/tests/performance/benchmarks/bench_intbuffer.py b/tests/performance/benchmarks/bench_intbuffer.py new file mode 100644 index 000000000..30d1c72ee --- /dev/null +++ b/tests/performance/benchmarks/bench_intbuffer.py @@ -0,0 +1,107 @@ +import os +import sys +import time + +sys.path.insert(1, os.path.join(sys.path[0], "../../..")) + +from opteryx.compiled.structures.buffers import IntBuffer +import numpy as np + + +def bench_append(N, iterations=10): + b = IntBuffer(size_hint=N) + # warmup + for i in range(1000): + b.append(i) + + times = [] + for _ in range(iterations): + b = IntBuffer(size_hint=N) + t0 = time.perf_counter() + for i in range(N): + b.append(i) + t1 = time.perf_counter() + times.append(t1 - t0) + + return min(times), sum(times) / len(times) + + +def bench_extend(N, chunk=10000, iterations=10): + # prepare chunks + chunks = [list(range(chunk)) for _ in range(max(1, N // chunk))] + + times = [] + for _ in range(iterations): + b = IntBuffer(size_hint=N) + t0 = time.perf_counter() + for c in chunks: + b.extend(c) + t1 = time.perf_counter() + times.append(t1 - t0) + + return min(times), sum(times) / len(times) + + +def bench_to_numpy(N, iterations=10): + b = IntBuffer(size_hint=N) + for i in range(N): + b.append(i) + + times = [] + for _ in range(iterations): + t0 = time.perf_counter() + _ = b.to_numpy() + t1 = time.perf_counter() + times.append(t1 - t0) + + return min(times), sum(times) / len(times) + + +def bench_python_list(N, iterations=5): + # baseline: python list append then numpy conversion + times_append = [] + times_to_numpy = [] + for _ in range(iterations): + lst = [] + t0 = time.perf_counter() + for i in range(N): + lst.append(i) + t1 = time.perf_counter() + times_append.append(t1 - t0) + + t0 = time.perf_counter() + arr = np.array(lst, dtype=np.int64) + t1 = time.perf_counter() + times_to_numpy.append(t1 - t0) + + return (min(times_append), sum(times_append) / len(times_append)), (min(times_to_numpy), sum(times_to_numpy) / len(times_to_numpy)) + + +if __name__ == '__main__': + import argparse + + parser = argparse.ArgumentParser() + parser.add_argument('--n', type=int, default=2_000_000, help='number of elements') + parser.add_argument('--iterations', type=int, default=10) + args = parser.parse_args() + + N = args.n + iters = args.iterations + print('IntBuffer benchmark — N=', N) + + print('\nBenchmarking append...') + best, mean = bench_append(N, iterations=iters) + print(f'append: best={best:.4f}s mean={mean:.4f}s') + + print('\nBenchmarking extend (chunks of 50000)...') + best, mean = bench_extend(N, chunk=50000, iterations=iters) + print(f'extend: best={best:.4f}s mean={mean:.4f}s') + + print('\nBenchmarking to_numpy...') + best, mean = bench_to_numpy(N, iterations=iters) + print(f'to_numpy: best={best:.4f}s mean={mean:.4f}s') + + print('\nPython list baseline (append + to_numpy)...') + (a_best, a_mean), (n_best, n_mean) = bench_python_list(N, iterations=iters) + print(f'py_append: best={a_best:.4f}s mean={a_mean:.4f}s') + print(f'py_to_numpy: best={n_best:.4f}s mean={n_mean:.4f}s') diff --git a/tests/performance/benchmarks/bench_lruk.py b/tests/performance/benchmarks/bench_lruk.py new file mode 100644 index 000000000..31f02cd25 --- /dev/null +++ b/tests/performance/benchmarks/bench_lruk.py @@ -0,0 +1,171 @@ +""" +Benchmark for the compiled LRU-K implementation. + +This follows the same style as other benchmarks in `tests/performance/benchmarks`. + +Scenarios: +- set_only: repeatedly insert unique keys until eviction occurs +Usage: + python -m benchmarks.bench_lruk + +Note: the compiled Cython `LRU_K` is preferred; the benchmark will import +`opteryx.compiled.structures.lru_k.LRU_K` and run the scenarios. +""" +import os +import sys + +# Ensure imports resolve when running from repository root +sys.path.insert(1, os.path.join(sys.path[0], "../../..")) + +import time +import random +import statistics +import argparse +import importlib + + +# Try to load the compiled Cython implementation; if not present leave as None +CompiledLRU = None +try: + _mod = importlib.import_module("opteryx.compiled.structures.lru_k") + CompiledLRU = getattr(_mod, "LRU_K", None) +except (ImportError, AttributeError): + CompiledLRU = None + + +# Minimal Python reference implementation for side-by-side comparison. +class PythonLRU: + def __init__(self, k=2): + self.k = k + self.slots = {} + self.hits = 0 + self.misses = 0 + self.evictions = 0 + self.inserts = 0 + + def do_work(self, iterations: int) -> None: + # small, deterministic CPU-bound loop (no imports of random) + x = 0x9e3779b97f4a7c15 + for i in range(iterations): + x = (x ^ (i + 0x9e3779b97f4a7c15)) * 0xbf58476d1ce4e5b9 & 0xFFFFFFFFFFFFFFFF + # use x so loop isn't optimized away + if x == 0xDEADBEEF: + print() + + def set(self, key, value): + self.inserts += 1 + self.slots[key] = value + self.do_work(10) # Simulate some overhead + + def get(self, key): + v = self.slots.get(key) + if v is None: + self.misses += 1 + else: + self.hits += 1 + return v + + +def timeit(func, *fargs, repeat=5): + times = [] + for _ in range(repeat): + t0 = time.perf_counter() + func(*fargs) + t1 = time.perf_counter() + times.append(t1 - t0) + return min(times), statistics.mean(times), (statistics.stdev(times) if len(times) > 1 else 0.0) + + +def bench_set_only(cache_cls, n_inserts, key_size=16, value_size=64): + """Insert unique keys until n_inserts performed. Measures throughput.""" + cache_obj = cache_cls() + + def runner(): + for i in range(n_inserts): + key = f"k-{i}".encode().ljust(key_size, b"_")[:key_size] + val = (f"v-{i}".encode()).ljust(value_size, b"_")[:value_size] + cache_obj.set(key, val) + + return runner, cache_obj + + +def bench_get_only(cache_cls, prepopulate, n_gets): + """Populate the cache with `prepopulate` items, then randomly get keys.""" + cache_obj = cache_cls() + keys = [] + for i in range(prepopulate): + k = f"k-{i}".encode() + cache_obj.set(k, b"v") + keys.append(k) + + def runner(): + for _ in range(n_gets): + k = random.choice(keys) + _ = cache_obj.get(k) + + return runner, cache_obj + + +def bench_mixed(cache_cls, prepopulate, n_ops, write_ratio=0.1): + cache_obj = cache_cls() + keys = [] + for i in range(prepopulate): + k = f"k-{i}".encode() + cache_obj.set(k, b"v") + keys.append(k) + + next_i = prepopulate + + def runner(): + nonlocal next_i + for _ in range(n_ops): + if random.random() < write_ratio: + k = f"k-{next_i}".encode() + cache_obj.set(k, b"v") + keys.append(k) + next_i += 1 + else: + k = random.choice(keys) + _ = cache_obj.get(k) + + return runner, cache_obj + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="LRU-K microbenchmarks") + parser.add_argument("--scenario", choices=["set_only", "get_only", "mixed"], default="mixed") + parser.add_argument("--inserts", type=int, default=100000) + parser.add_argument("--prepopulate", type=int, default=10000) + parser.add_argument("--gets", type=int, default=100000) + parser.add_argument("--ops", type=int, default=100000) + parser.add_argument("--repeat", type=int, default=10) + args = parser.parse_args() + + def run_and_report(cache_label, cache_cls): + if args.scenario == 'set_only': + run_fn, _ = bench_set_only(cache_cls, n_inserts=args.inserts) + ops = args.inserts + elif args.scenario == 'get_only': + run_fn, _ = bench_get_only(cache_cls, prepopulate=args.prepopulate, n_gets=args.gets) + ops = args.gets + else: + run_fn, _ = bench_mixed(cache_cls, prepopulate=args.prepopulate, n_ops=args.ops) + ops = args.ops + + mn, mean, sd = timeit(run_fn, repeat=args.repeat) + print(f"{cache_label}: min={mn:.4f}s mean={mean:.4f}s sd={sd:.4f}s ops/s={(ops/mn):.0f}") + + print() + print("Benchmarking LRU-K implementations") + print("---------------------------------") + + # Run compiled implementation if available + if CompiledLRU is not None: + print("Compiled LRU-K (Cython):") + run_and_report("compiled", CompiledLRU) + else: + print("Compiled LRU-K not available (skipping)") + + # Run stable Python reference implementation + print("Python reference LRU-K:") + run_and_report("python", PythonLRU) diff --git a/tests/performance/benchmarks/bench_memory_pool.py b/tests/performance/benchmarks/bench_memory_pool.py new file mode 100644 index 000000000..5ead5ebab --- /dev/null +++ b/tests/performance/benchmarks/bench_memory_pool.py @@ -0,0 +1,380 @@ +""" +Simple benchmark for opteryx compiled memory pool. + +This script attempts to import the Cython-compiled MemoryPool from +`opteryx.compiled.structures.memory_pool` and falls back to a plain-Python +shim if the compiled extension isn't available. It runs a few scenarios: + +- commit_only: repeatedly commit byte arrays and immediately release them +- commit_read_release: commit, read (zero-copy and copy), unlatch/release +- multi_threaded_reads: commits a pool of objects and spawns worker threads + performing zero-copy reads with latching to measure concurrent read throughput + +Usage: + python -m benchmarks.bench_memory_pool + +Note: For best results run against the compiled C extension built for this +repo (setup.py / build ext) so that the Cython `MemoryPool` is used. +""" + +import time +import threading +import random +import statistics +import argparse +import math + +from opteryx.compiled.structures.memory_pool import MemoryPool as CompiledMemoryPool + +class MemorySegment: + def __init__(self, start, length, latches=0): + self.start = start + self.length = length + self.latches = latches + +class PythonMemoryPool: + def __init__(self, size): + if size <= 0: + raise ValueError("size must be > 0") + self.size = size + self.pool = bytearray(size) + self.free_segments = [MemorySegment(0, size, 0)] + self.used_segments = {} + self.lock = threading.RLock() + self.next_ref_id = 1 + + def _find_free_segment(self, size): + for i, seg in enumerate(self.free_segments): + if seg.length >= size: + return i + return -1 + + def _level1_compaction(self): + self.free_segments.sort(key=lambda s: s.start) + out = [self.free_segments[0]] if self.free_segments else [] + for seg in self.free_segments[1:]: + last = out[-1] + if last.start + last.length == seg.start: + last.length += seg.length + else: + out.append(seg) + self.free_segments = out + + def _level2_compaction(self): + offset = 0 + items = sorted(self.used_segments.items(), key=lambda x: x[1].start) + for _, seg in items: + if seg.latches == 0 and seg.start != offset: + self.pool[offset:offset+seg.length] = self.pool[seg.start:seg.start+seg.length] + seg.start = offset + offset = max(offset, seg.start + seg.length) + free = [] + cur = 0 + for _, seg in sorted(self.used_segments.items(), key=lambda x: x[1].start): + if seg.start > cur: + free.append(MemorySegment(cur, seg.start - cur, 0)) + cur = seg.start + seg.length + if cur < self.size: + free.append(MemorySegment(cur, self.size - cur, 0)) + self.free_segments = free + + def commit(self, data): + if isinstance(data, (bytes, bytearray)): + raw = data + length = len(raw) + else: + raise TypeError("Unsupported data type for commit") + + ref = self.next_ref_id + self.next_ref_id += 1 + if length == 0: + self.used_segments[ref] = MemorySegment(0, 0, 0) + return ref + + total_free = sum(s.length for s in self.free_segments) + if total_free < length: + return -1 + + with self.lock: + idx = self._find_free_segment(length) + if idx == -1: + self._level1_compaction() + idx = self._find_free_segment(length) + if idx == -1: + self._level2_compaction() + idx = self._find_free_segment(length) + if idx == -1: + return -1 + + seg = self.free_segments.pop(idx) + if seg.length > length: + self.free_segments.append(MemorySegment(seg.start + length, seg.length - length, 0)) + start = seg.start + self.pool[start:start+length] = raw + self.used_segments[ref] = MemorySegment(start, length, 0) + + return ref + + def read(self, ref_id, zero_copy=0, latch=0): + with self.lock: + if ref_id not in self.used_segments: + raise ValueError("Invalid reference") + seg = self.used_segments[ref_id] + if latch: + seg.latches += 1 + if zero_copy: + return memoryview(self.pool)[seg.start:seg.start+seg.length] + else: + return bytes(self.pool[seg.start:seg.start+seg.length]) + + def unlatch(self, ref_id): + with self.lock: + seg = self.used_segments[ref_id] + if seg.latches == 0: + raise RuntimeError("not latched") + seg.latches -= 1 + + def release(self, ref_id): + with self.lock: + seg = self.used_segments.pop(ref_id) + self.free_segments.append(seg) + + def available_space(self): + return sum(s.length for s in self.free_segments) + +# Benchmark harness + +def timeit(func, *fargs, repeat=5): + times = [] + for _ in range(repeat): + start_time = time.perf_counter() + func(*fargs) + end_time = time.perf_counter() + times.append(end_time - start_time) + return min(times), statistics.mean(times), (statistics.stdev(times) if len(times) > 1 else 0.0) + +def bench_commit_immediate_release(memory_pool, size, payload_size, iterations): + """Commit and immediately release on each iteration so the pool is reused.""" + pool = memory_pool(size) + payload = b"x" * payload_size + + def run(): + for _ in range(iterations): + r = pool.commit(payload) + if r == -1: + # retry once; if still out of space, stop + r = pool.commit(payload) + pool.release(r) + + return run + + +def bench_commit_read_release(memory_pool, size, payload_size, iterations, zero_copy=False, latch=False): + pool = memory_pool(size) + payload = b"y" * payload_size + + def run(): + refs = [] + for _ in range(iterations): + r = pool.commit(payload) + if r == -1: + break + # read + data = pool.read(r, zero_copy=1 if zero_copy else 0, latch=1 if latch else 0) + if latch and isinstance(data, memoryview): + # unlatch immediately + pool.unlatch(r) + refs.append(r) + for r in refs: + pool.release(r) + + return run + + +def bench_commit_read_release_immediate(memory_pool, size, payload_size, iterations, zero_copy=False, latch=False): + """Commit, read and immediately release on each iteration so the pool is reused.""" + pool = memory_pool(size) + payload = b"y" * payload_size + + def run(): + for _ in range(iterations): + r = pool.commit(payload) + if r == -1: + # retry once; if still out of space, stop + r = pool.commit(payload) + if r == -1: + break + + data = pool.read(r, zero_copy=1 if zero_copy else 0, latch=1 if latch else 0) + if latch and isinstance(data, memoryview): + pool.unlatch(r) + pool.release(r) + + return run + + +def bench_multi_threaded_reads(memory_pool, size, payload_size, n_items, n_threads, iters_per_thread): + pool = memory_pool(size) + payload = b"m" * payload_size + refs = [] + for _ in range(n_items): + r = pool.commit(payload) + if r == -1: + break + refs.append(r) + + stop = threading.Event() + latched_counts = [0] + + def worker(): + local_reads = 0 + while not stop.is_set(): + r = random.choice(refs) + data = pool.read(r, zero_copy=1, latch=1) + # simulate small work + _ = len(data) + pool.unlatch(r) + local_reads += 1 + if local_reads >= iters_per_thread: + break + latched_counts[0] += local_reads + + threads = [threading.Thread(target=worker) for _ in range(n_threads)] + + def run(): + for t in threads: + t.start() + for t in threads: + t.join() + stop.set() + # no cleanup for brevity + + return run, pool, refs + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="MemoryPool stress benchmark") + parser.add_argument("--scenario", choices=["commit","commit_read","commit_read_immediate","multi_read"], default="commit_read_immediate") + parser.add_argument("--pool-size", type=int, default=16 * 1024 * 1024) + parser.add_argument("--payload-size", type=int, default=1024 * 1024) + parser.add_argument("--iterations", type=int, default=5000) + parser.add_argument("--threads", type=int, default=1) + parser.add_argument("--items", type=int, default=1024, help="number of items to pre-commit for multi_read") + parser.add_argument("--iters-per-thread", type=int, default=1000) + parser.add_argument("--sample-rate", type=int, default=1000, help="collect latency sample every N ops") + parser.add_argument("--zero-copy", action="store_true") + parser.add_argument("--latch", action="store_true") + parser.add_argument("--warmup", type=int, default=1000, help="warmup iterations to skip from sampling") + + args = parser.parse_args() + + def measure_commit_read_release_immediate(memory_pool, size, payload_size, iterations, sample_rate=1000, zero_copy=False, latch=False, warmup=1000): + pool = memory_pool(size) + payload = b"y" * payload_size + + samples = [] + total_bytes = 0 + # warmup iterations + for _ in range(min(warmup, iterations)): + r = pool.commit(payload) + if r == -1: + r = pool.commit(payload) + if r == -1: + break + _ = pool.read(r, zero_copy=1 if zero_copy else 0, latch=1 if latch else 0) + if latch: + pool.unlatch(r) + pool.release(r) + + start = time.perf_counter() + for i in range(iterations - min(warmup, iterations)): + op_t0 = time.perf_counter() + r = pool.commit(payload) + if r == -1: + # retry once + r = pool.commit(payload) + if r == -1: + break + data = pool.read(r, zero_copy=1 if zero_copy else 0, latch=1 if latch else 0) + if latch: + pool.unlatch(r) + pool.release(r) + op_t1 = time.perf_counter() + + total_bytes += payload_size + if (i % sample_rate) == 0: + samples.append((op_t1 - op_t0) * 1e6) # microseconds + + stop = time.perf_counter() + total_ops = (iterations - min(warmup, iterations)) + elapsed = stop - start + ops_per_sec = total_ops / elapsed if elapsed > 0 else float('inf') + mb_per_sec = (total_bytes / 1024 / 1024) / elapsed if elapsed > 0 else float('inf') + + # latency stats from samples + lat_stats = {} + if samples: + samples_sorted = sorted(samples) + lat_stats['p50'] = statistics.median(samples_sorted) + lat_stats['p95'] = samples_sorted[min(len(samples_sorted)-1, math.floor(len(samples_sorted)*0.95))] + lat_stats['p99'] = samples_sorted[min(len(samples_sorted)-1, math.floor(len(samples_sorted)*0.99))] + lat_stats['mean'] = statistics.mean(samples_sorted) + lat_stats['stddev'] = statistics.stdev(samples_sorted) if len(samples_sorted) > 1 else 0.0 + + return { + 'total_ops': total_ops, + 'elapsed_s': elapsed, + 'ops_per_sec': ops_per_sec, + 'mb_per_sec': mb_per_sec, + 'latency_samples': len(samples), + 'latency_stats': lat_stats, + } + + if args.scenario == 'commit_read_immediate': + print(f"Running commit+read+release immediate: iterations={args.iterations} payload={args.payload_size} threads={args.threads}") + if args.threads > 1: + # simple multi-threaded executor that runs the same work per thread and aggregates + results = [] + def thread_target(): + res = measure_commit_read_release_immediate(CompiledMemoryPool, args.pool_size, args.payload_size, args.iterations // args.threads, args.sample_rate, args.zero_copy, args.latch, args.warmup) + results.append(res) + + ths = [threading.Thread(target=thread_target) for _ in range(args.threads)] + t0 = time.perf_counter() + for t in ths: + t.start() + for t in ths: + t.join() + t1 = time.perf_counter() + + # aggregate + total_ops = sum(r['total_ops'] for r in results) + elapsed = t1 - t0 + ops_per_sec = total_ops / elapsed if elapsed > 0 else float('inf') + mb_per_sec = sum(r['mb_per_sec'] for r in results) / len(results) if results else 0.0 + print(f"Total ops: {total_ops} elapsed: {elapsed:.4f}s ops/s: {ops_per_sec:.0f} MB/s: {mb_per_sec:.2f}") + else: + print() + print("Compiled MemoryPool results:") + res = measure_commit_read_release_immediate(CompiledMemoryPool, args.pool_size, args.payload_size, args.iterations, args.sample_rate, args.zero_copy, args.latch, args.warmup) + print(f"Total ops: {res['total_ops']} elapsed: {res['elapsed_s']:.4f}s ops/s: {res['ops_per_sec']:.0f} MB/s: {res['mb_per_sec']:.2f}") + if res['latency_stats']: + print("Latency (us): p50={p50:.2f} p95={p95:.2f} p99={p99:.2f} mean={mean:.2f} stddev={stddev:.2f}".format(**res['latency_stats'])) + + print() + print("Python MemoryPool results:") + res = measure_commit_read_release_immediate(PythonMemoryPool, args.pool_size, args.payload_size, args.iterations, args.sample_rate, args.zero_copy, args.latch, args.warmup) + print(f"Total ops: {res['total_ops']} elapsed: {res['elapsed_s']:.4f}s ops/s: {res['ops_per_sec']:.0f} MB/s: {res['mb_per_sec']:.2f}") + if res['latency_stats']: + print("Latency (us): p50={p50:.2f} p95={p95:.2f} p99={p99:.2f} mean={mean:.2f} stddev={stddev:.2f}".format(**res['latency_stats'])) + + elif args.scenario == 'multi_read': + print("Running multi-threaded read scenario") + run_mt, pool_mt, refs_mt = bench_multi_threaded_reads(args.pool_size, args.payload_size, args.items, args.threads, args.iters_per_thread) + t0 = time.perf_counter() + run_mt() + t1 = time.perf_counter() + print(f"multi_read elapsed: {t1-t0:.4f}s") + + else: + print("Unknown or unsupported scenario selected") diff --git a/tests/unit/core/test_lruk.py b/tests/unit/core/test_lruk.py index 7c04e2d81..32b626ea4 100644 --- a/tests/unit/core/test_lruk.py +++ b/tests/unit/core/test_lruk.py @@ -2,7 +2,7 @@ import os import sys -sys.path.insert(1, os.path.join(sys.path[0], "../..")) +sys.path.insert(1, os.path.join(sys.path[0], "../../..")) from opteryx.compiled.structures.lru_k import LRU_K from tests import is_windows, skip_if diff --git a/tests/unit/core/test_memory_pool.py b/tests/unit/core/test_memory_pool.py index 54f1af7e8..c75bbb9f2 100644 --- a/tests/unit/core/test_memory_pool.py +++ b/tests/unit/core/test_memory_pool.py @@ -34,7 +34,7 @@ # os.environ["OPTERYX_DEBUG"] = "1" -sys.path.insert(1, os.path.join(sys.path[0], "../..")) +sys.path.insert(1, os.path.join(sys.path[0], "../../..")) from orso.tools import random_string @@ -884,6 +884,4 @@ def validate_integrity(): if __name__ == "__main__": # pragma: no cover from tests import run_tests - test_latch_counting() - run_tests() diff --git a/tests/unit/core/test_memory_pool_async.py b/tests/unit/core/test_memory_pool_async.py index d7299fa93..57666728e 100644 --- a/tests/unit/core/test_memory_pool_async.py +++ b/tests/unit/core/test_memory_pool_async.py @@ -11,7 +11,7 @@ import random import sys -sys.path.insert(1, os.path.join(sys.path[0], "../..")) +sys.path.insert(1, os.path.join(sys.path[0], "../../..")) os.environ["OPTERYX_DEBUG"] = "1" diff --git a/tests/unit/diagnostic/test_list_chunk_offsets_hash.py b/tests/unit/diagnostic/test_list_chunk_offsets_hash.py new file mode 100644 index 000000000..65a28434f --- /dev/null +++ b/tests/unit/diagnostic/test_list_chunk_offsets_hash.py @@ -0,0 +1,39 @@ +import os +import sys + +sys.path.insert(1, os.path.join(sys.path[0], "../../..")) + + +import pyarrow as pa + + +def test_list_chunk_offsets_hash_consistency(): + try: + from opteryx.compiled.table_ops.hash_ops import compute_hashes + except ImportError: + import pytest + pytest.skip("Cython hash_ops not available") + + full = [[1, 2], [3], [], None, [4, 5]] + list_type = pa.list_(pa.int64()) + + # Flat table + t_flat = pa.table({"l": pa.array(full, type=list_type)}) + + # Build a chunked column where the second chunk is a slice (non-zero offset) + chunk0 = pa.array(full[:2], type=list_type) + chunk1_src = pa.array([None] + full[2:], type=list_type) + # slice away the leading None so this chunk has a non-zero offset into its buffers + chunk1 = chunk1_src.slice(1, len(full) - 2) + chunked = pa.chunked_array([chunk0, chunk1]) + t_chunked = pa.Table.from_arrays([chunked], names=["l"]) + + h_flat = compute_hashes(t_flat, ["l"]) + h_chunked = compute_hashes(t_chunked, ["l"]) + + assert list(h_flat) == list(h_chunked) + +if __name__ == "__main__": + from tests import run_tests + + run_tests() diff --git a/tests/unit/diagnostic/test_list_fast_paths.py b/tests/unit/diagnostic/test_list_fast_paths.py new file mode 100644 index 000000000..accaa69f6 --- /dev/null +++ b/tests/unit/diagnostic/test_list_fast_paths.py @@ -0,0 +1,88 @@ +import os +import sys + +sys.path.insert(1, os.path.join(sys.path[0], "../../..")) + +import pyarrow as pa +from opteryx.compiled.table_ops.hash_ops import compute_hashes + + +def test_list_of_ints_matches_per_element_hashing(): + N = 1000 + data = [[i % 7 for _ in range(3)] for i in range(N)] + t = pa.table({"l": pa.array(data)}) + + # Compute hashes using our function + h1 = compute_hashes(t, ["l"]) + # Compute a Python-level reference by hashing tuples per-row (exact parity) + # Compute a reference using the same mixing logic as the Cython implementation + SEED = 0x9e3779b97f4a7c15 & 0xFFFFFFFFFFFFFFFF + c1 = 0xbf58476d1ce4e5b9 & 0xFFFFFFFFFFFFFFFF + c2 = 0x94d049bb133111eb & 0xFFFFFFFFFFFFFFFF + FINAL_MUL = 0x9e3779b97f4a7c15 & 0xFFFFFFFFFFFFFFFF + + ref = [] + for row in data: + # per-list hash + if len(row) == 0: + list_hash = 0xab52d8afc1448992 & 0xFFFFFFFFFFFFFFFF # EMPTY_HASH + else: + list_hash = SEED + for elem in row: + elem_hash = elem & 0xFFFFFFFFFFFFFFFF + # mix element + list_hash = elem_hash ^ list_hash + list_hash = (list_hash ^ (list_hash >> 30)) * c1 & 0xFFFFFFFFFFFFFFFF + list_hash = (list_hash ^ (list_hash >> 27)) * c2 & 0xFFFFFFFFFFFFFFFF + list_hash = list_hash ^ (list_hash >> 31) + + # final row mix (update_row_hash behavior) + h = 0 + h = (h ^ list_hash) * FINAL_MUL & 0xFFFFFFFFFFFFFFFF + h ^= (h >> 32) + ref.append(h) + + assert len(h1) == len(ref) + for i in range(len(ref)): + assert h1[i] == ref[i] + + +def test_list_of_strings_and_chunked_slices(): + # Create long list and then create a chunked / sliced version + data = [[f'str{i % 10}' for _ in range(2)] for i in range(2000)] + arr = pa.array(data) + # make chunked by slicing into two + a1 = arr.slice(0, 1200) + a2 = arr.slice(1200) + chunked = pa.chunked_array([a1, a2]) + t_chunked = pa.table({"l": chunked}) + + t_flat = pa.table({"l": arr}) + + h_flat = compute_hashes(t_flat, ["l"]) + h_chunked = compute_hashes(t_chunked, ["l"]) + + assert len(h_flat) == len(h_chunked) + for i in range(len(h_flat)): + assert h_flat[i] == h_chunked[i] + + +def test_nested_and_boolean_lists(): + # Nested lists of primitives + data_nested = [[[i % 3, (i + 1) % 3] for _ in range(2)] for i in range(500)] + t_nested = pa.table({"nl": pa.array(data_nested)}) + + # Boolean lists + data_bool = [[(i + j) % 2 == 0 for j in range(4)] for i in range(500)] + t_bool = pa.table({"bl": pa.array(data_bool)}) + + hn = compute_hashes(t_nested, ["nl"]) + hb = compute_hashes(t_bool, ["bl"]) + + assert len(hn) == 500 + assert len(hb) == 500 + +if __name__ == "__main__": + from tests import run_tests + + run_tests() diff --git a/tests/unit/diagnostic/test_non_null_indices_offsets.py b/tests/unit/diagnostic/test_non_null_indices_offsets.py new file mode 100644 index 000000000..902984d56 --- /dev/null +++ b/tests/unit/diagnostic/test_non_null_indices_offsets.py @@ -0,0 +1,37 @@ +import os +import sys + +sys.path.insert(1, os.path.join(sys.path[0], "../../..")) + +import pyarrow as pa + + +def test_non_null_indices_with_chunk_offsets(): + try: + from opteryx.compiled.table_ops.null_avoidant_ops import non_null_indices + except ImportError: + import pytest + pytest.skip("Cython null_avoidant_ops not available") + + # Build an array with some nulls and then create a chunked array where one chunk is sliced + base = [1, None, 3, None, 5, 6] + arr = pa.array(base, type=pa.int64()) + + # Create two chunks; second chunk will be a slice to create non-zero offset + c0 = pa.array(base[:3], type=pa.int64()) + c1_src = pa.array([None] + base[3:], type=pa.int64()) + c1 = c1_src.slice(1, len(base) - 3) + + chunked = pa.chunked_array([c0, c1]) + table = pa.table({"v": chunked}) + + idxs = non_null_indices(table, ["v"]).tolist() + + # Expected non-null positions in original full array + expected = [i for i, v in enumerate(base) if v is not None] + assert idxs == expected + +if __name__ == "__main__": + from tests import run_tests + + run_tests() diff --git a/tests/unit/storage/test_blob_gcs.py b/tests/unit/storage/test_blob_gcs.py index ffae9fc50..852ecc69e 100644 --- a/tests/unit/storage/test_blob_gcs.py +++ b/tests/unit/storage/test_blob_gcs.py @@ -4,7 +4,7 @@ import pytest -sys.path.insert(1, os.path.join(sys.path[0], "../..")) +sys.path.insert(1, os.path.join(sys.path[0], "../../..")) import opteryx from opteryx.connectors import GcpCloudStorageConnector