Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ data/src/app/service-account-key.json
# compiled python files
*.pyc

tmp/

# Local python development files
.python-version

# Cached and temporary data files from pipeline
storage/
7 changes: 5 additions & 2 deletions data/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,11 @@ RUN git clone https://github.com/felt/tippecanoe.git \
&& make install

# Copy the src directory
COPY src/ .
COPY src ./src

# Use Pipenv to run the script
# Adjust the path to your main Python script if needed
CMD ["pipenv", "run", "python", "./script.py"]
RUN ls -a /usr/src/app

CMD ["pipenv", "run", "python", "-m", "src.main.py"]

26 changes: 0 additions & 26 deletions data/Dockerfile-pg

This file was deleted.

26 changes: 0 additions & 26 deletions data/Dockerfile-timescale

This file was deleted.

43 changes: 1 addition & 42 deletions data/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ services:
- GOOGLE_CLOUD_PROJECT
- CAGP_SLACK_API_TOKEN
volumes:
- ./src:/usr/src/app
- ./src:/usr/src/app/src
- ~/.config/gcloud/application_default_credentials.json:/app/service-account-key.json
- /etc/timezone:/etc/timezone:ro
- /etc/localtime:/etc/localtime:ro
Expand Down Expand Up @@ -56,44 +56,3 @@ services:
extra_hosts:
- host.docker.internal:host-gateway
network_mode: 'host'

postgres:
container_name: cagp-postgres
build:
context: .
dockerfile: Dockerfile-pg
environment:
PGPORT: 5433
POSTGRES_PASSWORD:
restart: always
ports:
- '5433:5433'
volumes:
- database_volume:/var/lib/postgresql/data
- ./init_pg.sql:/docker-entrypoint-initdb.d/init_pg.sql
- /etc/timezone:/etc/timezone:ro
- /etc/localtime:/etc/localtime:ro
extra_hosts:
- host.docker.internal:host-gateway

postgres-timescale:
container_name: cagp-postgres-timescale
build:
context: .
dockerfile: Dockerfile-timescale
environment:
PGPORT: 5434
POSTGRES_PASSWORD:
restart: always
ports:
- '5434:5434'
volumes:
- timescale_database_volume:/var/lib/postgresql/data
- ./init_pg_timescale.sql:/docker-entrypoint-initdb.d/init_pg.sql
- /etc/timezone:/etc/timezone:ro
- /etc/localtime:/etc/localtime:ro
extra_hosts:
- host.docker.internal:host-gateway
volumes:
database_volume:
timescale_database_volume:
3 changes: 0 additions & 3 deletions data/init_pg.sql

This file was deleted.

5 changes: 0 additions & 5 deletions data/init_pg_timescale.sql

This file was deleted.

87 changes: 59 additions & 28 deletions data/src/classes/featurelayer.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@
write_production_tiles_file,
)
from src.config.psql import conn, local_engine
from src.new_etl.classes.file_manager import FileManager, FileType, LoadType

log.basicConfig(level=log_level)

file_manager = FileManager()


def google_cloud_bucket(require_write_access: bool = False) -> storage.Bucket | None:
"""
Expand Down Expand Up @@ -290,27 +293,29 @@ def build_and_publish(self, tiles_file_id_prefix: str) -> None:
"""
zoom_threshold: int = 13

# Export the GeoDataFrame to a temporary GeoJSON file
temp_geojson_points: str = f"tmp/temp_{tiles_file_id_prefix}_points.geojson"
temp_geojson_polygons: str = f"tmp/temp_{tiles_file_id_prefix}_polygons.geojson"
temp_pmtiles_points: str = f"tmp/temp_{tiles_file_id_prefix}_points.pmtiles"
temp_pmtiles_polygons: str = f"tmp/temp_{tiles_file_id_prefix}_polygons.pmtiles"
temp_merged_pmtiles: str = f"tmp/temp_{tiles_file_id_prefix}_merged.pmtiles"
temp_parquet: str = f"tmp/{tiles_file_id_prefix}.parquet"
# Export the GeoDataFrames to a temporary GeoJSON files
temp_geojson_points_file_name: str = f"temp_{tiles_file_id_prefix}_points"
temp_geojson_polygons_file_name: str = f"temp_{tiles_file_id_prefix}_polygons"
temp_parquet_file_name: str = f"{tiles_file_id_prefix}"

# Reproject
gdf_wm = self.gdf.to_crs(epsg=4326)
gdf_wm.to_file(temp_geojson_polygons, driver="GeoJSON")
file_manager.save_gdf(
gdf_wm, temp_geojson_polygons_file_name, LoadType.TEMP, FileType.GEOJSON
)
gdf_wm.to_file(temp_geojson_polygons_file_name, driver="GeoJSON")

