Skip to content

Commit c2fa5f7

Browse files
authored
Merging in feature pipeline-integration branch (#1212)
Merges in several large changes, including ripping out postgres services and replacing with caching to filesystem with parquet files, refactoring to the slack reporting and data diffing and other supporting changes.
1 parent b323c68 commit c2fa5f7

30 files changed

+786
-877
lines changed

.gitignore

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,8 @@ data/src/app/service-account-key.json
6060
# compiled python files
6161
*.pyc
6262

63-
tmp/
64-
6563
# Local python development files
6664
.python-version
65+
66+
# Cached and temporary data files from pipeline
67+
storage/

data/Dockerfile

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,11 @@ RUN git clone https://github.com/felt/tippecanoe.git \
4545
&& make install
4646

4747
# Copy the src directory
48-
COPY src/ .
48+
COPY src ./src
4949

5050
# Use Pipenv to run the script
5151
# Adjust the path to your main Python script if needed
52-
CMD ["pipenv", "run", "python", "./script.py"]
52+
RUN ls -a /usr/src/app
53+
54+
CMD ["pipenv", "run", "python", "-m", "src.main.py"]
55+

data/Dockerfile-pg

Lines changed: 0 additions & 26 deletions
This file was deleted.

data/Dockerfile-timescale

Lines changed: 0 additions & 26 deletions
This file was deleted.

data/docker-compose.yml

Lines changed: 1 addition & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ services:
1515
- GOOGLE_CLOUD_PROJECT
1616
- CAGP_SLACK_API_TOKEN
1717
volumes:
18-
- ./src:/usr/src/app
18+
- ./src:/usr/src/app/src
1919
- ~/.config/gcloud/application_default_credentials.json:/app/service-account-key.json
2020
- /etc/timezone:/etc/timezone:ro
2121
- /etc/localtime:/etc/localtime:ro
@@ -56,44 +56,3 @@ services:
5656
extra_hosts:
5757
- host.docker.internal:host-gateway
5858
network_mode: 'host'
59-
60-
postgres:
61-
container_name: cagp-postgres
62-
build:
63-
context: .
64-
dockerfile: Dockerfile-pg
65-
environment:
66-
PGPORT: 5433
67-
POSTGRES_PASSWORD:
68-
restart: always
69-
ports:
70-
- '5433:5433'
71-
volumes:
72-
- database_volume:/var/lib/postgresql/data
73-
- ./init_pg.sql:/docker-entrypoint-initdb.d/init_pg.sql
74-
- /etc/timezone:/etc/timezone:ro
75-
- /etc/localtime:/etc/localtime:ro
76-
extra_hosts:
77-
- host.docker.internal:host-gateway
78-
79-
postgres-timescale:
80-
container_name: cagp-postgres-timescale
81-
build:
82-
context: .
83-
dockerfile: Dockerfile-timescale
84-
environment:
85-
PGPORT: 5434
86-
POSTGRES_PASSWORD:
87-
restart: always
88-
ports:
89-
- '5434:5434'
90-
volumes:
91-
- timescale_database_volume:/var/lib/postgresql/data
92-
- ./init_pg_timescale.sql:/docker-entrypoint-initdb.d/init_pg.sql
93-
- /etc/timezone:/etc/timezone:ro
94-
- /etc/localtime:/etc/localtime:ro
95-
extra_hosts:
96-
- host.docker.internal:host-gateway
97-
volumes:
98-
database_volume:
99-
timescale_database_volume:

data/init_pg.sql

Lines changed: 0 additions & 3 deletions
This file was deleted.

data/init_pg_timescale.sql

Lines changed: 0 additions & 5 deletions
This file was deleted.

data/src/classes/featurelayer.py

Lines changed: 59 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,12 @@
2020
write_production_tiles_file,
2121
)
2222
from src.config.psql import conn, local_engine
23+
from src.new_etl.classes.file_manager import FileManager, FileType, LoadType
2324

2425
log.basicConfig(level=log_level)
2526

