Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions data/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ services:
environment:
- GOOGLE_APPLICATION_CREDENTIALS=/app/service-account-key.json
- VACANT_LOTS_DB
- POSTGRES_PASSWORD
- POSTGRES_PORT=5434
- CLEAN_GREEN_GOOGLE_KEY
- PYTHONUNBUFFERED=1
- GOOGLE_CLOUD_BUCKET_NAME
Expand Down
12 changes: 8 additions & 4 deletions data/src/classes/backup_archive_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,17 @@
from datetime import datetime, timedelta

import sqlalchemy as sa
from sqlalchemy import inspect

from classes.featurelayer import google_cloud_bucket
from config.config import (
log_level,
max_backup_schema_days,
tiles_file_id_prefix,
tile_file_backup_directory,
tiles_file_id_prefix,
)
from config.psql import conn, local_engine, url
from data_utils.utils import mask_password
from sqlalchemy import inspect
from classes.featurelayer import google_cloud_bucket

log.basicConfig(level=log_level)

Expand Down Expand Up @@ -125,7 +126,10 @@ def is_backup_schema_exists(self) -> bool:

def backup_tiles_file(self):
"""backup the main tiles file to a timestamped copy in the backup/ folder in GCP"""
bucket = google_cloud_bucket()
bucket = google_cloud_bucket(require_write_access=True)
if not bucket:
log.warning("No GCP bucket found. Skipping tiles file backup.")
return
count: int = 0
for blob in bucket.list_blobs(prefix=tiles_file_id_prefix):
suffix: str = "_" + self.timestamp_string
Expand Down
68 changes: 68 additions & 0 deletions data/src/classes/bucket_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import logging as log
import os

from google.cloud import storage

from config.config import log_level

log.basicConfig(level=log_level)


class GCSBucketManager:
"""
A manager for interacting with a Google Cloud Storage bucket.

This class initializes a bucket client using Application Default Credentials,
an optional service account key, or falls back to an anonymous client for read-only access.
"""

def __init__(
self, bucket_name: str = None, credential_path: str = None, client=None
):
"""
Initialize the GCSBucketManager.

Args:
bucket_name (str): Name of the bucket. Defaults to the environment variable
'GOOGLE_CLOUD_BUCKET_NAME' or "cleanandgreenphl" if not set.
credential_path (str): Optional path to a service account credentials file.
client: Optional storage. Client instance for dependency injection in testing.
"""
self.bucket_name = bucket_name or os.getenv(
"GOOGLE_CLOUD_BUCKET_NAME", "cleanandgreenphl"
)

self.read_only = False

if client is not None:
self._client = client
else:
self._client = self._init_client(credential_path)

self.bucket = self._client.bucket(self.bucket_name)

def _init_client(self, credential_path: str = None):
"""
Attempt to initialize the storage client using a credential file, application default
credentials or fall back to anonymous/read-only.
"""
project_name = os.getenv("GOOGLE_CLOUD_PROJECT", "clean-and-green-philly")
credentials_path = credential_path or "/app/service-account-key.json"
is_credentials_file = os.path.exists(credentials_path)

if is_credentials_file:
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = credentials_path

try:
# This will use application default credentials if GOOGLE_APPLICATION_CREDENTIALS is not set
if is_credentials_file:
print(f"Using service account key at {credentials_path}")
else:
print("Using application default credentials")
return storage.Client(project=project_name)

except Exception as e:
log.warning(f"Failed to initialize client with service account key: {e}")
log.warning("Falling back to anonymous client (read-only mode)")
self.read_only = True
return storage.Client.create_anonymous_client()
33 changes: 22 additions & 11 deletions data/src/classes/diff_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import subprocess
from email.mime.text import MIMEText

from slack_sdk import WebClient

from classes.backup_archive_database import backup_schema_name
from classes.featurelayer import google_cloud_bucket
from config.config import (
Expand All @@ -16,7 +18,6 @@
)
from config.psql import conn, url
from data_utils.utils import mask_password
from slack_sdk import WebClient

