Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 46 additions & 9 deletions data/src/main.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,42 @@
import sys
import pandas as pd
import traceback

import pandas as pd
from config.psql import conn
from config.config import tiles_file_id_prefix

from new_etl.classes.slack_reporters import send_dataframe_profile_to_slack, send_pg_stats_to_slack, send_error_to_slack
from new_etl.classes.data_diff import DiffReport
from new_etl.data_utils import *
from new_etl.database import to_postgis_with_schema
from new_etl.classes.slack_reporters import (
send_dataframe_profile_to_slack,
send_error_to_slack,
send_pg_stats_to_slack,
)
from new_etl.data_utils import (
city_owned_properties,
community_gardens,
conservatorship,
contig_neighbors,
council_dists,
delinquencies,
dev_probability,
drug_crimes,
gun_crimes,
imm_dang_buildings,
li_complaints,
li_violations,
nbhoods,
negligent_devs,
owner_type,
park_priority,
phs_properties,
ppr_properties,
pwd_parcels,
rco_geoms,
tactical_urbanism,
tree_canopy,
unsafe_buildings,
vacant_properties,
)

from config.config import tiles_file_id_prefix

# Ensure the directory containing awkde is in the Python path
awkde_path = "/usr/src/app"
Expand All @@ -17,7 +45,6 @@


try:

print("Starting ETL process.")

services = [
Expand Down Expand Up @@ -58,6 +85,12 @@
dataset = priority_level(dataset)
dataset = access_process(dataset)

# Save metadata
try:
metadata_df = pd.DataFrame(dataset.collected_metadata)
metadata_df.to_csv("tmp/metadata.csv", index=False)
except Exception as e:
print(f"Error saving metadata: {str(e)}")
# Drop duplicates
before_drop = dataset.gdf.shape[0]
dataset.gdf = dataset.gdf.drop_duplicates(subset="opa_id")
Expand All @@ -72,8 +105,12 @@
"num_years_owed",
"permit_count",
]
dataset.gdf[numeric_columns] = dataset.gdf[numeric_columns].apply(pd.to_numeric, errors="coerce")
dataset.gdf["most_recent_year_owed"] = dataset.gdf["most_recent_year_owed"].astype(str)
dataset.gdf[numeric_columns] = dataset.gdf[numeric_columns].apply(
pd.to_numeric, errors="coerce"
)
dataset.gdf["most_recent_year_owed"] = dataset.gdf["most_recent_year_owed"].astype(
str
)

# Dataset profiling
send_dataframe_profile_to_slack(dataset.gdf, "all_properties_end")
Expand Down
26 changes: 15 additions & 11 deletions data/src/new_etl/classes/featurelayer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,27 @@
import os
import subprocess
import traceback
from concurrent.futures import ThreadPoolExecutor, as_completed

import geopandas as gpd
import pandas as pd
import requests
import sqlalchemy as sa
from config.psql import conn, local_engine
from google.cloud import storage
from google.cloud.storage.bucket import Bucket
from new_etl.database import to_postgis_with_schema
from new_etl.loaders import load_carto_data, load_esri_data
from shapely import wkb
from tqdm import tqdm

from config.config import (
FORCE_RELOAD,
USE_CRS,
log_level,
min_tiles_file_size_in_bytes,
write_production_tiles_file,
)
from config.psql import conn, local_engine
from google.cloud import storage
from google.cloud.storage.bucket import Bucket
from shapely import wkb
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm

from new_etl.loaders import load_esri_data, load_carto_data
from new_etl.database import to_postgis_with_schema

log.basicConfig(level=log_level)

