Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 12 additions & 9 deletions python/xorq/caching/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,17 +149,18 @@ def get_key(self, expr: ir.Expr):

@frozen
class SnapshotStrategy(CacheStrategy):
@classmethod
@contextlib.contextmanager
def normalization_context(self, expr):
def normalization_context(cls, expr):
typs = map(type, expr.ls.backends)
with patch_normalize_token(*typs, f=self.normalize_backend):
with patch_normalize_token(*typs, f=cls.normalize_backend):
with patch_normalize_token(
ops.DatabaseTable,
f=self.normalize_databasetable,
f=cls.normalize_databasetable,
):
with patch_normalize_token(
Read,
f=self.cached_normalize_read,
f=cls.cached_normalize_read,
):
yield

Expand Down Expand Up @@ -188,16 +189,18 @@ def rename_remote_table(node, _, **kwargs):

return op.replace(rename_remote_table)

def replace_remote_table(self, expr):
@classmethod
def replace_remote_table(cls, expr):
"""replace remote table with deterministic name ***strictly for key calculation***"""
if expr.op().find(RemoteTable):
expr = self.cached_replace_remote_table(expr.op()).to_expr()
expr = cls.cached_replace_remote_table(expr.op()).to_expr()
return expr

def get_key(self, expr: ir.Expr):
@classmethod
def get_key(cls, expr: ir.Expr):
# can we cache this?
with self.normalization_context(expr):
replaced = self.replace_remote_table(expr)
with cls.normalization_context(expr):
replaced = cls.replace_remote_table(expr)
tokenized = replaced.ls.tokenized
return "-".join(("snapshot", tokenized))

Expand Down
32 changes: 32 additions & 0 deletions python/xorq/tests/test_ls_accessor.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from pathlib import Path

import dask
import pytest

import xorq.api as xo
Expand Down Expand Up @@ -172,3 +173,34 @@ def test_cache_properties(parquet_dir, tmp_path):
assert not psn1.to_expr().ls.cache_path.exists()
assert not psn2.to_expr().ls.cache_path.exists()
assert ssn.to_expr().ls.cache_path is None


def test_tokenized_matches():
expr = xo.examples.batting.fetch()[lambda t: t.stint == 0]
actual = expr.ls.tokenized
expected = dask.base.tokenize(expr)
assert actual == expected


def test_tokenized_changes():
expr = xo.examples.batting.fetch()[lambda t: t.stint == 0]
(path,) = (
Path(dict(op.read_kwargs)["path"])
for op in expr.op().find(xo.expr.relations.Read)
)
before = expr.ls.tokenized
path.touch()
after = expr.ls.tokenized
assert before != after


def test_tokenized_snapshot():
expr = xo.examples.batting.fetch()[lambda t: t.stint == 0]
(path,) = (
Path(dict(op.read_kwargs)["path"])
for op in expr.op().find(xo.expr.relations.Read)
)
before = expr.ls.tokenized_snapshot
path.touch()
after = expr.ls.tokenized_snapshot
assert before == after
7 changes: 7 additions & 0 deletions python/xorq/vendor/ibis/expr/types/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -816,6 +816,13 @@ def tokenized(self):
# NOTE: this should almost certainly not be functools.cache'd: it can obscure filesystem / source table changes within the same process run
return patched_tokenize(self.expr)

@property
def tokenized_snapshot(self):
from xorq.caching import SnapshotStrategy

with SnapshotStrategy.normalization_context(self.expr):
return self.tokenized

def get_cache_path(self):
from xorq.caching import ParquetStorage

Expand Down