Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 65 additions & 1 deletion data/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,56 @@
vacant_properties,
)
from new_etl.database import to_postgis_with_schema

from new_etl.validation import (
CommunityGardensValidator,
KDEValidator,
LIViolationsValidator,
OwnerTypeValidator,
TreeCanopyValidator,
VacantValidator,
)
from new_etl.validation.access_process import AccessProcessValidator
from new_etl.validation.city_owned_properties import CityOwnedPropertiesValidator
from new_etl.validation.council_dists import CouncilDistrictsValidator
from new_etl.validation.nbhoods import NeighborhoodsValidator
from new_etl.validation.phs_properties import PHSPropertiesValidator
from new_etl.validation.ppr_properties import PPRPropertiesValidator
from new_etl.validation.rco_geoms import RCOGeomsValidator

# Map services to their validators
SERVICE_VALIDATORS = {
"community_gardens": CommunityGardensValidator(),
"drug_crime": KDEValidator().configure(
density_column="drug_crimes_density",
zscore_column="drug_crimes_density_zscore",
label_column="drug_crimes_density_label",
percentile_column="drug_crimes_density_percentile",
),
"gun_crime": KDEValidator().configure(
density_column="gun_crimes_density",
zscore_column="gun_crimes_density_zscore",
label_column="gun_crimes_density_label",
percentile_column="gun_crimes_density_percentile",
),
"li_complaints": KDEValidator().configure(
density_column="l_and_i_complaints_density",
zscore_column="l_and_i_complaints_density_zscore",
label_column="l_and_i_complaints_density_label",
percentile_column="l_and_i_complaints_density_percentile",
),
"li_violations": LIViolationsValidator(),
"owner_type": OwnerTypeValidator(),
"vacant": VacantValidator(),
"council_dists": CouncilDistrictsValidator(),
"nbhoods": NeighborhoodsValidator(),
"rco_geoms": RCOGeomsValidator(),
"city_owned_properties": CityOwnedPropertiesValidator(),
"phs_properties": PHSPropertiesValidator(),
"ppr_properties": PPRPropertiesValidator(),
"tree_canopy": TreeCanopyValidator(),
"access_process": AccessProcessValidator(),
# Add other service validators as they are created
}

try:
print("Starting ETL process.")
Expand Down Expand Up @@ -79,6 +128,21 @@
print(f"Running service: {service.__name__}")
dataset = service(dataset)

# Run validation if a validator exists for this service
if service.__name__ in SERVICE_VALIDATORS:
validator = SERVICE_VALIDATORS[service.__name__]
is_valid, errors = validator.validate(dataset.gdf)

if not is_valid:
error_message = (
f"Data validation failed for {service.__name__}:\n"
+ "\n".join(errors)
)
send_error_to_slack(error_message)
raise ValueError(error_message)

print(f"Validation passed for {service.__name__}")

