File tree Expand file tree Collapse file tree 1 file changed +21
-1
lines changed
Expand file tree Collapse file tree 1 file changed +21
-1
lines changed Original file line number Diff line number Diff line change 11from pathlib import Path
2+
23from dagster import definitions , load_from_defs_folder
4+ from dagster_dbt import DbtCliResource , DbtProject , dbt_assets
5+
6+ import dagster as dg
37
48
59@definitions
610def defs ():
7- return load_from_defs_folder (project_root = Path (__file__ ).parent .parent )
11+ return load_from_defs_folder (project_root = Path (__file__ ).parent .parent )
12+
13+ dbt_project_directory = Path (__file__ ).absolute ().parent / "poke_cli_dbt"
14+ dbt_project = DbtProject (project_dir = dbt_project_directory )
15+
16+ dbt_resource = DbtCliResource (project_dir = dbt_project )
17+
18+ # Compiles the dbt project & allow Dagster to build an asset graph
19+ dbt_project .prepare_if_dev ()
20+
21+ # Yields Dagster events streamed from the dbt CLI
22+ @dbt_assets (manifest = dbt_project .manifest_path )
23+ def dbt_models (context : dg .AssetExecutionContext , dbt : DbtCliResource ):
24+ yield from dbt .cli (["build" ], context = context ).stream ()
25+
26+ # Dagster object that contains the dbt assets and resource
27+ defs_dbt = dg .Definitions (assets = [dbt_models ], resources = {"dbt" : dbt_resource })
You can’t perform that action at this time.
0 commit comments