diff --git a/data/src/main.py b/data/src/main.py index 25176201..21c9d7d2 100644 --- a/data/src/main.py +++ b/data/src/main.py @@ -40,7 +40,56 @@ vacant_properties, ) from new_etl.database import to_postgis_with_schema - +from new_etl.validation import ( + CommunityGardensValidator, + KDEValidator, + LIViolationsValidator, + OwnerTypeValidator, + TreeCanopyValidator, + VacantValidator, +) +from new_etl.validation.access_process import AccessProcessValidator +from new_etl.validation.city_owned_properties import CityOwnedPropertiesValidator +from new_etl.validation.council_dists import CouncilDistrictsValidator +from new_etl.validation.nbhoods import NeighborhoodsValidator +from new_etl.validation.phs_properties import PHSPropertiesValidator +from new_etl.validation.ppr_properties import PPRPropertiesValidator +from new_etl.validation.rco_geoms import RCOGeomsValidator + +# Map services to their validators +SERVICE_VALIDATORS = { + "community_gardens": CommunityGardensValidator(), + "drug_crime": KDEValidator().configure( + density_column="drug_crimes_density", + zscore_column="drug_crimes_density_zscore", + label_column="drug_crimes_density_label", + percentile_column="drug_crimes_density_percentile", + ), + "gun_crime": KDEValidator().configure( + density_column="gun_crimes_density", + zscore_column="gun_crimes_density_zscore", + label_column="gun_crimes_density_label", + percentile_column="gun_crimes_density_percentile", + ), + "li_complaints": KDEValidator().configure( + density_column="l_and_i_complaints_density", + zscore_column="l_and_i_complaints_density_zscore", + label_column="l_and_i_complaints_density_label", + percentile_column="l_and_i_complaints_density_percentile", + ), + "li_violations": LIViolationsValidator(), + "owner_type": OwnerTypeValidator(), + "vacant": VacantValidator(), + "council_dists": CouncilDistrictsValidator(), + "nbhoods": NeighborhoodsValidator(), + "rco_geoms": RCOGeomsValidator(), + "city_owned_properties": CityOwnedPropertiesValidator(), + "phs_properties": PHSPropertiesValidator(), + "ppr_properties": PPRPropertiesValidator(), + "tree_canopy": TreeCanopyValidator(), + "access_process": AccessProcessValidator(), + # Add other service validators as they are created +} try: print("Starting ETL process.") @@ -79,6 +128,21 @@ print(f"Running service: {service.__name__}") dataset = service(dataset) + # Run validation if a validator exists for this service + if service.__name__ in SERVICE_VALIDATORS: + validator = SERVICE_VALIDATORS[service.__name__] + is_valid, errors = validator.validate(dataset.gdf) + + if not is_valid: + error_message = ( + f"Data validation failed for {service.__name__}:\n" + + "\n".join(errors) + ) + send_error_to_slack(error_message) + raise ValueError(error_message) + + print(f"Validation passed for {service.__name__}") + print("Applying final dataset transformations.") dataset = priority_level(dataset) dataset = access_process(dataset) diff --git a/data/src/new_etl/data_utils/community_gardens.py b/data/src/new_etl/data_utils/community_gardens.py index ba72d9b3..b133e194 100644 --- a/data/src/new_etl/data_utils/community_gardens.py +++ b/data/src/new_etl/data_utils/community_gardens.py @@ -31,9 +31,6 @@ def community_gardens(primary_featurelayer: FeatureLayer) -> FeatureLayer: Source: https://services2.arcgis.com/qjOOiLCYeUtwT7x7/arcgis/rest/services/PHS_NGT_Supported_Current_view/FeatureServer/0/ """ - if "vacant" not in primary_featurelayer.gdf.columns: - raise ValueError("The 'vacant' column is missing in the primary feature layer.") - # Load community gardens community_gardens = FeatureLayer( name="Community Gardens", esri_rest_urls=COMMUNITY_GARDENS_TO_LOAD @@ -41,31 +38,18 @@ def community_gardens(primary_featurelayer: FeatureLayer) -> FeatureLayer: # Ensure both layers are in the same CRS if community_gardens.gdf.crs != USE_CRS: - print( - f"Transforming community gardens from {community_gardens.gdf.crs} to {USE_CRS}" - ) community_gardens.gdf = community_gardens.gdf.to_crs(USE_CRS) - # Identify problematic gardens - geom_types = community_gardens.gdf.geometry.geom_type.value_counts() - - if len(geom_types) > 1: - # Convert any non-point geometries to points using centroid - community_gardens.gdf.loc[ - community_gardens.gdf.geometry.geom_type != "Point", "geometry" - ] = community_gardens.gdf[ - community_gardens.gdf.geometry.geom_type != "Point" - ].geometry.centroid - - # Verify all geometries are now points - if not all(community_gardens.gdf.geometry.geom_type == "Point"): - raise ValueError("Failed to convert all geometries to points") + # Convert any non-point geometries to points using centroid + community_gardens.gdf.loc[ + community_gardens.gdf.geometry.geom_type != "Point", "geometry" + ] = community_gardens.gdf[ + community_gardens.gdf.geometry.geom_type != "Point" + ].geometry.centroid # Limit the community gardens data to relevant columns community_gardens.gdf = community_gardens.gdf[["site_name", "geometry"]] - print(f"\nTotal community gardens: {len(community_gardens.gdf)}") - # Use 'contains' predicate since we want the parcel that contains each point joined_gdf = primary_featurelayer.gdf.sjoin( community_gardens.gdf, predicate="contains", how="inner" @@ -73,17 +57,9 @@ def community_gardens(primary_featurelayer: FeatureLayer) -> FeatureLayer: # Get unique parcels that contain garden points garden_parcels = set(joined_gdf["opa_id"]) - print(f"\nUnique parcels containing gardens: {len(garden_parcels)}") - - if len(garden_parcels) > len(community_gardens.gdf): - print( - "\nWARNING: More matching parcels than gardens. This suggests possible data issues." - ) # Update vacant status for parcels containing gardens mask = primary_featurelayer.gdf["opa_id"].isin(garden_parcels) primary_featurelayer.gdf.loc[mask, "vacant"] = False - print(f"\nTotal parcels updated: {mask.sum()}") - return primary_featurelayer diff --git a/data/src/new_etl/validation/__init__.py b/data/src/new_etl/validation/__init__.py new file mode 100644 index 00000000..bacee329 --- /dev/null +++ b/data/src/new_etl/validation/__init__.py @@ -0,0 +1,31 @@ +from .access_process import AccessProcessValidator +from .base import ServiceValidator +from .city_owned_properties import CityOwnedPropertiesValidator +from .community_gardens import CommunityGardensValidator +from .council_dists import CouncilDistrictsValidator +from .kde import KDEValidator +from .li_violations import LIViolationsValidator +from .nbhoods import NeighborhoodsValidator +from .owner_type import OwnerTypeValidator +from .phs_properties import PHSPropertiesValidator +from .ppr_properties import PPRPropertiesValidator +from .rco_geoms import RCOGeomsValidator +from .tree_canopy import TreeCanopyValidator +from .vacant_properties import VacantValidator + +__all__ = [ + "AccessProcessValidator", + "ServiceValidator", + "CityOwnedPropertiesValidator", + "CommunityGardensValidator", + "CouncilDistrictsValidator", + "KDEValidator", + "LIViolationsValidator", + "NeighborhoodsValidator", + "OwnerTypeValidator", + "PHSPropertiesValidator", + "PPRPropertiesValidator", + "RCOGeomsValidator", + "TreeCanopyValidator", + "VacantValidator", +] diff --git a/data/src/new_etl/validation/access_process.py b/data/src/new_etl/validation/access_process.py new file mode 100644 index 00000000..0f4eaafc --- /dev/null +++ b/data/src/new_etl/validation/access_process.py @@ -0,0 +1,76 @@ +from typing import List, Tuple + +import geopandas as gpd + +from .base import ServiceValidator + + +class AccessProcessValidator(ServiceValidator): + """Validator for access process service.""" + + def validate(self, data: gpd.GeoDataFrame) -> Tuple[bool, List[str]]: + """ + Validate access process data. + + Critical checks: + - Required fields present (opa_id, access_process) + - No duplicate opa_ids + - Valid geometries + - Valid access process values + + Returns: + Tuple of (is_valid, list of error messages) + """ + errors = [] + + # Check required columns + errors.extend(self.check_required_columns(data, ["opa_id", "access_process"])) + + # Check for duplicate opa_ids + errors.extend(self.check_duplicates(data, "opa_id")) + + # Check data types + if "opa_id" in data.columns and not data["opa_id"].dtype == "object": + errors.append("opa_id must be string type") + if ( + "access_process" in data.columns + and not data["access_process"].dtype == "object" + ): + errors.append("access_process must be string type") + + # Check null values in critical fields + errors.extend( + self.check_null_percentage(data, "opa_id", threshold=0.0) + ) # No nulls allowed + errors.extend( + self.check_null_percentage(data, "access_process", threshold=0.0) + ) # No nulls allowed + + # Check geometry validity + if not data.geometry.is_valid.all(): + errors.append("Found invalid geometries") + + total_count = len(data) + + # Check for valid access process values + valid_processes = { + "Go through Land Bank", + "Do Nothing", + "Private Land Use Agreement", + "Buy Property", + } + invalid_processes = set(data["access_process"].unique()) - valid_processes + if invalid_processes: + errors.append( + f"Found invalid access processes: {', '.join(invalid_processes)}" + ) + + # Log statistics about access processes + print("\nAccess Process Statistics:") + print(f"- Total properties: {total_count}") + + for process in sorted(valid_processes): + count = len(data[data["access_process"] == process]) + print(f"- {process}: {count} ({count / total_count:.1%})") + + return len(errors) == 0, errors diff --git a/data/src/new_etl/validation/base.py b/data/src/new_etl/validation/base.py new file mode 100644 index 00000000..1a5521eb --- /dev/null +++ b/data/src/new_etl/validation/base.py @@ -0,0 +1,115 @@ +import logging +from abc import ABC, abstractmethod +from typing import List, Optional, Tuple + +import geopandas as gpd + + +class ServiceValidator(ABC): + """Base class for service-specific data validation.""" + + def __init__(self): + self.logger = logging.getLogger(self.__class__.__name__) + + @abstractmethod + def validate(self, data: gpd.GeoDataFrame) -> Tuple[bool, List[str]]: + """ + Validate the data after a service runs. + + Args: + data: The GeoDataFrame to validate + + Returns: + Tuple of (is_valid, list of error messages) + """ + pass + + def _run_base_validation(self, data: gpd.GeoDataFrame) -> List[str]: + """ + Run base validation checks that should be performed for all services. + Currently checks for: + - Duplicate OPA IDs + - Duplicate geometries + - Invalid geometries + + Args: + data: The GeoDataFrame to validate + + Returns: + List of error messages + """ + errors = [] + + # Check for duplicate OPA IDs + if "opa_id" in data.columns: + duplicates = data[data["opa_id"].duplicated()] + if not duplicates.empty: + errors.append(f"Found {len(duplicates)} duplicate OPA IDs") + + # Check for duplicate geometries + if "geometry" in data.columns: + duplicates = data[data["geometry"].duplicated()] + if not duplicates.empty: + errors.append(f"Found {len(duplicates)} duplicate geometries") + + # Check for invalid geometries + if "geometry" in data.columns: + invalid_geoms = data[~data["geometry"].is_valid] + if not invalid_geoms.empty: + errors.append(f"Found {len(invalid_geoms)} invalid geometries") + + return errors + + def check_required_columns( + self, data: gpd.GeoDataFrame, required_columns: List[str] + ) -> List[str]: + """Check if all required columns are present.""" + missing_columns = [col for col in required_columns if col not in data.columns] + if missing_columns: + return [f"Missing required columns: {', '.join(missing_columns)}"] + return [] + + def check_null_percentage( + self, data: gpd.GeoDataFrame, column: str, threshold: float = 0.1 + ) -> List[str]: + """Check if null percentage in a column exceeds threshold.""" + null_pct = data[column].isna().mean() + if null_pct > threshold: + return [ + f"Column {column} has {null_pct:.1%} null values (threshold: {threshold:.1%})" + ] + return [] + + def check_duplicates(self, data: gpd.GeoDataFrame, column: str) -> List[str]: + """Check for duplicate values in a column.""" + duplicates = data[data[column].duplicated()] + if not duplicates.empty: + return [f"Found {len(duplicates)} duplicate values in column {column}"] + return [] + + def check_count_threshold( + self, data: gpd.GeoDataFrame, min_count: int, max_count: Optional[int] = None + ) -> List[str]: + """ + Check if row count is within expected range. + This is a utility method intended for use by validator subclasses. + + Args: + data: The GeoDataFrame to check + min_count: Minimum number of rows required + max_count: Optional maximum number of rows allowed + + Returns: + List of error messages if thresholds are exceeded + """ + count = len(data) + errors = [] + if count < min_count: + errors.append( + f"Row count ({count}) is below minimum threshold ({min_count})" + ) + if max_count and count > max_count: + errors.append( + f"Row count ({count}) exceeds maximum threshold ({max_count})" + ) + return errors diff --git a/data/src/new_etl/validation/city_owned_properties.py b/data/src/new_etl/validation/city_owned_properties.py new file mode 100644 index 00000000..dd3c03f3 --- /dev/null +++ b/data/src/new_etl/validation/city_owned_properties.py @@ -0,0 +1,108 @@ +from typing import List, Tuple + +import geopandas as gpd + +from .base import ServiceValidator + + +class CityOwnedPropertiesValidator(ServiceValidator): + """Validator for city-owned properties service.""" + + # Known valid city agencies + KNOWN_AGENCIES = { + "Land Bank (PHDC)", + "PRA", + "DPP", + "PHA", + "City of Philadelphia", + } + + def validate(self, data: gpd.GeoDataFrame) -> Tuple[bool, List[str]]: + """ + Validate city-owned properties data. + + Critical checks: + - Required fields present (city_owner_agency, side_yard_eligible) + - city_owner_agency is string or NA + - side_yard_eligible is "Yes" or "No" (no NAs) + - city_owner_agency values match known agencies + - Valid geometries + + Returns: + Tuple of (is_valid, list of error messages) + """ + errors = [] + + # Check required columns + errors.extend( + self.check_required_columns( + data, ["city_owner_agency", "side_yard_eligible"] + ) + ) + + # Check data types and values + if "city_owner_agency" in data.columns: + # Check type for non-null values + non_null_agencies = data[data["city_owner_agency"].notna()] + if ( + len(non_null_agencies) > 0 + and non_null_agencies["city_owner_agency"].dtype != "object" + ): + errors.append("city_owner_agency must be string type") + + # Check for unknown agency values + unknown_agencies = ( + set( + data[data["city_owner_agency"].notna()][ + "city_owner_agency" + ].unique() + ) + - self.KNOWN_AGENCIES + ) + if unknown_agencies: + errors.append( + f"Found unknown city_owner_agency values: {sorted(unknown_agencies)}" + ) + + # Check side_yard_eligible values + if "side_yard_eligible" in data.columns: + invalid_values = data[~data["side_yard_eligible"].isin(["Yes", "No"])][ + "side_yard_eligible" + ].unique() + if len(invalid_values) > 0: + errors.append( + f"side_yard_eligible must be 'Yes' or 'No', found: {sorted(invalid_values)}" + ) + + # Check geometry validity + if not data.geometry.is_valid.all(): + errors.append("Found invalid geometries") + + # Log statistics about city ownership and side yard eligibility + if all( + col in data.columns for col in ["city_owner_agency", "side_yard_eligible"] + ): + total_properties = len(data) + city_owned = len(data[data["city_owner_agency"].notna()]) + side_yard_eligible = len(data[data["side_yard_eligible"] == "Yes"]) + + print("\nCity Ownership Statistics:") + print(f"- Total properties: {total_properties}") + print( + f"- City-owned properties: {city_owned} ({city_owned / total_properties:.1%})" + ) + print( + f"- Side yard eligible: {side_yard_eligible} ({side_yard_eligible / total_properties:.1%})" + ) + + if city_owned > 0: + print("\nCity Owner Agency Distribution:") + agency_counts = ( + data[data["city_owner_agency"].notna()]["city_owner_agency"] + .value_counts() + .to_dict() + ) + for agency, count in agency_counts.items(): + print(f" - {agency}: {count} ({count / city_owned:.1%})") + + return len(errors) == 0, errors diff --git a/data/src/new_etl/validation/community_gardens.py b/data/src/new_etl/validation/community_gardens.py new file mode 100644 index 00000000..60308085 --- /dev/null +++ b/data/src/new_etl/validation/community_gardens.py @@ -0,0 +1,110 @@ +from typing import List, Tuple + +import geopandas as gpd +import pandas as pd + +from config.config import USE_CRS + +from ..classes.featurelayer import FeatureLayer +from ..classes.service_validator import ServiceValidator +from ..constants.services import COMMUNITY_GARDENS_TO_LOAD + + +class CommunityGardensValidator(ServiceValidator): + """Validator for community gardens data quality and processing.""" + + def validate(self, gdf: gpd.GeoDataFrame) -> Tuple[bool, List[str]]: + """ + Validate community gardens data and processing. + + Args: + gdf: GeoDataFrame containing the processed data + + Returns: + Tuple of (is_valid, list of error messages) + """ + errors = [] + + # Check required columns + required_cols = {"geometry", "vacant", "opa_id"} + missing_cols = required_cols - set(gdf.columns) + if missing_cols: + errors.append(f"Missing required columns: {missing_cols}") + + # Check data types + if "vacant" in gdf.columns and not pd.api.types.is_bool_dtype(gdf["vacant"]): + errors.append("'vacant' column must be boolean type") + + # Check for null geometries + null_geoms = gdf.geometry.isna().sum() + if null_geoms > 0: + errors.append(f"Found {null_geoms} null geometries") + + # Check for invalid geometries + invalid_geoms = ~gdf.geometry.is_valid + if invalid_geoms.any(): + errors.append(f"Found {invalid_geoms.sum()} invalid geometries") + + # Load and validate community gardens data + try: + community_gardens = FeatureLayer( + name="Community Gardens", esri_rest_urls=COMMUNITY_GARDENS_TO_LOAD + ) + + # Check CRS + if community_gardens.gdf.crs != USE_CRS: + errors.append( + f"Community gardens data has incorrect CRS: {community_gardens.gdf.crs}, expected {USE_CRS}" + ) + + # Check geometry types + geom_types = community_gardens.gdf.geometry.geom_type.value_counts() + if len(geom_types) > 1: + errors.append( + f"Community gardens data contains multiple geometry types: {geom_types.to_dict()}" + ) + + # Check for null geometries in community gardens + null_garden_geoms = community_gardens.gdf.geometry.isna().sum() + if null_garden_geoms > 0: + errors.append( + f"Found {null_garden_geoms} null geometries in community gardens data" + ) + + # Check for invalid geometries in community gardens + invalid_garden_geoms = ~community_gardens.gdf.geometry.is_valid + if invalid_garden_geoms.any(): + errors.append( + f"Found {invalid_garden_geoms.sum()} invalid geometries in community gardens data" + ) + + # Check total number of properties being masked + if "vacant" in gdf.columns: + masked_count = (~gdf["vacant"]).sum() + if masked_count > 5000: + errors.append( + f"Too many properties being masked ({masked_count} > 5000). This may indicate a data issue." + ) + + # Check if more parcels are being masked than there are gardens + if masked_count > len(community_gardens.gdf): + errors.append( + f"More parcels being masked ({masked_count}) than there are community gardens ({len(community_gardens.gdf)}). This may indicate a data issue." + ) + + # Log statistics + if "vacant" in gdf.columns: + total_props = len(gdf) + masked_props = (~gdf["vacant"]).sum() + print("\nCommunity Gardens Statistics:") + print(f"Total properties: {total_props}") + print(f"Properties masked as non-vacant: {masked_props}") + print(f"Percentage masked: {(masked_props / total_props) * 100:.2f}%") + print(f"Total community gardens: {len(community_gardens.gdf)}") + + except Exception as e: + errors.append( + f"Error loading or validating community gardens data: {str(e)}" + ) + + return len(errors) == 0, errors diff --git a/data/src/new_etl/validation/council_dists.py b/data/src/new_etl/validation/council_dists.py new file mode 100644 index 00000000..96902d53 --- /dev/null +++ b/data/src/new_etl/validation/council_dists.py @@ -0,0 +1,75 @@ +from typing import List, Tuple + +import geopandas as gpd +import pandas as pd + +from .base import ServiceValidator + + +class CouncilDistrictsValidator(ServiceValidator): + """Validator for council districts service.""" + + def validate(self, data: gpd.GeoDataFrame) -> Tuple[bool, List[str]]: + """ + Validate council districts data. + + Critical checks: + - Required fields present (district, geometry) + - District numbers are valid (1-10) as strings + - Valid geometries + - No duplicate districts + - All observations have a district + + Returns: + Tuple of (is_valid, list of error messages) + """ + errors = [] + + # Check required columns + errors.extend(self.check_required_columns(data, ["district", "geometry"])) + + # Check data types + if "district" in data.columns and data["district"].dtype != "object": + errors.append("district must be string type") + + # Check district number ranges + if "district" in data.columns: + try: + # Convert to numeric for range checking + districts = pd.to_numeric(data["district"]) + valid_districts = set( + range(1, 11) + ) # Philadelphia has 10 council districts + invalid_districts = set(districts.unique()) - valid_districts + if invalid_districts: + errors.append( + f"Found invalid district numbers: {sorted(invalid_districts)}" + ) + except ValueError: + errors.append( + "district values must be numeric strings between 1 and 10" + ) + + # Check for duplicate districts + errors.extend(self.check_duplicates(data, "district")) + + # Check null values in critical fields + errors.extend( + self.check_null_percentage(data, "district", threshold=0.0) + ) # No nulls allowed + + # Check geometry validity + if not data.geometry.is_valid.all(): + errors.append("Found invalid geometries") + + # Check record count (should be exactly 10 districts) + if len(data) != 10: + errors.append(f"Expected exactly 10 council districts, found {len(data)}") + + # Check that all observations have a district + if "district" in data.columns: + null_districts = data["district"].isnull().sum() + if null_districts > 0: + errors.append(f"Found {null_districts} observations without a district") + + return len(errors) == 0, errors diff --git a/data/src/new_etl/validation/kde.py b/data/src/new_etl/validation/kde.py new file mode 100644 index 00000000..c0a046dd --- /dev/null +++ b/data/src/new_etl/validation/kde.py @@ -0,0 +1,178 @@ +from typing import List, Tuple + +import geopandas as gpd + +from .base_validator import BaseValidator + + +class KDEValidator(BaseValidator): + """ + Validator for Kernel Density Estimation (KDE) calculations. + Ensures proper density calculations and data quality across all services that use KDE. + """ + + # Valid density labels + VALID_DENSITY_LABELS = {"Low", "Medium", "High"} + + def __init__(self): + """Initialize the validator with default column names.""" + self.density_column = None + self.zscore_column = None + self.label_column = None + self.percentile_column = None + + def configure( + self, + density_column: str, + zscore_column: str, + label_column: str, + percentile_column: str, + ) -> "KDEValidator": + """ + Configure the validator with the column names for a specific service. + + Args: + density_column (str): Name of the density column + zscore_column (str): Name of the z-score column + label_column (str): Name of the density label column + percentile_column (str): Name of the percentile column + + Returns: + KDEValidator: The configured validator instance + """ + self.density_column = density_column + self.zscore_column = zscore_column + self.label_column = label_column + self.percentile_column = percentile_column + return self + + def validate(self, gdf: gpd.GeoDataFrame) -> Tuple[bool, List[str]]: + """ + Validate the KDE calculations for a specific service. + + Args: + gdf (gpd.GeoDataFrame): The GeoDataFrame to validate + + Returns: + Tuple[bool, List[str]]: A tuple containing: + - bool: Whether the validation passed + - List[str]: List of error messages if validation failed + """ + if not all( + [ + self.density_column, + self.zscore_column, + self.label_column, + self.percentile_column, + ] + ): + return False, [ + "Validator not configured. Call configure() before validate()." + ] + + errors = [] + + # Check required columns + required_columns = [ + self.density_column, + self.zscore_column, + self.label_column, + self.percentile_column, + ] + missing_columns = [col for col in required_columns if col not in gdf.columns] + if missing_columns: + errors.append(f"Missing required columns: {', '.join(missing_columns)}") + + # Check density bounds (0 to 1) + if self.density_column in gdf.columns: + # Check for null values + null_density = gdf[gdf[self.density_column].isna()] + if not null_density.empty: + errors.append( + f"Found {len(null_density)} properties with null {self.density_column}" + ) + + # Check bounds + out_of_bounds = gdf[ + (gdf[self.density_column] < 0) | (gdf[self.density_column] > 1) + ] + if not out_of_bounds.empty: + errors.append( + f"Found {len(out_of_bounds)} properties with density values outside [0,1] range" + ) + + # Check z-score bounds (-10 to 10) + if self.zscore_column in gdf.columns: + # Check for null values + null_zscore = gdf[gdf[self.zscore_column].isna()] + if not null_zscore.empty: + errors.append( + f"Found {len(null_zscore)} properties with null {self.zscore_column}" + ) + + # Check bounds + out_of_bounds = gdf[ + (gdf[self.zscore_column] < -10) | (gdf[self.zscore_column] > 10) + ] + if not out_of_bounds.empty: + errors.append( + f"Found {len(out_of_bounds)} properties with z-score values outside [-10,10] range" + ) + + # Check density label + if self.label_column in gdf.columns: + # Check for null values + null_labels = gdf[gdf[self.label_column].isna()] + if not null_labels.empty: + errors.append( + f"Found {len(null_labels)} properties with null {self.label_column}" + ) + + # Check valid values + invalid_labels = gdf[ + ~gdf[self.label_column].isin(self.VALID_DENSITY_LABELS) + ] + if not invalid_labels.empty: + errors.append( + f"Found {len(invalid_labels)} properties with invalid density labels. Valid labels are: {', '.join(self.VALID_DENSITY_LABELS)}" + ) + + # Check percentile bounds (0 to 100) + if self.percentile_column in gdf.columns: + # Check for null values + null_percentile = gdf[gdf[self.percentile_column].isna()] + if not null_percentile.empty: + errors.append( + f"Found {len(null_percentile)} properties with null {self.percentile_column}" + ) + + # Check bounds + out_of_bounds = gdf[ + (gdf[self.percentile_column] < 0) | (gdf[self.percentile_column] > 100) + ] + if not out_of_bounds.empty: + errors.append( + f"Found {len(out_of_bounds)} properties with percentile values outside [0,100] range" + ) + + # Log statistics about the density calculations + if all(col in gdf.columns for col in [self.density_column, self.label_column]): + total_properties = len(gdf) + print(f"\n{self.density_column} Statistics:") + print(f"- Total properties: {total_properties}") + + # Density label distribution + for label in self.VALID_DENSITY_LABELS: + count = len(gdf[gdf[self.label_column] == label]) + percentage = (count / total_properties) * 100 + print(f"- {label} density: {count} ({percentage:.1f}%)") + + # Density value statistics + if self.density_column in gdf.columns: + print("\nDensity Value Statistics:") + print(f"- Mean: {gdf[self.density_column].mean():.3f}") + print(f"- Median: {gdf[self.density_column].median():.3f}") + print(f"- Min: {gdf[self.density_column].min():.3f}") + print(f"- Max: {gdf[self.density_column].max():.3f}") + + return len(errors) == 0, errors diff --git a/data/src/new_etl/validation/li_violations.py b/data/src/new_etl/validation/li_violations.py new file mode 100644 index 00000000..e83421a5 --- /dev/null +++ b/data/src/new_etl/validation/li_violations.py @@ -0,0 +1,175 @@ +from typing import List, Tuple + +import geopandas as gpd +import pandas as pd + +from .base_validator import BaseValidator + + +class LIViolationsValidator(BaseValidator): + """ + Validator for L&I violations data. + Ensures proper counting and categorization of violations. + """ + + # Keywords used to filter violations + VIOLATION_KEYWORDS = { + "dumping", + "blight", + "rubbish", + "weeds", + "graffiti", + "abandoned", + "sanitation", + "litter", + "vacant", + "trash", + "unsafe", + } + + def validate(self, gdf: gpd.GeoDataFrame) -> Tuple[bool, List[str]]: + """ + Validate the L&I violations data. + + Args: + gdf (gpd.GeoDataFrame): The GeoDataFrame to validate. + + Returns: + Tuple[bool, List[str]]: A tuple containing: + - bool: Whether the validation passed + - List[str]: List of error messages if validation failed + """ + errors = [] + + # Check required columns + required_columns = [ + "all_violations_past_year", + "open_violations_past_year", + "li_code_violations", + "opa_id", # Required for checking duplicates + ] + missing_columns = [col for col in required_columns if col not in gdf.columns] + if missing_columns: + errors.append(f"Missing required columns: {', '.join(missing_columns)}") + + # Check for duplicate OPA IDs + if "opa_id" in gdf.columns: + duplicate_opa_ids = gdf[gdf.duplicated(subset=["opa_id"], keep=False)] + if not duplicate_opa_ids.empty: + errors.append( + f"Found {len(duplicate_opa_ids)} duplicate OPA IDs in the violations data" + ) + # Log some examples of duplicates + example_duplicates = duplicate_opa_ids["opa_id"].head(5).tolist() + errors.append( + f"Example duplicate OPA IDs: {', '.join(map(str, example_duplicates))}" + ) + + if "all_violations_past_year" in gdf.columns: + # Check for null values + null_violations = gdf[gdf["all_violations_past_year"].isna()] + if not null_violations.empty: + errors.append( + f"Found {len(null_violations)} properties with null all_violations_past_year" + ) + + # Check for negative values + negative_violations = gdf[gdf["all_violations_past_year"] < 0] + if not negative_violations.empty: + errors.append( + f"Found {len(negative_violations)} properties with negative all_violations_past_year" + ) + + # Check for non-integer values + non_integer_violations = gdf[ + ~gdf["all_violations_past_year"].apply(lambda x: float(x).is_integer()) + ] + if not non_integer_violations.empty: + errors.append( + f"Found {len(non_integer_violations)} properties with non-integer all_violations_past_year" + ) + + if "open_violations_past_year" in gdf.columns: + # Check for null values + null_open = gdf[gdf["open_violations_past_year"].isna()] + if not null_open.empty: + errors.append( + f"Found {len(null_open)} properties with null open_violations_past_year" + ) + + # Check for negative values + negative_open = gdf[gdf["open_violations_past_year"] < 0] + if not negative_open.empty: + errors.append( + f"Found {len(negative_open)} properties with negative open_violations_past_year" + ) + + # Check for non-integer values + non_integer_open = gdf[ + ~gdf["open_violations_past_year"].apply(lambda x: float(x).is_integer()) + ] + if not non_integer_open.empty: + errors.append( + f"Found {len(non_integer_open)} properties with non-integer open_violations_past_year" + ) + + # Check that open violations don't exceed total violations + if all( + col in gdf.columns + for col in ["all_violations_past_year", "open_violations_past_year"] + ): + invalid_counts = gdf[ + gdf["open_violations_past_year"] > gdf["all_violations_past_year"] + ] + if not invalid_counts.empty: + errors.append( + f"Found {len(invalid_counts)} properties where open_violations_past_year exceeds all_violations_past_year" + ) + + # Check violation codes + if "li_code_violations" in gdf.columns: + # Check for null values + null_codes = gdf[gdf["li_code_violations"].isna()] + if not null_codes.empty: + errors.append( + f"Found {len(null_codes)} properties with null li_code_violations" + ) + + # Check that violation codes contain expected keywords + def check_violation_keywords(codes: str) -> bool: + if pd.isna(codes): + return True + codes_lower = codes.lower() + return any( + keyword in codes_lower for keyword in self.VIOLATION_KEYWORDS + ) + + invalid_codes = gdf[ + ~gdf["li_code_violations"].apply(check_violation_keywords) + ] + if not invalid_codes.empty: + errors.append( + f"Found {len(invalid_codes)} properties with violation codes not matching expected keywords" + ) + + # Log statistics about violations + if all( + col in gdf.columns + for col in ["all_violations_past_year", "open_violations_past_year"] + ): + total_properties = len(gdf) + properties_with_violations = len(gdf[gdf["all_violations_past_year"] > 0]) + properties_with_open_violations = len( + gdf[gdf["open_violations_past_year"] > 0] + ) + + print("\nL&I Violations Statistics:") + print(f"- Total properties: {total_properties}") + print( + f"- Properties with violations: {properties_with_violations} ({properties_with_violations / total_properties * 100:.1f}%)" + ) + print( + f"- Properties with open violations: {properties_with_open_violations} ({properties_with_open_violations / total_properties * 100:.1f}%)" + ) + + return len(errors) == 0, errors diff --git a/data/src/new_etl/validation/nbhoods.py b/data/src/new_etl/validation/nbhoods.py new file mode 100644 index 00000000..705cdbde --- /dev/null +++ b/data/src/new_etl/validation/nbhoods.py @@ -0,0 +1,63 @@ +from typing import List, Tuple + +import geopandas as gpd + +from .base import ServiceValidator + + +class NeighborhoodsValidator(ServiceValidator): + """Validator for neighborhoods service.""" + + def validate(self, data: gpd.GeoDataFrame) -> Tuple[bool, List[str]]: + """ + Validate neighborhoods data. + + Critical checks: + - Required fields present (nbhood, geometry) + - Neighborhood names are strings + - Valid geometries + - No duplicate neighborhoods + - All observations have a neighborhood + - Expected number of unique neighborhoods (~160) + + Returns: + Tuple of (is_valid, list of error messages) + """ + errors = [] + + # Check required columns + errors.extend(self.check_required_columns(data, ["nbhood", "geometry"])) + + # Check data types + if "nbhood" in data.columns and data["nbhood"].dtype != "object": + errors.append("nbhood must be string type") + + # Check for duplicate neighborhoods + errors.extend(self.check_duplicates(data, "nbhood")) + + # Check null values in critical fields + errors.extend( + self.check_null_percentage(data, "nbhood", threshold=0.0) + ) # No nulls allowed + + # Check geometry validity + if not data.geometry.is_valid.all(): + errors.append("Found invalid geometries") + + # Check that all observations have a neighborhood + if "nbhood" in data.columns: + null_nbhoods = data["nbhood"].isnull().sum() + if null_nbhoods > 0: + errors.append( + f"Found {null_nbhoods} observations without a neighborhood" + ) + + # Check number of unique neighborhoods + if "nbhood" in data.columns: + unique_nbhoods = data["nbhood"].nunique() + if unique_nbhoods < 100 or unique_nbhoods > 200: + errors.append( + f"Expected around 150 unique neighborhoods, found {unique_nbhoods}" + ) + + return len(errors) == 0, errors diff --git a/data/src/new_etl/validation/owner_type.py b/data/src/new_etl/validation/owner_type.py new file mode 100644 index 00000000..f6123177 --- /dev/null +++ b/data/src/new_etl/validation/owner_type.py @@ -0,0 +1,99 @@ +from typing import List, Tuple + +import geopandas as gpd + +from .base_validator import BaseValidator + + +class OwnerTypeValidator(BaseValidator): + """ + Validator for owner type categorization. + Ensures properties are correctly categorized as Public, Business (LLC), or Individual. + """ + + # Valid owner types + VALID_OWNER_TYPES = {"Public", "Business (LLC)", "Individual"} + + def validate(self, gdf: gpd.GeoDataFrame) -> Tuple[bool, List[str]]: + """ + Validate the owner type categorization. + + Args: + gdf (gpd.GeoDataFrame): The GeoDataFrame to validate. + + Returns: + Tuple[bool, List[str]]: A tuple containing: + - bool: Whether the validation passed + - List[str]: List of error messages if validation failed + """ + errors = [] + + # Check required columns + required_columns = ["owner_type", "owner_1", "owner_2", "city_owner_agency"] + missing_columns = [col for col in required_columns if col not in gdf.columns] + if missing_columns: + errors.append(f"Missing required columns: {', '.join(missing_columns)}") + + # Check that owner_type column exists and has valid values + if "owner_type" in gdf.columns: + # Check for null values in owner_type + null_owner_types = gdf["owner_type"].isna().sum() + if null_owner_types > 0: + errors.append( + f"Found {null_owner_types} properties with null owner_type" + ) + + # Check for invalid owner types + invalid_types = ( + set(gdf["owner_type"].dropna().unique()) - self.VALID_OWNER_TYPES + ) + if invalid_types: + errors.append(f"Found invalid owner types: {sorted(invalid_types)}") + + # Validate categorization logic + for owner_type in self.VALID_OWNER_TYPES: + subset = gdf[gdf["owner_type"] == owner_type] + + if owner_type == "Public": + # Public properties should have a non-null city_owner_agency + invalid_public = subset[subset["city_owner_agency"].isna()] + if not invalid_public.empty: + errors.append( + f"Found {len(invalid_public)} properties marked as Public with null city_owner_agency" + ) + + elif owner_type == "Business (LLC)": + # Business (LLC) properties should have "LLC" in owner_1 or owner_2 + invalid_business = subset[ + ~subset["owner_1"].str.lower().str.contains(" llc", na=False) + & ~subset["owner_2"].str.lower().str.contains(" llc", na=False) + ] + if not invalid_business.empty: + errors.append( + f"Found {len(invalid_business)} properties marked as Business (LLC) without 'LLC' in owner names" + ) + + elif owner_type == "Individual": + # Individual properties should not have a city_owner_agency and should not have "LLC" in owner names + invalid_individual = subset[ + subset["city_owner_agency"].notna() + | subset["owner_1"].str.lower().str.contains(" llc", na=False) + | subset["owner_2"].str.lower().str.contains(" llc", na=False) + ] + if not invalid_individual.empty: + errors.append( + f"Found {len(invalid_individual)} properties marked as Individual that should be Public or Business (LLC)" + ) + + # Log statistics about owner types + if "owner_type" in gdf.columns: + total_properties = len(gdf) + print("\nOwner Type Statistics:") + print(f"- Total properties: {total_properties}") + + for owner_type in self.VALID_OWNER_TYPES: + count = len(gdf[gdf["owner_type"] == owner_type]) + percentage = (count / total_properties) * 100 + print(f"- {owner_type}: {count} ({percentage:.1f}%)") + + return len(errors) == 0, errors diff --git a/data/src/new_etl/validation/phs_properties.py b/data/src/new_etl/validation/phs_properties.py new file mode 100644 index 00000000..8584ff3a --- /dev/null +++ b/data/src/new_etl/validation/phs_properties.py @@ -0,0 +1,89 @@ +from typing import List, Tuple + +import geopandas as gpd + +from .base import ServiceValidator + + +class PHSPropertiesValidator(ServiceValidator): + """Validator for PHS properties service.""" + + MAX_MATCHES = 30000 # Maximum reasonable number of PHS program matches + + def validate(self, data: gpd.GeoDataFrame) -> Tuple[bool, List[str]]: + """ + Validate PHS properties data. + + Critical checks: + - Required fields present (phs_care_program) + - phs_care_program is string type + - Total matches is below threshold + - No null geometries + - Valid geometries + - No duplicate properties + + Returns: + Tuple of (is_valid, list of error messages) + """ + errors = [] + + # Check required columns + required_columns = ["phs_care_program", "geometry"] + errors.extend(self.check_required_columns(data, required_columns)) + + # Check data types and values + if "phs_care_program" in data.columns: + # Check type + if data["phs_care_program"].dtype != "object": + errors.append("phs_care_program must be string type") + + # Check values + invalid_values = data[~data["phs_care_program"].isin(["Yes", "No"])][ + "phs_care_program" + ].unique() + if len(invalid_values) > 0: + errors.append( + f"phs_care_program must be 'Yes' or 'No', found: {sorted(invalid_values)}" + ) + + # Get PHS properties subset + phs_properties = data[data["phs_care_program"] == "Yes"] + total_matches = len(phs_properties) + + # Check total matches + if total_matches > self.MAX_MATCHES: + errors.append( + f"Found {total_matches} PHS program matches, which exceeds the maximum of {self.MAX_MATCHES}" + ) + + # Check for null geometries + null_geoms = phs_properties.geometry.isnull().sum() + if null_geoms > 0: + errors.append(f"Found {null_geoms} PHS properties with null geometries") + + # Check for duplicate geometries + if len(phs_properties) > 0: + # Convert geometries to WKT for comparison + wkt_geoms = phs_properties.geometry.apply( + lambda x: x.wkt if x else None + ) + duplicate_geoms = wkt_geoms.value_counts() + duplicates = duplicate_geoms[duplicate_geoms > 1] + if len(duplicates) > 0: + errors.append( + f"Found {len(duplicates)} duplicate geometries in PHS properties" + ) + + # Log statistics + print("\nPHS Properties Statistics:") + print(f"- Total properties: {len(data)}") + print( + f"- Properties in PHS program: {total_matches} ({total_matches / len(data):.1%})" + ) + + # Check geometry validity + if not data.geometry.is_valid.all(): + invalid_count = (~data.geometry.is_valid).sum() + errors.append(f"Found {invalid_count} invalid geometries") + + return len(errors) == 0, errors diff --git a/data/src/new_etl/validation/ppr_properties.py b/data/src/new_etl/validation/ppr_properties.py new file mode 100644 index 00000000..9be69b4f --- /dev/null +++ b/data/src/new_etl/validation/ppr_properties.py @@ -0,0 +1,72 @@ +from typing import List, Tuple + +import geopandas as gpd +import pandas as pd + +from .base_validator import BaseValidator + + +class PPRPropertiesValidator(BaseValidator): + """ + Validator for PPR (Philadelphia Parks & Recreation) properties. + Ensures data quality and proper masking of park properties. + """ + + def validate(self, gdf: gpd.GeoDataFrame) -> Tuple[bool, List[str]]: + """ + Validate the PPR properties data and their impact on the primary feature layer. + + Args: + gdf (gpd.GeoDataFrame): The GeoDataFrame to validate. + + Returns: + Tuple[bool, List[str]]: A tuple containing: + - bool: Whether the validation passed + - List[str]: List of error messages if validation failed + """ + errors = [] + + # Check required columns + required_columns = ["geometry", "vacant", "public_name"] + missing_columns = [col for col in required_columns if col not in gdf.columns] + if missing_columns: + errors.append(f"Missing required columns: {', '.join(missing_columns)}") + + # Check that 'vacant' column is boolean + if "vacant" in gdf.columns and not pd.api.types.is_bool_dtype(gdf["vacant"]): + errors.append("'vacant' column must be of boolean type") + + # Check for null geometries + null_geoms = gdf["geometry"].isna().sum() + if null_geoms > 0: + errors.append(f"Found {null_geoms} null geometries") + + # Check for invalid geometries + invalid_geoms = ~gdf["geometry"].is_valid + if invalid_geoms.any(): + errors.append(f"Found {invalid_geoms.sum()} invalid geometries") + + # Check number of properties being masked + if "public_name" in gdf.columns: + mask = gdf["public_name"].notnull() + count_masked = mask.sum() + if count_masked < 400: + errors.append( + f"Too few PPR properties being masked: {count_masked} (expected: 400-600)" + ) + elif count_masked > 600: + errors.append( + f"Too many PPR properties being masked: {count_masked} (expected: 400-600)" + ) + + # Log statistics about masking + total_properties = len(gdf) + percent_masked = (count_masked / total_properties) * 100 + print("PPR properties masking statistics:") + print(f"- Total properties: {total_properties}") + print(f"- Properties being masked: {count_masked}") + print(f"- Percentage masked: {percent_masked:.2f}%") + if count_masked < 400 or count_masked > 600: + print(f"WARNING: Expected 400-600 PPR properties, found {count_masked}") + + return len(errors) == 0, errors diff --git a/data/src/new_etl/validation/rco_geoms.py b/data/src/new_etl/validation/rco_geoms.py new file mode 100644 index 00000000..9a11d9e1 --- /dev/null +++ b/data/src/new_etl/validation/rco_geoms.py @@ -0,0 +1,83 @@ +from typing import List, Tuple + +import geopandas as gpd + +from .base import ServiceValidator + + +class RCOGeomsValidator(ServiceValidator): + """Validator for RCO geoms service.""" + + def validate(self, data: gpd.GeoDataFrame) -> Tuple[bool, List[str]]: + """ + Validate RCO geoms data. + + Critical checks: + - Required fields present (rco_info, rco_names, geometry) + - RCO fields are strings + - Valid geometries + - RCO info format is correct (semicolon-separated fields) + - RCO names format is correct (pipe-separated when multiple) + + Returns: + Tuple of (is_valid, list of error messages) + """ + errors = [] + + # Check required columns + errors.extend( + self.check_required_columns(data, ["rco_info", "rco_names", "geometry"]) + ) + + # Check data types + if "rco_info" in data.columns and data["rco_info"].dtype != "object": + errors.append("rco_info must be string type") + if "rco_names" in data.columns and data["rco_names"].dtype != "object": + errors.append("rco_names must be string type") + + # Check geometry validity + if not data.geometry.is_valid.all(): + errors.append("Found invalid geometries") + + # Check RCO info format + if "rco_info" in data.columns: + # Check that non-empty rco_info contains expected fields + non_empty_info = data[data["rco_info"].notna() & (data["rco_info"] != "")] + if len(non_empty_info) > 0: + sample_info = non_empty_info["rco_info"].iloc[0] + if ";" not in sample_info: + errors.append("rco_info should contain semicolon-separated fields") + + # Check RCO names format + if "rco_names" in data.columns: + # Check that non-empty rco_names contains pipe separator when multiple + non_empty_names = data[ + data["rco_names"].notna() & (data["rco_names"] != "") + ] + if len(non_empty_names) > 0: + sample_names = non_empty_names["rco_names"].iloc[0] + if "|" not in sample_names and "," in sample_names: + errors.append( + "rco_names should use pipe (|) as separator for multiple RCOs" + ) + + # Log statistics about RCO coverage + if "rco_names" in data.columns: + total_properties = len(data) + properties_with_rco = len( + data[data["rco_names"].notna() & (data["rco_names"] != "")] + ) + properties_with_multiple_rcos = len( + data[data["rco_names"].str.contains("|", na=False)] + ) + + print("RCO Coverage Statistics:") + print(f"- Total properties: {total_properties}") + print( + f"- Properties with RCO: {properties_with_rco} ({properties_with_rco / total_properties:.1%})" + ) + print( + f"- Properties with multiple RCOs: {properties_with_multiple_rcos} ({properties_with_multiple_rcos / total_properties:.1%})" + ) + + return len(errors) == 0, errors diff --git a/data/src/new_etl/validation/tree_canopy.py b/data/src/new_etl/validation/tree_canopy.py new file mode 100644 index 00000000..1e5ed8c9 --- /dev/null +++ b/data/src/new_etl/validation/tree_canopy.py @@ -0,0 +1,76 @@ +from typing import Tuple + +import pandas as pd + +from .base import BaseValidator + + +class TreeCanopyValidator(BaseValidator): + """ + Validator for tree canopy data. + + This validator ensures that: + 1. The required 'tree_canopy_gap' column exists + 2. The tree_canopy_gap values are numeric and within expected range (0 to 1) + 3. The geometry column is valid + """ + + def validate(self, data: pd.DataFrame) -> Tuple[bool, list[str]]: + """ + Validate the tree canopy data. + + Args: + data (pd.DataFrame): The DataFrame containing tree canopy data. + + Returns: + Tuple[bool, list[str]]: A tuple containing: + - bool: True if validation passes, False otherwise + - list[str]: List of error messages if validation fails + """ + errors = [] + + # Check for required column + if "tree_canopy_gap" not in data.columns: + errors.append("Missing required column: tree_canopy_gap") + return False, errors + + # Check data type of tree_canopy_gap + if not pd.api.types.is_numeric_dtype(data["tree_canopy_gap"]): + errors.append("tree_canopy_gap must be numeric") + return False, errors + + # Check value range (tree canopy gap should be between 0 and 1) + if (data["tree_canopy_gap"] < 0).any() or (data["tree_canopy_gap"] > 1).any(): + errors.append("tree_canopy_gap values must be between 0 and 1") + return False, errors + + # Check for missing values + missing_values = data["tree_canopy_gap"].isna().sum() + if missing_values > 0: + errors.append( + f"Found {missing_values} missing values in tree_canopy_gap column" + ) + + # Check geometry validity + if not data.geometry.is_valid.all(): + errors.append("Found invalid geometries") + + # Log statistics about tree canopy gaps + total_properties = len(data) + high_gap = len( + data[data["tree_canopy_gap"] >= 0.3] + ) # Using 0.3 as threshold for "very low tree canopy" + medium_gap = len( + data[(data["tree_canopy_gap"] >= 0.1) & (data["tree_canopy_gap"] < 0.3)] + ) + low_gap = len(data[data["tree_canopy_gap"] < 0.1]) + + print("\nTree Canopy Gap Statistics:") + print(f"- Total properties: {total_properties}") + print(f"- High gap (≥0.3): {high_gap} ({high_gap / total_properties:.1%})") + print( + f"- Medium gap (0.1-0.3): {medium_gap} ({medium_gap / total_properties:.1%})" + ) + print(f"- Low gap (<0.1): {low_gap} ({low_gap / total_properties:.1%})") + + return len(errors) == 0, errors diff --git a/data/src/new_etl/validation/vacant_properties.py b/data/src/new_etl/validation/vacant_properties.py new file mode 100644 index 00000000..45cd04fb --- /dev/null +++ b/data/src/new_etl/validation/vacant_properties.py @@ -0,0 +1,71 @@ +from typing import List, Tuple + +import geopandas as gpd + +from .base import ServiceValidator + + +class VacantPropertiesValidator(ServiceValidator): + """Validator for vacant properties service.""" + + def validate(self, data: gpd.GeoDataFrame) -> Tuple[bool, List[str]]: + """ + Validate vacant properties data. + + Critical checks: + - Required fields present (opa_id, parcel_type) + - No duplicate opa_ids + - Valid geometries + - Expected number of records + + Returns: + Tuple of (is_valid, list of error messages) + """ + errors = [] + + # Check required columns + errors.extend(self.check_required_columns(data, ["opa_id", "parcel_type"])) + + # Check for duplicate opa_ids + errors.extend(self.check_duplicates(data, "opa_id")) + + # Check data types + if "opa_id" in data.columns and not data["opa_id"].dtype == "object": + errors.append("opa_id must be string type") + if "parcel_type" in data.columns and not data["parcel_type"].dtype == "object": + errors.append("parcel_type must be string type") + + # Check null values in critical fields + errors.extend( + self.check_null_percentage(data, "opa_id", threshold=0.0) + ) # No nulls allowed + errors.extend( + self.check_null_percentage(data, "parcel_type", threshold=0.0) + ) # No nulls allowed + + # Check geometry validity + if not data.geometry.is_valid.all(): + errors.append("Found invalid geometries") + + # Check record counts + total_count = len(data) + if total_count < 10000: + errors.append( + f"Total vacant properties count ({total_count}) is below minimum threshold (10000)" + ) + + # Check counts by parcel type + if "parcel_type" in data.columns: + building_count = len(data[data["parcel_type"] == "Building"]) + lot_count = len(data[data["parcel_type"] == "Land"]) + + if building_count < 10000: + errors.append( + f"Vacant building count ({building_count}) is below minimum threshold (10000)" + ) + if lot_count < 20000: + errors.append( + f"Vacant lot count ({lot_count}) is below minimum threshold (20000)" + ) + + return len(errors) == 0, errors