print("Applying final dataset transformations.")
dataset = priority_level(dataset)
dataset = access_process(dataset)
Expand Down
36 changes: 6 additions & 30 deletions data/src/new_etl/data_utils/community_gardens.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,59 +31,35 @@ def community_gardens(primary_featurelayer: FeatureLayer) -> FeatureLayer:
Source:
https://services2.arcgis.com/qjOOiLCYeUtwT7x7/arcgis/rest/services/PHS_NGT_Supported_Current_view/FeatureServer/0/
"""
if "vacant" not in primary_featurelayer.gdf.columns:
raise ValueError("The 'vacant' column is missing in the primary feature layer.")

# Load community gardens
community_gardens = FeatureLayer(
name="Community Gardens", esri_rest_urls=COMMUNITY_GARDENS_TO_LOAD
)

# Ensure both layers are in the same CRS
if community_gardens.gdf.crs != USE_CRS:
print(
f"Transforming community gardens from {community_gardens.gdf.crs} to {USE_CRS}"
)
community_gardens.gdf = community_gardens.gdf.to_crs(USE_CRS)

# Identify problematic gardens
geom_types = community_gardens.gdf.geometry.geom_type.value_counts()

if len(geom_types) > 1:
# Convert any non-point geometries to points using centroid
community_gardens.gdf.loc[
community_gardens.gdf.geometry.geom_type != "Point", "geometry"
] = community_gardens.gdf[
community_gardens.gdf.geometry.geom_type != "Point"
].geometry.centroid

# Verify all geometries are now points
if not all(community_gardens.gdf.geometry.geom_type == "Point"):
raise ValueError("Failed to convert all geometries to points")
# Convert any non-point geometries to points using centroid
community_gardens.gdf.loc[
community_gardens.gdf.geometry.geom_type != "Point", "geometry"
] = community_gardens.gdf[
community_gardens.gdf.geometry.geom_type != "Point"
].geometry.centroid

# Limit the community gardens data to relevant columns
community_gardens.gdf = community_gardens.gdf[["site_name", "geometry"]]

print(f"\nTotal community gardens: {len(community_gardens.gdf)}")

# Use 'contains' predicate since we want the parcel that contains each point
joined_gdf = primary_featurelayer.gdf.sjoin(
community_gardens.gdf, predicate="contains", how="inner"
)

# Get unique parcels that contain garden points
garden_parcels = set(joined_gdf["opa_id"])
print(f"\nUnique parcels containing gardens: {len(garden_parcels)}")

if len(garden_parcels) > len(community_gardens.gdf):
print(
"\nWARNING: More matching parcels than gardens. This suggests possible data issues."
)

# Update vacant status for parcels containing gardens
mask = primary_featurelayer.gdf["opa_id"].isin(garden_parcels)
primary_featurelayer.gdf.loc[mask, "vacant"] = False

print(f"\nTotal parcels updated: {mask.sum()}")

return primary_featurelayer
31 changes: 31 additions & 0 deletions data/src/new_etl/validation/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from .access_process import AccessProcessValidator
from .base import ServiceValidator
from .city_owned_properties import CityOwnedPropertiesValidator
from .community_gardens import CommunityGardensValidator
from .council_dists import CouncilDistrictsValidator
from .kde import KDEValidator
from .li_violations import LIViolationsValidator
from .nbhoods import NeighborhoodsValidator
from .owner_type import OwnerTypeValidator
from .phs_properties import PHSPropertiesValidator
from .ppr_properties import PPRPropertiesValidator
from .rco_geoms import RCOGeomsValidator
from .tree_canopy import TreeCanopyValidator
from .vacant_properties import VacantValidator

__all__ = [
"AccessProcessValidator",
"ServiceValidator",
"CityOwnedPropertiesValidator",
"CommunityGardensValidator",
"CouncilDistrictsValidator",
"KDEValidator",
"LIViolationsValidator",
"NeighborhoodsValidator",
"OwnerTypeValidator",
"PHSPropertiesValidator",
"PPRPropertiesValidator",
"RCOGeomsValidator",
"TreeCanopyValidator",
"VacantValidator",
]
76 changes: 76 additions & 0 deletions data/src/new_etl/validation/access_process.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
from typing import List, Tuple

import geopandas as gpd

from .base import ServiceValidator


class AccessProcessValidator(ServiceValidator):
"""Validator for access process service."""

def validate(self, data: gpd.GeoDataFrame) -> Tuple[bool, List[str]]:
"""
Validate access process data.

Critical checks:
- Required fields present (opa_id, access_process)
- No duplicate opa_ids
- Valid geometries
- Valid access process values

