|
1 | 1 | from pathlib import Path |
2 | 2 |
|
3 | 3 | from dagster import definitions, load_from_defs_folder |
4 | | -from dagster_dbt import DbtCliResource, DbtProject, dbt_assets |
5 | 4 |
|
6 | 5 | import dagster as dg |
7 | 6 |
|
|
11 | 10 |
|
12 | 11 | @definitions |
13 | 12 | def defs(): |
| 13 | + # load_from_defs_folder discovers dbt assets from transform_data.py |
14 | 14 | folder_defs = load_from_defs_folder(project_root=Path(__file__).parent.parent) |
15 | 15 | return dg.Definitions.merge(folder_defs, defs_pricing) |
16 | 16 |
|
17 | | -dbt_project_directory = Path(__file__).absolute().parent / "poke_cli_dbt" |
18 | | -dbt_project = DbtProject(project_dir=dbt_project_directory) |
19 | | - |
20 | | -dbt_resource = DbtCliResource(project_dir=dbt_project) |
21 | | - |
22 | | -# Compiles the dbt project & allow Dagster to build an asset graph |
23 | | -dbt_project.prepare_if_dev() |
24 | | - |
25 | | -# Yields Dagster events streamed from the dbt CLI |
26 | | -@dbt_assets(manifest=dbt_project.manifest_path) |
27 | | -def dbt_models(context: dg.AssetExecutionContext, dbt: DbtCliResource): |
28 | | - yield from dbt.cli(["build"], context=context).stream() |
29 | | - |
30 | | -# Define the pricing pipeline job that materializes both assets |
| 17 | +# Define the pricing pipeline job that materializes the assets and downstream dbt model |
31 | 18 | pricing_pipeline_job = dg.define_asset_job( |
32 | 19 | name="pricing_pipeline_job", |
33 | | - selection=dg.AssetSelection.assets(build_dataframe, load_pricing_data), |
| 20 | + selection=dg.AssetSelection.assets(build_dataframe, load_pricing_data).downstream(include_self=True), |
34 | 21 | ) |
35 | 22 |
|
36 | 23 | price_schedule = dg.ScheduleDefinition( |
37 | 24 | name="price_schedule", |
38 | | - cron_schedule="10 10 * * *", |
| 25 | + cron_schedule="31 21 * * *", |
39 | 26 | target=pricing_pipeline_job, |
40 | 27 | execution_timezone="America/Los_Angeles", |
41 | 28 | ) |
|
0 commit comments