Skip to content

Commit 318e8c3

Browse files
authored
[component tree] Example of x-component access using dbt (dagster-io#31146)
## Summary Showcases the utility of x-component accesses using dbt - adds a new method to the `DbtProjectComponent` which can be invoked from other components to get an asset key from a dbt model name: ```yaml type: MyComponent attributes: assets: - key: my_asset_name deps: - "{{ load_component_at_path('jaffle_shop_dbt').get_asset_key_for_model('customers') }}" ``` ~~Extremely verbose at the moment~~, but otherwise feels pretty good to me. ## Test Plan New unit tests of using this utility from YAML and from Python. New docs.
1 parent b2b6125 commit 318e8c3

File tree

12 files changed

+177
-10
lines changed

12 files changed

+177
-10
lines changed

docs/docs/guides/build/components/integrations/dbt-component-tutorial.md

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,3 +75,22 @@ You can customize the properties of the assets emitted by each dbt model using t
7575
<WideContent maxSize={1100}>
7676
<CliInvocationExample path="docs_snippets/docs_snippets/guides/components/integrations/dbt-component/13-list-defs.txt" />
7777
</WideContent>
78+
79+
## 7. Depending on dbt assets in other components
80+
81+
If you want to refer to assets built by the dbt component elsewhere in your Dagster project, you can use the `asset_key_for_model` method on the dbt component.
82+
This lets you refer to an asset by the model name without having to know how that model is translated to an asset key.
83+
84+
Imagine a `PythonScriptComponent` that exports the `customers` model to a CSV file:
85+
86+
<CliInvocationExample path="docs_snippets/docs_snippets/guides/components/integrations/dbt-component/14-scaffold-python-script-component.txt" />
87+
88+
<CliInvocationExample path="docs_snippets/docs_snippets/guides/components/integrations/dbt-component/15-touch-export-customers.txt" />
89+
90+
You can refer to the `customers` asset in this component by using the `asset_key_for_model` method on the dbt component:
91+
92+
<CodeExample path="docs_snippets/docs_snippets/guides/components/integrations/dbt-component/16-component.yaml" title="my_project/defs/my_python_script/defs.yaml" language="yaml" />
93+
94+
<WideContent maxSize={1100}>
95+
<CliInvocationExample path="docs_snippets/docs_snippets/guides/components/integrations/dbt-component/17-list-defs.txt" />
96+
</WideContent>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
dg scaffold defs dagster.PythonScriptComponent my_python_script
2+
3+
Creating defs at /.../my-project/src/my_project/defs/my_python_script.
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
touch src/my_project/defs/my_python_script/export_customers.py
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
type: dagster.PythonScriptComponent
2+
3+
attributes:
4+
execution:
5+
path: my_script.py
6+
assets:
7+
- key: customers_export
8+
deps:
9+
- "{{ load_component_at_path('dbt_ingest').asset_key_for_model('customers') }}"
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
dg list defs
2+
3+
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
4+
┃ Section ┃ Definitions ┃
5+
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
6+
│ Assets │ ┏━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┳━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ │
7+
│ │ ┃ Key ┃ Group ┃ Deps ┃ Kinds ┃ Description ┃ │
8+
│ │ ┡━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━╇━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩ │
9+
│ │ │ customers │ dbt_models │ stg_customers │ dbt │ Transforms data using dbt model │ │
10+
│ │ │ │ │ stg_orders │ duckdb │ customers │ │
11+
│ │ │ │ │ stg_payments │ │ │ │
12+
│ │ ├──────────────────┼────────────┼───────────────┼────────┼──────────────────────────────────────────┤ │
13+
│ │ │ customers_export │ default │ customers │ │ │ │
14+
│ │ ├──────────────────┼────────────┼───────────────┼────────┼──────────────────────────────────────────┤ │
15+
│ │ │ stg_customers │ default │ │ │ │ │
16+
│ │ ├──────────────────┼────────────┼───────────────┼────────┼──────────────────────────────────────────┤ │
17+
│ │ │ stg_orders │ default │ │ │ │ │
18+
│ │ ├──────────────────┼────────────┼───────────────┼────────┼──────────────────────────────────────────┤ │
19+
│ │ │ stg_payments │ default │ │ │ │ │
20+
│ │ └──────────────────┴────────────┴───────────────┴────────┴──────────────────────────────────────────┘ │
21+
│ Asset Checks │ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━┓ │
22+
│ │ ┃ Key ┃ Deps ┃ Description ┃ │
23+
│ │ ┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━┩ │
24+
│ │ │ customers:not_null_customers_customer_id │ customers │ │ │
25+
│ │ ├──────────────────────────────────────────┼───────────┼─────────────┤ │
26+
│ │ │ customers:unique_customers_customer_id │ customers │ │ │
27+
│ │ └──────────────────────────────────────────┴───────────┴─────────────┘ │
28+
└──────────────┴───────────────────────────────────────────────────────────────────────────────────────────────────────┘

examples/docs_snippets/docs_snippets_tests/snippet_checks/guides/components/integrations/test_dbt_component.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,3 +175,36 @@ def test_components_docs_dbt_project(
175175
_run_command(
176176
cmd="dg launch --assets '*'",
177177
)
178+
179+
# scaffold a PythonScriptComponent
180+
context.run_command_and_snippet_output(
181+
cmd="dg scaffold defs dagster.PythonScriptComponent my_python_script",
182+
snippet_path=f"{context.get_next_snip_number()}-scaffold-python-script-component.txt",
183+
)
184+
context.run_command_and_snippet_output(
185+
cmd="touch src/my_project/defs/my_python_script/export_customers.py",
186+
snippet_path=f"{context.get_next_snip_number()}-touch-export-customers.txt",
187+
)
188+
189+
context.create_file(
190+
Path("src") / "my_project" / "defs" / "my_python_script" / "defs.yaml",
191+
contents=textwrap.dedent(
192+
"""\
193+
type: dagster.PythonScriptComponent
194+
195+
attributes:
196+
execution:
197+
path: my_script.py
198+
assets:
199+
- key: customers_export
200+
deps:
201+
- "{{ load_component_at_path('dbt_ingest').asset_key_for_model('customers') }}"
202+
"""
203+
),
204+
snippet_path=f"{context.get_next_snip_number()}-component.yaml",
205+
)
206+
207+
context.run_command_and_snippet_output(
208+
cmd="dg list defs",
209+
snippet_path=f"{context.get_next_snip_number()}-list-defs.txt",
210+
)

python_modules/libraries/dagster-dbt/dagster_dbt/components/dbt_project/component.py

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,11 @@
55
from typing import Annotated, Any, Callable, Optional, Union
66

77
from dagster import Resolvable
8+
from dagster._core.definitions.asset_key import AssetKey
89
from dagster._core.definitions.assets.definition.asset_spec import AssetSpec
910
from dagster._core.definitions.definitions_class import Definitions
1011
from dagster._core.execution.context.asset_execution_context import AssetExecutionContext
12+
from dagster._utils.cached_method import cached_method
1113
from dagster.components.component.component import Component
1214
from dagster.components.core.context import ComponentLoadContext
1315
from dagster.components.core.tree import ComponentTree
@@ -21,9 +23,15 @@
2123
from dagster_dbt.asset_utils import DBT_DEFAULT_EXCLUDE, DBT_DEFAULT_SELECT, get_node
2224
from dagster_dbt.components.dbt_project.scaffolder import DbtProjectComponentScaffolder
2325
from dagster_dbt.core.resource import DbtCliResource
24-
from dagster_dbt.dagster_dbt_translator import DagsterDbtTranslator, DagsterDbtTranslatorSettings
26+
from dagster_dbt.dagster_dbt_translator import (
27+
DagsterDbtTranslator,
28+
DagsterDbtTranslatorSettings,
29+
validate_translator,
30+
)
31+
from dagster_dbt.dbt_manifest import validate_manifest
2532
from dagster_dbt.dbt_manifest_asset_selection import DbtManifestAssetSelection
2633
from dagster_dbt.dbt_project import DbtProject
34+
from dagster_dbt.utils import ASSET_RESOURCE_TYPES
2735

2836
TranslationFn: TypeAlias = Callable[[AssetSpec, Mapping[str, Any]], AssetSpec]
2937

@@ -236,6 +244,34 @@ def _fn(context: AssetExecutionContext):
236244
def execute(self, context: AssetExecutionContext, dbt: DbtCliResource) -> Iterator:
237245
yield from dbt.cli(["build"], context=context).stream()
238246

247+
@cached_property
248+
def _validated_manifest(self):
249+
return validate_manifest(self.project.manifest_path)
250+
251+
@cached_property
252+
def _validated_translator(self):
253+
return validate_translator(self.translator)
254+
255+
@cached_method
256+
def asset_key_for_model(self, model_name: str) -> AssetKey:
257+
dagster_dbt_translator = self._validated_translator
258+
manifest = self._validated_manifest
259+
260+
matching_model_ids = [
261+
unique_id
262+
for unique_id, value in manifest["nodes"].items()
263+
if value["name"] == model_name and value["resource_type"] in ASSET_RESOURCE_TYPES
264+
]
265+
266+
if len(matching_model_ids) == 0:
267+
raise KeyError(f"Could not find a dbt model, seed, or snapshot with name: {model_name}")
268+
269+
return dagster_dbt_translator.get_asset_spec(
270+
manifest,
271+
next(iter(matching_model_ids)),
272+
self.project,
273+
).key
274+
239275

240276
class ProxyDagsterDbtTranslator(DagsterDbtTranslator):
241277
# get_description conflicts on Component, so cant make it directly a translator

python_modules/libraries/dagster-dbt/dagster_dbt_tests/components/code_locations/dependency_on_dbt_project_location/defs/depends_on_dbt/definitions.py

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,18 @@
1+
from pathlib import Path
2+
13
import dagster as dg
2-
from dagster.components.core.context import ComponentLoadContext
3-
from dagster_dbt.asset_utils import get_asset_key_for_model
4+
from dagster.components.core.defs_module import ComponentPath
5+
from dagster_dbt import DbtProjectComponent
46

57

68
@dg.definitions
7-
def defs(context: ComponentLoadContext) -> dg.Definitions:
8-
dbt_assets = (
9-
context.component_tree.build_defs_at_path("jaffle_shop_dbt")
10-
.resolve_asset_graph()
11-
.assets_defs
12-
)
9+
def defs(context: dg.ComponentLoadContext) -> dg.Definitions:
10+
customers_asset_key = context.component_tree.load_component_at_path(
11+
ComponentPath(file_path=Path("jaffle_shop_dbt"), instance_key=0),
12+
expected_type=DbtProjectComponent,
13+
).asset_key_for_model("customers")
1314

14-
@dg.asset(deps={get_asset_key_for_model(dbt_assets, "customers")})
15+
@dg.asset(deps={customers_asset_key})
1516
def downstream_of_customers():
1617
pass
1718

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
from collections.abc import Iterable
2+
from typing import Union
3+
4+
import dagster as dg
5+
from dagster.components.lib.executable_component.component import ExecutableComponent
6+
from dagster.components.resolved.core_models import OpSpec
7+
8+
9+
class MyExecutableComponent(ExecutableComponent):
10+
def invoke_execute_fn(
11+
self,
12+
context: Union[dg.AssetExecutionContext, dg.AssetCheckExecutionContext],
13+
component_load_context: dg.ComponentLoadContext,
14+
) -> Iterable:
15+
return []
16+
17+
@property
18+
def op_spec(self) -> OpSpec:
19+
return OpSpec(name="my_executable_component")
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
type: dependency_on_dbt_project_location.defs.depends_on_dbt_yaml.component.MyExecutableComponent
2+
3+
attributes:
4+
assets:
5+
- key: downstream_of_customers_two
6+
deps:
7+
- "{{ load_component_at_path('jaffle_shop_dbt').asset_key_for_model('customers') }}"

0 commit comments

Comments
 (0)