diff --git a/.github/workflows/pr_checks_backend.yml b/.github/workflows/pr_checks_backend.yml index 7801f73b..c6420a60 100644 --- a/.github/workflows/pr_checks_backend.yml +++ b/.github/workflows/pr_checks_backend.yml @@ -88,7 +88,7 @@ jobs: - name: Run Ruff Formatter in Docker run: | cd data - docker compose run --rm formatter + docker compose run --rm formatter sh -c "pip install ruff && ruff format --check --exclude '/usr/src/app/awkde/'" run-linter: runs-on: ubuntu-latest diff --git a/data/src/classes/backup_archive_database.py b/data/src/classes/backup_archive_database.py index 60f5ef4f..94302971 100644 --- a/data/src/classes/backup_archive_database.py +++ b/data/src/classes/backup_archive_database.py @@ -4,7 +4,12 @@ from datetime import datetime, timedelta import sqlalchemy as sa -from config.config import log_level, max_backup_schema_days, tiles_file_id_prefix, tile_file_backup_directory +from config.config import ( + log_level, + max_backup_schema_days, + tiles_file_id_prefix, + tile_file_backup_directory, +) from config.psql import conn, local_engine, url from data_utils.utils import mask_password from sqlalchemy import inspect @@ -39,13 +44,15 @@ def backup_schema(self): "pg_dump " + url + " -s --schema public | " - + " sed 's/public/" + backup_schema_name + "/g'" + + " sed 's/public/" + + backup_schema_name + + "/g'" + " | sed 's/" + backup_schema_name + ".geometry/public.geometry/' | sed 's/" + backup_schema_name + ".spatial_ref_sys/public.spatial_ref_sys/'" - + " | sed 's/backup__/public_/g'" # ppr_properties.public_name column needs to be restored. + + " | sed 's/backup__/public_/g'" # ppr_properties.public_name column needs to be restored. + " | psql -v ON_ERROR_STOP=1 " + url + " > /dev/null " @@ -109,24 +116,25 @@ def prune_old_archives(self): conn.execute(sa.DDL(sql)) def is_backup_schema_exists(self) -> bool: - """ whether the backup schema exists + """whether the backup schema exists Returns: bool: whether true - """ + """ return backup_schema_name in inspect(local_engine).get_schema_names() - + def backup_tiles_file(self): - """backup the main tiles file to a timestamped copy in the backup/ folder in GCP - """ + """backup the main tiles file to a timestamped copy in the backup/ folder in GCP""" bucket = google_cloud_bucket() count: int = 0 for blob in bucket.list_blobs(prefix=tiles_file_id_prefix): - suffix: str = '_' + self.timestamp_string + suffix: str = "_" + self.timestamp_string name, ext = os.path.splitext(blob.name) - backup_file_name: str = tile_file_backup_directory + "/" + name + suffix + ext + backup_file_name: str = ( + tile_file_backup_directory + "/" + name + suffix + ext + ) log.debug(backup_file_name) - bucket.copy_blob(blob,destination_bucket=bucket,new_name=backup_file_name) + bucket.copy_blob(blob, destination_bucket=bucket, new_name=backup_file_name) count += 1 if count == 0: log.warning("No files were found to back up.") diff --git a/data/src/classes/diff_report.py b/data/src/classes/diff_report.py index 2ac7395f..102d0352 100644 --- a/data/src/classes/diff_report.py +++ b/data/src/classes/diff_report.py @@ -20,9 +20,10 @@ log.basicConfig(level=log_level) + class DiffTable: - """Metadata about a table to be run through data-diff - """ + """Metadata about a table to be run through data-diff""" + def __init__(self, table: str, pk_cols: list[str], where: str = None): """constructor @@ -35,6 +36,7 @@ def __init__(self, table: str, pk_cols: list[str], where: str = None): self.pk_cols = pk_cols self.where = where + class DiffReport: """ Class to manage computing data differences for all tables between the newly imported schema and the last schema. Build a report of summary differences for all tables. Log detailed differences to a table in the old backed-up schema. Post difference summary to Slack and or email. @@ -48,7 +50,11 @@ def __init__(self, timestamp_string: str = None): """ self.diff_tables = self._list_diff_tables() self.timestamp_string = timestamp_string - self.report: str = "The back-end data has been fully refreshed. Here is the difference report on " + str(len(self.diff_tables)) + " key tables.\nLegend: table A = new data, table B = old data.\n\n" + self.report: str = ( + "The back-end data has been fully refreshed. Here is the difference report on " + + str(len(self.diff_tables)) + + " key tables.\nLegend: table A = new data, table B = old data.\n\n" + ) def run(self): """ @@ -56,7 +62,11 @@ def run(self): """ for diff_table in self.diff_tables: - log.debug("Process table %s with pks %s", diff_table.table, str(diff_table.pk_cols)) + log.debug( + "Process table %s with pks %s", + diff_table.table, + str(diff_table.pk_cols), + ) summary = diff_table.table + "\n" + self.compare_table(diff_table) # if no differences, do not report. if self._summary_shows_differences(summary): @@ -141,11 +151,23 @@ def _list_diff_tables(self) -> list[DiffTable]: list[DiffTable]: the list of metadata """ return [ - DiffTable(table="vacant_properties",pk_cols=["opa_id", "parcel_type"],where="opa_id is not null"), - DiffTable(table="li_complaints",pk_cols=["service_request_id"]), - DiffTable(table="li_violations",pk_cols=["violationnumber", "opa_account_num"],where="opa_account_num is not null"), - DiffTable(table="opa_properties",pk_cols=["parcel_number"]), - DiffTable(table="property_tax_delinquencies",pk_cols=["opa_number"],where="opa_number <> 0") + DiffTable( + table="vacant_properties", + pk_cols=["opa_id", "parcel_type"], + where="opa_id is not null", + ), + DiffTable(table="li_complaints", pk_cols=["service_request_id"]), + DiffTable( + table="li_violations", + pk_cols=["violationnumber", "opa_account_num"], + where="opa_account_num is not null", + ), + DiffTable(table="opa_properties", pk_cols=["parcel_number"]), + DiffTable( + table="property_tax_delinquencies", + pk_cols=["opa_number"], + where="opa_number <> 0", + ), ] def compare_table(self, diff_table: DiffTable) -> str: @@ -221,5 +243,3 @@ def email_report(self): s = smtplib.SMTP(smtp_server) s.sendmail(from_email, [report_to_email], msg.as_string()) s.quit() - - diff --git a/data/src/classes/featurelayer.py b/data/src/classes/featurelayer.py index f3eff73d..4eb30e3b 100644 --- a/data/src/classes/featurelayer.py +++ b/data/src/classes/featurelayer.py @@ -42,8 +42,6 @@ def google_cloud_bucket() -> Bucket: return storage_client.bucket(bucket_name) - - class FeatureLayer: """ FeatureLayer is a class to represent a GIS dataset. It can be initialized with a URL to an Esri Feature Service, a SQL query to Carto, or a GeoDataFrame. @@ -60,7 +58,7 @@ def __init__( from_xy=False, use_wkb_geom_field=None, cols: list[str] = None, - bucket: Bucket = None + bucket: Bucket = None, ): self.name = name self.esri_rest_urls = ( @@ -406,4 +404,4 @@ def build_and_publish(self, tiles_file_id_prefix: str) -> None: blob.upload_from_filename(temp_merged_pmtiles) print(f"PMTiles upload successful for {file}!") except Exception as e: - print(f"PMTiles upload failed for {file}: {e}") \ No newline at end of file + print(f"PMTiles upload failed for {file}: {e}") diff --git a/data/src/constants/services.py b/data/src/constants/services.py index dcc4daac..e08a38fb 100644 --- a/data/src/constants/services.py +++ b/data/src/constants/services.py @@ -75,4 +75,4 @@ CENSUS_BGS_URL = ( "https://opendata.arcgis.com/datasets/2f982bada233478ea0100528227febce_0.geojson" -) \ No newline at end of file +) diff --git a/data/src/data_utils/access_process.py b/data/src/data_utils/access_process.py index 7c8e79de..ae3af8e6 100644 --- a/data/src/data_utils/access_process.py +++ b/data/src/data_utils/access_process.py @@ -39,5 +39,5 @@ def access_process(dataset: Any) -> Any: access_processes.append(access_process) dataset.gdf["access_process"] = access_processes - + return dataset diff --git a/data/src/data_utils/community_gardens.py b/data/src/data_utils/community_gardens.py index 4bed0284..1e69873d 100644 --- a/data/src/data_utils/community_gardens.py +++ b/data/src/data_utils/community_gardens.py @@ -9,7 +9,7 @@ def community_gardens(primary_featurelayer): ) community_gardens.gdf = community_gardens.gdf[["Site_Name", "geometry"]] - + primary_featurelayer.spatial_join(community_gardens) # Create a boolean mask where 'site_Name' is not null diff --git a/data/src/data_utils/l_and_i.py b/data/src/data_utils/l_and_i.py index 27f28147..6cf277ba 100644 --- a/data/src/data_utils/l_and_i.py +++ b/data/src/data_utils/l_and_i.py @@ -4,6 +4,7 @@ from classes.featurelayer import FeatureLayer from constants.services import COMPLAINTS_SQL_QUERY, VIOLATIONS_SQL_QUERY + def l_and_i(primary_featurelayer: FeatureLayer) -> FeatureLayer: """ Process L&I (Licenses and Inspections) data for complaints and violations. @@ -19,20 +20,27 @@ def l_and_i(primary_featurelayer: FeatureLayer) -> FeatureLayer: FeatureLayer: The primary feature layer updated with L&I data. """ keywords: List[str] = [ - 'dumping', 'blight', 'rubbish', 'weeds', 'graffiti', - 'abandoned', 'sanitation', 'litter', 'vacant', 'trash', - 'unsafe' + "dumping", + "blight", + "rubbish", + "weeds", + "graffiti", + "abandoned", + "sanitation", + "litter", + "vacant", + "trash", + "unsafe", ] # Load complaints data from L&I l_and_i_complaints: FeatureLayer = FeatureLayer( - name="LI Complaints", - carto_sql_queries=COMPLAINTS_SQL_QUERY + name="LI Complaints", carto_sql_queries=COMPLAINTS_SQL_QUERY ) # Filter for rows where 'subject' contains any of the keywords l_and_i_complaints.gdf = l_and_i_complaints.gdf[ - l_and_i_complaints.gdf["subject"].str.lower().str.contains('|'.join(keywords)) + l_and_i_complaints.gdf["subject"].str.lower().str.contains("|".join(keywords)) ] # Filter for only Status = 'Open' @@ -56,14 +64,15 @@ def l_and_i(primary_featurelayer: FeatureLayer) -> FeatureLayer: # Load data for violations from L&I l_and_i_violations: FeatureLayer = FeatureLayer( - name="LI Violations", - carto_sql_queries=VIOLATIONS_SQL_QUERY, - from_xy=True + name="LI Violations", carto_sql_queries=VIOLATIONS_SQL_QUERY, from_xy=True ) # Filter for rows where 'casetype' contains any of the keywords, handling NaN values l_and_i_violations.gdf = l_and_i_violations.gdf[ - l_and_i_violations.gdf["violationcodetitle"].fillna('').str.lower().str.contains('|'.join(keywords)) + l_and_i_violations.gdf["violationcodetitle"] + .fillna("") + .str.lower() + .str.contains("|".join(keywords)) ] all_violations_count_df: pd.DataFrame = ( @@ -175,4 +184,4 @@ def remove_nan_strings(x: str) -> str | None: .astype(int) ) - return primary_featurelayer \ No newline at end of file + return primary_featurelayer diff --git a/data/src/data_utils/nbhoods.py b/data/src/data_utils/nbhoods.py index 6fde4bd0..d0de302b 100644 --- a/data/src/data_utils/nbhoods.py +++ b/data/src/data_utils/nbhoods.py @@ -7,19 +7,19 @@ def nbhoods(primary_featurelayer): phl_nbhoods = gpd.read_file(NBHOODS_URL) - + # Correct the column name to uppercase if needed - if 'MAPNAME' in phl_nbhoods.columns: + if "MAPNAME" in phl_nbhoods.columns: phl_nbhoods.rename(columns={"MAPNAME": "neighborhood"}, inplace=True) - + phl_nbhoods = phl_nbhoods.to_crs(USE_CRS) - + nbhoods = FeatureLayer("Neighborhoods") nbhoods.gdf = phl_nbhoods - + red_cols_to_keep = ["neighborhood", "geometry"] nbhoods.gdf = nbhoods.gdf[red_cols_to_keep] - + primary_featurelayer.spatial_join(nbhoods) - + return primary_featurelayer diff --git a/data/src/data_utils/negligent_devs.py b/data/src/data_utils/negligent_devs.py index aa95532c..422009e6 100644 --- a/data/src/data_utils/negligent_devs.py +++ b/data/src/data_utils/negligent_devs.py @@ -62,23 +62,29 @@ def negligent_devs(primary_featurelayer): print("Columns in 'devs' DataFrame:", devs.columns) print("Initial properties data:") - print(devs[['opa_id', 'city_owner_agency', 'mailing_street']].head(10)) + print(devs[["opa_id", "city_owner_agency", "mailing_street"]].head(10)) - city_owners = devs.loc[~devs["city_owner_agency"].isna() & (devs["city_owner_agency"] != "")].copy() - non_city_owners = devs.loc[devs["city_owner_agency"].isna() | (devs["city_owner_agency"] == "")].copy() + city_owners = devs.loc[ + ~devs["city_owner_agency"].isna() & (devs["city_owner_agency"] != "") + ].copy() + non_city_owners = devs.loc[ + devs["city_owner_agency"].isna() | (devs["city_owner_agency"] == "") + ].copy() - print(f"City owners shape: {city_owners.shape}, Non-city owners shape: {non_city_owners.shape}") + print( + f"City owners shape: {city_owners.shape}, Non-city owners shape: {non_city_owners.shape}" + ) # Log before standardizing addresses print("Non-city owners mailing streets before standardization:") - print(non_city_owners[['opa_id', 'mailing_street']].head(10)) + print(non_city_owners[["opa_id", "mailing_street"]].head(10)) non_city_owners.loc[:, "mailing_street"] = ( non_city_owners["mailing_street"].astype(str).apply(standardize_street) ) print("Non-city owners mailing streets after standardization:") - print(non_city_owners[['opa_id', 'mailing_street']].head(10)) + print(non_city_owners[["opa_id", "mailing_street"]].head(10)) for term in ["ST", "AVE", "RD", "BLVD"]: non_city_owners.loc[:, "mailing_street"] = non_city_owners[ @@ -87,7 +93,7 @@ def negligent_devs(primary_featurelayer): # Log after applying term replacement print("Non-city owners mailing streets after term replacement:") - print(non_city_owners[['opa_id', 'mailing_street']].head(10)) + print(non_city_owners[["opa_id", "mailing_street"]].head(10)) # Fill missing address components non_city_owners.loc[:, "mailing_address_1"] = non_city_owners[ @@ -106,7 +112,11 @@ def negligent_devs(primary_featurelayer): # Log addresses before creating standardized address print("Non-city owners mailing details before creating standardized address:") - print(non_city_owners[['opa_id', 'mailing_street', 'mailing_city_state', 'mailing_zip']].head(10)) + print( + non_city_owners[ + ["opa_id", "mailing_street", "mailing_city_state", "mailing_zip"] + ].head(10) + ) non_city_owners.loc[:, "standardized_address"] = non_city_owners.apply( create_standardized_address, axis=1 @@ -145,10 +155,10 @@ def negligent_devs(primary_featurelayer): ) devs_combined = pd.concat([city_owners, non_city_owners], axis=0) - + # Final check on the merged data before updating primary_featurelayer print("Combined data with property counts:") - print(devs_combined[['opa_id', 'property_count']].head(10)) + print(devs_combined[["opa_id", "property_count"]].head(10)) primary_featurelayer.gdf = primary_featurelayer.gdf.merge( devs_combined[["opa_id", "property_count"]], on="opa_id", how="left" @@ -158,9 +168,16 @@ def negligent_devs(primary_featurelayer): ) primary_featurelayer.gdf.loc[:, "negligent_dev"] = ( primary_featurelayer.gdf["n_properties_owned"] > 5 - ) & (primary_featurelayer.gdf["city_owner_agency"].isna() | (primary_featurelayer.gdf["city_owner_agency"] == "")) + ) & ( + primary_featurelayer.gdf["city_owner_agency"].isna() + | (primary_featurelayer.gdf["city_owner_agency"] == "") + ) print("Final feature layer data with negligent_dev flag:") - print(primary_featurelayer.gdf[['opa_id', 'n_properties_owned', 'negligent_dev']].head(10)) + print( + primary_featurelayer.gdf[ + ["opa_id", "n_properties_owned", "negligent_dev"] + ].head(10) + ) return primary_featurelayer diff --git a/data/src/data_utils/opa_properties.py b/data/src/data_utils/opa_properties.py index 2d02f42f..77c838ba 100644 --- a/data/src/data_utils/opa_properties.py +++ b/data/src/data_utils/opa_properties.py @@ -12,13 +12,13 @@ def opa_properties(primary_featurelayer): "sale_date", "sale_price", "parcel_number", - "mailing_address_1", - "mailing_address_2", - "mailing_care_of", + "mailing_address_1", + "mailing_address_2", + "mailing_care_of", "mailing_city_state", "mailing_street", - "mailing_zip" - ] + "mailing_zip", + ], ) primary_featurelayer.opa_join( diff --git a/data/src/data_utils/owner_type.py b/data/src/data_utils/owner_type.py index 291364df..bd2aa7fd 100644 --- a/data/src/data_utils/owner_type.py +++ b/data/src/data_utils/owner_type.py @@ -1,9 +1,10 @@ import pandas as pd from classes.featurelayer import FeatureLayer + def owner_type(primary_featurelayer: FeatureLayer) -> FeatureLayer: """ - Determines the ownership type for each property in the primary feature layer based on + Determines the ownership type for each property in the primary feature layer based on the 'owner_1', 'owner_2', and 'city_owner_agency' columns. The ownership type is set as: - "Public" if 'city_owner_agency' is not NA. - "Business (LLC)" if 'city_owner_agency' is NA and "LLC" is found in 'owner_1' or 'owner_2'. diff --git a/data/src/data_utils/phs_properties.py b/data/src/data_utils/phs_properties.py index c906c2d1..aeeac757 100644 --- a/data/src/data_utils/phs_properties.py +++ b/data/src/data_utils/phs_properties.py @@ -1,6 +1,7 @@ from classes.featurelayer import FeatureLayer from constants.services import PHS_LAYERS_TO_LOAD + def phs_properties(primary_featurelayer: FeatureLayer) -> FeatureLayer: """ Perform a spatial join between the primary feature layer and the PHS properties layer, @@ -13,7 +14,7 @@ def phs_properties(primary_featurelayer: FeatureLayer) -> FeatureLayer: Returns: FeatureLayer: The updated primary feature layer with the 'phs_care_program' column. """ - + phs_properties = FeatureLayer( name="PHS Properties", esri_rest_urls=PHS_LAYERS_TO_LOAD, cols=["program"] ) @@ -23,9 +24,11 @@ def phs_properties(primary_featurelayer: FeatureLayer) -> FeatureLayer: # Initialize 'phs_care_program' column with default "no" for all rows primary_featurelayer.gdf["phs_care_program"] = "No" - + # Set 'phs_care_program' to "yes" for matched rows - primary_featurelayer.gdf.loc[primary_featurelayer.gdf["program"].notna(), "phs_care_program"] = "Yes" + primary_featurelayer.gdf.loc[ + primary_featurelayer.gdf["program"].notna(), "phs_care_program" + ] = "Yes" # Rebuild the GeoDataFrame after updates primary_featurelayer.rebuild_gdf() diff --git a/data/src/data_utils/ppr_properties.py b/data/src/data_utils/ppr_properties.py index 48111b35..67e7ce28 100644 --- a/data/src/data_utils/ppr_properties.py +++ b/data/src/data_utils/ppr_properties.py @@ -9,29 +9,30 @@ def ppr_properties(primary_featurelayer): - fallback_url = 'https://opendata.arcgis.com/datasets/d52445160ab14380a673e5849203eb64_0.geojson' + fallback_url = "https://opendata.arcgis.com/datasets/d52445160ab14380a673e5849203eb64_0.geojson" try: - ppr_properties = FeatureLayer( - name="PPR Properties", - esri_rest_urls=PPR_PROPERTIES_TO_LOAD, - cols=["PUBLIC_NAME"] + name="PPR Properties", + esri_rest_urls=PPR_PROPERTIES_TO_LOAD, + cols=["PUBLIC_NAME"], ) if ppr_properties.gdf is None or ppr_properties.gdf.empty: - raise ValueError("PPR properties GeoDataFrame is empty or failed to load from Esri REST URL.") - + raise ValueError( + "PPR properties GeoDataFrame is empty or failed to load from Esri REST URL." + ) + print("Loaded PPR properties from Esri REST URL.") - + except Exception as e: print(f"Error loading PPR properties from Esri REST URL: {e}") print("Falling back to loading from GeoJSON URL.") - + response = requests.get(fallback_url) response.raise_for_status() ppr_properties_gdf = gpd.read_file(io.BytesIO(response.content)) - + ppr_properties = FeatureLayer(name="PPR Properties") ppr_properties.gdf = ppr_properties_gdf @@ -42,12 +43,14 @@ def ppr_properties(primary_featurelayer): primary_featurelayer.spatial_join(ppr_properties) mask = primary_featurelayer.gdf["public_name"].notnull() - + count_dropped = mask.sum() print(f"Number of PPR properties being dropped: {count_dropped}") - primary_featurelayer.gdf = primary_featurelayer.gdf.drop(primary_featurelayer.gdf[mask].index) + primary_featurelayer.gdf = primary_featurelayer.gdf.drop( + primary_featurelayer.gdf[mask].index + ) primary_featurelayer.gdf = primary_featurelayer.gdf.drop(columns=["public_name"]) - return primary_featurelayer \ No newline at end of file + return primary_featurelayer diff --git a/data/src/data_utils/utils.py b/data/src/data_utils/utils.py index b7b9ef4e..0b55b4fa 100644 --- a/data/src/data_utils/utils.py +++ b/data/src/data_utils/utils.py @@ -17,7 +17,7 @@ def mask_password(value: str): def save_stream_url(url: str) -> str: - """download the file from this url to the tmp/ directory by streaming in a memory-friendly way. + """download the file from this url to the tmp/ directory by streaming in a memory-friendly way. If local file already exists, use it and don't download. Args: url (str): the url of the zip file @@ -25,17 +25,17 @@ def save_stream_url(url: str) -> str: Returns: str: the relative local path of the saved zip file """ - local_filename = "tmp/" + url.split('/')[-1] + local_filename = "tmp/" + url.split("/")[-1] if os.path.exists(local_filename): return local_filename with requests.get(url, stream=True) as r: r.raise_for_status() - with open(local_filename, 'wb') as f: + with open(local_filename, "wb") as f: for chunk in r.iter_content(chunk_size=8192): # If you have chunk encoded response uncomment if # and set chunk_size parameter to None. - #if chunk: + # if chunk: f.write(chunk) f.close() r.close() diff --git a/data/src/streetview.py b/data/src/streetview.py index 555db455..cd35763b 100644 --- a/data/src/streetview.py +++ b/data/src/streetview.py @@ -12,6 +12,7 @@ key = os.environ["CLEAN_GREEN_GOOGLE_KEY"] bucket_name = bucket.name + # Helper Functions def get_streetview_metadata(address): """Fetches metadata from the Street View API."""