Skip to content

Commit e893f86

Browse files
committed
add common verifier functions
1 parent f5f51a9 commit e893f86

File tree

2 files changed

+159
-88
lines changed

2 files changed

+159
-88
lines changed
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
import logging
2+
import os
3+
import socket
4+
import subprocess
5+
from typing import Dict
6+
import uuid
7+
from io import BytesIO
8+
9+
import requests
10+
11+
from shared.database.database import with_db_session
12+
from shared.database_gen.sqlacodegen_models import Gtfsfeed, Gbfsfeed
13+
from shared.helpers.runtime_metrics import track_metrics
14+
15+
from google.cloud import storage
16+
from sqlalchemy.orm import Session
17+
18+
19+
EMULATOR_STORAGE_BUCKET_NAME = "verifier"
20+
EMULATOR_HOST = "localhost"
21+
EMULATOR_STORAGE_PORT = 9023
22+
23+
24+
@track_metrics(metrics=("time", "memory", "cpu"))
25+
def download_to_local(
26+
feed_stable_id: str, url: str, filename: str, force_download: bool = False
27+
):
28+
"""
29+
Download a file from a URL and upload it to the Google Cloud Storage emulator.
30+
If the file already exists, it will not be downloaded again.
31+
Args:
32+
url (str): The URL to download the file from.
33+
filename (str): The name of the file to save in the emulator.
34+
"""
35+
if not url:
36+
return
37+
blob_path = f"{feed_stable_id}/{filename}"
38+
client = storage.Client()
39+
bucket = client.bucket(EMULATOR_STORAGE_BUCKET_NAME)
40+
blob = bucket.blob(blob_path)
41+
42+
# Check if the blob already exists in the emulator
43+
if not blob.exists() or force_download:
44+
logging.info(f"Downloading and uploading: {blob_path}")
45+
with requests.get(url, stream=True) as response:
46+
response.raise_for_status()
47+
blob.content_type = "application/json"
48+
# The file is downloaded into memory before uploading to ensure it's seekable.
49+
# Be careful with large files.
50+
data = BytesIO(response.content)
51+
blob.upload_from_file(data, rewind=True)
52+
else:
53+
logging.info(f"Blob already exists: gs://{EMULATOR_STORAGE_BUCKET_NAME}/{blob_path}")
54+
55+
56+
@with_db_session
57+
def create_test_data(feed_stable_id: str, feed_dict: Dict, db_session: Session = None):
58+
"""
59+
Create test data in the database if it does not exist.
60+
This function is used to ensure that the reverse geolocation process has the necessary data to work with.
61+
"""
62+
# Here you would typically interact with your database to create the necessary test data
63+
# For this example, we will just log the action
64+
logging.info(f"Creating test data for {feed_stable_id} with data: {feed_dict}")
65+
model = Gtfsfeed if feed_dict["data_type"] == "gtfs" else Gbfsfeed
66+
local_feed = (
67+
db_session.query(model).filter(model.stable_id == feed_stable_id).one_or_none()
68+
)
69+
if not local_feed:
70+
local_feed = model(
71+
id=uuid.uuid4(),
72+
stable_id=feed_stable_id,
73+
data_type=feed_dict["data_type"],
74+
feed_name="Test Feed",
75+
note="This is a test feed created for reverse geolocation verification.",
76+
producer_url="https://files.mobilitydatabase.org/mdb-2014/mdb-2014-202508120303/mdb-2014-202508120303.zip",
77+
authentication_type="0",
78+
status="active",
79+
)
80+
db_session.add(local_feed)
81+
db_session.commit()
82+
83+
84+
def setup_local_storage_emulator():
85+
"""
86+
Setup the Google Cloud Storage emulator by creating the necessary bucket.
87+
"""
88+
from gcp_storage_emulator.server import create_server
89+
90+
os.environ["STORAGE_EMULATOR_HOST"] = f"http://{EMULATOR_HOST}:{EMULATOR_STORAGE_PORT}"
91+
os.environ["DATASETS_BUCKET_NAME_GBFS"] = EMULATOR_STORAGE_BUCKET_NAME
92+
os.environ["DATASETS_BUCKET_NAME_GTFS"] = EMULATOR_STORAGE_BUCKET_NAME
93+
os.environ["DATASTORE_EMULATOR_HOST"] = "localhost:8081"
94+
server = create_server(
95+
host=EMULATOR_HOST, port=EMULATOR_STORAGE_PORT, in_memory=False, default_bucket=EMULATOR_STORAGE_BUCKET_NAME
96+
)
97+
server.start()
98+
return server
99+
100+
101+
def shutdown_local_storage_emulator(server):
102+
""" Shutdown the Google Cloud Storage emulator."""
103+
server.stop()
104+
105+
106+
def is_datastore_emulator_running(host=EMULATOR_HOST, port=8081):
107+
""" Check if the Google Cloud Datastore emulator is running."""
108+
try:
109+
with socket.create_connection((host, port), timeout=2):
110+
return True
111+
except OSError:
112+
return False
113+
114+
115+
def start_datastore_emulator(project_id="test-project"):
116+
""" Start the Google Cloud Datastore emulator if it's not already running."""
117+
if not is_datastore_emulator_running():
118+
process = subprocess.Popen([
119+
"gcloud", "beta", "emulators", "datastore", "start",
120+
"--project={}".format(project_id),
121+
"--host-port=localhost:8081"
122+
])
123+
return process
124+
return None # Already running
125+
126+
127+
def shutdown_datastore_emulator(process):
128+
""" Shutdown the Google Cloud Datastore emulator."""
129+
if process:
130+
process.terminate()
131+
process.wait()