27+
file_manager = FileManager()
28+
2629

2730
def google_cloud_bucket(require_write_access: bool = False) -> storage.Bucket | None:
2831
"""
@@ -290,27 +293,29 @@ def build_and_publish(self, tiles_file_id_prefix: str) -> None:
290293
"""
291294
zoom_threshold: int = 13
292295

293-
# Export the GeoDataFrame to a temporary GeoJSON file
294-
temp_geojson_points: str = f"tmp/temp_{tiles_file_id_prefix}_points.geojson"
295-
temp_geojson_polygons: str = f"tmp/temp_{tiles_file_id_prefix}_polygons.geojson"
296-
temp_pmtiles_points: str = f"tmp/temp_{tiles_file_id_prefix}_points.pmtiles"
297-
temp_pmtiles_polygons: str = f"tmp/temp_{tiles_file_id_prefix}_polygons.pmtiles"
298-
temp_merged_pmtiles: str = f"tmp/temp_{tiles_file_id_prefix}_merged.pmtiles"
299-
temp_parquet: str = f"tmp/{tiles_file_id_prefix}.parquet"
296+
# Export the GeoDataFrames to a temporary GeoJSON files
297+
temp_geojson_points_file_name: str = f"temp_{tiles_file_id_prefix}_points"
298+
temp_geojson_polygons_file_name: str = f"temp_{tiles_file_id_prefix}_polygons"
299+
temp_parquet_file_name: str = f"{tiles_file_id_prefix}"
300300

301301
# Reproject
302302
gdf_wm = self.gdf.to_crs(epsg=4326)
303-
gdf_wm.to_file(temp_geojson_polygons, driver="GeoJSON")
303+
file_manager.save_gdf(
304+
gdf_wm, temp_geojson_polygons_file_name, LoadType.TEMP, FileType.GEOJSON
305+
)
306+
gdf_wm.to_file(temp_geojson_polygons_file_name, driver="GeoJSON")
304307

305308
# Create points dataset
306-
self.centroid_gdf = self.gdf.copy()
307-
self.centroid_gdf["geometry"] = self.centroid_gdf["geometry"].centroid
308-
self.centroid_gdf = self.centroid_gdf.to_crs(epsg=4326)
309-
self.centroid_gdf.to_file(temp_geojson_points, driver="GeoJSON")
309+
centroid_gdf = self.gdf.copy()
310+
centroid_gdf["geometry"] = centroid_gdf["geometry"].centroid
311+
centroid_gdf = centroid_gdf.to_crs(epsg=4326)
312+
centroid_gdf.to_file(temp_geojson_points_file_name, driver="GeoJSON")
313+
file_manager.save_gdf(
314+
centroid_gdf, temp_geojson_points_file_name, LoadType.TEMP, FileType.GEOJSON
315+
)
310316

311-
# Load the GeoJSON from the polygons, drop geometry, and save as Parquet
312-
gdf_polygons = gpd.read_file(temp_geojson_polygons)
313-
df_no_geom = gdf_polygons.drop(columns=["geometry"])
317+
# Drop geometry, and save as Parquet
318+
df_no_geom = gdf_wm.drop(columns=["geometry"])
314319

315320
# Check if the DataFrame has fewer than 25,000 rows
316321
num_rows, num_cols = df_no_geom.shape
@@ -321,9 +326,14 @@ def build_and_publish(self, tiles_file_id_prefix: str) -> None:
321326
return
322327

323328
# Save the DataFrame as Parquet
324-
df_no_geom.to_parquet(temp_parquet)
329+
file_manager.save_gdf(
330+
df_no_geom, temp_parquet_file_name, LoadType.TEMP, FileType.PARQUET
331+
)
325332

