Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/pr_checks_backend.yml
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ jobs:
- name: Run Ruff Formatter in Docker
run: |
cd data
docker compose run --rm formatter
docker compose run --rm formatter sh -c "pip install ruff && ruff format --check --exclude '/usr/src/app/awkde/'"

run-linter:
runs-on: ubuntu-latest
Expand Down
30 changes: 19 additions & 11 deletions data/src/classes/backup_archive_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@
from datetime import datetime, timedelta

import sqlalchemy as sa
from config.config import log_level, max_backup_schema_days, tiles_file_id_prefix, tile_file_backup_directory
from config.config import (
log_level,
max_backup_schema_days,
tiles_file_id_prefix,
tile_file_backup_directory,
)
from config.psql import conn, local_engine, url
from data_utils.utils import mask_password
from sqlalchemy import inspect
Expand Down Expand Up @@ -39,13 +44,15 @@ def backup_schema(self):
"pg_dump "
+ url
+ " -s --schema public | "
+ " sed 's/public/" + backup_schema_name + "/g'"
+ " sed 's/public/"
+ backup_schema_name
+ "/g'"
+ " | sed 's/"
+ backup_schema_name
+ ".geometry/public.geometry/' | sed 's/"
+ backup_schema_name
+ ".spatial_ref_sys/public.spatial_ref_sys/'"
+ " | sed 's/backup__/public_/g'" # ppr_properties.public_name column needs to be restored.
+ " | sed 's/backup__/public_/g'" # ppr_properties.public_name column needs to be restored.
+ " | psql -v ON_ERROR_STOP=1 "
+ url
+ " > /dev/null "
Expand Down Expand Up @@ -109,24 +116,25 @@ def prune_old_archives(self):
conn.execute(sa.DDL(sql))

def is_backup_schema_exists(self) -> bool:
""" whether the backup schema exists
"""whether the backup schema exists

Returns:
bool: whether true
"""
"""
return backup_schema_name in inspect(local_engine).get_schema_names()

def backup_tiles_file(self):
"""backup the main tiles file to a timestamped copy in the backup/ folder in GCP
"""
"""backup the main tiles file to a timestamped copy in the backup/ folder in GCP"""
bucket = google_cloud_bucket()
count: int = 0
for blob in bucket.list_blobs(prefix=tiles_file_id_prefix):
suffix: str = '_' + self.timestamp_string
suffix: str = "_" + self.timestamp_string
name, ext = os.path.splitext(blob.name)
backup_file_name: str = tile_file_backup_directory + "/" + name + suffix + ext
backup_file_name: str = (
tile_file_backup_directory + "/" + name + suffix + ext
)
log.debug(backup_file_name)
bucket.copy_blob(blob,destination_bucket=bucket,new_name=backup_file_name)
bucket.copy_blob(blob, destination_bucket=bucket, new_name=backup_file_name)
count += 1
if count == 0:
log.warning("No files were found to back up.")
42 changes: 31 additions & 11 deletions data/src/classes/diff_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@

log.basicConfig(level=log_level)


class DiffTable:
"""Metadata about a table to be run through data-diff
"""
"""Metadata about a table to be run through data-diff"""