functions-python/reverse_geolocation/src/scripts/reverse_geolocation_process_verifier.py

Lines changed: 28 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -5,27 +5,26 @@
55

66
import json
77
import logging
8-
import os
9-
import uuid
10-
from io import BytesIO
118
from typing import Dict
129

1310
import folium
14-
import requests
1511
from dotenv import load_dotenv
16-
from google.cloud import storage
17-
from sqlalchemy.orm import Session
1812

1913
from reverse_geolocation_processor import reverse_geolocation_process
20-
from shared.database.database import with_db_session
21-
from shared.database_gen.sqlacodegen_models import Gtfsfeed, Gbfsfeed
2214
from shared.helpers.locations import ReverseGeocodingStrategy
2315
from shared.helpers.logger import init_logger
24-
from shared.helpers.runtime_metrics import track_metrics
2516

26-
HOST = "localhost"
27-
PORT = 9023
28-
BUCKET_NAME = "verifier"
17+
from shared.helpers.verifier_common import (
18+
download_to_local,
19+
EMULATOR_STORAGE_BUCKET_NAME,
20+
create_test_data,
21+
EMULATOR_HOST,
22+
EMULATOR_STORAGE_PORT,
23+
setup_local_storage_emulator,
24+
shutdown_local_storage_emulator,
25+
start_datastore_emulator,
26+
shutdown_datastore_emulator,
27+
)
2928

3029
feeds = [
3130
{
@@ -80,7 +79,7 @@
8079
},
8180
]
8281
run_with_feed_index = (
83-
1 # Set to an integer index to run with a specific feed from the list above
82+
3 # Set to an integer index to run with a specific feed from the list above
8483
)
8584

8685

@@ -90,38 +89,6 @@
9089
init_logger()
9190

9291

