Skip to content

Commit 47884e8

Browse files
committed
wip
1 parent fd9f032 commit 47884e8

File tree

3 files changed

+267
-0
lines changed

3 files changed

+267
-0
lines changed
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
# ruff: noqa: F403
2+
from .generic import *
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
"""Generic caching utilities for pydantic models
2+
3+
This is a generic set of caching utilities that can be used with any pydantic
4+
model. This allows us to cache many python objects. Rehydration should be done
5+
by the caller but this provides the tools to store and retrieve the objects in
6+
the cache.
7+
8+
We have two kinds of cacheable objects:
9+
10+
1. A single object that can be cached and retrieved
11+
2. A streamable object
12+
"""
13+
14+
import abc
15+
import hashlib
16+
import typing as t
17+
from pathlib import Path
18+
19+
import arrow
20+
from pydantic import BaseModel
21+
22+
T = t.TypeVar("T", bound=BaseModel)
23+
24+
25+
class CacheOptions(BaseModel):
26+
ttl: int = 0 # Time to live for the cache in seconds. 0 means no expiration.
27+
28+
29+
def key_from_model(model: BaseModel) -> str:
30+
"""Generate a cache key from a pydantic model"""
31+
return hashlib.sha256(model.model_dump_json().encode()).hexdigest()
32+
33+
34+
class CacheMetadata(BaseModel):
35+
"""Metadata for the cache entry
36+
37+
This metadata is used to store information about the cache entry such as
38+
when it was created, when it expires, etc.
39+
40+
Attributes:
41+
created_at: The time when the cache entry was created in ISO format.
42+
valid_until: The time when the cache entry expires in ISO format. If
43+
None, the entry only expires if passed in options ttl is
44+
exceeded. If it is set, it is always checked against the
45+
current time.
46+
"""
47+
48+
created_at: str # ISO format timestamp
49+
valid_until: str | None = None # ISO format timestamp
50+
51+
def is_valid(self, options: CacheOptions | None = None) -> bool:
52+
"""Check if the cache entry is valid based on the ttl"""
53+
created_at = arrow.get(self.created_at)
54+
now = arrow.now()
55+
age = created_at - now
56+
57+
if self.valid_until:
58+
valid_until = arrow.get(self.valid_until)
59+
if valid_until < now:
60+
return False
61+
62+
if options:
63+
if options.ttl == 0:
64+
return True
65+
66+
if age.total_seconds() > options.ttl:
67+
return False
68+
69+
return False # Placeholder for actual expiration logic
70+
71+
72+
class CacheBackend(abc.ABC):
73+
"""A generic cache backend interface
74+
75+
This is a generic interface for a cache backend that can be used to store
76+
and retrieve pydantic models. The actual implementation of the cache backend
77+
should inherit from this class and implement the methods.
78+
"""
79+
80+
@abc.abstractmethod
81+
def store_object(self, key: BaseModel, value: BaseModel) -> None:
82+
"""Store a single object in the cache"""
83+
...
84+
85+
@abc.abstractmethod
86+
def retrieve_object(
87+
self,
88+
key: BaseModel,
89+
model_type: type[T],
90+
override_options: CacheOptions | None = None,
91+
) -> T:
92+
"""Retrieve a single object from the cache"""
93+
...
94+
95+
96+
class FileCacheBackend(CacheBackend):
97+
"""A generic cache for pydantic models"""
98+
99+
def __init__(self, cache_dir: str, default_options: CacheOptions):
100+
self.cache_dir = cache_dir
101+
self.default_options = default_options
102+
103+
def store_object(self, key: BaseModel, value: BaseModel) -> None:
104+
"""Store a single object in the cache"""
105+
# Ensure the cache directory exists
106+
self._ensure_cache_dir()
107+
# Create a file path based on the key
108+
file_path = self._cache_key_path(key)
109+
110+
# Write the value to the file
111+
with open(file_path, "w") as f:
112+
f.write(value.model_dump_json())
113+
114+
def retrieve_object(
115+
self,
116+
key: BaseModel,
117+
model_type: type[T],
118+
override_options: CacheOptions | None = None,
119+
) -> T:
120+
"""Retrieve a single object from the cache"""
121+
self._ensure_cache_dir()
122+
file_path = self._cache_key_path(key)
123+
if not file_path.exists():
124+
raise FileNotFoundError(f"Cache file not found: {file_path}")
125+
126+
with open(file_path, "r") as f:
127+
return model_type.model_validate_json(f.read())
128+
129+
def _cache_dir_path(self):
130+
"""Get the cache directory path"""
131+
return Path(self.cache_dir)
132+
133+
def _ensure_cache_dir(self):
134+
"""Ensure the cache directory exists"""
135+
self._cache_dir_path().mkdir(parents=True, exist_ok=True)
136+
137+
def _cache_key(self, key: BaseModel) -> str:
138+
"""Generate a cache key from the pydantic model"""
139+
key_str = hashlib.sha256(key.model_dump_json().encode()).hexdigest()
140+
return f"{key_str}.json"
141+
142+
def _cache_key_path(self, key: BaseModel) -> Path:
143+
"""Get the cache file path for a given key"""
144+
return self._cache_dir_path() / self._cache_key(key)
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
"""Provide a way to cache dagster assets that are loaded with our factory utilities"""
2+
3+
import typing as t
4+
from inspect import signature
5+
6+
import dagster as dg
7+
from pydantic import BaseModel
8+
9+
10+
class CacheableAssetOptions(BaseModel):
11+
"""Derived from dg.asset.__annotations__
12+
13+
For options that are not support by the cache, we set the type to None so
14+
that attempts to use them will fail at static type checking time.
15+
"""
16+
17+
name: str | None = None
18+
key_prefix: t.Union[str, t.Sequence[str]] | None = None
19+
ins: t.Mapping[str, dg.AssetIn] | None = None
20+
deps: t.Iterable[t.Union[dg.AssetKey, str, t.Sequence[str], dg.AssetDep]] | None = (
21+
None
22+
)
23+
metadata: t.Mapping[str, t.Any] | None = None
24+
tags: t.Mapping[str, str] | None = None
25+
description: str | None = None
26+
config_schema: t.Type[dg.Config] | None = None
27+
required_resource_keys: t.AbstractSet[str] | None = None
28+
resource_defs: t.Mapping[str, object] | None = None
29+
# hooks currently not supported. We set the type to None to force static type failure if used
30+
hooks: None = None
31+
io_manager_def: object | None = None
32+
io_manager_key: str | None = None
33+
# dagster_type currently not supported
34+
dagster_type: None = None
35+
# partitions_def currently not supported
36+
op_tags: t.Mapping[str, t.Any] | None = None
37+
group_name: str | None = None
38+
output_required: bool = False
39+
automation_condition: None = None
40+
freshness_policy: None = None
41+
backfill_policy: None = None
42+
retry_policy: None = None
43+
code_version: str | None = None
44+
key: t.Union[dg.AssetKey, str, t.Sequence[str]] | None = None
45+
check_specs: None = None
46+
owners: t.Sequence[str] | None = None
47+
kinds: t.AbstractSet[str] | None = None
48+
pool: str | None = None
49+
50+
51+
class CacheableMultiAssetOptions(BaseModel):
52+
"""Derived from dg.multi_asset.__annotations__
53+
54+
For options that are not support by the cache, we set the type to None so
55+
that attempts to use them will fail at static type checking time.
56+
"""
57+
58+
name: str | None = None
59+
ins: t.Mapping[str, str] | None = None
60+
deps: t.Iterable[t.Union[str, t.Sequence[str]]] | None = None
61+
description: str | None = None
62+
config_schema: dg.Config | None = None
63+
required_resource_keys: t.AbstractSet[str] | None = None
64+
internal_asset_deps: t.Mapping[str, set[str]] | None = None
65+
backfill_policy: dg.BackfillPolicy | None = None
66+
op_tags: t.Mapping[str, t.Any] | None = None
67+
can_subset: bool = False
68+
resource_defs: t.Mapping[str, object] | None = None
69+
group_name: str | None = None
70+
retry_policy: dg.RetryPolicy | None = None
71+
code_version: str | None = None
72+
specs: t.Sequence[dg.AssetSpec] | None = None
73+
check_specs: None = None
74+
pool: str | None = None
75+
76+
77+
class CachedAssetOut(BaseModel):
78+
type: t.Literal["asset_out"] = "asset_out"
79+
80+
model_key: str
81+
asset_key: str
82+
tags: t.Mapping[str, str]
83+
is_required: bool
84+
group_name: str
85+
kinds: set[str] | None
86+
87+
def to_asset_out(self) -> dg.AssetOut:
88+
"""Convert to a Dagster AssetOut object"""
89+
if "kinds" in signature(dg.AssetOut).parameters:
90+
return dg.AssetOut(
91+
key=dg.AssetKey.from_user_string(self.asset_key),
92+
tags=self.tags,
93+
is_required=self.is_required,
94+
group_name=self.group_name,
95+
kinds=self.kinds,
96+
)
97+
return dg.AssetOut(
98+
key=dg.AssetKey.from_user_string(self.asset_key),
99+
tags=self.tags,
100+
is_required=self.is_required,
101+
group_name=self.group_name,
102+
)
103+
104+
@classmethod
105+
def from_asset_out(cls, model_key: str, asset_out: dg.AssetOut) -> "CachedAssetOut":
106+
"""Create from a Dagster AssetOut object"""
107+
assert asset_out.key is not None, "AssetOut key must not be None"
108+
109+
return cls(
110+
model_key=model_key,
111+
asset_key=asset_out.key.to_user_string(),
112+
tags=asset_out.tags or {},
113+
is_required=asset_out.is_required,
114+
group_name=asset_out.group_name or "",
115+
kinds=asset_out.kinds,
116+
)
117+
118+
119+
def cacheable_asset(asset_out: dg.AssetOut) -> CachedAssetOut:
120+
"""Create a asset that can be loaded from cache"""
121+
return CachedAssetOut.from_asset_out("model_key", asset_out)

0 commit comments

Comments
 (0)