Skip to content

Commit 7c32e05

Browse files
committed
feat: added location extraction to gbfs process
1 parent 90511e3 commit 7c32e05

File tree

11 files changed

+270
-87
lines changed

11 files changed

+270
-87
lines changed

functions-python/gbfs_validator/requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ google-cloud-pubsub
2020
google-api-core
2121
google-cloud-firestore
2222
google-cloud-datastore
23+
google-cloud-tasks
2324
cloudevents~=1.10.1
2425

2526
# Configuration

functions-python/gbfs_validator/src/gbfs_data_processor.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from http import HTTPMethod
66
from typing import Dict, Any, Optional, List, Tuple
77
from jsonpath_ng import parse
8+
from google.cloud import tasks_v2
89
import requests
910
from requests.exceptions import RequestException
1011
from google.cloud import storage
@@ -16,6 +17,7 @@
1617
BUCKET_NAME,
1718
VALIDATOR_URL,
1819
)
20+
import os
1921
from shared.database_gen.sqlacodegen_models import (
2022
Gbfsnotice,
2123
Gbfsversion,
@@ -26,6 +28,7 @@
2628
)
2729
from sqlalchemy.orm import Session
2830
from shared.database.database import with_db_session
31+
from shared.helpers.utils import create_http_task
2932

3033

3134
class GBFSDataProcessor:
@@ -53,6 +56,8 @@ def process_gbfs_data(self, autodiscovery_url: str) -> None:
5356
# Update database entities
5457
self.update_database_entities()
5558

