Skip to content

Commit e9e9c9e

Browse files
authored
feat: added location extraction to gbfs process (#1155)
1 parent 88940c8 commit e9e9c9e

File tree

15 files changed

+424
-117
lines changed

15 files changed

+424
-117
lines changed

functions-python/gbfs_validator/requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ google-cloud-pubsub
2020
google-api-core
2121
google-cloud-firestore
2222
google-cloud-datastore
23+
google-cloud-tasks
2324
cloudevents~=1.10.1
2425

2526
# Configuration

functions-python/gbfs_validator/src/gbfs_data_processor.py

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from http import HTTPMethod
66
from typing import Dict, Any, Optional, List, Tuple
77
from jsonpath_ng import parse
8+
from google.cloud import tasks_v2
89
import requests
910
from requests.exceptions import RequestException
1011
from google.cloud import storage
@@ -16,6 +17,7 @@
1617
BUCKET_NAME,
1718
VALIDATOR_URL,
1819
)
20+
import os
1921
from shared.database_gen.sqlacodegen_models import (
2022
Gbfsnotice,
2123
Gbfsversion,
@@ -26,6 +28,7 @@
2628
)
2729
from sqlalchemy.orm import Session
2830
from shared.database.database import with_db_session
31+
from shared.helpers.utils import create_http_task
2932

3033

3134
class GBFSDataProcessor:
@@ -53,6 +56,8 @@ def process_gbfs_data(self, autodiscovery_url: str) -> None:
5356
# Update database entities
5457
self.update_database_entities()
5558

