Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 96 additions & 0 deletions apps/docs/docs/contribute-data/dagster.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,3 +150,99 @@ gitcoin_passport_scores = interval_gcs_import_asset(
),
)
```

## Creating an asset factory that needs resources

If your asset factory needs resources, to generate the assets you will need to
use the `EarlyResourcesAssetFactory` class. To do this, you can simply use the
`@early_resources_asset_factory` decorator. This decorator automatically sets up
the underlying `EarlyResourcesAssetFactory` instance based on your decorated
function. The decorated function can also declare the resources it needs by
annotating the function arguments with the resource types. To get the
`global_config` resource that uses a class called `DagsterConfig` in your asset
factory you would just do this:

```python
import dagster as dg
from oso_dagster.factories import early_resources_asset_factory

@early_resources_asset_factory()
def my_asset_factory(
global_config: DagsterConfig,
):
@dg.asset
def my_asset(context: dg.AssetExecutionContext):
# Use the global_config resource to create the asset
some_value = global_config.some_setting

...

return AssetFactoryResponse(
assets=assets
)
```

This can be useful in many cases where you need to preprocess before the assets
are loaded.

## Defining cacheable asset factories

Many of the assets in the system can are created using an asset factory.
However, we have discovered that often these factories can add significant
overhead to the start time of dagster. To mitigate this, we can cache the
results of these factories (some at build time) and some at runtime. To do this
we define a new way to create assets that can be cached.

To do this you need to use the `cacheable_asset_factory` decorator. This
decorator expects the decorated function to take at least the `cache_context`
argument and any other resources that it might need (resources are resolved with
the ResourceResolver).

`CacheableDagsterContext` allows us to _mostly_ cacheable `AssetFactoryResponse` objects.
We say _mostly_, because due to the complexity of the dagster object model we
don't actually cache the dagster objects but instead cache objects that allow
for fast hydration of dagster objects. In order to accomplish this, we
separate the generation cacheable objects (must be inherit from
`pydantic.BaseModel`) from the hydration of those cached objects into an
`AssetFactoryResponse`. You can have any number of cacheable object generation
functions and only a single hydration function.

This would generally look like the following:

```python
import dagster as dg

class SomeCacheableResponse(BaseModel):
responses: List[str]

@cacheable_asset_factory
def my_asset_factory(cache_context: CacheableDagsterContext) -> CacheableAssetFactoryResponse:
@cache_context.register_generator()
def cacheable_response(some_resource: SomeResource) -> SomeCacheableResponse:
responses = []
for response in some_resource.some_slow_iterator():
responses.append(response)
return SomeCacheableResponse(responses=responses)


@cache_context.hydrator()
def hydrate_cacheable_response(
cacheable_response: SomeCacheableResponse,
) -> AssetFactoryResponse:
# Convert the cacheable response to a set of dagster objects

assets: list[dg.AssetsDefinition] = []

for response in cacheable_response.responses:
# Create dagster assets, sensors, jobs, etc. based on the response
@dg.asset(name=response)
def my_asset(context: dg.AssetExecutionContext):
# Do something that creates an asset
pass
assets.append(my_asset)

return AssetFactoryResponse(
assets=assets
)

