Skip to content
Merged

1.7.1 #189

Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
7d70610
adding retry policy and raising exceptions on error
digitalghost-dev Oct 8, 2025
547dbff
initial commit
digitalghost-dev Oct 8, 2025
eb9e699
successfully returning dataframe in Dagster
digitalghost-dev Oct 8, 2025
b69eaaa
dropping constraints and adding run_query
digitalghost-dev Oct 8, 2025
8979621
updating selected columns
digitalghost-dev Oct 8, 2025
90ab475
adding name mapping for dbt models
digitalghost-dev Oct 9, 2025
ecbe298
adding name to dagster asset
digitalghost-dev Oct 9, 2025
ecf8ad9
initial commit
digitalghost-dev Oct 9, 2025
519a3cf
adding profile and run instructions
digitalghost-dev Oct 9, 2025
dc4387b
adding pricing_data table to dbt
digitalghost-dev Oct 10, 2025
ca2fdb5
updating soda checks
digitalghost-dev Oct 10, 2025
c6545e3
updating models
digitalghost-dev Oct 10, 2025
1fc64c0
updating test outputs
digitalghost-dev Oct 11, 2025
0bf8339
updating asset names
digitalghost-dev Oct 11, 2025
ba6f999
updating soda data check asset
digitalghost-dev Oct 11, 2025
b03deb6
adding egg group(s) to pokemon command (#188)
digitalghost-dev Oct 11, 2025
e72daa2
updating struct to include egg groups (#188)
digitalghost-dev Oct 11, 2025
3ea8ee2
initial commit - supabase
digitalghost-dev Oct 11, 2025
be66e64
updating RDS instructions
digitalghost-dev Oct 11, 2025
03e647b
adding missed step in RDS instructions
digitalghost-dev Oct 11, 2025
a8a569b
fixing typo, updating aws credentials description
digitalghost-dev Oct 12, 2025
91c59e2
raising error if any DataFrame from a set is empty
digitalghost-dev Oct 12, 2025
2338167
referencing correct function name in error text
digitalghost-dev Oct 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions card_data/pipelines/defs/extract/extract_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,16 @@ def extract_series_data() -> pl.DataFrame:
print(e)
raise

filtered = [s.model_dump(mode="json") for s in validated if s.id in ["swsh", "sv"]]
filtered = [s.model_dump(mode="json") for s in validated if s.id in ["swsh", "sv", "me"]]
return pl.DataFrame(filtered)


@dg.asset(kinds={"API", "Polars", "Pydantic"})
@dg.asset(kinds={"API", "Polars", "Pydantic"}, name="extract_set_data")
def extract_set_data() -> pl.DataFrame:
url_list = [
"https://api.tcgdex.net/v2/en/series/swsh",
"https://api.tcgdex.net/v2/en/series/sv"
"https://api.tcgdex.net/v2/en/series/sv",
"https://api.tcgdex.net/v2/en/series/me",
]

flat: list[dict] = []
Expand Down Expand Up @@ -86,11 +87,10 @@ def extract_set_data() -> pl.DataFrame:
return pl.DataFrame([s.model_dump(mode="json") for s in validated])


@dg.asset(kinds={"API"})
@dg.asset(kinds={"API"}, name="extract_card_url_from_set_data")
def extract_card_url_from_set() -> list:
urls = [
"https://api.tcgdex.net/v2/en/sets/sv01",
"https://api.tcgdex.net/v2/en/sets/sv02",
"https://api.tcgdex.net/v2/en/sets/swsh3"
]

all_card_urls = [] # Initialize empty list to collect all URLs
Expand All @@ -113,7 +113,7 @@ def extract_card_url_from_set() -> list:
return all_card_urls


@dg.asset(deps=[extract_card_url_from_set], kinds={"API"})
@dg.asset(deps=[extract_card_url_from_set], kinds={"API"}, name="extract_card_info")
def extract_card_info() -> list:
card_url_list = extract_card_url_from_set()
cards_list = []
Expand All @@ -124,14 +124,15 @@ def extract_card_info() -> list:
r.raise_for_status()
data = r.json()
cards_list.append(data)
# print(f"Retrieved card: {data['id']} - {data.get('name', 'Unknown')}")
time.sleep(0.1)
except requests.RequestException as e:
print(f"Failed to fetch {url}: {e}")

return cards_list


@dg.asset(deps=[extract_card_info], kinds={"Polars"})
@dg.asset(deps=[extract_card_info], kinds={"Polars"}, name="create_card_dataframe")
def create_card_dataframe() -> pl.DataFrame:
cards_list = extract_card_info()

Expand Down
110 changes: 110 additions & 0 deletions card_data/pipelines/defs/extract/extract_pricing_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
from typing import Optional

import dagster as dg
import polars as pl
import requests
from pydantic import BaseModel, ValidationError
from termcolor import colored


SET_PRODUCT_MATCHING = {
"sv01": "22873",
"sv02": "23120",
}


class CardPricing(BaseModel):
product_id: int
name: str
card_number: str
market_price: Optional[float] = None


def is_card(item: dict) -> bool:
"""Check if item has a 'Number' field in extendedData"""
return any(
data_field.get("name") == "Number"
for data_field in item.get("extendedData", [])
)


def get_card_number(card: dict) -> Optional[str]:
"""Get the card number from extendedData"""
for data_field in card.get("extendedData", []):
if data_field.get("name") == "Number":
return data_field.get("value")
return None


def extract_card_name(full_name: str) -> str:
"""Extract clean card name, removing variant information after dash"""
return full_name.partition("-")[0].strip() if "-" in full_name else full_name


def pull_product_information(set_number: str) -> pl.DataFrame:
"""Pull product and pricing information for a given set number."""

print(colored(" →", "blue"), f"Processing set: {set_number}")

product_id = SET_PRODUCT_MATCHING[set_number]

# Fetch product data
products_url = (f"https://tcgcsv.com/tcgplayer/3/{product_id}/products")
products_data = requests.get(products_url, timeout=30).json()

# Fetch pricing data
prices_url = (f"https://tcgcsv.com/tcgplayer/3/{product_id}/prices")
prices_data = requests.get(prices_url, timeout=30).json()

price_dict = {
price["productId"]: price.get("marketPrice")
for price in prices_data.get("results", [])
}

cards_data = []
for card in products_data.get("results", []):
if not is_card(card):
continue

card_info = {
"product_id": card["productId"],
"name": extract_card_name(card["name"]),
"card_number": get_card_number(card),
"market_price": price_dict.get(card["productId"]),
}
cards_data.append(card_info)

# Pydantic validation
try:
validated: list[CardPricing] = [CardPricing(**card) for card in cards_data]
print(
colored(" ✓", "green"),
f"Pydantic validation passed for {len(validated)} cards.",
)
except ValidationError as e:
print(colored(" ✖", "red"), "Pydantic validation failed.")
print(e)
raise

df_data = [card.model_dump(mode="json") for card in validated]
return pl.DataFrame(df_data)


@dg.asset(kinds={"API", "Polars", "Pydantic"}, name="build_pricing_dataframe")
def build_dataframe() -> pl.DataFrame:
all_cards = []
for set_number in SET_PRODUCT_MATCHING.keys():
df = pull_product_information(set_number)

# Raise error if any DataFrame is empty
if df is None or df.shape[1] == 0 or df.is_empty():
error_msg = f"Empty DataFrame returned for set '{set_number}'. " \
f"Cannot proceed with drop+replace operation to avoid data loss."
print(colored(" ✖", "red"), error_msg)
raise ValueError(error_msg)

all_cards.append(df)

concatenated = pl.concat(all_cards)
print(concatenated)
return concatenated
41 changes: 33 additions & 8 deletions card_data/pipelines/defs/load/load_data.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import dagster as dg
from dagster import RetryPolicy, Backoff
from sqlalchemy.exc import OperationalError
from ..extract.extract_data import (
extract_series_data,
Expand All @@ -11,7 +12,12 @@
from pathlib import Path


@dg.asset(deps=[extract_series_data], kinds={"Supabase", "Postgres"})
@dg.asset(
deps=[extract_series_data],
kinds={"Supabase", "Postgres"},
name="load_series_data",
retry_policy=RetryPolicy(max_retries=3, delay=2, backoff=Backoff.EXPONENTIAL)
)
def load_series_data() -> None:
database_url: str = fetch_secret()
table_name: str = "staging.series"
Expand All @@ -23,12 +29,16 @@ def load_series_data() -> None:
)
print(colored(" ✓", "green"), f"Data loaded into {table_name}")
except OperationalError as e:
print(colored(" ✖", "red"), "Error:", e)
print(colored(" ✖", "red"), "Connection error in load_series_data():", e)
raise


@dg.asset(deps=[load_series_data], kinds={"Soda"}, key_prefix=["staging"], name="series")
@dg.asset(
deps=[load_series_data],
kinds={"Soda"},
name="quality_checks_series"
)
def data_quality_check_on_series() -> None:
# Set working directory to where this file is located
current_file_dir = Path(__file__).parent
print(f"Setting cwd to: {current_file_dir}")

Expand All @@ -53,8 +63,16 @@ def data_quality_check_on_series() -> None:
if result.stderr:
print(result.stderr)

if result.returncode != 0:
raise Exception(f"Soda data quality checks failed with return code {result.returncode}")


@dg.asset(deps=[extract_set_data], kinds={"Supabase", "Postgres"}, key_prefix=["staging"], name="sets")
@dg.asset(
deps=[extract_set_data],
kinds={"Supabase", "Postgres"},
name="load_set_data",
retry_policy=RetryPolicy(max_retries=3, delay=2, backoff=Backoff.EXPONENTIAL)
)
def load_set_data() -> None:
database_url: str = fetch_secret()
table_name: str = "staging.sets"
Expand All @@ -66,10 +84,16 @@ def load_set_data() -> None:
)
print(colored(" ✓", "green"), f"Data loaded into {table_name}")
except OperationalError as e:
print(colored(" ✖", "red"), "Error:", e)
print(colored(" ✖", "red"), "Connection error in load_set_data():", e)
raise


@dg.asset(deps=[create_card_dataframe], kinds={"Supabase", "Postgres"}, key_prefix=["staging"], name="cards")
@dg.asset(
deps=[create_card_dataframe],
kinds={"Supabase", "Postgres"},
name="load_card_data",
retry_policy=RetryPolicy(max_retries=3, delay=2, backoff=Backoff.EXPONENTIAL)
)
def load_card_data() -> None:
database_url: str = fetch_secret()
table_name: str = "staging.cards"
Expand All @@ -81,4 +105,5 @@ def load_card_data() -> None:
)
print(colored(" ✓", "green"), f"Data loaded into {table_name}")
except OperationalError as e:
print(colored(" ✖", "red"), "Error:", e)
print(colored(" ✖", "red"), "Connection error in load_card_data():", e)
raise
26 changes: 26 additions & 0 deletions card_data/pipelines/defs/load/load_pricing_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import dagster as dg
from dagster import RetryPolicy, Backoff
from sqlalchemy.exc import OperationalError
from ..extract.extract_pricing_data import build_dataframe
from ...utils.secret_retriever import fetch_secret
from termcolor import colored


