Skip to content

Commit eb2519b

Browse files
initial commit
1 parent 411f1b1 commit eb2519b

File tree

1 file changed

+43
-0
lines changed

1 file changed

+43
-0
lines changed
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
import dagster as dg
2+
from dagster_dbt import DbtCliResource, dbt_assets
3+
from pathlib import Path
4+
5+
from ..load.load_data import (
6+
load_series_data,
7+
data_quality_check_on_series,
8+
load_set_data,
9+
load_card_data
10+
)
11+
12+
DBT_PROJECT_PATH = Path(__file__).joinpath("..", "..", "..", "poke_cli_dbt").resolve()
13+
14+
@dg.asset(deps=[load_series_data, data_quality_check_on_series, load_set_data, load_card_data], kinds=["dbt"])
15+
def dbt_transformation(context: dg.AssetExecutionContext):
16+
"""Run dbt build after all extract and load operations complete"""
17+
import subprocess
18+
import os
19+
20+
# Set environment variables for dbt
21+
env = os.environ.copy()
22+
env["SUPABASE_PASSWORD"] = os.getenv("SUPABASE_PASSWORD", "")
23+
24+
# Run dbt build
25+
result = subprocess.run(
26+
["dbt", "build"],
27+
cwd=str(DBT_PROJECT_PATH),
28+
env=env,
29+
capture_output=True,
30+
text=True
31+
)
32+
33+
if result.returncode != 0:
34+
context.log.error(f"dbt build failed: {result.stderr}")
35+
raise Exception(f"dbt build failed: {result.stderr}")
36+
37+
context.log.info(f"dbt build completed successfully: {result.stdout}")
38+
return "dbt build completed"
39+
40+
# Create definitions for this transformation
41+
defs = dg.Definitions(
42+
assets=[dbt_transformation]
43+
)

0 commit comments

Comments
 (0)