Skip to content

Commit ecf8ad9

Browse files
initial commit
1 parent ecbe298 commit ecf8ad9

File tree

1 file changed

+26
-0
lines changed

1 file changed

+26
-0
lines changed
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
import dagster as dg
2+
from dagster import RetryPolicy, Backoff
3+
from sqlalchemy.exc import OperationalError
4+
from ..extract.extract_pricing_data import build_dataframe
5+
from ...utils.secret_retriever import fetch_secret
6+
from termcolor import colored
7+
8+
9+
@dg.asset(
10+
deps=[build_dataframe],
11+
kinds={"Supabase", "Postgres"},
12+
retry_policy=RetryPolicy(max_retries=3, delay=2, backoff=Backoff.EXPONENTIAL),
13+
)
14+
def load_pricing_data() -> None:
15+
database_url: str = fetch_secret()
16+
table_name: str = "staging.pricing_data"
17+
18+
df = build_dataframe()
19+
try:
20+
df.write_database(
21+
table_name=table_name, connection=database_url, if_table_exists="replace"
22+
)
23+
print(colored(" ✓", "green"), f"Data loaded into {table_name}")
24+
except OperationalError as e:
25+
print(colored(" ✖", "red"), "Connection error in load_series_data():", e)
26+
raise

0 commit comments

Comments
 (0)