log.basicConfig(level=log_level)

Expand Down Expand Up @@ -213,20 +214,30 @@ def compare_table(self, diff_table: DiffTable) -> str:
output = complete_process.stdout.decode()
return re.sub(r"\nExtra-Info:.*", "", output, flags=re.DOTALL)

def send_report_to_slack(self):
def send_report_to_slack(self, slack_token=None):
"""
post the summary report to the slack channel if configured.
"""
if report_to_slack_channel:
token = os.environ["CAGP_SLACK_API_TOKEN"]
client = WebClient(token=token)

# Send a message
client.chat_postMessage(
channel=report_to_slack_channel,
text=self.report,
username="CAGP Diff Bot",
token = slack_token or os.getenv("CAGP_SLACK_API_TOKEN")
if not report_to_slack_channel:
log.warning(
"Skipping Slack reporting. Configure report_to_slack_channel in config.py to enable."
)
return
if not token:
log.warning(
"Skipping Slack reporting. Configure CAGP_SLACK_API_TOKEN in environment to enable."
)
return

client = WebClient(token=token)

# Send a message
client.chat_postMessage(
channel=report_to_slack_channel,
text=self.report,
username="CAGP Diff Bot",
)

def email_report(self):
"""
Expand Down
46 changes: 23 additions & 23 deletions data/src/classes/featurelayer.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@
import pandas as pd
import requests
import sqlalchemy as sa
from esridump.dumper import EsriDumper
from google.cloud import storage
from shapely import Point, wkb

from classes.bucket_manager import GCSBucketManager
from config.config import (
FORCE_RELOAD,
USE_CRS,
Expand All @@ -15,31 +20,22 @@
write_production_tiles_file,
)
from config.psql import conn, local_engine
from esridump.dumper import EsriDumper
from google.cloud import storage
from google.cloud.storage.bucket import Bucket
from shapely import Point, wkb

log.basicConfig(level=log_level)


def google_cloud_bucket() -> Bucket:
"""Build the google cloud bucket with name configured in your environ or default of cleanandgreenphl

Returns:
Bucket: the gcp bucket
def google_cloud_bucket(require_write_access: bool = False) -> storage.Bucket | None:
"""
Initialize a Google Cloud Storage bucket client using Application Default Credentials.
If a writable bucket is requested and the user does not have write access None is returned.
Args:
require_write_access (bool): Whether it is required that the bucket should be writable. Defaults to False.
"""
credentials_path = os.path.expanduser("/app/service-account-key.json")

if not os.path.exists(credentials_path):
raise FileNotFoundError(f"Credentials file not found at {credentials_path}")

os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = credentials_path
bucket_name = os.getenv("GOOGLE_CLOUD_BUCKET_NAME", "cleanandgreenphl")
project_name = os.getenv("GOOGLE_CLOUD_PROJECT", "clean-and-green-philly")

storage_client = storage.Client(project=project_name)
return storage_client.bucket(bucket_name)
bucket_manager = GCSBucketManager()
if require_write_access and bucket_manager.read_only:
return None
return bucket_manager.bucket


class FeatureLayer:
Expand All @@ -58,7 +54,6 @@ def __init__(
from_xy=False,
use_wkb_geom_field=None,
cols: list[str] = None,
bucket: Bucket = None,
):
self.name = name
self.esri_rest_urls = (
Expand All @@ -75,7 +70,6 @@ def __init__(
self.psql_table = name.lower().replace(" ", "_")
self.input_crs = "EPSG:4326" if not from_xy else USE_CRS
self.use_wkb_geom_field = use_wkb_geom_field
self.bucket = bucket or google_cloud_bucket()

inputs = [self.esri_rest_urls, self.carto_sql_queries, self.gdf]
non_none_inputs = [i for i in inputs if i is not None]
Expand Down Expand Up @@ -330,7 +324,13 @@ def build_and_publish(self, tiles_file_id_prefix: str) -> None:
df_no_geom.to_parquet(temp_parquet)

# Upload Parquet to Google Cloud Storage
blob_parquet = self.bucket.blob(f"{tiles_file_id_prefix}.parquet")
bucket = google_cloud_bucket(require_write_access=True)
if bucket is None:
print(
"Skipping Parquest and PMTiles upload due to read-only bucket access."
)
return
blob_parquet = bucket.blob(f"{tiles_file_id_prefix}.parquet")
try:
blob_parquet.upload_from_filename(temp_parquet)
parquet_size = os.stat(temp_parquet).st_size
Expand Down Expand Up @@ -399,7 +399,7 @@ def build_and_publish(self, tiles_file_id_prefix: str) -> None:

# Upload PMTiles to Google Cloud Storage
for file in write_files:
blob = self.bucket.blob(file)
blob = bucket.blob(file)
try:
blob.upload_from_filename(temp_merged_pmtiles)
print(f"PMTiles upload successful for {file}!")
Expand Down
25 changes: 14 additions & 11 deletions data/src/classes/slack_error_reporter.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
import os

from slack_sdk import WebClient


def send_error_to_slack(error_message: str) -> None:
def send_error_to_slack(error_message: str, slack_token: str | None = None) -> None:
"""Send error message to Slack."""
token: str | None = os.getenv("CAGP_SLACK_API_TOKEN") # token can be None
if token:
client = WebClient(token=token)
client.chat_postMessage(
channel="clean-and-green-philly-back-end", # Replace with actual Slack channel ID
text=error_message,
username="Backend Error Reporter",
)
else:
raise ValueError("Slack API token not found in environment variables.")
token = slack_token or os.getenv("CAGP_SLACK_API_TOKEN")
if not token:
print("Slack API token not found in environment variables.")
print("Skipping QC profile report to Slack.")
return

client = WebClient(token=token)
client.chat_postMessage(
channel="clean-and-green-philly-back-end", # Replace with actual Slack channel ID
text=error_message,
username="Backend Error Reporter",
)
5 changes: 4 additions & 1 deletion data/src/config/config.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import logging
from pathlib import Path

FORCE_RELOAD = False
FORCE_RELOAD = True
""" During the data load, whether to query the various GIS API services for the data to load into the postgres tables. If True, will query the API services, backup the database, reload the database and report on data differences. If false will read the data from postgres."""

BACKUP_SCHEMA = False
""" Whether to backup the database schema before loading the data in script.py. """

USE_CRS = "EPSG:2272"
""" the standard geospatial code for Pennsylvania South (ftUS) """

Expand Down
36 changes: 30 additions & 6 deletions data/src/config/psql.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,36 @@
import os

from config.config import is_docker
from sqlalchemy import create_engine

url: str = (
os.environ["VACANT_LOTS_DB"].replace("localhost", "host.docker.internal")
if is_docker()
else os.environ["VACANT_LOTS_DB"]
)
from config.config import is_docker


def get_db_url():
# Detect if running in Cloud Run:
is_cloud_run = "K_SERVICE" in os.environ or "CLOUD_RUN_JOB" in os.environ

# Use host.docker.internal when running locally in Docker
# except when running in Cloud Run
host = "localhost"
if is_docker() and not is_cloud_run:
host = "host.docker.internal"

if os.getenv("VACANT_LOTS_DB"):
# Use the provided database URL
url = os.getenv("VACANT_LOTS_DB")
url = url.replace("localhost", host)
else:
# Use the specified port, pw, db and user to construct the URL
pw = os.environ["POSTGRES_PASSWORD"]
port = os.getenv("POSTGRES_PORT", "5432")
db = os.getenv("POSTGRES_DB", "vacantlotdb")
user = os.getenv("POSTGRES_USER", "postgres")
url: str = f"postgresql://{user}:{pw}@{host}:{port}/{db}"
print(f"Set database url to: postgresql://{user}:****@{host}:{port}/{db}")
return url


url = get_db_url()

local_engine = create_engine(url)
conn = local_engine.connect()
Loading
Loading