def __init__(self, table: str, pk_cols: list[str], where: str = None):
"""constructor

Expand All @@ -35,6 +36,7 @@ def __init__(self, table: str, pk_cols: list[str], where: str = None):
self.pk_cols = pk_cols
self.where = where


class DiffReport:
"""
Class to manage computing data differences for all tables between the newly imported schema and the last schema. Build a report of summary differences for all tables. Log detailed differences to a table in the old backed-up schema. Post difference summary to Slack and or email.
Expand All @@ -48,15 +50,23 @@ def __init__(self, timestamp_string: str = None):
"""
self.diff_tables = self._list_diff_tables()
self.timestamp_string = timestamp_string
self.report: str = "The back-end data has been fully refreshed. Here is the difference report on " + str(len(self.diff_tables)) + " key tables.\nLegend: table A = new data, table B = old data.\n\n"
self.report: str = (
"The back-end data has been fully refreshed. Here is the difference report on "
+ str(len(self.diff_tables))
+ " key tables.\nLegend: table A = new data, table B = old data.\n\n"
)

def run(self):
"""
run the report and slack or email it.
"""

for diff_table in self.diff_tables:
log.debug("Process table %s with pks %s", diff_table.table, str(diff_table.pk_cols))
log.debug(
"Process table %s with pks %s",
diff_table.table,
str(diff_table.pk_cols),
)
summary = diff_table.table + "\n" + self.compare_table(diff_table)
# if no differences, do not report.
if self._summary_shows_differences(summary):
Expand Down Expand Up @@ -141,11 +151,23 @@ def _list_diff_tables(self) -> list[DiffTable]:
list[DiffTable]: the list of metadata
"""
return [
DiffTable(table="vacant_properties",pk_cols=["opa_id", "parcel_type"],where="opa_id is not null"),
DiffTable(table="li_complaints",pk_cols=["service_request_id"]),
DiffTable(table="li_violations",pk_cols=["violationnumber", "opa_account_num"],where="opa_account_num is not null"),
DiffTable(table="opa_properties",pk_cols=["parcel_number"]),
DiffTable(table="property_tax_delinquencies",pk_cols=["opa_number"],where="opa_number <> 0")
DiffTable(
table="vacant_properties",
pk_cols=["opa_id", "parcel_type"],
where="opa_id is not null",
),
DiffTable(table="li_complaints", pk_cols=["service_request_id"]),
DiffTable(
table="li_violations",
pk_cols=["violationnumber", "opa_account_num"],
where="opa_account_num is not null",
),
DiffTable(table="opa_properties", pk_cols=["parcel_number"]),
DiffTable(
table="property_tax_delinquencies",
pk_cols=["opa_number"],
where="opa_number <> 0",
),
]

def compare_table(self, diff_table: DiffTable) -> str:
Expand Down Expand Up @@ -221,5 +243,3 @@ def email_report(self):
s = smtplib.SMTP(smtp_server)
s.sendmail(from_email, [report_to_email], msg.as_string())
s.quit()


6 changes: 2 additions & 4 deletions data/src/classes/featurelayer.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ def google_cloud_bucket() -> Bucket:
return storage_client.bucket(bucket_name)