326333
# Upload Parquet to Google Cloud Storage
334+
temp_parquet_file_path = file_manager.get_file_path(
335+
temp_parquet_file_name, LoadType.TEMP, FileType.PARQUET
336+
)
327337
bucket = google_cloud_bucket(require_write_access=True)
328338
if bucket is None:
329339
print(
@@ -332,8 +342,8 @@ def build_and_publish(self, tiles_file_id_prefix: str) -> None:
332342
return
333343
blob_parquet = bucket.blob(f"{tiles_file_id_prefix}.parquet")
334344
try:
335-
blob_parquet.upload_from_filename(temp_parquet)
336-
parquet_size = os.stat(temp_parquet).st_size
345+
blob_parquet.upload_from_filename(temp_parquet_file_path)
346+
parquet_size = os.stat(temp_parquet_file_path).st_size
337347
parquet_size_mb = parquet_size / (1024 * 1024)
338348
print(
339349
f"Parquet upload successful! Size: {parquet_size} bytes ({parquet_size_mb:.2f} MB), Dimensions: {num_rows} rows, {num_cols} columns."
@@ -342,16 +352,37 @@ def build_and_publish(self, tiles_file_id_prefix: str) -> None:
342352
print(f"Parquet upload failed: {e}")
343353
return
344354

355+
temp_pmtiles_points_file_name: str = f"temp_{tiles_file_id_prefix}_points"
356+
temp_pmtiles_polygons_file_name: str = f"temp_{tiles_file_id_prefix}_polygons"
357+
temp_merged_pmtiles_file_name: str = f"temp_{tiles_file_id_prefix}_merged"
358+
359+
temp_pmtiles_points_file_path = file_manager.get_file_path(
360+
temp_pmtiles_points_file_name, LoadType.TEMP, FileType.PMTILES
361+
)
362+
temp_pmtiles_polygons_file_path = file_manager.get_file_path(
363+
temp_pmtiles_polygons_file_name, LoadType.TEMP, FileType.PMTILES
364+
)
365+
temp_merged_pmtiles_file_path = file_manager.get_file_path(
366+
temp_merged_pmtiles_file_name, LoadType.TEMP, FileType.PMTILES
367+
)
368+
369+
temp_geojson_points_file_path = file_manager.get_file_path(
370+
temp_geojson_points_file_name, LoadType.TEMP, FileType.GEOJSON
371+
)
372+
temp_geojson_polygons_file_path = file_manager.get_file_path(
373+
temp_geojson_points_file_name, LoadType.TEMP, FileType.GEOJSON
374+
)
375+
345376
# Command for generating PMTiles for points up to zoom level zoom_threshold
346377
points_command: list[str] = [
347378
"tippecanoe",
348-
f"--output={temp_pmtiles_points}",
379+
f"--output={temp_pmtiles_points_file_path}",
349380
f"--maximum-zoom={zoom_threshold}",
350381
"--minimum-zoom=10",
351382
"-zg",
352383
"-aC",
353384
"-r0",
354-
temp_geojson_points,
385+
temp_geojson_points_file_path,
355386
"-l",
356387
"vacant_properties_tiles_points",
357388
"--force",
@@ -360,12 +391,12 @@ def build_and_publish(self, tiles_file_id_prefix: str) -> None:
360391
# Command for generating PMTiles for polygons from zoom level zoom_threshold
361392
polygons_command: list[str] = [
362393
"tippecanoe",
363-
f"--output={temp_pmtiles_polygons}",
394+
f"--output={temp_pmtiles_polygons_file_path}",
364395
f"--minimum-zoom={zoom_threshold}",
365396
"--maximum-zoom=16",
366397
"-zg",
367398
"--no-tile-size-limit",
368-
temp_geojson_polygons,
399+
temp_geojson_polygons_file_path,
369400
"-l",
370401
"vacant_properties_tiles_polygons",
371402
"--force",
@@ -374,10 +405,10 @@ def build_and_publish(self, tiles_file_id_prefix: str) -> None:
374405
# Command for merging the two PMTiles files into a single output file
375406
merge_command: list[str] = [
376407
"tile-join",
377-
f"--output={temp_merged_pmtiles}",
408+
f"--output={temp_merged_pmtiles_file_path}",
378409
"--no-tile-size-limit",
379-
temp_pmtiles_polygons,
380-
temp_pmtiles_points,
410+
temp_pmtiles_polygons_file_path,
411+
temp_pmtiles_points_file_path,
381412
"--force",
382413
]
383414

@@ -391,17 +422,17 @@ def build_and_publish(self, tiles_file_id_prefix: str) -> None:
391422
write_files.append(f"{tiles_file_id_prefix}.pmtiles")
392423

393424
# Check whether the temp saved tiles files is big enough.
394-
file_size: int = os.stat(temp_merged_pmtiles).st_size
425+
file_size: int = os.stat(temp_merged_pmtiles_file_path).st_size
395426
if file_size < min_tiles_file_size_in_bytes:
396427
raise ValueError(
397-
f"{temp_merged_pmtiles} is {file_size} bytes in size but should be at least {min_tiles_file_size_in_bytes}. Therefore, we are not uploading any files to the GCP bucket. The file may be corrupt or incomplete."
428+
f"{temp_merged_pmtiles_file_path} is {file_size} bytes in size but should be at least {min_tiles_file_size_in_bytes}. Therefore, we are not uploading any files to the GCP bucket. The file may be corrupt or incomplete."
398429
)
399430

400431
# Upload PMTiles to Google Cloud Storage
401432
for file in write_files:
402433
blob = bucket.blob(file)
403434
try:
404-
blob.upload_from_filename(temp_merged_pmtiles)
435+
blob.upload_from_filename(temp_merged_pmtiles_file_path)
405436
print(f"PMTiles upload successful for {file}!")
406437
except Exception as e:
407438
print(f"PMTiles upload failed for {file}: {e}")

data/src/config/config.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,12 @@
1010
USE_CRS = "EPSG:2272"
1111
""" the standard geospatial code for Pennsylvania South (ftUS) """
1212

13+
ROOT_DIRECTORY = Path(__file__).resolve().parent.parent
14+
""" the root directory of the project """
15+
16+
CACHE_FRACTION = 0.05
17+
"""The fraction used to cache portions of the pipeline's transformed data in each step of the pipeline."""
18+
1319
log_level: int = logging.WARN
1420
""" overall log level for the project """
1521

data/src/config/psql.py

Lines changed: 5 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -4,33 +4,10 @@
44

55
from src.config.config import is_docker
66

7-
8-
def get_db_url():
9-
# Detect if running in Cloud Run:
10-
is_cloud_run = "K_SERVICE" in os.environ or "CLOUD_RUN_JOB" in os.environ
11-
12-
# Use host.docker.internal when running locally in Docker
13-
# except when running in Cloud Run
14-
host = "localhost"
15-
if is_docker() and not is_cloud_run:
16-
host = "host.docker.internal"
17-
18-
if os.getenv("VACANT_LOTS_DB"):
19-
# Use the provided database URL
20-
url = os.getenv("VACANT_LOTS_DB")
21-
url = url.replace("localhost", host)
22-
else:
23-
# Use the specified port, pw, db and user to construct the URL
24-
pw = os.environ["POSTGRES_PASSWORD"]
25-
port = os.getenv("POSTGRES_PORT", "5432")
26-
db = os.getenv("POSTGRES_DB", "vacantlotdb")
27-
user = os.getenv("POSTGRES_USER", "postgres")
28-
url: str = f"postgresql://{user}:{pw}@{host}:{port}/{db}"
29-
print(f"Set database url to: postgresql://{user}:****@{host}:{port}/{db}")
30-
return url
31-
32-
33-
url = get_db_url()
34-
7+
url: str = (
8+
os.environ["VACANT_LOTS_DB"].replace("localhost", "host.docker.internal")
9+
if is_docker()
10+
else os.environ["VACANT_LOTS_DB"]
11+
)
3512
local_engine = create_engine(url)
3613
conn = local_engine.connect()

0 commit comments

Comments
 (0)