|
1 | | -import logging |
2 | 1 | import typing as t |
| 2 | +from pathlib import Path |
| 3 | +from traceback import print_exception |
3 | 4 |
|
4 | | -from dagster import AssetKey, ResourceParam |
5 | | -from dagster_sqlmesh import DagsterSQLMeshController, SQLMeshContextConfig |
| 5 | +import structlog |
| 6 | +from dagster import AssetKey, AssetsDefinition, ResourceParam |
| 7 | +from dagster_sqlmesh import ( |
| 8 | + DagsterSQLMeshCacheOptions, |
| 9 | + DagsterSQLMeshController, |
| 10 | + SQLMeshContextConfig, |
| 11 | +) |
6 | 12 | from dagster_sqlmesh.controller.base import DEFAULT_CONTEXT_FACTORY |
| 13 | +from oso_dagster.resources.sqlmesh import SQLMeshExportedAssetDefinition |
7 | 14 | from sqlmesh.core.model import Model |
8 | 15 |
|
9 | 16 | from ..factories import AssetFactoryResponse, early_resources_asset_factory |
10 | 17 | from ..resources import PrefixedSQLMeshTranslator, SQLMeshExporter |
11 | 18 |
|
12 | | -logger = logging.getLogger(__name__) |
| 19 | +logger = structlog.get_logger(__name__) |
13 | 20 |
|
14 | 21 |
|
15 | 22 | @early_resources_asset_factory() |
16 | 23 | def sqlmesh_export_factory( |
17 | 24 | sqlmesh_infra_config: dict, |
18 | | - sqlmesh_config: SQLMeshContextConfig, |
| 25 | + sqlmesh_context_config: SQLMeshContextConfig, |
19 | 26 | sqlmesh_translator: PrefixedSQLMeshTranslator, |
20 | 27 | sqlmesh_exporters: ResourceParam[t.List[SQLMeshExporter]], |
| 28 | + sqlmesh_cache_options: DagsterSQLMeshCacheOptions, |
21 | 29 | ): |
22 | 30 | environment = sqlmesh_infra_config["environment"] |
23 | 31 |
|
24 | 32 | controller = DagsterSQLMeshController.setup_with_config( |
25 | | - config=sqlmesh_config, |
| 33 | + config=sqlmesh_context_config, |
26 | 34 | context_factory=DEFAULT_CONTEXT_FACTORY, |
27 | 35 | ) |
28 | | - assets = [] |
29 | | - |
30 | | - with controller.instance(environment) as mesh: |
31 | | - models = mesh.models() |
32 | | - models_to_export: t.List[t.Tuple[Model, AssetKey]] = [] |
33 | | - for name, model in models.items(): |
34 | | - if "export" not in model.tags: |
35 | | - continue |
36 | | - models_to_export.append( |
37 | | - ( |
38 | | - model, |
39 | | - sqlmesh_translator.get_asset_key(mesh.context, model.fqn), |
40 | | - ) |
41 | | - ) |
| 36 | + assets: list[AssetsDefinition] = [] |
| 37 | + |
| 38 | + missing_exporters = set() |
| 39 | + |
| 40 | + # Hack for now to cache all of the sqlmesh export assets |
| 41 | + if sqlmesh_cache_options.enabled: |
| 42 | + logger.info("SQLMesh export cache is enabled, caching assets") |
| 43 | + |
| 44 | + cache_dir = Path(sqlmesh_cache_options.cache_dir) |
| 45 | + exporter_assets_cache_dir = cache_dir.joinpath("sqlmesh_export_assets") |
| 46 | + # Ensure the cache directory exists |
| 47 | + exporter_assets_cache_dir.mkdir(parents=True, exist_ok=True) |
42 | 48 |
|
43 | | - # Create a export assets for this |
44 | 49 | for exporter in sqlmesh_exporters: |
45 | | - asset_def = exporter.create_export_asset( |
46 | | - mesh, |
47 | | - sqlmesh_translator, |
48 | | - to_export=models_to_export, |
| 50 | + # Load the definition from the exporter |
| 51 | + cache_file = exporter_assets_cache_dir / f"{exporter.name()}.json" |
| 52 | + if cache_file.exists(): |
| 53 | + logger.debug(f"Loading cached asset definition for {exporter.name()}") |
| 54 | + exporter_asset_def = SQLMeshExportedAssetDefinition.model_validate_json( |
| 55 | + cache_file.read_text() |
| 56 | + ) |
| 57 | + assets.append(exporter.asset_from_definition(exporter_asset_def)) |
| 58 | + else: |
| 59 | + missing_exporters.add(exporter.name()) |
| 60 | + else: |
| 61 | + missing_exporters = set(exporter.name() for exporter in sqlmesh_exporters) |
| 62 | + |
| 63 | + if len(missing_exporters) > 0: |
| 64 | + try: |
| 65 | + with controller.instance(environment) as mesh: |
| 66 | + models = mesh.models() |
| 67 | + models_to_export: t.List[t.Tuple[Model, AssetKey]] = [] |
| 68 | + for name, model in models.items(): |
| 69 | + if "export" not in model.tags: |
| 70 | + continue |
| 71 | + models_to_export.append( |
| 72 | + ( |
| 73 | + model, |
| 74 | + sqlmesh_translator.get_asset_key(mesh.context, model.fqn), |
| 75 | + ) |
| 76 | + ) |
| 77 | + |
| 78 | + # Create a export assets for this |
| 79 | + for exporter in sqlmesh_exporters: |
| 80 | + asset_def = exporter.create_export_asset( |
| 81 | + mesh, |
| 82 | + sqlmesh_translator, |
| 83 | + to_export=models_to_export, |
| 84 | + ) |
| 85 | + if sqlmesh_cache_options.enabled: |
| 86 | + cache_dir = Path(sqlmesh_cache_options.cache_dir) |
| 87 | + exporter_assets_cache_dir = cache_dir.joinpath( |
| 88 | + "sqlmesh_export_assets" |
| 89 | + ) |
| 90 | + |
| 91 | + # Save the asset definition to the cache |
| 92 | + cache_file = ( |
| 93 | + exporter_assets_cache_dir / f"{exporter.name()}.json" |
| 94 | + ) |
| 95 | + logger.debug( |
| 96 | + f"Caching asset definition for {exporter.name()} at {cache_file}" |
| 97 | + ) |
| 98 | + cache_file.write_text(asset_def.model_dump_json()) |
| 99 | + |
| 100 | + assets.append(exporter.asset_from_definition(asset_def)) |
| 101 | + logger.debug(f"exporting for {exporter.name()}") |
| 102 | + except Exception as e: |
| 103 | + logger.exception( |
| 104 | + "Failed to create SQLMesh export assets", |
| 105 | + error=str(e), |
49 | 106 | ) |
50 | | - assets.append(asset_def) |
51 | | - logger.debug(f"exporting for {exporter.__class__.__name__}") |
| 107 | + print_exception(e) |
| 108 | + raise e |
52 | 109 |
|
53 | 110 | return AssetFactoryResponse(assets=assets) |
0 commit comments