59+
self.trigger_location_extraction()
60+
5661
@with_db_session()
5762
def record_autodiscovery_request(
5863
self, autodiscovery_url: str, db_session: Session
@@ -112,7 +117,6 @@ def extract_gbfs_endpoints(
112117
)
113118
except AttributeError:
114119
language = None
115-
print(language)
116120
endpoints += GBFSEndpoint.from_dict(feed_match.value, language)
117121
unique_endpoints = list(
118122
{
@@ -376,3 +380,52 @@ def extract_endpoints_for_all_versions(self):
376380
self.gbfs_endpoints[version.version] = endpoints
377381
else:
378382
logging.error(f"No endpoints found for version {version.version}.")
383+
384+
def trigger_location_extraction(self):
385+
"""Trigger the location extraction process."""
386+
latest_version = self.get_latest_version()
387+
if not latest_version:
388+
logging.error("No latest version found.")
389+
return
390+
endpoints = self.gbfs_endpoints.get(latest_version, [])
391+
# Find the station_information_url endpoint
392+
station_information_url = next(
393+
(
394+
endpoint.url
395+
for endpoint in endpoints
396+
if endpoint.name == "station_information"
397+
),
398+
None,
399+
)
400+
# If station_information_url is not found, use vehicle_status_url
401+
vehicle_status_url = next(
402+
(
403+
endpoint.url
404+
for endpoint in endpoints
405+
if endpoint.name == "vehicle_status"
406+
),
407+
None,
408+
)
409+
if not station_information_url and not vehicle_status_url:
410+
logging.warning("No station_information_url or vehicle_status_url found.")
411+
return
412+
client = tasks_v2.CloudTasksClient()
413+
body = json.dumps(
414+
{
415+
"stable_id": self.stable_id,
416+
"data_type": "gbfs",
417+
"station_information_url": station_information_url,
418+
"vehicle_status_url": vehicle_status_url,
419+
}
420+
).encode()
421+
project_id = os.getenv("PROJECT_ID")
422+
gcp_region = os.getenv("GCP_REGION")
423+
queue_name = os.getenv("QUEUE_NAME")
424+
create_http_task(
425+
client,
426+
body,
427+
f"https://{gcp_region}-{project_id}.cloudfunctions.net/reverse-geolocation-processor",
428+
project_id,
429+
gcp_region,
430+
queue_name,
431+
)

functions-python/gbfs_validator/tests/test_gbfs_data_processor.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@ def setUp(self):
6868
)
6969

7070
@with_db_session(db_url=default_db_url)
71+
@patch("gbfs_data_processor.create_http_task")
72+
@patch("gbfs_data_processor.tasks_v2")
7173
@patch(
7274
"gbfs_data_processor.GBFSEndpoint.get_request_metadata",
7375
side_effect=mock_get_request_metadata,
@@ -76,9 +78,15 @@ def setUp(self):
7678
@patch("gbfs_data_processor.fetch_gbfs_data", side_effect=mock_fetch_gbfs_data)
7779
@patch("requests.post")
7880
@patch("requests.get")
79-
@patch.dict(os.environ, {"FEEDS_DATABASE_URL": default_db_url})
81+
@patch.dict(
82+
os.environ,
83+
{
84+
"FEEDS_DATABASE_URL": default_db_url,
85+
"GOOGLE_APPLICATION_CREDENTIALS": "test",
86+
},
87+
)
8088
def test_fetch_gbfs_files(
81-
self, _, mock_post, __, mock_cloud_storage_client, ___, db_session
89+
self, _, mock_post, __, mock_cloud_storage_client, ___, ____, _____, db_session
8290
):
8391
autodiscovery_url = "http://example.com/gbfs.json"
8492
# Add GBFS feed to the database

functions-python/helpers/requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ google-cloud-datastore
2222
cloudevents~=1.10.1
2323
google-cloud-bigquery
2424
google-api-core
25+
google-cloud-tasks
2526
google-cloud-firestore
2627
google-cloud-bigquery
2728

functions-python/helpers/tests/test_helpers.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,3 +206,18 @@ def test_download_url_content_failure(self, mock_get):
206206
"Failed to download",
207207
"Should raise the correct exception",
208208
)
209+
210+
@patch.dict(
211+
os.environ,
212+
{
213+
"GOOGLE_APPLICATION_CREDENTIALS": "test",
214+
},
215+
)
216+
def test_create_http_task(self):
217+
from utils import create_http_task
218+
219+
client = MagicMock()
220+
body = b"test"
221+
url = "test"
222+
create_http_task(client, body, url, "test", "test", "test")
223+
client.create_task.assert_called_once()

functions-python/helpers/utils.py

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
1515
#
16-
1716
import hashlib
1817
import logging
1918
import os
@@ -131,3 +130,30 @@ def download_and_get_hash(
131130
if os.path.exists(file_path):
132131
os.remove(file_path)
133132
raise e
133+
134+
135+
def create_http_task(
136+
client, # type: tasks_v2.CloudTasksClient
137+
body: bytes,
138+
url: str,
139+
project_id: str,
140+
gcp_region: str,
141+
queue_name: str,
142+
) -> None:
143+
"""Creates a GCP Cloud Task."""
144+
from google.cloud import tasks_v2
145+
146+
task = tasks_v2.Task(
147+
http_request=tasks_v2.HttpRequest(
148+
url=url,
149+
http_method=tasks_v2.HttpMethod.POST,
150+
oidc_token=tasks_v2.OidcToken(
151+
service_account_email=os.getenv("SERVICE_ACCOUNT_EMAIL")
152+
),
153+
body=body,
154+
headers={"Content-Type": "application/json"},
155+
)
156+
)
157+
client.create_task(
158+
parent=client.queue_path(project_id, gcp_region, queue_name), task=task
159+
)

functions-python/reverse_geolocation/README.md

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -52,20 +52,25 @@ Currently, storing `stops.txt` in GCP is a temporary implementation and may be m
5252

5353
## 3. `reverse_geolocation_process` Function
5454

55-
This function performs the core reverse geolocation logic. It processes each stop in `stops.txt` and determines its geographic location.
55+
This function performs the core reverse geolocation logic. It processes location data from GTFS or GBFS feeds to determine their geographic context and stores it accordingly.
5656

5757
### Parameters:
58-
- `stable_id`: Identifies the GTFS feed.
59-
- `dataset_id`: Identifies the dataset being processed.
60-
- `stops_url`: URL of the `stops.txt` file.
58+
- `stable_id`: Identifies the feed (GTFS or GBFS).
59+
- `dataset_id`: Required if `data_type` is not provided or is `gtfs`. Identifies the dataset being processed.
60+
- `stops_url`: Required if `data_type` is not provided or is `gtfs`. URL of the GTFS `stops.txt` file.
61+
- `station_information_url`: Required if `data_type` is `gbfs` and `vehicle_status_url` is omitted. URL of the GBFS `station_information.json` file.
62+
- `vehicle_status_url`: Required if `data_type` is `gbfs` and `station_information_url` is omitted. URL of the GBFS `vehicle_status.json` file.
63+
- `data_type`: Optional. Specifies the type of data being processed. Can be `gtfs` or `gbfs`. If not provided, the function will attempt to determine the type based on the URLs provided.
6164

6265
### Processing Steps:
6366

64-
1. **Load Stop Data**
65-
- The function reads `stops.txt` into a Pandas DataFrame, ensuring unique longitude-latitude combinations to avoid redundant processing.
67+
1. **Load Location Data**
68+
- For GTFS: the function reads `stops.txt` into a Pandas DataFrame, ensuring unique longitude-latitude pairs.
69+
- For GBFS: location data is extracted from `station_information.json` (preferred) or `vehicle_status.json` (fallback), also ensuring uniqueness.
6670

67-
2. **Update Dataset Bounding Box**
68-
- The dataset's bounding box is updated using only the extreme coordinate values, forming a rectangular boundary.
71+
2. **Updates Bounding Box**
72+
- For GTFS: the bounding box is derived from stop coordinates. The dataset's bounding box is updated in the database.
73+
- For GBFS: it’s based on extracted station or vehicle coordinates. No database update is performed. We will use the term `stop` to refer to both GTFS stops and GBFS stations/vehicles.
6974

7075
3. **Check for Previously Processed Stops**
7176
- Stops are matched against existing `Stop` entities in PostgreSQL using geographic coordinates (not `stop_id`).
@@ -79,11 +84,17 @@ This function performs the core reverse geolocation logic. It processes each sto
7984
5. **Store Results in PostgreSQL**
8085
- Unique location aggregates are identified, and stop counts per location are recorded.
8186
- `Location` entities are created based on the extracted administrative hierarchy.
87+
8288
6. **GeoJSON Generation**
83-
- The function creates a **GeoJSON file** containing location aggregates and their corresponding stop counts.
84-
- The file is stored in **GCP Storage** following a consistent path format:
85-
- **`<feed_stable_id>/geolocation.geojson`**
86-
- This file always reflects the **latest dataset** results and is used for **location heatmap visualization on the front end**.
89+
- A **GeoJSON file** is created representing the aggregated locations and their counts.
90+
- It is stored in GCS GTFS or GBFS buckets, depending on the data type under:
91+
- **`<stable_id>/geolocation.geojson`**
92+
- The file includes:
93+
- Extracted locations,
94+
- Timestamp of extraction,
95+
- URL used for data extraction.
96+
- This file always reflects the most recent dataset/gbfs version results and powers the **location heatmap visualization** on the front end.
97+
8798
### Location Mapping:
8899
- **`country_code` / `country`**: Taken from the `Geopolygon` with an ISO 3166-1 code.
89100
- **`subdivision_name`**: Derived from the lowest administrative `Geopolygon` with an ISO 3166-2 code, but no ISO 3166-1 code.

functions-python/reverse_geolocation/requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ pycountry
2727
shapely
2828
gtfs-kit
2929
matplotlib
30+
jsonpath_ng
3031

3132
# Configuration
3233
python-dotenv==1.0.0

functions-python/reverse_geolocation/src/location_group_utils.py

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,11 @@
1-
import os
21
from typing import List, Optional
32

43
import matplotlib.pyplot as plt
54
import pycountry
65
from geoalchemy2.shape import to_shape
7-
from google.cloud import tasks_v2
86

97
from shared.database_gen.sqlacodegen_models import Geopolygon, Osmlocationgroup
108

11-
queue_name = os.getenv("QUEUE_NAME")
12-
project_id = os.getenv("PROJECT_ID")
13-
gcp_region = os.getenv("GCP_REGION")
14-
15-
169
ERROR_STATUS_CODE = 299 # Custom error code for the function to avoid retries
1710

1811

@@ -29,24 +22,6 @@ def generate_color(
2922
return f"rgba({int(rgba[0] * 255)}, {int(rgba[1] * 255)}, {int(rgba[2] * 255)}, {rgba[3]})"
3023

3124

32-
def create_http_task(client: tasks_v2.CloudTasksClient, body: bytes, url: str) -> None:
33-
"""Creates a GCP Cloud Task."""
34-
task = tasks_v2.Task(
35-
http_request=tasks_v2.HttpRequest(
36-
url=url,
37-
http_method=tasks_v2.HttpMethod.POST,
38-
oidc_token=tasks_v2.OidcToken(
39-
service_account_email=os.getenv("SERVICE_ACCOUNT_EMAIL")
40-
),
41-
body=body,
42-
headers={"Content-Type": "application/json"},
43-
)
44-
)
45-
client.create_task(
46-
parent=client.queue_path(project_id, gcp_region, queue_name), task=task
47-
)
48-
49-
5025
class GeopolygonAggregate:
5126
"""
5227
A class to represent an aggregate of geopolygon object to represent a location

0 commit comments

Comments
 (0)