Skip to content

Commit e54dc0c

Browse files
authored
feat: introduce asset caching for sqlmesh related assets (#4675)
* temp: temporarily add dagster-sqlmesh as a local editable lib * feat(oso_dagster): asset caching for sqlmesh related assets * chore(oso_core): allow logging to be configured via env var * Update warehouse/oso_dagster/assets/sqlmesh.py * wip * wip * fix * additional test * wip * fix(oso_core): fix oso_core cache * fix(oso_dagster): caching working * feat(oso_dagster): add `build` cli command * chore: upgrade dagster-sqlmesh * chore: update uv.lock * fix: bad import * chore(docs): update docs * fix: remove cache.py file * fix: imports * Update lib/oso-core/oso_core/cache/types.py * Update lib/oso-core/oso_core/cache/types.py * cr fixes * more cr refactoring * feat(oso_dagster): can populare dagster asset cache at build * fix uv.lock * feat(ops): support cache in oso-dagster helm chart
1 parent f2da24a commit e54dc0c

File tree

25 files changed

+2703
-1631
lines changed

25 files changed

+2703
-1631
lines changed

apps/docs/docs/contribute-data/dagster.md

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,3 +150,99 @@ gitcoin_passport_scores = interval_gcs_import_asset(
150150
),
151151
)
152152
```
153+
154+
## Creating an asset factory that needs resources
155+
156+
If your asset factory needs resources, to generate the assets you will need to
157+
use the `EarlyResourcesAssetFactory` class. To do this, you can simply use the
158+
`@early_resources_asset_factory` decorator. This decorator automatically sets up
159+
the underlying `EarlyResourcesAssetFactory` instance based on your decorated
160+
function. The decorated function can also declare the resources it needs by
161+
annotating the function arguments with the resource types. To get the
162+
`global_config` resource that uses a class called `DagsterConfig` in your asset
163+
factory you would just do this:
164+
165+
```python
166+
import dagster as dg
167+
from oso_dagster.factories import early_resources_asset_factory
168+
169+
@early_resources_asset_factory()
170+
def my_asset_factory(
171+
global_config: DagsterConfig,
172+
):
173+
@dg.asset
174+
def my_asset(context: dg.AssetExecutionContext):
175+
# Use the global_config resource to create the asset
176+
some_value = global_config.some_setting
177+
178+
...
179+
180+
return AssetFactoryResponse(
181+
assets=assets
182+
)
183+
```
184+
185+
This can be useful in many cases where you need to preprocess before the assets
186+
are loaded.
187+
188+
## Defining cacheable asset factories
189+
190+
Many of the assets in the system can are created using an asset factory.
191+
However, we have discovered that often these factories can add significant
192+
overhead to the start time of dagster. To mitigate this, we can cache the
193+
results of these factories (some at build time) and some at runtime. To do this
194+
we define a new way to create assets that can be cached.
195+
196+
To do this you need to use the `cacheable_asset_factory` decorator. This
197+
decorator expects the decorated function to take at least the `cache_context`
198+
argument and any other resources that it might need (resources are resolved with
199+
the ResourceResolver).
200+
201+
`CacheableDagsterContext` allows us to _mostly_ cacheable `AssetFactoryResponse` objects.
202+
We say _mostly_, because due to the complexity of the dagster object model we
203+
don't actually cache the dagster objects but instead cache objects that allow
204+
for fast hydration of dagster objects. In order to accomplish this, we
205+
separate the generation cacheable objects (must be inherit from
206+
`pydantic.BaseModel`) from the hydration of those cached objects into an
207+
`AssetFactoryResponse`. You can have any number of cacheable object generation
208+
functions and only a single hydration function.
209+
210+
This would generally look like the following:
211+
212+
```python
213+
import dagster as dg
214+
215+
class SomeCacheableResponse(BaseModel):
216+
responses: List[str]
217+
218+
@cacheable_asset_factory
219+
def my_asset_factory(cache_context: CacheableDagsterContext) -> CacheableAssetFactoryResponse:
220+
@cache_context.register_generator()
221+
def cacheable_response(some_resource: SomeResource) -> SomeCacheableResponse:
222+
responses = []
223+
for response in some_resource.some_slow_iterator():
224+
responses.append(response)
225+
return SomeCacheableResponse(responses=responses)
226+
227+
228+
@cache_context.hydrator()
229+
def hydrate_cacheable_response(
230+
cacheable_response: SomeCacheableResponse,
231+
) -> AssetFactoryResponse:
232+
# Convert the cacheable response to a set of dagster objects
233+
234+
assets: list[dg.AssetsDefinition] = []
235+
236+
for response in cacheable_response.responses:
237+
# Create dagster assets, sensors, jobs, etc. based on the response
238+
@dg.asset(name=response)
239+
def my_asset(context: dg.AssetExecutionContext):
240+
# Do something that creates an asset
241+
pass
242+
assets.append(my_asset)
243+
244+
return AssetFactoryResponse(
245+
assets=assets
246+
)
247+
248+
```

docker/images/oso/Dockerfile

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,11 @@ ENV UV_PROJECT_ENVIRONMENT=/usr/local/
1313

1414
RUN uv sync --all-packages --locked
1515

16-
ENTRYPOINT []
16+
# Build the dagster for production
17+
ENV DAGSTER_ASSET_CACHE_DIR=/dagster_assets_cache
18+
ENV DAGSTER_ASSET_CACHE_ENABLED=1
19+
ENV DAGSTER_SQLMESH_GATEWAY=trino
20+
ENV GOOGLE_PROJECT_ID=opensource-observer
21+
RUN uv run oso_dagster build
22+
23+
ENTRYPOINT []
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# ruff: noqa: F403
2+
from .file import *
3+
from .types import *
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
import hashlib
2+
import logging
3+
import typing as t
4+
from pathlib import Path
5+
6+
import arrow
7+
from pydantic import BaseModel
8+
9+
from .types import (
10+
CacheBackend,
11+
CacheInvalidError,
12+
CacheKey,
13+
CacheMetadata,
14+
CacheMetadataOptions,
15+
)
16+
17+
T = t.TypeVar("T", bound=BaseModel)
18+
K = t.TypeVar("K", bound=BaseModel)
19+
20+
logger = logging.getLogger(__name__)
21+
22+
23+
class FileCacheBackend(CacheBackend):
24+
"""A generic cache for pydantic models"""
25+
26+
def __init__(self, cache_dir: str):
27+
self.cache_dir = cache_dir
28+
29+
def store_object(
30+
self, key: CacheKey, value: BaseModel, options: CacheMetadataOptions
31+
) -> None:
32+
"""Store a single object in the cache
33+
34+
The file cache stores everything as a jsonl file. The first object in
35+
the file is the metadata, which contains the creation time and
36+
expiration time of the cache entry.
37+
"""
38+
# Ensure the cache directory exists
39+
self._ensure_cache_dir()
40+
# Create a file path based on the key
41+
file_path = self._cache_key_path(key)
42+
43+
metadata = CacheMetadata(
44+
created_at=arrow.now().isoformat(),
45+
valid_until=(
46+
arrow.now().shift(seconds=options.ttl_seconds).isoformat()
47+
if options.ttl_seconds > 0
48+
else None
49+
),
50+
)
51+
52+
# Write the value to the file
53+
with open(file_path, "w") as f:
54+
f.write(metadata.model_dump_json() + "\n")
55+
f.write(value.model_dump_json())
56+
57+
def retrieve_object(
58+
self,
59+
key: CacheKey,
60+
model_type: type[T],
61+
options: CacheMetadataOptions,
62+
) -> T:
63+
"""Retrieve a single object from the cache"""
64+
self._ensure_cache_dir()
65+
file_path = self._cache_key_path(key)
66+
67+
if not file_path.exists():
68+
logger.debug(
69+
f"Cache file not found: {file_path}", extra={"file_path": file_path}
70+
)
71+
raise CacheInvalidError(f"Cache file not found: {file_path}")
72+
73+
with open(file_path, "r") as f:
74+
# Read the metadata and check if it is valid
75+
metadata = CacheMetadata.model_validate_json(f.readline().strip())
76+
77+
if not metadata.is_valid(options):
78+
logger.debug(
79+
f"Cache entry is invalid: {metadata}", extra={"metadata": metadata}
80+
)
81+
raise CacheInvalidError(f"Cache entry is invalid: {metadata}")
82+
83+
return model_type.model_validate_json(f.read())
84+
85+
def _cache_dir_path(self):
86+
"""Get the cache directory path"""
87+
return Path(self.cache_dir)
88+
89+
def _ensure_cache_dir(self):
90+
"""Ensure the cache directory exists"""
91+
self._cache_dir_path().mkdir(parents=True, exist_ok=True)
92+
93+
def _cache_key(self, key: CacheKey) -> str:
94+
"""Generate a cache key from the pydantic model"""
95+
key_str = hashlib.sha256(str(key).encode()).hexdigest()
96+
return f"{key_str}.json"
97+
98+
def _cache_key_path(self, key: CacheKey) -> Path:
99+
"""Get the cache file path for a given key"""
100+
return self._cache_dir_path() / self._cache_key(key)
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
import tempfile
2+
3+
import pytest
4+
from pydantic import BaseModel
5+
6+
from .file import FileCacheBackend
7+
from .types import CacheMetadataOptions
8+
9+
10+
class FakeModel(BaseModel):
11+
"""A simple Pydantic model for testing purposes."""
12+
13+
name: str
14+
value: int
15+
16+
17+
class FakeNestedModel(BaseModel):
18+
"""A nested Pydantic model for testing purposes."""
19+
20+
name: str
21+
nested: FakeModel
22+
23+
24+
@pytest.fixture
25+
def temp_dir():
26+
"""Create a temporary directory for file system tests."""
27+
with tempfile.TemporaryDirectory() as tmpdirname:
28+
yield tmpdirname
29+
30+
31+
def test_write_file_to_temp_dir(temp_dir):
32+
"""Test writing a file to the temporary directory."""
33+
cache_backend = FileCacheBackend(cache_dir=temp_dir)
34+
35+
test0 = FakeModel(name="test", value=42)
36+
37+
options = CacheMetadataOptions(ttl_seconds=60)
38+
39+
cache_backend.store_object(key="test_key0", value=test0, options=options)
40+
stored_test0 = cache_backend.retrieve_object(
41+
key="test_key0", model_type=FakeModel, options=options
42+
)
43+
44+
assert test0 == stored_test0, "Stored and retrieved objects should match"
45+
46+
test1 = FakeNestedModel(name="nested_test", nested=test0)
47+
cache_backend.store_object(key="test_key1", value=test1, options=options)
48+
stored_test1 = cache_backend.retrieve_object(
49+
key="test_key1", model_type=FakeNestedModel, options=options
50+
)
51+
assert test1 == stored_test1, "Nested stored and retrieved objects should match"
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
import arrow
2+
3+
from .types import CacheMetadata
4+
5+
6+
def test_metadata_is_valid():
7+
"""Test that metadata is valid when created and not expired."""
8+
now = arrow.now()
9+
10+
created_at = now.shift(seconds=-60)
11+
valid_until = now.shift(seconds=60)
12+
metadata = CacheMetadata(
13+
created_at=created_at.isoformat(), valid_until=valid_until.isoformat()
14+
)
15+
assert metadata.is_valid() is True

0 commit comments

Comments
 (0)