59+
self.trigger_location_extraction()
60+
5661
@with_db_session()
5762
def record_autodiscovery_request(
5863
self, autodiscovery_url: str, db_session: Session
@@ -376,3 +381,53 @@ def extract_endpoints_for_all_versions(self):
376381
self.gbfs_endpoints[version.version] = endpoints
377382
else:
378383
logging.error(f"No endpoints found for version {version.version}.")
384+
385+
def trigger_location_extraction(self):
386+
"""Trigger the location extraction process."""
387+
latest_version = self.get_latest_version()
388+
if not latest_version:
389+
logging.error("No latest version found.")
390+
return
391+
endpoints = self.gbfs_endpoints.get(latest_version, [])
392+
# Find the station_information_url endpoint
393+
station_information_url = next(
394+
(
395+
endpoint.url
396+
for endpoint in endpoints
397+
if endpoint.name == "station_information"
398+
),
399+
None,
400+
)
401+
# If station_information_url is not found, use vehicle_status_url
402+
vehicle_status_url = next(
403+
(
404+
endpoint.url
405+
for endpoint in endpoints
406+
if endpoint.name == "vehicle_status"
407+
),
408+
None,
409+
)
410+
if not station_information_url and not vehicle_status_url:
411+
logging.error("No station_information_url or vehicle_status_url found.")
412+
return
413+
client = tasks_v2.CloudTasksClient()
414+
body = json.dumps(
415+
{
416+
"stable_id": self.stable_id,
417+
"data_type": "gbfs",
418+
"station_information_url": station_information_url,
419+
"vehicle_status_url": vehicle_status_url
420+
}
421+
).encode()
422+
project_id = os.getenv("PROJECT_ID")
423+
gcp_region = os.getenv("GCP_REGION")
424+
queue_name = os.getenv("QUEUE_NAME")
425+
create_http_task(
426+
client,
427+
body,
428+
f"https://{gcp_region}-{project_id}.cloudfunctions.net/reverse-geolocation-processor",
429+
project_id,
430+
gcp_region,
431+
queue_name
432+
)
433+

functions-python/helpers/requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ google-cloud-datastore
2222
cloudevents~=1.10.1
2323
google-cloud-bigquery
2424
google-api-core
25+
google-cloud-tasks
2526
google-cloud-firestore
2627
google-cloud-bigquery
2728

functions-python/helpers/utils.py

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
1515
#
16-
16+
from google.cloud import tasks_v2
1717
import hashlib
1818
import logging
1919
import os
@@ -131,3 +131,28 @@ def download_and_get_hash(
131131
if os.path.exists(file_path):
132132
os.remove(file_path)
133133
raise e
134+
135+
136+
def create_http_task(
137+
client: tasks_v2.CloudTasksClient,
138+
body: bytes,
139+
url: str,
140+
project_id: str,
141+
gcp_region: str,
142+
queue_name: str,
143+
) -> None:
144+
"""Creates a GCP Cloud Task."""
145+
task = tasks_v2.Task(
146+
http_request=tasks_v2.HttpRequest(
147+
url=url,
148+
http_method=tasks_v2.HttpMethod.POST,
149+
oidc_token=tasks_v2.OidcToken(
150+
service_account_email=os.getenv("SERVICE_ACCOUNT_EMAIL")
151+
),
152+
body=body,
153+
headers={"Content-Type": "application/json"},
154+
)
155+
)
156+
client.create_task(
157+
parent=client.queue_path(project_id, gcp_region, queue_name), task=task
158+
)

functions-python/reverse_geolocation/requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ pycountry
2727
shapely
2828
gtfs-kit
2929
matplotlib
30+
jsonpath_ng
3031

3132
# Configuration
3233
python-dotenv==1.0.0

functions-python/reverse_geolocation/src/location_group_utils.py

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,11 @@
1-
import os
21
from typing import List, Optional
32

43
import matplotlib.pyplot as plt
54
import pycountry
65
from geoalchemy2.shape import to_shape
7-
from google.cloud import tasks_v2
86

97
from shared.database_gen.sqlacodegen_models import Geopolygon, Osmlocationgroup
108

11-
queue_name = os.getenv("QUEUE_NAME")
12-
project_id = os.getenv("PROJECT_ID")
13-
gcp_region = os.getenv("GCP_REGION")
14-
15-
169
ERROR_STATUS_CODE = 299 # Custom error code for the function to avoid retries
1710

1811

@@ -29,24 +22,6 @@ def generate_color(
2922
return f"rgba({int(rgba[0] * 255)}, {int(rgba[1] * 255)}, {int(rgba[2] * 255)}, {rgba[3]})"
3023

3124

32-
def create_http_task(client: tasks_v2.CloudTasksClient, body: bytes, url: str) -> None:
33-
"""Creates a GCP Cloud Task."""
34-
task = tasks_v2.Task(
35-
http_request=tasks_v2.HttpRequest(
36-
url=url,
37-
http_method=tasks_v2.HttpMethod.POST,
38-
oidc_token=tasks_v2.OidcToken(
39-
service_account_email=os.getenv("SERVICE_ACCOUNT_EMAIL")
40-
),
41-
body=body,
42-
headers={"Content-Type": "application/json"},
43-
)
44-
)
45-
client.create_task(
46-
parent=client.queue_path(project_id, gcp_region, queue_name), task=task
47-
)
48-
49-
5025
class GeopolygonAggregate:
5126
"""
5227
A class to represent an aggregate of geopolygon object to represent a location
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
import io
2+
import logging
3+
from typing import Tuple, Optional
4+
5+
import flask
6+
import pandas as pd
7+
import requests
8+
from jsonpath_ng import parse
9+
10+
11+
def parse_request_parameters(
12+
request: flask.Request,
13+
) -> Tuple[pd.DataFrame, str, Optional[str], str, str]:
14+
"""
15+
Parse the request parameters and return a DataFrame with the stops data.
16+
@:returns Tuple: A tuple containing the stops DataFrame, stable ID, and dataset ID.
17+
"""
18+
logging.info("Parsing request parameters.")
19+
request_json = request.get_json(silent=True)
20+
logging.info(f"Request JSON: {request_json}")
21+
22+
if (
23+
not request_json
24+
or (
25+
("stops_url" not in request_json or "dataset_id" not in request_json) and
26+
"station_information_url" not in request_json and
27+
"vehicle_status_url" not in request_json
28+
)
29+
or "stable_id" not in request_json
30+
):
31+
raise ValueError(
32+
"Missing required parameters: [stops_url, dataset_id | station_information_url | vehicle_status_url], "
33+
"stable_id."
34+
)
35+
36+
data_type = request_json.get("data_type", "gtfs")
37+
logging.info(f"Data type: {data_type}")
38+
if data_type == "gtfs":
39+
df, stable_id, dataset_id, url = parse_request_parameters_gtfs(request_json)
40+
elif data_type == "gbfs":
41+
df, stable_id, dataset_id, url = parse_request_parameters_gbfs(request_json)
42+
else:
43+
raise ValueError(
44+
f"Invalid data_type '{data_type}'. Supported types are 'gtfs' and 'gbfs'."
45+
)
46+
return df, stable_id, dataset_id, data_type, url
47+
48+
49+
def parse_request_parameters_gtfs(request_json: dict) -> Tuple[pd.DataFrame, str, Optional[str], str]:
50+
""" Parse the request parameters for GTFS data. """
51+
if (
52+
not request_json
53+
or "stops_url" not in request_json
54+
or "stable_id" not in request_json
55+
or "dataset_id" not in request_json
56+
):
57+
raise ValueError(
58+
"Invalid request: missing 'stops_url', 'dataset_id' or 'stable_id' parameter."
59+
)
60+
61+
stable_id = request_json["stable_id"]
62+
dataset_id = request_json["dataset_id"]
63+
64+
# Read the stops from the URL
65+
try:
66+
s = requests.get(request_json["stops_url"]).content
67+
stops_df = pd.read_csv(io.StringIO(s.decode("utf-8")))
68+
except Exception as e:
69+
raise ValueError(
70+
f"Error reading stops from URL {request_json['stops_url']}: {e}"
71+
)
72+
return stops_df, stable_id, dataset_id, request_json["stops_url"]
73+
74+
75+
def parse_station_information_url(station_information_url) -> pd.DataFrame:
76+
""" Parse the station information URL and return a DataFrame with the stops' data. """
77+
response = requests.get(station_information_url)
78+
response.raise_for_status()
79+
data = response.json()
80+
81+
lat_expr = parse('data.stations[*].lat')
82+
lon_expr = parse('data.stations[*].lon')
83+
station_id_expr = parse('data.stations[*].station_id')
84+
85+
lats = [match.value for match in lat_expr.find(data)]
86+
lons = [match.value for match in lon_expr.find(data)]
87+
station_ids = [match.value for match in station_id_expr.find(data)]
88+
89+
stations_info = [
90+
{"station_id": sid, "stop_lat": lat, "stop_lon": lon}
91+
for sid, lat, lon in zip(station_ids, lats, lons)
92+
]
93+
return pd.DataFrame(stations_info)
94+
95+
96+
def parse_vehicle_status_url(vehicle_status_url) -> pd.DataFrame:
97+
""" Parse the vehicle status URL and return a DataFrame with vehicle_id, lat, and lon. """
98+
response = requests.get(vehicle_status_url)
99+
response.raise_for_status()
100+
data = response.json()
101+
102+
lat_expr = parse('data.vehicles[*].lat')
103+
lon_expr = parse('data.vehicles[*].lon')
104+
vehicle_id_expr = parse('data.vehicles[*].vehicle_id')
105+
106+
lats = [match.value for match in lat_expr.find(data)]
107+
lons = [match.value for match in lon_expr.find(data)]
108+
vehicle_ids = [match.value for match in vehicle_id_expr.find(data)]
109+
110+
vehicles_info = [
111+
{"vehicle_id": vid, "stop_lat": lat, "stop_lon": lon}
112+
for vid, lat, lon in zip(vehicle_ids, lats, lons)
113+
]
114+
115+
return pd.DataFrame(vehicles_info)
116+
117+
118+
def parse_request_parameters_gbfs(request_json: dict) -> Tuple[pd.DataFrame, str, Optional[str], str]:
119+
""" Parse the request parameters for GBFS data. """
120+
if (
121+
not request_json
122+
or ("station_information_url" not in request_json and "vehicle_status_url" not in request_json)
123+
or "stable_id" not in request_json
124+
):
125+
raise ValueError(
126+
"Invalid request: missing ['station_information_url' | 'vehicle_status_url'], 'dataset_id' or 'stable_id' "
127+
"parameter."
128+
)
129+
130+
stable_id = request_json["stable_id"]
131+
station_information_url = request_json.get("station_information_url")
132+
vehicle_status_url = request_json.get("vehicle_status_url")
133+
if station_information_url:
134+
logging.info('Parsing station information URL')
135+
stops_df = parse_station_information_url(station_information_url)
136+
else:
137+
logging.info('Parsing vehicle status URL')
138+
stops_df = parse_vehicle_status_url(vehicle_status_url)
139+
return stops_df, stable_id, None, station_information_url or vehicle_status_url
140+

functions-python/reverse_geolocation/src/reverse_geolocation.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@
88
from google.cloud import storage
99
from google.cloud import tasks_v2
1010

11-
from location_group_utils import create_http_task, project_id, gcp_region
1211
from shared.helpers.logger import Logger
1312
from shared.helpers.parser import jsonify_pubsub
13+
from shared.helpers.utils import create_http_task
1414

1515

1616
def init(request: CloudEvent) -> None:
@@ -110,8 +110,15 @@ def create_http_processor_task(
110110
body = json.dumps(
111111
{"stable_id": stable_id, "stops_url": stops_url, "dataset_id": dataset_id}
112112
).encode()
113+
queue_name = os.getenv("QUEUE_NAME")
114+
project_id = os.getenv("PROJECT_ID")
115+
gcp_region = os.getenv("GCP_REGION")
116+
113117
create_http_task(
114118
client,
115119
body,
116120
f"https://{gcp_region}-{project_id}.cloudfunctions.net/reverse-geolocation-processor",
121+
project_id,
122+
gcp_region,
123+
queue_name,
117124
)

0 commit comments

Comments
 (0)