# Create points dataset
self.centroid_gdf = self.gdf.copy()
self.centroid_gdf["geometry"] = self.centroid_gdf["geometry"].centroid
self.centroid_gdf = self.centroid_gdf.to_crs(epsg=4326)
self.centroid_gdf.to_file(temp_geojson_points, driver="GeoJSON")
centroid_gdf = self.gdf.copy()
centroid_gdf["geometry"] = centroid_gdf["geometry"].centroid
centroid_gdf = centroid_gdf.to_crs(epsg=4326)
centroid_gdf.to_file(temp_geojson_points_file_name, driver="GeoJSON")
file_manager.save_gdf(
centroid_gdf, temp_geojson_points_file_name, LoadType.TEMP, FileType.GEOJSON
)

# Load the GeoJSON from the polygons, drop geometry, and save as Parquet
gdf_polygons = gpd.read_file(temp_geojson_polygons)
df_no_geom = gdf_polygons.drop(columns=["geometry"])
# Drop geometry, and save as Parquet
df_no_geom = gdf_wm.drop(columns=["geometry"])

# Check if the DataFrame has fewer than 25,000 rows
num_rows, num_cols = df_no_geom.shape
Expand All @@ -321,9 +326,14 @@ def build_and_publish(self, tiles_file_id_prefix: str) -> None:
return

# Save the DataFrame as Parquet
df_no_geom.to_parquet(temp_parquet)
file_manager.save_gdf(
df_no_geom, temp_parquet_file_name, LoadType.TEMP, FileType.PARQUET
)