Expand All @@ -33,8 +33,8 @@ def google_cloud_bucket() -> Bucket:
Returns:
Bucket: the gcp bucket
"""
credentials_path = os.path.expanduser("/app/service-account-key.json")

credentials_path = os.path.expanduser("/app/service-account-key.json")
if not os.path.exists(credentials_path):
raise FileNotFoundError(f"Credentials file not found at {credentials_path}")

Expand Down Expand Up @@ -63,7 +63,12 @@ def __init__(
cols: list[str] = None,
max_workers=os.cpu_count(),
chunk_size=100000,
collected_metadata=None,
):
if collected_metadata is None:
self.collected_metadata = []
else:
self.collected_metadata = collected_metadata
self.name = name
self.esri_rest_urls = (
[esri_rest_urls] if isinstance(esri_rest_urls, str) else esri_rest_urls
Expand All @@ -84,7 +89,6 @@ def __init__(

inputs = [self.esri_rest_urls, self.carto_sql_queries, self.gdf]
non_none_inputs = [i for i in inputs if i is not None]

if len(non_none_inputs) > 0:
self.type = (
"esri"
Expand Down
12 changes: 12 additions & 0 deletions data/src/new_etl/data_utils/access_process.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from typing import Any

from new_etl.metadata.metadata_utils import provide_metadata


@provide_metadata()
def access_process(dataset: Any) -> Any:
"""
Process a dataset to determine the access process for each property based on
Expand All @@ -13,6 +16,15 @@ def access_process(dataset: Any) -> Any:
Returns:
Any: The updated dataset with an additional "access_process" column.

Tagline:
Assigns access processes

Columns added:
access_process (str): The access process for each property based on city ownership and market value.

Primary Feature Layer Columns Referenced:
city_owner_agency, market_value

Side Effects:
Prints the distribution of the "access_process" column.
"""
Expand Down
16 changes: 16 additions & 0 deletions data/src/new_etl/data_utils/city_owned_properties.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from ..classes.featurelayer import FeatureLayer
from ..constants.services import CITY_OWNED_PROPERTIES_TO_LOAD
from ..metadata.metadata_utils import provide_metadata


@provide_metadata()
def city_owned_properties(primary_featurelayer: FeatureLayer) -> FeatureLayer:
"""
Processes city-owned property data by joining it with the primary feature layer,
Expand All @@ -15,6 +17,20 @@ def city_owned_properties(primary_featurelayer: FeatureLayer) -> FeatureLayer:
Returns:
FeatureLayer: The updated primary feature layer with processed city ownership
information.

Columns added:
city_owner_agency (str): The agency that owns the city property.
side_yard_eligible (str): Indicates if the property is eligible for the side yard program.

Primary Feature Layer Columns Referenced:
opa_id, owner_1, owner2

Tagline:
Categorizes City Owned Properties

Source:
https://services.arcgis.com/fLeGjb7u4uXqeF9q/ArcGIS/rest/services/LAMAAssets/FeatureServer/0/

"""
city_owned_properties = FeatureLayer(
name="City Owned Properties",
Expand Down
24 changes: 23 additions & 1 deletion data/src/new_etl/data_utils/community_gardens.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,35 @@
from config.config import USE_CRS

from ..classes.featurelayer import FeatureLayer
from ..constants.services import COMMUNITY_GARDENS_TO_LOAD
from config.config import USE_CRS
from ..metadata.metadata_utils import provide_metadata


@provide_metadata()
def community_gardens(primary_featurelayer: FeatureLayer) -> FeatureLayer:
"""
Updates the 'vacant' column in the primary feature layer to ensure community gardens
are marked as not vacant. This protects known community gardens from being categorized
as vacant, preventing potential predatory development.

Args:
primary_featurelayer (FeatureLayer): The feature layer containing property data.

Returns:
FeatureLayer: The input feature layer with the 'vacant' column updated to False
for parcels containing community gardens.

Tagline:
Mark Community Gardens as Not Vacant

Columns updated:
vacant: Updated to False for parcels containing community gardens.

Primary Feature Layer Columns Referenced:
opa_id, vacant

Source:
https://services2.arcgis.com/qjOOiLCYeUtwT7x7/arcgis/rest/services/PHS_NGT_Supported_Current_view/FeatureServer/0/
"""
if "vacant" not in primary_featurelayer.gdf.columns:
raise ValueError("The 'vacant' column is missing in the primary feature layer.")
Expand Down
17 changes: 15 additions & 2 deletions data/src/new_etl/data_utils/conservatorship.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,34 @@
from ..classes.featurelayer import FeatureLayer
import datetime
from dateutil.parser import parse

import pytz
from dateutil.parser import parse

from ..classes.featurelayer import FeatureLayer
from ..metadata.metadata_utils import provide_metadata

est = pytz.timezone("US/Eastern")
six_months_ago = (datetime.datetime.now() - datetime.timedelta(days=180)).astimezone(
est
)


@provide_metadata()
def conservatorship(primary_featurelayer: FeatureLayer) -> FeatureLayer:
"""
Determines conservatorship eligibility for properties in a feature layer.

