Skip to content

Commit 295c42f

Browse files
committed
Add another example for data quality profiling using v2 API
1 parent bd53212 commit 295c42f

File tree

2 files changed

+293
-0
lines changed

2 files changed

+293
-0
lines changed

README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,10 @@ spark.stop()
184184
$SPARK_HOME/sbin/stop-connect-server.sh
185185
```
186186

187+
### Full Example
188+
189+
For a comprehensive example covering data analysis, constraint verification, column profiling, and constraint suggestions, see [tutorials/data_quality_example_v2.py](tutorials/data_quality_example_v2.py).
190+
187191
---
188192

189193
## PyDeequ 2.0 API Reference
Lines changed: 289 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,289 @@
1+
#!/usr/bin/env python3
2+
"""
3+
Testing Data Quality at Scale with PyDeequ V2
4+
5+
This example demonstrates the PyDeequ 2.0 API using Spark Connect.
6+
It covers data analysis, constraint verification, column profiling,
7+
and constraint suggestions.
8+
9+
Prerequisites:
10+
1. Start the Spark Connect server with the Deequ plugin:
11+
12+
$SPARK_HOME/sbin/start-connect-server.sh \
13+
--packages org.apache.spark:spark-connect_2.12:3.5.0 \
14+
--jars /path/to/deequ_2.12-2.1.0b-spark-3.5.jar \
15+
--conf spark.connect.extensions.relation.classes=com.amazon.deequ.connect.DeequRelationPlugin
16+
17+
2. Run this script:
18+
SPARK_REMOTE=sc://localhost:15002 python data_quality_example_v2.py
19+
"""
20+
21+
import os
22+
from pyspark.sql import SparkSession, Row
23+
24+
# PyDeequ V2 imports
25+
from pydeequ.v2.analyzers import (
26+
Size,
27+
Completeness,
28+
Distinctness,
29+
Mean,
30+
Minimum,
31+
Maximum,
32+
StandardDeviation,
33+
Correlation,
34+
)
35+
from pydeequ.v2.checks import Check, CheckLevel
36+
from pydeequ.v2.verification import AnalysisRunner, VerificationSuite
37+
from pydeequ.v2.predicates import eq, gte, lte, between
38+
from pydeequ.v2.profiles import ColumnProfilerRunner
39+
from pydeequ.v2.suggestions import ConstraintSuggestionRunner, Rules
40+
41+
42+
def create_sample_data(spark: SparkSession):
43+
"""Create a sample product reviews dataset for demonstration."""
44+
data = [
45+
# Normal reviews
46+
Row(review_id="R001", customer_id="C100", product_id="P001",
47+
marketplace="US", star_rating=5, helpful_votes=10, total_votes=12,
48+
review_year=2023, product_title="Great Product", insight="Y"),
49+
Row(review_id="R002", customer_id="C101", product_id="P002",
50+
marketplace="US", star_rating=4, helpful_votes=8, total_votes=10,
51+
review_year=2023, product_title="Good Value", insight="Y"),
52+
Row(review_id="R003", customer_id="C102", product_id="P001",
53+
marketplace="UK", star_rating=5, helpful_votes=15, total_votes=18,
54+
review_year=2022, product_title="Great Product", insight="N"),
55+
Row(review_id="R004", customer_id="C103", product_id="P003",
56+
marketplace="DE", star_rating=3, helpful_votes=5, total_votes=8,
57+
review_year=2022, product_title="Decent Item", insight="Y"),
58+
Row(review_id="R005", customer_id="C104", product_id="P002",
59+
marketplace="FR", star_rating=4, helpful_votes=12, total_votes=15,
60+
review_year=2021, product_title="Good Value", insight="N"),
61+
Row(review_id="R006", customer_id="C105", product_id="P004",
62+
marketplace="JP", star_rating=5, helpful_votes=20, total_votes=22,
63+
review_year=2023, product_title="Excellent!", insight="Y"),
64+
Row(review_id="R007", customer_id="C106", product_id="P001",
65+
marketplace="US", star_rating=2, helpful_votes=3, total_votes=10,
66+
review_year=2020, product_title="Great Product", insight="N"),
67+
Row(review_id="R008", customer_id="C107", product_id="P005",
68+
marketplace="UK", star_rating=1, helpful_votes=25, total_votes=30,
69+
review_year=2021, product_title="Disappointing", insight="Y"),
70+
# Review with missing marketplace (data quality issue)
71+
Row(review_id="R009", customer_id="C108", product_id="P002",
72+
marketplace=None, star_rating=4, helpful_votes=7, total_votes=9,
73+
review_year=2023, product_title="Good Value", insight="Y"),
74+
# Duplicate review_id (data quality issue)
75+
Row(review_id="R001", customer_id="C109", product_id="P003",
76+
marketplace="US", star_rating=3, helpful_votes=4, total_votes=6,
77+
review_year=2022, product_title="Decent Item", insight="N"),
78+
]
79+
return spark.createDataFrame(data)
80+
81+
82+
def run_data_analysis(spark: SparkSession, df):
83+
"""
84+
Run data analysis using AnalysisRunner.
85+
86+
This demonstrates computing various metrics on the dataset:
87+
- Size: Total row count
88+
- Completeness: Ratio of non-null values
89+
- Distinctness: Ratio of distinct values
90+
- Mean, Min, Max: Statistical measures
91+
- Correlation: Relationship between columns
92+
"""
93+
print("\n" + "=" * 60)
94+
print("DATA ANALYSIS")
95+
print("=" * 60)
96+
97+
result = (AnalysisRunner(spark)
98+
.onData(df)
99+
.addAnalyzer(Size())
100+
.addAnalyzer(Completeness("review_id"))
101+
.addAnalyzer(Completeness("marketplace"))
102+
.addAnalyzer(Distinctness("review_id"))
103+
.addAnalyzer(Mean("star_rating"))
104+
.addAnalyzer(Minimum("star_rating"))
105+
.addAnalyzer(Maximum("star_rating"))
106+
.addAnalyzer(StandardDeviation("star_rating"))
107+
.addAnalyzer(Correlation("total_votes", "helpful_votes"))
108+
.run())
109+
110+
print("\nAnalysis Results:")
111+
result.show(truncate=False)
112+
113+
# Extract key insights
114+
rows = result.collect()
115+
metrics = {(r["name"], r["instance"]): r["value"] for r in rows}
116+
117+
print("\nKey Insights:")
118+
print(f" - Dataset contains {int(metrics.get(('Size', '*'), 0))} reviews")
119+
print(f" - review_id completeness: {metrics.get(('Completeness', 'review_id'), 0):.1%}")
120+
print(f" - marketplace completeness: {metrics.get(('Completeness', 'marketplace'), 0):.1%}")
121+
print(f" - review_id distinctness: {metrics.get(('Distinctness', 'review_id'), 0):.1%}")
122+
print(f" - Average star rating: {metrics.get(('Mean', 'star_rating'), 0):.2f}")
123+
print(f" - Star rating range: {metrics.get(('Minimum', 'star_rating'), 0):.0f} - {metrics.get(('Maximum', 'star_rating'), 0):.0f}")
124+
125+
return result
126+
127+
128+
def run_constraint_verification(spark: SparkSession, df):
129+
"""
130+
Run constraint verification using VerificationSuite.
131+
132+
This demonstrates defining and verifying data quality rules:
133+
- Size checks
134+
- Completeness checks
135+
- Uniqueness checks
136+
- Range checks (min/max)
137+
- Categorical value checks
138+
"""
139+
print("\n" + "=" * 60)
140+
print("CONSTRAINT VERIFICATION")
141+
print("=" * 60)
142+
143+
# Define checks using the V2 predicate API
144+
# Note: In V2, we use predicates like eq(), gte(), between() instead of lambdas
145+
check = (Check(CheckLevel.Warning, "Product Reviews Quality Check")
146+
# Size check: at least 5 reviews
147+
.hasSize(gte(5))
148+
# Completeness checks
149+
.isComplete("review_id")
150+
.isComplete("customer_id")
151+
.hasCompleteness("marketplace", gte(0.8)) # Allow some missing
152+
# Uniqueness check
153+
.isUnique("review_id")
154+
# Star rating range check
155+
.hasMin("star_rating", eq(1.0))
156+
.hasMax("star_rating", eq(5.0))
157+
.hasMean("star_rating", between(1.0, 5.0))
158+
# Year range check
159+
.hasMin("review_year", gte(2015))
160+
.hasMax("review_year", lte(2025))
161+
# Categorical check
162+
.isContainedIn("marketplace", ["US", "UK", "DE", "JP", "FR"])
163+
.isContainedIn("insight", ["Y", "N"])
164+
)
165+
166+
result = (VerificationSuite(spark)
167+
.onData(df)
168+
.addCheck(check)
169+
.run())
170+
171+
print("\nVerification Results:")
172+
result.show(truncate=False)
173+
174+
# Summarize results
175+
rows = result.collect()
176+
passed = sum(1 for r in rows if r["constraint_status"] == "Success")
177+
failed = sum(1 for r in rows if r["constraint_status"] == "Failure")
178+
179+
print(f"\nSummary: {passed} passed, {failed} failed out of {len(rows)} constraints")
180+
181+
if failed > 0:
182+
print("\nFailed Constraints:")
183+
for r in rows:
184+
if r["constraint_status"] == "Failure":
185+
print(f" - {r['constraint']}")
186+
if r["constraint_message"]:
187+
print(f" Message: {r['constraint_message']}")
188+
189+
return result
190+
191+
192+
def run_column_profiling(spark: SparkSession, df):
193+
"""
194+
Run column profiling using ColumnProfilerRunner.
195+
196+
This automatically computes statistics for each column:
197+
- Completeness
198+
- Approximate distinct values
199+
- Data type detection
200+
- Numeric statistics (mean, min, max, etc.)
201+
- Optional: KLL sketches for approximate quantiles
202+
"""
203+
print("\n" + "=" * 60)
204+
print("COLUMN PROFILING")
205+
print("=" * 60)
206+
207+
result = (ColumnProfilerRunner(spark)
208+
.onData(df)
209+
.withLowCardinalityHistogramThreshold(10) # Generate histograms for low-cardinality columns
210+
.run())
211+
212+
print("\nColumn Profiles:")
213+
# Show selected columns for readability
214+
result.select(
215+
"column", "completeness", "approx_distinct_values",
216+
"data_type", "mean", "minimum", "maximum"
217+
).show(truncate=False)
218+
219+
return result
220+
221+
222+
def run_constraint_suggestions(spark: SparkSession, df):
223+
"""
224+
Run automated constraint suggestion using ConstraintSuggestionRunner.
225+
226+
This analyzes the data and suggests appropriate constraints:
227+
- Completeness constraints for complete columns
228+
- Uniqueness constraints for unique columns
229+
- Categorical range constraints for low-cardinality columns
230+
- Non-negative constraints for numeric columns
231+
"""
232+
print("\n" + "=" * 60)
233+
print("CONSTRAINT SUGGESTIONS")
234+
print("=" * 60)
235+
236+
result = (ConstraintSuggestionRunner(spark)
237+
.onData(df)
238+
.addConstraintRules(Rules.DEFAULT)
239+
.run())
240+
241+
print("\nSuggested Constraints:")
242+
result.select(
243+
"column_name", "constraint_name", "description", "code_for_constraint"
244+
).show(truncate=False)
245+
246+
# Count suggestions per column
247+
rows = result.collect()
248+
print(f"\nTotal suggestions: {len(rows)}")
249+
250+
return result
251+
252+
253+
def main():
254+
# Get Spark Connect URL from environment
255+
spark_remote = os.environ.get("SPARK_REMOTE", "sc://localhost:15002")
256+
257+
print("PyDeequ V2 Data Quality Example")
258+
print(f"Connecting to: {spark_remote}")
259+
260+
# Create Spark Connect session
261+
spark = SparkSession.builder.remote(spark_remote).getOrCreate()
262+
263+
try:
264+
# Create sample data
265+
print("\nCreating sample product reviews dataset...")
266+
df = create_sample_data(spark)
267+
268+
print("\nDataset Schema:")
269+
df.printSchema()
270+
271+
print("\nSample Data:")
272+
df.show(truncate=False)
273+
274+
# Run all examples
275+
run_data_analysis(spark, df)
276+
run_constraint_verification(spark, df)
277+
run_column_profiling(spark, df)
278+
run_constraint_suggestions(spark, df)
279+
280+
print("\n" + "=" * 60)
281+
print("EXAMPLE COMPLETE")
282+
print("=" * 60)
283+
284+
finally:
285+
spark.stop()
286+
287+
288+
if __name__ == "__main__":
289+
main()

0 commit comments

Comments
 (0)