Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 116 additions & 5 deletions pyiceberg/expressions/visitors.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import hashlib
import math
import threading
from abc import ABC, abstractmethod
from functools import singledispatch
from typing import (
Expand All @@ -23,6 +25,7 @@
Dict,
Generic,
List,
Optional,
Set,
SupportsFloat,
Tuple,
Expand Down Expand Up @@ -1976,11 +1979,119 @@ def residual_for(self, partition_data: Record) -> BooleanExpression:
return self.expr


_DEFAULT_RESIDUAL_EVALUATOR_CACHE_SIZE = 128


class ResidualEvaluatorCache:
"""Thread-safe LRU cache for ResidualEvaluator instances.

Caches ResidualEvaluators to avoid repeated instantiation and initialization
overhead when scanning multiple data files with identical partition specs,
expressions, schemas, and case sensitivity settings.
"""

_cache: Dict[str, ResidualEvaluator]
_maxsize: int
_lock: threading.RLock

def __init__(self, maxsize: int = _DEFAULT_RESIDUAL_EVALUATOR_CACHE_SIZE) -> None:
"""Initialize the cache.

Args:
maxsize: Maximum number of evaluators to cache. Defaults to 128.
"""
self._cache = {}
self._maxsize = maxsize
self._lock = threading.RLock()

@staticmethod
def _make_key(
spec_id: int,
expr: BooleanExpression,
case_sensitive: bool,
schema_id: Optional[int] = None,
) -> str:
"""Create deterministic cache key from evaluator parameters.

Args:
spec_id: Partition spec identifier.
expr: Filter expression tree.
case_sensitive: Case-sensitive flag.
schema_id: Optional schema identifier.

Returns:
32-character MD5 hex string cache key.
"""
key_parts = f"{spec_id}#{repr(expr)}#{case_sensitive}#{schema_id}"
return hashlib.md5(key_parts.encode()).hexdigest()

def get(
self,
spec: PartitionSpec,
expr: BooleanExpression,
case_sensitive: bool,
schema: Schema,
) -> Optional[ResidualEvaluator]:
"""Retrieve cached evaluator if it exists.

Args:
spec: Partition specification.
expr: Filter expression.
case_sensitive: Case sensitivity flag.
schema: Table schema.

Returns:
Cached ResidualEvaluator or None.
"""
cache_key = self._make_key(spec.spec_id, expr, case_sensitive, schema.schema_id)
with self._lock:
return self._cache.get(cache_key)

def put(
self,
spec: PartitionSpec,
expr: BooleanExpression,
case_sensitive: bool,
schema: Schema,
evaluator: ResidualEvaluator,
) -> None:
"""Cache a ResidualEvaluator instance.

Args:
spec: Partition specification.
expr: Filter expression.
case_sensitive: Case sensitivity flag.
schema: Table schema.
evaluator: ResidualEvaluator to cache.
"""
cache_key = self._make_key(spec.spec_id, expr, case_sensitive, schema.schema_id)
with self._lock:
if len(self._cache) >= self._maxsize:
oldest_key = next(iter(self._cache))
del self._cache[oldest_key]
self._cache[cache_key] = evaluator

def clear(self) -> None:
"""Clear all cached evaluators."""
with self._lock:
self._cache.clear()


_residual_evaluator_cache = ResidualEvaluatorCache()


def residual_evaluator_of(
spec: PartitionSpec, expr: BooleanExpression, case_sensitive: bool, schema: Schema
) -> ResidualEvaluator:
return (
UnpartitionedResidualEvaluator(schema=schema, expr=expr)
if spec.is_unpartitioned()
else ResidualEvaluator(spec=spec, expr=expr, schema=schema, case_sensitive=case_sensitive)
)
cached = _residual_evaluator_cache.get(spec, expr, case_sensitive, schema)
if cached is not None:
return cached

evaluator: ResidualEvaluator
if spec.is_unpartitioned():
evaluator = UnpartitionedResidualEvaluator(schema=schema, expr=expr)
else:
evaluator = ResidualEvaluator(spec=spec, expr=expr, schema=schema, case_sensitive=case_sensitive)

_residual_evaluator_cache.put(spec, expr, case_sensitive, schema, evaluator)
return evaluator
Loading