Args:
primary_featurelayer (FeatureLayer): A feature layer containing property data in a GeoDataFrame (`gdf`).

Columns Added:
conservatorship (str): Indicates whether each property qualifies for conservatorship ("Yes" or "No").

Primary Feature Layer Columns Referenced:
city_owner_agency, sheriff_sale, market_value, all_violations_past_year, sale_date

Tagline:
Identify conservatorship-eligible properties

Returns:
FeatureLayer: The input feature layer with an added "conservatorship" column indicating
whether each property qualifies for conservatorship ("Yes" or "No").
Expand Down
15 changes: 14 additions & 1 deletion data/src/new_etl/data_utils/contig_neighbors.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
import warnings

import networkx as nx
from libpysal.weights import Queen
import numpy as np
from libpysal.weights import Queen

from ..classes.featurelayer import FeatureLayer
from ..metadata.metadata_utils import provide_metadata


@provide_metadata()
def contig_neighbors(primary_featurelayer: FeatureLayer) -> FeatureLayer:
"""
Calculates the number of contiguous vacant neighbors for each property in a feature layer.
Expand All @@ -15,6 +19,15 @@ def contig_neighbors(primary_featurelayer: FeatureLayer) -> FeatureLayer:
Returns:
FeatureLayer: The input feature layer with an added "n_contiguous" column indicating
the number of contiguous vacant neighbors for each property.

Tagline:
Count vacant neighbors

Columns Added:
n_contiguous (int): The number of contiguous vacant neighbors for each property.

Primary Feature Layer Columns Referenced:
opa_id, vacant
"""
# Create a filtered dataframe with only vacant properties and polygon geometries
vacant_parcels = primary_featurelayer.gdf.loc[
Expand Down
14 changes: 13 additions & 1 deletion data/src/new_etl/data_utils/council_dists.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import pandas as pd

from ..classes.featurelayer import FeatureLayer
from ..constants.services import COUNCIL_DISTRICTS_TO_LOAD
import pandas as pd
from ..metadata.metadata_utils import provide_metadata

pd.set_option("future.no_silent_downcasting", True)


@provide_metadata()
def council_dists(primary_featurelayer: FeatureLayer) -> FeatureLayer:
"""
Associates properties in the primary feature layer with council districts
Expand All @@ -16,6 +19,15 @@ def council_dists(primary_featurelayer: FeatureLayer) -> FeatureLayer:
Returns:
FeatureLayer: The input feature layer with properties spatially joined
to council districts, ensuring no duplicate entries.

Tagline:
Assigns council districts

Columns added:
district (str): The council district associated with the property.

Primary Feature Layer Columns Referenced:
opa_id, geometry
"""
# Load council districts
council_dists = FeatureLayer(
Expand Down
20 changes: 20 additions & 0 deletions data/src/new_etl/data_utils/delinquencies.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from ..classes.featurelayer import FeatureLayer
from ..constants.services import DELINQUENCIES_QUERY
from ..metadata.metadata_utils import provide_metadata


@provide_metadata()
def delinquencies(primary_featurelayer: FeatureLayer) -> FeatureLayer:
"""
Adds property tax delinquency information to the primary feature layer by
Expand All @@ -13,6 +15,24 @@ def delinquencies(primary_featurelayer: FeatureLayer) -> FeatureLayer:
Returns:
FeatureLayer: The input feature layer with added columns for tax delinquency
information, including total due, actionable status, payment agreements, and more.

Tagline:
Summarize tax delinquencies

Source:
https://phl.carto.com/api/v2/sql

Columns Added:
total_due (float): Total amount owed.
most_recent_year_owed (str): Most recent year owed.
num_years_owed (int): Number of years owed.
payment_agreement (str): Indicates if there is a payment agreement.
is_actionable (str): Flag for actionable tax delinquency.
sheriff_sale (str): Indicates if the property is at risk of sheriff sale.
total_assessment (float): Total property assessment.

Primary Feature Layer Columns Referenced:
opa_id
"""
tax_delinquencies = FeatureLayer(
name="Property Tax Delinquencies",
Expand Down
Loading
Loading