class FeatureLayer:
"""
FeatureLayer is a class to represent a GIS dataset. It can be initialized with a URL to an Esri Feature Service, a SQL query to Carto, or a GeoDataFrame.
Expand All @@ -60,7 +58,7 @@ def __init__(
from_xy=False,
use_wkb_geom_field=None,
cols: list[str] = None,
bucket: Bucket = None
bucket: Bucket = None,
):
self.name = name
self.esri_rest_urls = (
Expand Down Expand Up @@ -406,4 +404,4 @@ def build_and_publish(self, tiles_file_id_prefix: str) -> None:
blob.upload_from_filename(temp_merged_pmtiles)
print(f"PMTiles upload successful for {file}!")
except Exception as e:
print(f"PMTiles upload failed for {file}: {e}")
print(f"PMTiles upload failed for {file}: {e}")
2 changes: 1 addition & 1 deletion data/src/constants/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,4 @@

CENSUS_BGS_URL = (
"https://opendata.arcgis.com/datasets/2f982bada233478ea0100528227febce_0.geojson"
)
)
2 changes: 1 addition & 1 deletion data/src/data_utils/access_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,5 @@ def access_process(dataset: Any) -> Any:
access_processes.append(access_process)

dataset.gdf["access_process"] = access_processes

return dataset
2 changes: 1 addition & 1 deletion data/src/data_utils/community_gardens.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ def community_gardens(primary_featurelayer):
)

community_gardens.gdf = community_gardens.gdf[["Site_Name", "geometry"]]

primary_featurelayer.spatial_join(community_gardens)

# Create a boolean mask where 'site_Name' is not null
Expand Down
31 changes: 20 additions & 11 deletions data/src/data_utils/l_and_i.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from classes.featurelayer import FeatureLayer
from constants.services import COMPLAINTS_SQL_QUERY, VIOLATIONS_SQL_QUERY


def l_and_i(primary_featurelayer: FeatureLayer) -> FeatureLayer:
"""
Process L&I (Licenses and Inspections) data for complaints and violations.
Expand All @@ -19,20 +20,27 @@ def l_and_i(primary_featurelayer: FeatureLayer) -> FeatureLayer:
FeatureLayer: The primary feature layer updated with L&I data.
"""
keywords: List[str] = [
'dumping', 'blight', 'rubbish', 'weeds', 'graffiti',
'abandoned', 'sanitation', 'litter', 'vacant', 'trash',
'unsafe'
"dumping",
"blight",
"rubbish",
"weeds",
"graffiti",
"abandoned",
"sanitation",
"litter",
"vacant",
"trash",
"unsafe",
]

# Load complaints data from L&I
l_and_i_complaints: FeatureLayer = FeatureLayer(
name="LI Complaints",
carto_sql_queries=COMPLAINTS_SQL_QUERY
name="LI Complaints", carto_sql_queries=COMPLAINTS_SQL_QUERY
)

# Filter for rows where 'subject' contains any of the keywords
l_and_i_complaints.gdf = l_and_i_complaints.gdf[
l_and_i_complaints.gdf["subject"].str.lower().str.contains('|'.join(keywords))
l_and_i_complaints.gdf["subject"].str.lower().str.contains("|".join(keywords))
]

# Filter for only Status = 'Open'
Expand All @@ -56,14 +64,15 @@ def l_and_i(primary_featurelayer: FeatureLayer) -> FeatureLayer:

# Load data for violations from L&I
l_and_i_violations: FeatureLayer = FeatureLayer(
name="LI Violations",
carto_sql_queries=VIOLATIONS_SQL_QUERY,
from_xy=True
name="LI Violations", carto_sql_queries=VIOLATIONS_SQL_QUERY, from_xy=True
)

# Filter for rows where 'casetype' contains any of the keywords, handling NaN values
l_and_i_violations.gdf = l_and_i_violations.gdf[
l_and_i_violations.gdf["violationcodetitle"].fillna('').str.lower().str.contains('|'.join(keywords))
l_and_i_violations.gdf["violationcodetitle"]
.fillna("")
.str.lower()
.str.contains("|".join(keywords))
]

all_violations_count_df: pd.DataFrame = (
Expand Down Expand Up @@ -175,4 +184,4 @@ def remove_nan_strings(x: str) -> str | None:
.astype(int)
)

return primary_featurelayer
return primary_featurelayer
14 changes: 7 additions & 7 deletions data/src/data_utils/nbhoods.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,19 @@

def nbhoods(primary_featurelayer):
phl_nbhoods = gpd.read_file(NBHOODS_URL)

# Correct the column name to uppercase if needed
if 'MAPNAME' in phl_nbhoods.columns:
if "MAPNAME" in phl_nbhoods.columns:
phl_nbhoods.rename(columns={"MAPNAME": "neighborhood"}, inplace=True)

phl_nbhoods = phl_nbhoods.to_crs(USE_CRS)

nbhoods = FeatureLayer("Neighborhoods")
nbhoods.gdf = phl_nbhoods

red_cols_to_keep = ["neighborhood", "geometry"]
nbhoods.gdf = nbhoods.gdf[red_cols_to_keep]

primary_featurelayer.spatial_join(nbhoods)

return primary_featurelayer
41 changes: 29 additions & 12 deletions data/src/data_utils/negligent_devs.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,23 +62,29 @@ def negligent_devs(primary_featurelayer):
print("Columns in 'devs' DataFrame:", devs.columns)

print("Initial properties data:")
print(devs[['opa_id', 'city_owner_agency', 'mailing_street']].head(10))
print(devs[["opa_id", "city_owner_agency", "mailing_street"]].head(10))

city_owners = devs.loc[~devs["city_owner_agency"].isna() & (devs["city_owner_agency"] != "")].copy()
non_city_owners = devs.loc[devs["city_owner_agency"].isna() | (devs["city_owner_agency"] == "")].copy()
city_owners = devs.loc[
~devs["city_owner_agency"].isna() & (devs["city_owner_agency"] != "")
].copy()
non_city_owners = devs.loc[
devs["city_owner_agency"].isna() | (devs["city_owner_agency"] == "")
].copy()

print(f"City owners shape: {city_owners.shape}, Non-city owners shape: {non_city_owners.shape}")
print(
f"City owners shape: {city_owners.shape}, Non-city owners shape: {non_city_owners.shape}"
)

# Log before standardizing addresses
print("Non-city owners mailing streets before standardization:")
print(non_city_owners[['opa_id', 'mailing_street']].head(10))
print(non_city_owners[["opa_id", "mailing_street"]].head(10))

non_city_owners.loc[:, "mailing_street"] = (
non_city_owners["mailing_street"].astype(str).apply(standardize_street)
)

print("Non-city owners mailing streets after standardization:")
print(non_city_owners[['opa_id', 'mailing_street']].head(10))
print(non_city_owners[["opa_id", "mailing_street"]].head(10))

for term in ["ST", "AVE", "RD", "BLVD"]:
non_city_owners.loc[:, "mailing_street"] = non_city_owners[
Expand All @@ -87,7 +93,7 @@ def negligent_devs(primary_featurelayer):

# Log after applying term replacement
print("Non-city owners mailing streets after term replacement:")
print(non_city_owners[['opa_id', 'mailing_street']].head(10))
print(non_city_owners[["opa_id", "mailing_street"]].head(10))

# Fill missing address components
non_city_owners.loc[:, "mailing_address_1"] = non_city_owners[
Expand All @@ -106,7 +112,11 @@ def negligent_devs(primary_featurelayer):

# Log addresses before creating standardized address
print("Non-city owners mailing details before creating standardized address:")
print(non_city_owners[['opa_id', 'mailing_street', 'mailing_city_state', 'mailing_zip']].head(10))
print(
non_city_owners[
["opa_id", "mailing_street", "mailing_city_state", "mailing_zip"]
].head(10)
)

non_city_owners.loc[:, "standardized_address"] = non_city_owners.apply(
create_standardized_address, axis=1
Expand Down Expand Up @@ -145,10 +155,10 @@ def negligent_devs(primary_featurelayer):
)

devs_combined = pd.concat([city_owners, non_city_owners], axis=0)

# Final check on the merged data before updating primary_featurelayer
print("Combined data with property counts:")
print(devs_combined[['opa_id', 'property_count']].head(10))
print(devs_combined[["opa_id", "property_count"]].head(10))

primary_featurelayer.gdf = primary_featurelayer.gdf.merge(
devs_combined[["opa_id", "property_count"]], on="opa_id", how="left"
Expand All @@ -158,9 +168,16 @@ def negligent_devs(primary_featurelayer):
)
primary_featurelayer.gdf.loc[:, "negligent_dev"] = (
primary_featurelayer.gdf["n_properties_owned"] > 5
) & (primary_featurelayer.gdf["city_owner_agency"].isna() | (primary_featurelayer.gdf["city_owner_agency"] == ""))
) & (
primary_featurelayer.gdf["city_owner_agency"].isna()
| (primary_featurelayer.gdf["city_owner_agency"] == "")
)

print("Final feature layer data with negligent_dev flag:")
print(primary_featurelayer.gdf[['opa_id', 'n_properties_owned', 'negligent_dev']].head(10))
print(
primary_featurelayer.gdf[
["opa_id", "n_properties_owned", "negligent_dev"]
].head(10)
)

return primary_featurelayer
Loading
Loading