@dg.asset(
deps=[build_dataframe],
kinds={"Supabase", "Postgres"},
retry_policy=RetryPolicy(max_retries=3, delay=2, backoff=Backoff.EXPONENTIAL),
)
def load_pricing_data() -> None:
database_url: str = fetch_secret()
table_name: str = "staging.pricing_data"

df = build_dataframe()
try:
df.write_database(
table_name=table_name, connection=database_url, if_table_exists="replace"
)
print(colored(" ✓", "green"), f"Data loaded into {table_name}")
except OperationalError as e:
print(colored(" ✖", "red"), "Connection error in load_pricing_data():", e)
raise
27 changes: 25 additions & 2 deletions card_data/pipelines/defs/transformation/transform_data.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,33 @@
import dagster as dg
from dagster_dbt import DbtCliResource, dbt_assets
from dagster_dbt import DbtCliResource, DagsterDbtTranslator, dbt_assets
from pathlib import Path

DBT_PROJECT_PATH = Path(__file__).joinpath("..", "..", "..", "poke_cli_dbt").resolve()

@dbt_assets(manifest=DBT_PROJECT_PATH / "target" / "manifest.json")
class CustomDbtTranslator(DagsterDbtTranslator):
def get_asset_key(self, dbt_resource_props):

resource_type = dbt_resource_props["resource_type"]
name = dbt_resource_props["name"]