Returns:
Tuple of (is_valid, list of error messages)
"""
errors = []

# Check required columns
errors.extend(self.check_required_columns(data, ["opa_id", "access_process"]))

# Check for duplicate opa_ids
errors.extend(self.check_duplicates(data, "opa_id"))

# Check data types
if "opa_id" in data.columns and not data["opa_id"].dtype == "object":
errors.append("opa_id must be string type")
if (
"access_process" in data.columns
and not data["access_process"].dtype == "object"
):
errors.append("access_process must be string type")

# Check null values in critical fields
errors.extend(
self.check_null_percentage(data, "opa_id", threshold=0.0)
) # No nulls allowed
errors.extend(
self.check_null_percentage(data, "access_process", threshold=0.0)
) # No nulls allowed

# Check geometry validity
if not data.geometry.is_valid.all():
errors.append("Found invalid geometries")

total_count = len(data)

# Check for valid access process values
valid_processes = {
"Go through Land Bank",
"Do Nothing",
"Private Land Use Agreement",
"Buy Property",
}
invalid_processes = set(data["access_process"].unique()) - valid_processes
if invalid_processes:
errors.append(
f"Found invalid access processes: {', '.join(invalid_processes)}"
)

# Log statistics about access processes
print("\nAccess Process Statistics:")
print(f"- Total properties: {total_count}")

for process in sorted(valid_processes):
count = len(data[data["access_process"] == process])
print(f"- {process}: {count} ({count / total_count:.1%})")

return len(errors) == 0, errors
115 changes: 115 additions & 0 deletions data/src/new_etl/validation/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import logging
from abc import ABC, abstractmethod
from typing import List, Optional, Tuple

import geopandas as gpd


class ServiceValidator(ABC):
"""Base class for service-specific data validation."""

def __init__(self):
self.logger = logging.getLogger(self.__class__.__name__)

@abstractmethod
def validate(self, data: gpd.GeoDataFrame) -> Tuple[bool, List[str]]:
"""
Validate the data after a service runs.

Args:
data: The GeoDataFrame to validate

Returns:
Tuple of (is_valid, list of error messages)
"""
pass

def _run_base_validation(self, data: gpd.GeoDataFrame) -> List[str]:
"""
Run base validation checks that should be performed for all services.
Currently checks for:
- Duplicate OPA IDs
- Duplicate geometries
- Invalid geometries

Args:
data: The GeoDataFrame to validate

Returns:
List of error messages
"""
errors = []

# Check for duplicate OPA IDs
if "opa_id" in data.columns:
duplicates = data[data["opa_id"].duplicated()]
if not duplicates.empty:
errors.append(f"Found {len(duplicates)} duplicate OPA IDs")

# Check for duplicate geometries
if "geometry" in data.columns:
duplicates = data[data["geometry"].duplicated()]
if not duplicates.empty:
errors.append(f"Found {len(duplicates)} duplicate geometries")

# Check for invalid geometries
if "geometry" in data.columns:
invalid_geoms = data[~data["geometry"].is_valid]
if not invalid_geoms.empty:
errors.append(f"Found {len(invalid_geoms)} invalid geometries")

return errors

def check_required_columns(
self, data: gpd.GeoDataFrame, required_columns: List[str]
) -> List[str]:
"""Check if all required columns are present."""
missing_columns = [col for col in required_columns if col not in data.columns]
if missing_columns:
return [f"Missing required columns: {', '.join(missing_columns)}"]
return []

def check_null_percentage(
self, data: gpd.GeoDataFrame, column: str, threshold: float = 0.1
) -> List[str]:
"""Check if null percentage in a column exceeds threshold."""
null_pct = data[column].isna().mean()
if null_pct > threshold:
return [
f"Column {column} has {null_pct:.1%} null values (threshold: {threshold:.1%})"
]
return []

def check_duplicates(self, data: gpd.GeoDataFrame, column: str) -> List[str]:
"""Check for duplicate values in a column."""
duplicates = data[data[column].duplicated()]
if not duplicates.empty:
return [f"Found {len(duplicates)} duplicate values in column {column}"]
return []

def check_count_threshold(
self, data: gpd.GeoDataFrame, min_count: int, max_count: Optional[int] = None
) -> List[str]:
"""
Check if row count is within expected range.
This is a utility method intended for use by validator subclasses.

Args:
data: The GeoDataFrame to check
min_count: Minimum number of rows required
max_count: Optional maximum number of rows allowed

Returns:
List of error messages if thresholds are exceeded
"""
count = len(data)
errors = []
if count < min_count:
errors.append(
f"Row count ({count}) is below minimum threshold ({min_count})"
)
if max_count and count > max_count:
errors.append(
f"Row count ({count}) exceeds maximum threshold ({max_count})"
)
return errors
Loading
Loading