Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions data/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ services:
environment:
- GOOGLE_APPLICATION_CREDENTIALS=/app/service-account-key.json
- VACANT_LOTS_DB
- POSTGRES_PASSWORD
- POSTGRES_PORT=5434
- CLEAN_GREEN_GOOGLE_KEY
- PYTHONUNBUFFERED=1
- GOOGLE_CLOUD_BUCKET_NAME
Expand Down
34 changes: 28 additions & 6 deletions data/src/config/psql.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,34 @@
import os

from config.config import is_docker
from sqlalchemy import create_engine

url: str = (
os.environ["VACANT_LOTS_DB"].replace("localhost", "host.docker.internal")
if is_docker()
else os.environ["VACANT_LOTS_DB"]
)
from config.config import is_docker

# Detect if running in Cloud Run:
is_cloud_run = "K_SERVICE" in os.environ or "CLOUD_RUN_JOB" in os.environ

# Use host.docker.internal when running locally in Docker
# except when running in Cloud Run
if is_docker() and not is_cloud_run:
host = "host.docker.internal"
else:
host = "localhost"


if os.getenv("VACANT_LOTS_DB"):
# Use the provided database URL
url = os.getenv("VACANT_LOTS_DB")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I probably should switch this back to

 os.environ["VACANT_LOTS_DB"].replace("localhost", "host.docker.internal")
    if is_docker()
    else os.environ["VACANT_LOTS_DB"]

in-case we're in docker and VACANT_LOTS_DB is defined.

else:
# Use the specified port, pw, db and user to construct the URL
pw = os.environ["POSTGRES_PASSWORD"]
port = os.getenv("POSTGRES_PORT", "5432")
db = os.getenv("POSTGRES_DB", "vacantlotdb")
user = os.getenv("POSTGRES_USER", "postgres")
url: str = f"postgresql://{user}:{pw}@{host}:{port}/{db}"
print(
f"Connecting to database with URL: postgresql://{user}:****@{host}:{port}/{db}"
)


local_engine = create_engine(url)
conn = local_engine.connect()
68 changes: 68 additions & 0 deletions data/src/new_etl/classes/bucket_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import logging as log
import os

from google.cloud import storage

from config.config import log_level

log.basicConfig(level=log_level)


class GCSBucketManager:
"""
A manager for interacting with a Google Cloud Storage bucket.

This class initializes a bucket client using Application Default Credentials,
an optional service account key, or falls back to an anonymous client for read-only access.
"""

def __init__(
self, bucket_name: str = None, credential_path: str = None, client=None
):
"""
Initialize the GCSBucketManager.

Args:
bucket_name (str): Name of the bucket. Defaults to the environment variable
'GOOGLE_CLOUD_BUCKET_NAME' or "cleanandgreenphl" if not set.
credential_path (str): Optional path to a service account credentials file.
client: Optional storage. Client instance for dependency injection in testing.
"""
self.bucket_name = bucket_name or os.getenv(
"GOOGLE_CLOUD_BUCKET_NAME", "cleanandgreenphl"
)

self.read_only = False

if client is not None:
self._client = client
else:
self._client = self._init_client(credential_path)

self.bucket = self._client.bucket(self.bucket_name)

def _init_client(self, credential_path: str = None):
"""
Attempt to initialize the storage client using a credential file, application default
credentials or fall back to anonymous/read-only.
"""
project_name = os.getenv("GOOGLE_CLOUD_PROJECT", "clean-and-green-philly")
credentials_path = credential_path or "/app/service-account-key.json"
is_credentials_file = os.path.exists(credentials_path)

if is_credentials_file:
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = credentials_path

try:
# This will use application default credentials if GOOGLE_APPLICATION_CREDENTIALS is not set
if is_credentials_file:
print(f"Using service account key at {credentials_path}")
else:
print("Using application default credentials")
return storage.Client(project=project_name)

except Exception as e:
log.warning(f"Failed to initialize client with service account key: {e}")
log.warning("Falling back to anonymous client (read-only mode)")
self.read_only = True
return storage.Client.create_anonymous_client()
39 changes: 22 additions & 17 deletions data/src/new_etl/classes/data_diff.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from slack_sdk import WebClient
import os

import pandas as pd
from slack_sdk import WebClient
from sqlalchemy import text
import os

