Skip to content

Commit 1733a75

Browse files
authored
Merge pull request #1193 from CodeForPhilly/lebovits/add-data-validation
Add data validation framework with vacant properties validator
2 parents 9440f7a + 07a15c2 commit 1733a75

17 files changed

+1492
-31
lines changed

data/src/main.py

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,56 @@
4040
vacant_properties,
4141
)
4242
from new_etl.database import to_postgis_with_schema
43-
43+
from new_etl.validation import (
44+
CommunityGardensValidator,
45+
KDEValidator,
46+
LIViolationsValidator,
47+
OwnerTypeValidator,
48+
TreeCanopyValidator,
49+
VacantValidator,
50+
)
51+
from new_etl.validation.access_process import AccessProcessValidator
52+
from new_etl.validation.city_owned_properties import CityOwnedPropertiesValidator
53+
from new_etl.validation.council_dists import CouncilDistrictsValidator
54+
from new_etl.validation.nbhoods import NeighborhoodsValidator
55+
from new_etl.validation.phs_properties import PHSPropertiesValidator
56+
from new_etl.validation.ppr_properties import PPRPropertiesValidator
57+
from new_etl.validation.rco_geoms import RCOGeomsValidator
58+
59+
# Map services to their validators
60+
SERVICE_VALIDATORS = {
61+
"community_gardens": CommunityGardensValidator(),
62+
"drug_crime": KDEValidator().configure(
63+
density_column="drug_crimes_density",
64+
zscore_column="drug_crimes_density_zscore",
65+
label_column="drug_crimes_density_label",
66+
percentile_column="drug_crimes_density_percentile",
67+
),
68+
"gun_crime": KDEValidator().configure(
69+
density_column="gun_crimes_density",
70+
zscore_column="gun_crimes_density_zscore",
71+
label_column="gun_crimes_density_label",
72+
percentile_column="gun_crimes_density_percentile",
73+
),
74+
"li_complaints": KDEValidator().configure(
75+
density_column="l_and_i_complaints_density",
76+
zscore_column="l_and_i_complaints_density_zscore",
77+
label_column="l_and_i_complaints_density_label",
78+
percentile_column="l_and_i_complaints_density_percentile",
79+
),
80+
"li_violations": LIViolationsValidator(),
81+
"owner_type": OwnerTypeValidator(),
82+
"vacant": VacantValidator(),
83+
"council_dists": CouncilDistrictsValidator(),
84+
"nbhoods": NeighborhoodsValidator(),
85+
"rco_geoms": RCOGeomsValidator(),
86+
"city_owned_properties": CityOwnedPropertiesValidator(),
87+
"phs_properties": PHSPropertiesValidator(),
88+
"ppr_properties": PPRPropertiesValidator(),
89+
"tree_canopy": TreeCanopyValidator(),
90+
"access_process": AccessProcessValidator(),
91+
# Add other service validators as they are created
92+
}
4493

4594
try:
4695
print("Starting ETL process.")
@@ -79,6 +128,21 @@
79128
print(f"Running service: {service.__name__}")
80129
dataset = service(dataset)
81130

131+
# Run validation if a validator exists for this service
132+
if service.__name__ in SERVICE_VALIDATORS:
133+
validator = SERVICE_VALIDATORS[service.__name__]
134+
is_valid, errors = validator.validate(dataset.gdf)
135+
136+
if not is_valid:
137+
error_message = (
138+
f"Data validation failed for {service.__name__}:\n"
139+
+ "\n".join(errors)
140+
)
141+
send_error_to_slack(error_message)
142+
raise ValueError(error_message)
143+
144+
print(f"Validation passed for {service.__name__}")
145+
82146
print("Applying final dataset transformations.")
83147
dataset = priority_level(dataset)
84148
dataset = access_process(dataset)

data/src/new_etl/data_utils/community_gardens.py