```
9 changes: 8 additions & 1 deletion docker/images/oso/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,11 @@ ENV UV_PROJECT_ENVIRONMENT=/usr/local/

RUN uv sync --all-packages --locked

ENTRYPOINT []
# Build the dagster for production
ENV DAGSTER_ASSET_CACHE_DIR=/dagster_assets_cache
ENV DAGSTER_ASSET_CACHE_ENABLED=1
ENV DAGSTER_SQLMESH_GATEWAY=trino
ENV GOOGLE_PROJECT_ID=opensource-observer
RUN uv run oso_dagster build

ENTRYPOINT []
3 changes: 3 additions & 0 deletions lib/oso-core/oso_core/cache/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# ruff: noqa: F403
from .file import *
from .types import *
100 changes: 100 additions & 0 deletions lib/oso-core/oso_core/cache/file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import hashlib
import logging
import typing as t
from pathlib import Path

import arrow
from pydantic import BaseModel

from .types import (
CacheBackend,
CacheInvalidError,
CacheKey,
CacheMetadata,
CacheMetadataOptions,
)

T = t.TypeVar("T", bound=BaseModel)
K = t.TypeVar("K", bound=BaseModel)

logger = logging.getLogger(__name__)


class FileCacheBackend(CacheBackend):
"""A generic cache for pydantic models"""

def __init__(self, cache_dir: str):
self.cache_dir = cache_dir

def store_object(
self, key: CacheKey, value: BaseModel, options: CacheMetadataOptions
) -> None:
"""Store a single object in the cache

The file cache stores everything as a jsonl file. The first object in
the file is the metadata, which contains the creation time and
expiration time of the cache entry.
"""
# Ensure the cache directory exists
self._ensure_cache_dir()
# Create a file path based on the key
file_path = self._cache_key_path(key)

metadata = CacheMetadata(
created_at=arrow.now().isoformat(),
valid_until=(
arrow.now().shift(seconds=options.ttl_seconds).isoformat()
if options.ttl_seconds > 0
else None
),
)

# Write the value to the file
with open(file_path, "w") as f:
f.write(metadata.model_dump_json() + "\n")
f.write(value.model_dump_json())

def retrieve_object(
self,
key: CacheKey,
model_type: type[T],
options: CacheMetadataOptions,
) -> T:
"""Retrieve a single object from the cache"""
self._ensure_cache_dir()
file_path = self._cache_key_path(key)

if not file_path.exists():
logger.debug(
f"Cache file not found: {file_path}", extra={"file_path": file_path}
)
raise CacheInvalidError(f"Cache file not found: {file_path}")

with open(file_path, "r") as f:
# Read the metadata and check if it is valid
metadata = CacheMetadata.model_validate_json(f.readline().strip())

if not metadata.is_valid(options):
logger.debug(
f"Cache entry is invalid: {metadata}", extra={"metadata": metadata}
)
raise CacheInvalidError(f"Cache entry is invalid: {metadata}")

return model_type.model_validate_json(f.read())

def _cache_dir_path(self):
"""Get the cache directory path"""
return Path(self.cache_dir)

def _ensure_cache_dir(self):
"""Ensure the cache directory exists"""
self._cache_dir_path().mkdir(parents=True, exist_ok=True)

def _cache_key(self, key: CacheKey) -> str:
"""Generate a cache key from the pydantic model"""
key_str = hashlib.sha256(str(key).encode()).hexdigest()
return f"{key_str}.json"

def _cache_key_path(self, key: CacheKey) -> Path:
"""Get the cache file path for a given key"""
return self._cache_dir_path() / self._cache_key(key)
51 changes: 51 additions & 0 deletions lib/oso-core/oso_core/cache/test_file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import tempfile

import pytest
from pydantic import BaseModel

from .file import FileCacheBackend
from .types import CacheMetadataOptions


class FakeModel(BaseModel):
"""A simple Pydantic model for testing purposes."""

name: str
value: int


class FakeNestedModel(BaseModel):
"""A nested Pydantic model for testing purposes."""

name: str
nested: FakeModel


@pytest.fixture
def temp_dir():
"""Create a temporary directory for file system tests."""
with tempfile.TemporaryDirectory() as tmpdirname:
yield tmpdirname


def test_write_file_to_temp_dir(temp_dir):
"""Test writing a file to the temporary directory."""
cache_backend = FileCacheBackend(cache_dir=temp_dir)

test0 = FakeModel(name="test", value=42)

options = CacheMetadataOptions(ttl_seconds=60)

cache_backend.store_object(key="test_key0", value=test0, options=options)
stored_test0 = cache_backend.retrieve_object(
key="test_key0", model_type=FakeModel, options=options
)

assert test0 == stored_test0, "Stored and retrieved objects should match"

test1 = FakeNestedModel(name="nested_test", nested=test0)
cache_backend.store_object(key="test_key1", value=test1, options=options)
stored_test1 = cache_backend.retrieve_object(
key="test_key1", model_type=FakeNestedModel, options=options
)
assert test1 == stored_test1, "Nested stored and retrieved objects should match"
15 changes: 15 additions & 0 deletions lib/oso-core/oso_core/cache/test_types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import arrow

from .types import CacheMetadata


def test_metadata_is_valid():
"""Test that metadata is valid when created and not expired."""
now = arrow.now()

created_at = now.shift(seconds=-60)
valid_until = now.shift(seconds=60)
metadata = CacheMetadata(
created_at=created_at.isoformat(), valid_until=valid_until.isoformat()
)
assert metadata.is_valid() is True
Loading