Skip to content

Commit b603977

Browse files
adding soda data quality checks to pricing data pipeline
1 parent 4e4ede4 commit b603977

File tree

4 files changed

+115
-4
lines changed

4 files changed

+115
-4
lines changed

card_data/pipelines/definitions.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import dagster as dg
66

77
from .defs.extract.extract_pricing_data import build_dataframe
8-
from .defs.load.load_pricing_data import load_pricing_data
8+
from .defs.load.load_pricing_data import load_pricing_data, data_quality_checks_on_pricing
99

1010

1111
@definitions
@@ -17,7 +17,7 @@ def defs():
1717
# Define the pricing pipeline job that materializes the assets and downstream dbt model
1818
pricing_pipeline_job = dg.define_asset_job(
1919
name="pricing_pipeline_job",
20-
selection=dg.AssetSelection.assets(build_dataframe, load_pricing_data).downstream(include_self=True),
20+
selection=dg.AssetSelection.assets(build_dataframe).downstream(include_self=True),
2121
)
2222

2323
price_schedule = dg.ScheduleDefinition(
@@ -28,7 +28,7 @@ def defs():
2828
)
2929

3030
defs_pricing = dg.Definitions(
31-
assets=[build_dataframe, load_pricing_data],
31+
assets=[build_dataframe, load_pricing_data, data_quality_checks_on_pricing],
3232
jobs=[pricing_pipeline_job],
3333
schedules=[price_schedule],
3434
)

card_data/pipelines/defs/load/load_pricing_data.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
import subprocess
2+
from pathlib import Path
3+
14
import dagster as dg
25
import polars as pl
36
from dagster import RetryPolicy, Backoff
@@ -23,3 +26,36 @@ def load_pricing_data(build_pricing_dataframe: pl.DataFrame) -> None:
2326
except OperationalError as e:
2427
print(colored(" ✖", "red"), "Connection error in load_pricing_data():", e)
2528
raise
29+
30+
31+
@dg.asset(
32+
deps=[load_pricing_data],
33+
kinds={"Soda"},
34+
name="data_quality_checks_on_pricing",
35+
)
36+
def data_quality_checks_on_pricing() -> None:
37+
current_file_dir = Path(__file__).parent
38+
print(f"Setting cwd to: {current_file_dir}")
39+
40+
result = subprocess.run(
41+
[
42+
"soda",
43+
"scan",
44+
"-d",
45+
"supabase",
46+
"-c",
47+
"../../soda/configuration.yml",
48+
"../../soda/checks_pricing.yml",
49+
],
50+
capture_output=True,
51+
text=True,
52+
cwd=current_file_dir,
53+
)
54+
55+
if result.stdout:
56+
print(result.stdout)
57+
if result.stderr:
58+
print(result.stderr)
59+
60+
if result.returncode != 0:
61+
raise Exception(f"Soda data quality checks failed with return code {result.returncode}")

card_data/pipelines/defs/transformation/transform_data.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ def get_asset_key(self, dbt_resource_props):
1616
"series": "quality_checks_series",
1717
"sets": "load_set_data",
1818
"cards": "load_card_data",
19-
"pricing_data": "load_pricing_data",
19+
"pricing_data": "data_quality_checks_on_pricing",
2020
}
2121
if name in source_mapping:
2222
return dg.AssetKey([source_mapping[name]])
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
checks for pricing_data:
2+
# Row count validation - currently have 4216 rows
3+
# Expect at least 4000 cards
4+
- row_count > 4000:
5+
name: Minimum row count check
6+
7+
# Warn if row count drops significantly
8+
- row_count > 4200:
9+
warn:
10+
when fail
11+
name: Row count sanity check (warn if below expected)
12+
13+
# Schema validation checks
14+
- schema:
15+
fail:
16+
when required column missing: [product_id, name, card_number, market_price]
17+
when wrong column type:
18+
product_id: bigint
19+
name: text
20+
card_number: text
21+
market_price: double precision
22+
23+
# Completeness checks - product_id, name, card_number should always be present
24+
- missing_count(product_id) = 0:
25+
name: Product ID completeness
26+
27+
- missing_count(name) = 0:
28+
name: Card name completeness
29+
30+
- missing_count(card_number) = 0:
31+
name: Card number completeness
32+
33+
# Data uniqueness checks
34+
- duplicate_count(product_id) = 0:
35+
name: Product ID uniqueness
36+
37+
# Data format validation
38+
# Card numbers should be alphanumeric with slashes (e.g., "013/198", "4", "005/086")
39+
- invalid_count(card_number) = 0:
40+
valid regex: '^[A-Za-z0-9/]+$'
41+
name: Card number format validation
42+
43+
# Card names should not be empty and should be reasonable length (<100 chars)
44+
- invalid_count(name) = 0:
45+
valid min length: 1
46+
valid max length: 100
47+
name: Card name length validation
48+
49+
# Data range validation
50+
# Product IDs should be positive 6-digit numbers (observed range: 475k-642k)
51+
- invalid_count(product_id) = 0:
52+
valid min: 100000
53+
valid max: 999999999
54+
name: Product ID range validation
55+
56+
# Market prices (when present) should be positive and reasonable
57+
# Current range: $0.02 to $1119.08
58+
- invalid_percent(market_price) < 1%:
59+
valid min: 0.01
60+
valid max: 10000
61+
name: Market price range validation ($0.01-$10,000)
62+
63+
# Statistical validation - average price should be reasonable
64+
# Current average is ~$6.01, allow range of $2-$20 for sanity
65+
- avg(market_price):
66+
warn:
67+
when < 2
68+
when > 20
69+
name: Average market price sanity check
70+
71+
# Anomaly detection - check for extreme outliers
72+
- max(market_price) < 5000:
73+
warn:
74+
when fail
75+
name: Maximum price outlier detection (warn if >$5000)

0 commit comments

Comments
 (0)