Lines changed: 6 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -31,59 +31,35 @@ def community_gardens(primary_featurelayer: FeatureLayer) -> FeatureLayer:
3131
Source:
3232
https://services2.arcgis.com/qjOOiLCYeUtwT7x7/arcgis/rest/services/PHS_NGT_Supported_Current_view/FeatureServer/0/
3333
"""
34-
if "vacant" not in primary_featurelayer.gdf.columns:
35-
raise ValueError("The 'vacant' column is missing in the primary feature layer.")
36-
3734
# Load community gardens
3835
community_gardens = FeatureLayer(
3936
name="Community Gardens", esri_rest_urls=COMMUNITY_GARDENS_TO_LOAD
4037
)
4138

4239
# Ensure both layers are in the same CRS
4340
if community_gardens.gdf.crs != USE_CRS:
44-
print(
45-
f"Transforming community gardens from {community_gardens.gdf.crs} to {USE_CRS}"
46-
)
4741
community_gardens.gdf = community_gardens.gdf.to_crs(USE_CRS)
4842

49-
# Identify problematic gardens
50-
geom_types = community_gardens.gdf.geometry.geom_type.value_counts()
51-
52-
if len(geom_types) > 1:
53-
# Convert any non-point geometries to points using centroid
54-
community_gardens.gdf.loc[
55-
community_gardens.gdf.geometry.geom_type != "Point", "geometry"
56-
] = community_gardens.gdf[
57-
community_gardens.gdf.geometry.geom_type != "Point"
58-
].geometry.centroid
59-
60-
# Verify all geometries are now points
61-
if not all(community_gardens.gdf.geometry.geom_type == "Point"):
62-
raise ValueError("Failed to convert all geometries to points")
43+
# Convert any non-point geometries to points using centroid
44+
community_gardens.gdf.loc[
45+
community_gardens.gdf.geometry.geom_type != "Point", "geometry"
46+
] = community_gardens.gdf[
47+
community_gardens.gdf.geometry.geom_type != "Point"
48+
].geometry.centroid
6349

6450
# Limit the community gardens data to relevant columns
6551
community_gardens.gdf = community_gardens.gdf[["site_name", "geometry"]]
6652

67-
print(f"\nTotal community gardens: {len(community_gardens.gdf)}")
68-
6953
# Use 'contains' predicate since we want the parcel that contains each point
7054
joined_gdf = primary_featurelayer.gdf.sjoin(
7155
community_gardens.gdf, predicate="contains", how="inner"
7256
)
7357

7458
# Get unique parcels that contain garden points
7559
garden_parcels = set(joined_gdf["opa_id"])
76-
print(f"\nUnique parcels containing gardens: {len(garden_parcels)}")
77-
78-
if len(garden_parcels) > len(community_gardens.gdf):
79-
print(
80-
"\nWARNING: More matching parcels than gardens. This suggests possible data issues."
81-
)
8260

8361
# Update vacant status for parcels containing gardens
8462
mask = primary_featurelayer.gdf["opa_id"].isin(garden_parcels)
8563
primary_featurelayer.gdf.loc[mask, "vacant"] = False
8664

87-
print(f"\nTotal parcels updated: {mask.sum()}")
88-
8965
return primary_featurelayer
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
from .access_process import AccessProcessValidator
2+
from .base import ServiceValidator
3+
from .city_owned_properties import CityOwnedPropertiesValidator
4+
from .community_gardens import CommunityGardensValidator
5+
from .council_dists import CouncilDistrictsValidator
6+
from .kde import KDEValidator
7+
from .li_violations import LIViolationsValidator
8+
from .nbhoods import NeighborhoodsValidator
9+
from .owner_type import OwnerTypeValidator
10+
from .phs_properties import PHSPropertiesValidator
11+
from .ppr_properties import PPRPropertiesValidator
12+
from .rco_geoms import RCOGeomsValidator
13+
from .tree_canopy import TreeCanopyValidator
14+
from .vacant_properties import VacantValidator
15+
16+
__all__ = [
17+
"AccessProcessValidator",
18+
"ServiceValidator",
19+
"CityOwnedPropertiesValidator",
20+
"CommunityGardensValidator",
21+
"CouncilDistrictsValidator",
22+
"KDEValidator",
23+
"LIViolationsValidator",
24+
"NeighborhoodsValidator",
25+
"OwnerTypeValidator",
26+
"PHSPropertiesValidator",
27+
"PPRPropertiesValidator",
28+
"RCOGeomsValidator",
29+
"TreeCanopyValidator",
30+
"VacantValidator",
31+
]
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
from typing import List, Tuple
2+
3+
import geopandas as gpd
4+
5+
from .base import ServiceValidator
6+
7+
8+
class AccessProcessValidator(ServiceValidator):
9+
"""Validator for access process service."""
10+
11+
def validate(self, data: gpd.GeoDataFrame) -> Tuple[bool, List[str]]:
12+
"""
13+
Validate access process data.
14+
15+
Critical checks:
16+
- Required fields present (opa_id, access_process)
17+
- No duplicate opa_ids
18+
- Valid geometries
19+
- Valid access process values
20+
21+
Returns:
22+
Tuple of (is_valid, list of error messages)
23+
"""
24+
errors = []
25+
26+
# Check required columns
27+
errors.extend(self.check_required_columns(data, ["opa_id", "access_process"]))
28+
29+
# Check for duplicate opa_ids
30+
errors.extend(self.check_duplicates(data, "opa_id"))
31+
32+
# Check data types
33+
if "opa_id" in data.columns and not data["opa_id"].dtype == "object":
34+
errors.append("opa_id must be string type")
35+
if (
36+
"access_process" in data.columns
37+
and not data["access_process"].dtype == "object"
38+
):
39+
errors.append("access_process must be string type")
40+
41+
# Check null values in critical fields
42+
errors.extend(
43+
self.check_null_percentage(data, "opa_id", threshold=0.0)
44+
) # No nulls allowed
45+
errors.extend(
46+
self.check_null_percentage(data, "access_process", threshold=0.0)
47+
) # No nulls allowed
48+
49+
# Check geometry validity
50+
if not data.geometry.is_valid.all():
51+
errors.append("Found invalid geometries")
52+
53+
total_count = len(data)
54+
55+
# Check for valid access process values
56+
valid_processes = {
57+
"Go through Land Bank",
58+
"Do Nothing",
59+
"Private Land Use Agreement",
60+
"Buy Property",
61+
}
62+
invalid_processes = set(data["access_process"].unique()) - valid_processes
63+
if invalid_processes:
64+
errors.append(
65+
f"Found invalid access processes: {', '.join(invalid_processes)}"
66+
)
67+
68+
# Log statistics about access processes
69+
print("\nAccess Process Statistics:")
70+
print(f"- Total properties: {total_count}")
71+
72+
for process in sorted(valid_processes):
73+
count = len(data[data["access_process"] == process])
74+
print(f"- {process}: {count} ({count / total_count:.1%})")
75+
76+
return len(errors) == 0, errors
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
import logging
2+
from abc import ABC, abstractmethod
3+
from typing import List, Optional, Tuple
4+
5+
import geopandas as gpd
6+
7+
8+
class ServiceValidator(ABC):
9+
"""Base class for service-specific data validation."""
10+
11+
def __init__(self):
12+
self.logger = logging.getLogger(self.__class__.__name__)
13+
14+
@abstractmethod
15+
def validate(self, data: gpd.GeoDataFrame) -> Tuple[bool, List[str]]:
16+
"""
17+
Validate the data after a service runs.
18+
19+
Args:
20+
data: The GeoDataFrame to validate
21+
22+
Returns:
23+
Tuple of (is_valid, list of error messages)
24+
"""
25+
pass
26+
27+
def _run_base_validation(self, data: gpd.GeoDataFrame) -> List[str]:
28+
"""
29+
Run base validation checks that should be performed for all services.
30+
Currently checks for:
31+
- Duplicate OPA IDs
32+
- Duplicate geometries
33+
- Invalid geometries
34+
35+
Args:
36+
data: The GeoDataFrame to validate
37+
38+
Returns:
39+
List of error messages
40+
"""
41+
errors = []
42+
43+
# Check for duplicate OPA IDs
44+
if "opa_id" in data.columns:
45+
duplicates = data[data["opa_id"].duplicated()]
46+
if not duplicates.empty:
47+
errors.append(f"Found {len(duplicates)} duplicate OPA IDs")
48+
49+
# Check for duplicate geometries
50+
if "geometry" in data.columns:
51+
duplicates = data[data["geometry"].duplicated()]
52+
if not duplicates.empty:
53+
errors.append(f"Found {len(duplicates)} duplicate geometries")
54+
55+
# Check for invalid geometries
56+
if "geometry" in data.columns:
57+
invalid_geoms = data[~data["geometry"].is_valid]
58+
if not invalid_geoms.empty:
59+
errors.append(f"Found {len(invalid_geoms)} invalid geometries")
60+
61+
return errors
62+
63+
def check_required_columns(
64+
self, data: gpd.GeoDataFrame, required_columns: List[str]
65+
) -> List[str]:
66+
"""Check if all required columns are present."""
67+
missing_columns = [col for col in required_columns if col not in data.columns]
68+
if missing_columns:
69+
return [f"Missing required columns: {', '.join(missing_columns)}"]
70+
return []
71+
72+
def check_null_percentage(
73+
self, data: gpd.GeoDataFrame, column: str, threshold: float = 0.1
74+
) -> List[str]:
75+
"""Check if null percentage in a column exceeds threshold."""
76+
null_pct = data[column].isna().mean()
77+
if null_pct > threshold:
78+
return [
79+
f"Column {column} has {null_pct:.1%} null values (threshold: {threshold:.1%})"
80+
]
81+
return []
82+
83+
def check_duplicates(self, data: gpd.GeoDataFrame, column: str) -> List[str]:
84+
"""Check for duplicate values in a column."""
85+
duplicates = data[data[column].duplicated()]
86+
if not duplicates.empty:
87+
return [f"Found {len(duplicates)} duplicate values in column {column}"]
88+
return []
89+
90+
def check_count_threshold(
91+
self, data: gpd.GeoDataFrame, min_count: int, max_count: Optional[int] = None
92+
) -> List[str]:
93+
"""
94+
Check if row count is within expected range.
95+
This is a utility method intended for use by validator subclasses.
96+
97+
Args:
98+
data: The GeoDataFrame to check
99+
min_count: Minimum number of rows required
100+
max_count: Optional maximum number of rows allowed
101+
102+
Returns:
103+
List of error messages if thresholds are exceeded
104+
"""
105+
count = len(data)
106+
errors = []
107+
if count < min_count:
108+
errors.append(
109+
f"Row count ({count}) is below minimum threshold ({min_count})"
110+
)
111+
if max_count and count > max_count:
112+
errors.append(
113+
f"Row count ({count}) exceeds maximum threshold ({max_count})"
114+
)
115+
return errors

0 commit comments

Comments
 (0)