93-
@track_metrics(metrics=("time", "memory", "cpu"))
94-
def download_to_local(
95-
feed_stable_id: str, url: str, filename: str, force_download: bool = False
96-
):
97-
"""
98-
Download a file from a URL and upload it to the Google Cloud Storage emulator.
99-
If the file already exists, it will not be downloaded again.
100-
Args:
101-
url (str): The URL to download the file from.
102-
filename (str): The name of the file to save in the emulator.
103-
"""
104-
if not url:
105-
return
106-
blob_path = f"{feed_stable_id}/{filename}"
107-
client = storage.Client()
108-
bucket = client.bucket(BUCKET_NAME)
109-
blob = bucket.blob(blob_path)
110-
111-
# Check if the blob already exists in the emulator
112-
if not blob.exists() or force_download:
113-
logging.info(f"Downloading and uploading: {blob_path}")
114-
with requests.get(url, stream=True) as response:
115-
response.raise_for_status()
116-
blob.content_type = "application/json"
117-
# The file is downloaded into memory before uploading to ensure it's seekable.
118-
# Be careful with large files.
119-
data = BytesIO(response.content)
120-
blob.upload_from_file(data, rewind=True)
121-
else:
122-
logging.info(f"Blob already exists: gs://{BUCKET_NAME}/{blob_path}")
123-
124-
12592
def verify_reverse_geolocation_process(
12693
feed_stable_id: str,
12794
feed_dict: Dict,
@@ -183,7 +150,10 @@ def verify_reverse_geolocation_process(
183150
reverse_geolocation_process(request)
184151

185152
# Visualize the resulting geojson file
186-
url = f"http://{HOST}:{PORT}/{BUCKET_NAME}/{feed_stable_id}/geolocation.geojson"
153+
url = (
154+
f"http://{EMULATOR_HOST}:{EMULATOR_STORAGE_PORT}/{EMULATOR_STORAGE_BUCKET_NAME}"
155+
f"/{feed_stable_id}/geolocation.geojson"
156+
)
187157
gdf = gpd.read_file(url)
188158

189159
# Calculate centroid for map center
@@ -203,37 +173,8 @@ def verify_reverse_geolocation_process(
203173
)
204174

205175

206-
@with_db_session
207-
def create_test_data(feed_stable_id: str, feed_dict: Dict, db_session: Session = None):
208-
"""
209-
Create test data in the database if it does not exist.
210-
This function is used to ensure that the reverse geolocation process has the necessary data to work with.
211-
"""
212-
# Here you would typically interact with your database to create the necessary test data
213-
# For this example, we will just log the action
214-
logging.info(f"Creating test data for {feed_stable_id} with data: {feed_dict}")
215-
model = Gtfsfeed if feed_dict["data_type"] == "gtfs" else Gbfsfeed
216-
local_feed = (
217-
db_session.query(model).filter(model.stable_id == feed_stable_id).one_or_none()
218-
)
219-
if not local_feed:
220-
local_feed = model(
221-
id=uuid.uuid4(),
222-
stable_id=feed_stable_id,
223-
data_type=feed_dict["data_type"],
224-
feed_name="Test Feed",
225-
note="This is a test feed created for reverse geolocation verification.",
226-
producer_url="https://files.mobilitydatabase.org/mdb-2014/mdb-2014-202508120303/mdb-2014-202508120303.zip",
227-
authentication_type="0",
228-
status="active",
229-
)
230-
db_session.add(local_feed)
231-
db_session.commit()
232-
233-
234176
if __name__ == "__main__":
235177
import geopandas as gpd
236-
from gcp_storage_emulator.server import create_server
237178
from flask import Flask, Request
238179

239180
strategy = ReverseGeocodingStrategy.PER_POINT
@@ -245,16 +186,20 @@ def create_test_data(feed_stable_id: str, feed_dict: Dict, db_session: Session =
245186
data = {
246187
"stable_id": feed_stable_id,
247188
"dataset_id": feed_dict["dataset_id"] if "dataset_id" in feed_dict else None,
248-
"station_information_url": f"http://{HOST}:{PORT}/{BUCKET_NAME}/{feed_stable_id}/station_information.json"
189+
"station_information_url": f"http://{EMULATOR_HOST}:{EMULATOR_STORAGE_PORT}/{EMULATOR_STORAGE_BUCKET_NAME}"
190+
f"/{feed_stable_id}/station_information.json"
249191
if "station_information_url" in feed_dict
250192
else None,
251-
"vehicle_status_url": f"http://{HOST}:{PORT}/{BUCKET_NAME}/{feed_stable_id}/vehicle_status.json"
193+
"vehicle_status_url": f"http://{EMULATOR_HOST}:{EMULATOR_STORAGE_PORT}/{EMULATOR_STORAGE_BUCKET_NAME}"
194+
f"/{feed_stable_id}/vehicle_status.json"
252195
if "vehicle_status_url" in feed_dict
253196
else None,
254-
"free_bike_status_url": f"http://{HOST}:{PORT}/{BUCKET_NAME}/{feed_stable_id}/free_bike_status.json"
197+
"free_bike_status_url": f"http://{EMULATOR_HOST}:{EMULATOR_STORAGE_PORT}/{EMULATOR_STORAGE_BUCKET_NAME}"
198+
f"/{feed_stable_id}/free_bike_status.json"
255199
if "free_bike_status_url" in feed_dict
256200
else None,
257-
"stops_url": f"http://{HOST}:{PORT}/{BUCKET_NAME}/{feed_stable_id}/stops.txt",
201+
"stops_url": f"http://{EMULATOR_HOST}:{EMULATOR_STORAGE_PORT}/{EMULATOR_STORAGE_BUCKET_NAME}"
202+
f"/{feed_stable_id}/stops.txt",
258203
"strategy": str(strategy.value),
259204
"data_type": feed_dict["data_type"],
260205
# "use_cache": False,
@@ -263,14 +208,8 @@ def create_test_data(feed_stable_id: str, feed_dict: Dict, db_session: Session =
263208
}
264209

265210
try:
266-
os.environ["STORAGE_EMULATOR_HOST"] = f"http://{HOST}:{PORT}"
267-
os.environ["DATASETS_BUCKET_NAME_GBFS"] = BUCKET_NAME
268-
os.environ["DATASETS_BUCKET_NAME_GTFS"] = BUCKET_NAME
269-
os.environ["DATASTORE_EMULATOR_HOST"] = "localhost:8081"
270-
server = create_server(
271-
host=HOST, port=PORT, in_memory=False, default_bucket=BUCKET_NAME
272-
)
273-
server.start()
211+
server = setup_local_storage_emulator()
212+
datastore_process = start_datastore_emulator()
274213
verify_reverse_geolocation_process(
275214
feed_stable_id=feed_stable_id,
276215
feed_dict=feed_dict,
@@ -280,4 +219,5 @@ def create_test_data(feed_stable_id: str, feed_dict: Dict, db_session: Session =
280219
except Exception as e:
281220
logging.error(f"Error verifying download content: {e}")
282221
finally:
283-
server.stop()
222+
shutdown_local_storage_emulator(server)
223+
shutdown_datastore_emulator(datastore_process)

0 commit comments

Comments
 (0)