diff --git a/python/xorq/caching/__init__.py b/python/xorq/caching/__init__.py index d82871522..4de7779b0 100644 --- a/python/xorq/caching/__init__.py +++ b/python/xorq/caching/__init__.py @@ -149,17 +149,18 @@ def get_key(self, expr: ir.Expr): @frozen class SnapshotStrategy(CacheStrategy): + @classmethod @contextlib.contextmanager - def normalization_context(self, expr): + def normalization_context(cls, expr): typs = map(type, expr.ls.backends) - with patch_normalize_token(*typs, f=self.normalize_backend): + with patch_normalize_token(*typs, f=cls.normalize_backend): with patch_normalize_token( ops.DatabaseTable, - f=self.normalize_databasetable, + f=cls.normalize_databasetable, ): with patch_normalize_token( Read, - f=self.cached_normalize_read, + f=cls.cached_normalize_read, ): yield @@ -188,16 +189,18 @@ def rename_remote_table(node, _, **kwargs): return op.replace(rename_remote_table) - def replace_remote_table(self, expr): + @classmethod + def replace_remote_table(cls, expr): """replace remote table with deterministic name ***strictly for key calculation***""" if expr.op().find(RemoteTable): - expr = self.cached_replace_remote_table(expr.op()).to_expr() + expr = cls.cached_replace_remote_table(expr.op()).to_expr() return expr - def get_key(self, expr: ir.Expr): + @classmethod + def get_key(cls, expr: ir.Expr): # can we cache this? - with self.normalization_context(expr): - replaced = self.replace_remote_table(expr) + with cls.normalization_context(expr): + replaced = cls.replace_remote_table(expr) tokenized = replaced.ls.tokenized return "-".join(("snapshot", tokenized)) diff --git a/python/xorq/tests/test_ls_accessor.py b/python/xorq/tests/test_ls_accessor.py index aaa4dd361..883ec464b 100644 --- a/python/xorq/tests/test_ls_accessor.py +++ b/python/xorq/tests/test_ls_accessor.py @@ -1,5 +1,6 @@ from pathlib import Path +import dask import pytest import xorq.api as xo @@ -172,3 +173,34 @@ def test_cache_properties(parquet_dir, tmp_path): assert not psn1.to_expr().ls.cache_path.exists() assert not psn2.to_expr().ls.cache_path.exists() assert ssn.to_expr().ls.cache_path is None + + +def test_tokenized_matches(): + expr = xo.examples.batting.fetch()[lambda t: t.stint == 0] + actual = expr.ls.tokenized + expected = dask.base.tokenize(expr) + assert actual == expected + + +def test_tokenized_changes(): + expr = xo.examples.batting.fetch()[lambda t: t.stint == 0] + (path,) = ( + Path(dict(op.read_kwargs)["path"]) + for op in expr.op().find(xo.expr.relations.Read) + ) + before = expr.ls.tokenized + path.touch() + after = expr.ls.tokenized + assert before != after + + +def test_tokenized_snapshot(): + expr = xo.examples.batting.fetch()[lambda t: t.stint == 0] + (path,) = ( + Path(dict(op.read_kwargs)["path"]) + for op in expr.op().find(xo.expr.relations.Read) + ) + before = expr.ls.tokenized_snapshot + path.touch() + after = expr.ls.tokenized_snapshot + assert before == after diff --git a/python/xorq/vendor/ibis/expr/types/core.py b/python/xorq/vendor/ibis/expr/types/core.py index 9ea06b334..506cbcf80 100644 --- a/python/xorq/vendor/ibis/expr/types/core.py +++ b/python/xorq/vendor/ibis/expr/types/core.py @@ -816,6 +816,13 @@ def tokenized(self): # NOTE: this should almost certainly not be functools.cache'd: it can obscure filesystem / source table changes within the same process run return patched_tokenize(self.expr) + @property + def tokenized_snapshot(self): + from xorq.caching import SnapshotStrategy + + with SnapshotStrategy.normalization_context(self.expr): + return self.tokenized + def get_cache_path(self): from xorq.caching import ParquetStorage