if resource_type == "source":
# Map staging sources to load assets
source_mapping = {
"series": "quality_checks_series",
"sets": "load_set_data",
"cards": "load_card_data",
"pricing_data": "load_pricing_data",
}
if name in source_mapping:
return dg.AssetKey([source_mapping[name]])

# For models, use default behavior
return super().get_asset_key(dbt_resource_props)

@dbt_assets(
manifest=DBT_PROJECT_PATH / "target" / "manifest.json",
dagster_dbt_translator=CustomDbtTranslator()
)
def poke_cli_dbt_assets(context: dg.AssetExecutionContext, dbt: DbtCliResource):
"""
dbt assets that transform staging data into final models.
Expand Down
33 changes: 22 additions & 11 deletions card_data/pipelines/poke_cli_dbt/macros/create_relationships.sql
Original file line number Diff line number Diff line change
@@ -1,16 +1,27 @@
{% macro create_relationships() %}
ALTER TABLE {{ target.schema }}.series ADD CONSTRAINT pk_series PRIMARY KEY (id);
ALTER TABLE {{ target.schema }}.sets ADD CONSTRAINT pk_sets PRIMARY KEY (set_id);
ALTER TABLE {{ target.schema }}.cards ADD CONSTRAINT pk_cards PRIMARY KEY (id);
{{ print("Dropping existing constraints...") }}

ALTER TABLE public.sets
ADD CONSTRAINT fk_sets_series
FOREIGN KEY (series_id)
REFERENCES public.series (id);
-- Drop existing constraints if they exist (in reverse dependency order)
{% do run_query("ALTER TABLE " ~ target.schema ~ ".cards DROP CONSTRAINT IF EXISTS fk_cards_sets") %}
{% do run_query("ALTER TABLE " ~ target.schema ~ ".sets DROP CONSTRAINT IF EXISTS fk_sets_series") %}
{% do run_query("ALTER TABLE " ~ target.schema ~ ".cards DROP CONSTRAINT IF EXISTS pk_cards") %}
{% do run_query("ALTER TABLE " ~ target.schema ~ ".sets DROP CONSTRAINT IF EXISTS pk_sets") %}
{% do run_query("ALTER TABLE " ~ target.schema ~ ".series DROP CONSTRAINT IF EXISTS pk_series") %}

ALTER TABLE public.cards
ADD CONSTRAINT fk_cards_sets
FOREIGN KEY (set_id)
REFERENCES public.sets (set_id);
{{ print("Adding primary keys...") }}

-- Add primary keys
{% do run_query("ALTER TABLE " ~ target.schema ~ ".series ADD CONSTRAINT pk_series PRIMARY KEY (id)") %}
{% do run_query("ALTER TABLE " ~ target.schema ~ ".sets ADD CONSTRAINT pk_sets PRIMARY KEY (set_id)") %}
{% do run_query("ALTER TABLE " ~ target.schema ~ ".cards ADD CONSTRAINT pk_cards PRIMARY KEY (id)") %}

{{ print("Adding foreign keys...") }}

-- Add foreign keys
{% do run_query("ALTER TABLE " ~ target.schema ~ ".sets ADD CONSTRAINT fk_sets_series FOREIGN KEY (series_id) REFERENCES " ~ target.schema ~ ".series (id)") %}
{% do run_query("ALTER TABLE " ~ target.schema ~ ".cards ADD CONSTRAINT fk_cards_sets FOREIGN KEY (set_id) REFERENCES " ~ target.schema ~ ".sets (set_id)") %}

{{ print("Relationships created successfully") }}

{% do return('') %}
{% endmacro %}
2 changes: 1 addition & 1 deletion card_data/pipelines/poke_cli_dbt/models/cards.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@
post_hook="{{ enable_rls() }}"
) }}

SELECT id, image, name, "localId", category, hp
SELECT id, set_id, image, name, "localId", category, hp, "set_cardCount_official", set_name
FROM {{ source('staging', 'cards') }}
7 changes: 7 additions & 0 deletions card_data/pipelines/poke_cli_dbt/models/pricing_data.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{{ config(
materialized='table',
post_hook="{{ enable_rls() }}"
) }}

SELECT product_id, name, card_number, market_price
FROM {{ source('staging', 'pricing_data') }}
14 changes: 13 additions & 1 deletion card_data/pipelines/poke_cli_dbt/models/sources.yml
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,16 @@ sources:
- name: attack_2_effect
description: "Second attack effect"
- name: attack_2_cost
description: "Second attack energy cost"
description: "Second attack energy cost"

- name: pricing_data
description: "Card pricing data"
columns:
- name: product_id
description: "Product ID"
- name: name
description: "Card name"
- name: card_number
description: "Card number"
- name: market_price
description: "Market price"
Loading
Loading