# Upload Parquet to Google Cloud Storage
temp_parquet_file_path = file_manager.get_file_path(
temp_parquet_file_name, LoadType.TEMP, FileType.PARQUET
)
bucket = google_cloud_bucket(require_write_access=True)
if bucket is None:
print(
Expand All @@ -332,8 +342,8 @@ def build_and_publish(self, tiles_file_id_prefix: str) -> None:
return
blob_parquet = bucket.blob(f"{tiles_file_id_prefix}.parquet")
try:
blob_parquet.upload_from_filename(temp_parquet)
parquet_size = os.stat(temp_parquet).st_size
blob_parquet.upload_from_filename(temp_parquet_file_path)
parquet_size = os.stat(temp_parquet_file_path).st_size
parquet_size_mb = parquet_size / (1024 * 1024)
print(
f"Parquet upload successful! Size: {parquet_size} bytes ({parquet_size_mb:.2f} MB), Dimensions: {num_rows} rows, {num_cols} columns."
Expand All @@ -342,16 +352,37 @@ def build_and_publish(self, tiles_file_id_prefix: str) -> None:
print(f"Parquet upload failed: {e}")
return

temp_pmtiles_points_file_name: str = f"temp_{tiles_file_id_prefix}_points"
temp_pmtiles_polygons_file_name: str = f"temp_{tiles_file_id_prefix}_polygons"
temp_merged_pmtiles_file_name: str = f"temp_{tiles_file_id_prefix}_merged"

temp_pmtiles_points_file_path = file_manager.get_file_path(
temp_pmtiles_points_file_name, LoadType.TEMP, FileType.PMTILES
)
temp_pmtiles_polygons_file_path = file_manager.get_file_path(
temp_pmtiles_polygons_file_name, LoadType.TEMP, FileType.PMTILES
)
temp_merged_pmtiles_file_path = file_manager.get_file_path(
temp_merged_pmtiles_file_name, LoadType.TEMP, FileType.PMTILES
)

temp_geojson_points_file_path = file_manager.get_file_path(
temp_geojson_points_file_name, LoadType.TEMP, FileType.GEOJSON
)
temp_geojson_polygons_file_path = file_manager.get_file_path(
temp_geojson_points_file_name, LoadType.TEMP, FileType.GEOJSON
)

# Command for generating PMTiles for points up to zoom level zoom_threshold
points_command: list[str] = [
"tippecanoe",
f"--output={temp_pmtiles_points}",
f"--output={temp_pmtiles_points_file_path}",
f"--maximum-zoom={zoom_threshold}",
"--minimum-zoom=10",
"-zg",
"-aC",
"-r0",
temp_geojson_points,
temp_geojson_points_file_path,
"-l",
"vacant_properties_tiles_points",
"--force",
Expand All @@ -360,12 +391,12 @@ def build_and_publish(self, tiles_file_id_prefix: str) -> None:
# Command for generating PMTiles for polygons from zoom level zoom_threshold
polygons_command: list[str] = [
"tippecanoe",
f"--output={temp_pmtiles_polygons}",
f"--output={temp_pmtiles_polygons_file_path}",
f"--minimum-zoom={zoom_threshold}",
"--maximum-zoom=16",
"-zg",
"--no-tile-size-limit",
temp_geojson_polygons,
temp_geojson_polygons_file_path,
"-l",
"vacant_properties_tiles_polygons",
"--force",
Expand All @@ -374,10 +405,10 @@ def build_and_publish(self, tiles_file_id_prefix: str) -> None:
# Command for merging the two PMTiles files into a single output file
merge_command: list[str] = [
"tile-join",
f"--output={temp_merged_pmtiles}",
f"--output={temp_merged_pmtiles_file_path}",
"--no-tile-size-limit",
temp_pmtiles_polygons,
temp_pmtiles_points,
temp_pmtiles_polygons_file_path,
temp_pmtiles_points_file_path,
"--force",
]

Expand All @@ -391,17 +422,17 @@ def build_and_publish(self, tiles_file_id_prefix: str) -> None:
write_files.append(f"{tiles_file_id_prefix}.pmtiles")

# Check whether the temp saved tiles files is big enough.
file_size: int = os.stat(temp_merged_pmtiles).st_size
file_size: int = os.stat(temp_merged_pmtiles_file_path).st_size
if file_size < min_tiles_file_size_in_bytes:
raise ValueError(
f"{temp_merged_pmtiles} is {file_size} bytes in size but should be at least {min_tiles_file_size_in_bytes}. Therefore, we are not uploading any files to the GCP bucket. The file may be corrupt or incomplete."
f"{temp_merged_pmtiles_file_path} is {file_size} bytes in size but should be at least {min_tiles_file_size_in_bytes}. Therefore, we are not uploading any files to the GCP bucket. The file may be corrupt or incomplete."
)

# Upload PMTiles to Google Cloud Storage
for file in write_files:
blob = bucket.blob(file)
try:
blob.upload_from_filename(temp_merged_pmtiles)
blob.upload_from_filename(temp_merged_pmtiles_file_path)
print(f"PMTiles upload successful for {file}!")
except Exception as e:
print(f"PMTiles upload failed for {file}: {e}")
6 changes: 6 additions & 0 deletions data/src/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@
USE_CRS = "EPSG:2272"
""" the standard geospatial code for Pennsylvania South (ftUS) """

ROOT_DIRECTORY = Path(__file__).resolve().parent.parent
""" the root directory of the project """

CACHE_FRACTION = 0.05
"""The fraction used to cache portions of the pipeline's transformed data in each step of the pipeline."""

log_level: int = logging.WARN
""" overall log level for the project """

Expand Down
33 changes: 5 additions & 28 deletions data/src/config/psql.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,33 +4,10 @@

from src.config.config import is_docker


def get_db_url():
# Detect if running in Cloud Run:
is_cloud_run = "K_SERVICE" in os.environ or "CLOUD_RUN_JOB" in os.environ

# Use host.docker.internal when running locally in Docker
# except when running in Cloud Run
host = "localhost"
if is_docker() and not is_cloud_run:
host = "host.docker.internal"

if os.getenv("VACANT_LOTS_DB"):
# Use the provided database URL
url = os.getenv("VACANT_LOTS_DB")
url = url.replace("localhost", host)
else:
# Use the specified port, pw, db and user to construct the URL
pw = os.environ["POSTGRES_PASSWORD"]
port = os.getenv("POSTGRES_PORT", "5432")
db = os.getenv("POSTGRES_DB", "vacantlotdb")
user = os.getenv("POSTGRES_USER", "postgres")
url: str = f"postgresql://{user}:{pw}@{host}:{port}/{db}"
print(f"Set database url to: postgresql://{user}:****@{host}:{port}/{db}")
return url


url = get_db_url()

url: str = (
os.environ["VACANT_LOTS_DB"].replace("localhost", "host.docker.internal")
if is_docker()
else os.environ["VACANT_LOTS_DB"]
)
local_engine = create_engine(url)
conn = local_engine.connect()
4 changes: 3 additions & 1 deletion data/src/data_utils/conservatorship.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import datetime
from dateutil.parser import parse

import pytz
from dateutil.parser import parse

est = pytz.timezone("US/Eastern")
six_months_ago = (datetime.datetime.now() - datetime.timedelta(days=180)).astimezone(
Expand Down Expand Up @@ -56,4 +57,5 @@ def conservatorship(primary_featurelayer):
conservatorships.append(conservatorship)

primary_featurelayer.gdf["conservatorship"] = conservatorships

return primary_featurelayer
Loading
Loading