from config.psql import conn

Expand Down Expand Up @@ -115,27 +116,31 @@ def generate_diff(self):

self.summary_text = "\n".join(summary_lines)

def send_to_slack(self, channel="clean-and-green-philly-pipeline"):
def send_to_slack(
self, channel="clean-and-green-philly-pipeline", slack_token=None
):
"""
Sends the diff summary to a Slack channel.

Args:
channel (str): The Slack channel to post the message to.
"""
token = os.getenv("CAGP_SLACK_API_TOKEN")
if token:
client = WebClient(token=token)
try:
client.chat_postMessage(
channel=channel,
text=f"*Data Difference Report*\n\n{self.summary_text}",
username="Diff Reporter",
)
print("Diff report sent to Slack successfully.")
except Exception as e:
print(f"Failed to send diff report to Slack: {e}")
else:
raise ValueError("Slack API token not found in environment variables.")
token = slack_token or os.getenv("CAGP_SLACK_API_TOKEN")
if not token:
print("Slack API token not found in environment variables.")
print("Skipping sending diff report to Slack.")
return

client = WebClient(token=token)
try:
client.chat_postMessage(
channel=channel,
text=f"*Data Difference Report*\n\n{self.summary_text}",
username="Diff Reporter",
)
print("Diff report sent to Slack successfully.")
except Exception as e:
print(f"Failed to send diff report to Slack: {e}")

def run(self, send_to_slack=True, slack_channel="clean-and-green-philly-pipeline"):
"""
Expand Down
32 changes: 17 additions & 15 deletions data/src/new_etl/classes/featurelayer.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import requests
import sqlalchemy as sa
from google.cloud import storage
from google.cloud.storage.bucket import Bucket
from shapely import wkb
from tqdm import tqdm

Expand All @@ -21,29 +20,26 @@
write_production_tiles_file,
)
from config.psql import conn, local_engine
from new_etl.classes.bucket_manager import GCSBucketManager
from new_etl.database import to_postgis_with_schema
from new_etl.loaders import load_carto_data, load_esri_data

log.basicConfig(level=log_level)


def google_cloud_bucket() -> Bucket:
"""Build the google cloud bucket with name configured in your environ or default of cleanandgreenphl

Returns:
Bucket: the gcp bucket
def google_cloud_bucket(require_write_access: bool = False) -> storage.Bucket | None:
"""
Initialize a Google Cloud Storage bucket client using Application Default Credentials.
If a writable bucket is requested and the user does not have write access None is returned.

credentials_path = os.path.expanduser("/app/service-account-key.json")
if not os.path.exists(credentials_path):
raise FileNotFoundError(f"Credentials file not found at {credentials_path}")

os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = credentials_path
bucket_name = os.getenv("GOOGLE_CLOUD_BUCKET_NAME", "cleanandgreenphl")
project_name = os.getenv("GOOGLE_CLOUD_PROJECT", "clean-and-green-philly")
Args:
require_write_access (bool): Whether it is required that the bucket should be writable. Defaults to False.
"""

storage_client = storage.Client(project=project_name)
return storage_client.bucket(bucket_name)
bucket_manager = GCSBucketManager()
if require_write_access and bucket_manager.read_only:
return None
return bucket_manager.bucket


class FeatureLayer:
Expand Down Expand Up @@ -149,6 +145,7 @@ def load_data(self):
]

# Save GeoDataFrame to PostgreSQL and configure it as a hypertable
print("Saving GeoDataFrame to PostgreSQL...")
to_postgis_with_schema(self.gdf, self.psql_table, conn)

except Exception as e:
Expand Down Expand Up @@ -358,6 +355,11 @@ def build_and_publish(self, tiles_file_id_prefix: str) -> None:
f"{temp_merged_pmtiles} is {file_size} bytes in size but should be at least {min_tiles_file_size_in_bytes}. Therefore, we are not uploading any files to the GCP bucket. The file may be corrupt or incomplete."
)

bucket = google_cloud_bucket(require_write_access=True)
if bucket is None:
print("Skipping PMTiles upload due to read-only bucket access.")
return

# Upload PMTiles to Google Cloud Storage
bucket = google_cloud_bucket()
for file in write_files:
Expand Down
Loading
Loading