Skip to content

Commit 3ea7771

Browse files
authored
feat: allow duplicate versions for gbfs feeds with different sources (#1222)
1 parent ba92900 commit 3ea7771

File tree

13 files changed

+145
-98
lines changed

13 files changed

+145
-98
lines changed

api/src/feeds/impl/models/gbfs_version_impl.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ def from_orm(cls, version: GbfsVersionOrm | None) -> GbfsVersion | None:
2828
version=version.version,
2929
created_at=version.created_at,
3030
last_updated_at=latest_report.validated_at if latest_report else None,
31-
latest=version.latest,
31+
source=version.source,
3232
endpoints=[GbfsEndpointImpl.from_orm(item) for item in version.gbfsendpoints]
3333
if version.gbfsendpoints
3434
else [],

api/src/scripts/populate_db_test_data.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -134,9 +134,7 @@ def populate_test_datasets(self, filepath, db_session: "Session"):
134134
if not gbfs_feed:
135135
self.logger.error(f"No feed found with stable_id: {version['feed_id']}")
136136
continue
137-
gbfs_version = Gbfsversion(
138-
id=version["id"], version=version["version"], url=version["url"], latest=version["latest"]
139-
)
137+
gbfs_version = Gbfsversion(id=version["id"], version=version["version"], url=version["url"])
140138
if version.get("endpoints"):
141139
for endpoint in version["endpoints"]:
142140
gbfs_endpoint = Gbfsendpoint(

api/tests/test_data/extra_test_data.json

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -766,7 +766,6 @@
766766
"id": "gbfs-system_id_1-2.3",
767767
"version": "2.3",
768768
"url": "https://www.example.com/gbfs_feed_1/2.3/gbfs.json",
769-
"latest": false,
770769
"endpoints": [
771770
{
772771
"id": "gbfs-system_id_1-2.3-system_information",
@@ -789,7 +788,6 @@
789788
"id": "gbfs-system_id_1-3.0",
790789
"version": "3.0",
791790
"url": "https://www.example.com/gbfs_feed_1/3.0/gbfs.json",
792-
"latest": true,
793791
"endpoints": [
794792
{
795793
"id": "gbfs-system_id_1-3.0-system_information",
@@ -810,7 +808,6 @@
810808
"id": "gbfs-system_id_2-2.3",
811809
"version": "2.3",
812810
"url": "https://www.example.com/gbfs_feed_1/2.3/gbfs.json",
813-
"latest": false,
814811
"endpoints": [
815812
{
816813
"id": "gbfs-system_id_2-2.3-system_information",

docs/DatabaseCatalogAPI.yaml

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -527,11 +527,16 @@ components:
527527
type: string
528528
format: date-time
529529
example: 2023-07-10T22:06:00Z
530-
latest:
530+
source:
531531
description: >
532-
A boolean value indicating if this is the latest version of the GBFS feed.
533-
type: boolean
534-
example: true
532+
Indicates the origin of the version information. Possible values are:
533+
* `autodiscovery`: Retrieved directly from the main GBFS autodiscovery URL.
534+
* `gbfs_versions`: Retrieved from the `gbfs_versions` endpoint.
535+
type: string
536+
enum:
537+
- autodiscovery
538+
- gbfs_versions
539+
535540
endpoints:
536541
description: >
537542
A list of endpoints that are available in the version.

functions-python/gbfs_validator/src/gbfs_data_processor.py

Lines changed: 85 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -100,8 +100,7 @@ def record_autodiscovery_request(
100100
raise ValueError(f"Error fetching {autodiscovery_url}")
101101

102102
def extract_gbfs_endpoints(
103-
self,
104-
gbfs_json_url: str,
103+
self, gbfs_json_url: str, extracted_from: str, latency: bool = True
105104
) -> Tuple[Optional[List[GBFSEndpoint]], GBFSVersion]:
106105
"""
107106
Extract GBFS endpoints from the GBFS JSON URL.
@@ -115,9 +114,11 @@ def extract_gbfs_endpoints(
115114
self.logger.warning(
116115
"No version found in the GBFS data. Defaulting to version 1.0."
117116
)
118-
gbfs_version = GBFSVersion("1.0", gbfs_json_url)
117+
gbfs_version = GBFSVersion("1.0", gbfs_json_url, extracted_from)
119118
else:
120-
gbfs_version = GBFSVersion(version_match[0].value, gbfs_json_url)
119+
gbfs_version = GBFSVersion(
120+
version_match[0].value, gbfs_json_url, extracted_from
121+
)
121122
if not feeds_matches:
122123
self.logger.error(
123124
"No feeds found in the GBFS data for version %s.", gbfs_version.version
@@ -133,9 +134,9 @@ def extract_gbfs_endpoints(
133134
)
134135
except AttributeError:
135136
language = None
136-
endpoints += GBFSEndpoint.from_dict(feed_match.value, language)
137+
endpoints += GBFSEndpoint.from_dict(feed_match.value, language, latency)
137138

138-
# If the autodiscovery endpoint is not listed then add it
139+
# If the autodiscovery endpoint is not listed, then add it
139140
if not any(endpoint.name == "gbfs" for endpoint in endpoints):
140141
endpoints += GBFSEndpoint.from_dict(
141142
[{"name": "gbfs", "url": gbfs_json_url}], None
@@ -147,6 +148,11 @@ def extract_gbfs_endpoints(
147148
for endpoint in endpoints
148149
}.values()
149150
)
151+
if len(unique_endpoints) != len(endpoints):
152+
self.logger.warning(
153+
"Duplicate endpoints found. This is a spec violation. Duplicates have been ignored."
154+
)
155+
150156
self.logger.info("Found version %s.", gbfs_version.version)
151157
self.logger.info(
152158
"Found endpoints %s.", ", ".join([endpoint.name for endpoint in endpoints])
@@ -155,10 +161,13 @@ def extract_gbfs_endpoints(
155161

156162
def extract_gbfs_versions(self, gbfs_json_url: str) -> Optional[List[GBFSVersion]]:
157163
"""Extract GBFS versions from the autodiscovery URL"""
158-
all_endpoints, version = self.extract_gbfs_endpoints(gbfs_json_url)
164+
all_endpoints, version = self.extract_gbfs_endpoints(
165+
gbfs_json_url, "autodiscovery"
166+
)
159167
if not all_endpoints or not version:
160168
return None
161-
self.gbfs_endpoints[version.version] = all_endpoints
169+
version_id = f"{self.stable_id}_{version.version}_{version.extracted_from}"
170+
self.gbfs_endpoints[version_id] = all_endpoints
162171

163172
# Fetch GBFS Versions
164173
gbfs_versions_endpoint = next(
@@ -172,7 +181,22 @@ def extract_gbfs_versions(self, gbfs_json_url: str) -> Optional[List[GBFSVersion
172181
gbfs_versions_json = fetch_gbfs_data(gbfs_versions_endpoint.url)
173182
versions_matches = parse("$..versions").find(gbfs_versions_json)
174183
if versions_matches:
175-
gbfs_versions = GBFSVersion.from_dict(versions_matches[0].value)
184+
extracted_versions = GBFSVersion.from_dict(
185+
versions_matches[0].value, "gbfs_versions"
186+
)
187+
autodiscovery_url_in_extracted = any(
188+
version.url == gbfs_json_url for version in extracted_versions
189+
)
190+
if len(extracted_versions) > 0 and not autodiscovery_url_in_extracted:
191+
self.logger.warning(
192+
"The autodiscovery URL is not included in gbfs_versions. There could be duplication"
193+
" of versions."
194+
)
195+
gbfs_versions = [
196+
version
197+
for version in extracted_versions
198+
if version.url != gbfs_json_url
199+
] + [version]
176200
self.logger.info(
177201
"Found versions %s",
178202
", ".join([version.version for version in gbfs_versions]),
@@ -186,29 +210,6 @@ def extract_gbfs_versions(self, gbfs_json_url: str) -> Optional[List[GBFSVersion
186210
version
187211
] # If no gbfs_versions endpoint, return the version from the autodiscovery URL
188212

189-
def get_latest_version(self) -> Optional[str]:
190-
"""Get the latest GBFS version."""
191-
max_version = max(
192-
(
193-
version
194-
for version in self.gbfs_versions
195-
if not version.version.lower().endswith("RC")
196-
),
197-
key=lambda version: version.version,
198-
default=None,
199-
)
200-
if not max_version:
201-
self.logger.error(
202-
"No non-RC versions found. Trying to set the latest to a RC version."
203-
)
204-
max_version = max(
205-
self.gbfs_versions, key=lambda version: version.version, default=None
206-
)
207-
if not max_version:
208-
self.logger.error("No versions found.")
209-
return None
210-
return max_version.version
211-
212213
@with_db_session()
213214
def update_database_entities(self, db_session: Session) -> None:
214215
"""Update the database entities with the processed GBFS data."""
@@ -222,9 +223,6 @@ def update_database_entities(self, db_session: Session) -> None:
222223
self.logger.error("GBFS feed with ID %s not found.", self.feed_id)
223224
return
224225
gbfs_versions_orm = []
225-
latest_version = self.get_latest_version()
226-
if not latest_version:
227-
return
228226

229227
# Deactivate versions that are not in the current feed
230228
active_versions = [version.version for version in self.gbfs_versions]
@@ -236,28 +234,29 @@ def update_database_entities(self, db_session: Session) -> None:
236234
# Update or create GBFS versions and endpoints
237235
for gbfs_version in self.gbfs_versions:
238236
gbfs_version_orm = self.update_or_create_gbfs_version(
239-
db_session, gbfs_version, latest_version
237+
db_session, gbfs_version
240238
)
241239
gbfs_versions_orm.append(gbfs_version_orm)
242240

243-
gbfs_endpoints = self.gbfs_endpoints.get(gbfs_version.version, [])
241+
gbfs_endpoints = self.gbfs_endpoints.get(gbfs_version_orm.id, [])
244242
gbfs_endpoints_orm = []
245-
features = self.validation_reports.get(gbfs_version.version, {}).get(
243+
features = self.validation_reports.get(gbfs_version_orm.id, {}).get(
246244
"features", []
247245
)
248246
for endpoint in gbfs_endpoints:
249247
gbfs_endpoint_orm = self.update_or_create_gbfs_endpoint(
250-
db_session, gbfs_version.version, endpoint, features
248+
db_session, gbfs_version_orm.id, endpoint, features
251249
)
252-
gbfs_endpoint_orm.httpaccesslogs.append(
253-
Httpaccesslog(
254-
request_method=HTTPMethod.GET.value,
255-
request_url=endpoint.url,
256-
status_code=endpoint.status_code,
257-
latency_ms=endpoint.latency,
258-
response_size_bytes=endpoint.response_size_bytes,
250+
if endpoint.status_code is not None:
251+
gbfs_endpoint_orm.httpaccesslogs.append(
252+
Httpaccesslog(
253+
request_method=HTTPMethod.GET.value,
254+
request_url=endpoint.url,
255+
status_code=endpoint.status_code,
256+
latency_ms=endpoint.latency,
257+
response_size_bytes=endpoint.response_size_bytes,
258+
)
259259
)
260-
)
261260
gbfs_endpoints_orm.append(gbfs_endpoint_orm)
262261

263262
# Deactivate endpoints that are not in the current feed
@@ -269,41 +268,42 @@ def update_database_entities(self, db_session: Session) -> None:
269268
gbfs_version_orm.gbfsendpoints = gbfs_endpoints_orm
270269

271270
validation_report_orm = self.create_validation_report_entities(
272-
gbfs_version_orm, self.validation_reports.get(gbfs_version.version, {})
271+
gbfs_version_orm, self.validation_reports.get(gbfs_version_orm.id, {})
273272
)
274273
if validation_report_orm:
275274
gbfs_version_orm.gbfsvalidationreports.append(validation_report_orm)
276275
gbfs_feed.gbfsversions = gbfs_versions_orm
277276
db_session.commit()
278277

279278
def update_or_create_gbfs_version(
280-
self, db_session: Session, gbfs_version: GBFSVersion, latest_version: str
279+
self, db_session: Session, gbfs_version: GBFSVersion
281280
) -> Gbfsversion:
282281
"""Update or create a GBFS version entity."""
283-
formatted_id = f"{self.stable_id}_{gbfs_version.version}"
282+
formatted_id = (
283+
f"{self.stable_id}_{gbfs_version.version}_{gbfs_version.extracted_from}"
284+
)
284285
gbfs_version_orm = (
285286
db_session.query(Gbfsversion).filter(Gbfsversion.id == formatted_id).first()
286287
)
287288
if not gbfs_version_orm:
288289
gbfs_version_orm = Gbfsversion(
289-
id=formatted_id, version=gbfs_version.version
290+
id=formatted_id,
291+
version=gbfs_version.version,
292+
source=gbfs_version.extracted_from,
290293
)
291294

292295
gbfs_version_orm.url = gbfs_version.url # Update the URL
293-
gbfs_version_orm.latest = (
294-
gbfs_version.version == latest_version
295-
) # Update the latest flag
296296
return gbfs_version_orm
297297

298298
def update_or_create_gbfs_endpoint(
299299
self,
300300
db_session: Session,
301-
version: str,
301+
version_id: str,
302302
endpoint: GBFSEndpoint,
303303
features: List[str],
304304
) -> Gbfsendpoint:
305305
"""Update or create a GBFS endpoint entity."""
306-
formatted_id = f"{self.stable_id}_{version}_{endpoint.name}"
306+
formatted_id = f"{version_id}_{endpoint.name}"
307307
if endpoint.language:
308308
formatted_id += f"_{endpoint.language}"
309309
gbfs_endpoint_orm = (
@@ -346,7 +346,8 @@ def validate_gbfs_feed_versions(self) -> None:
346346
json.dumps(json_report_summary), content_type="application/json"
347347
)
348348
report_summary_blob.make_public()
349-
self.validation_reports[version.version] = {
349+
version_id = f"{self.stable_id}_{version.version}_{version.extracted_from}"
350+
self.validation_reports[version_id] = {
350351
"report_summary_url": report_summary_blob.public_url,
351352
"json_report_summary": json_report_summary,
352353
"validation_time": date_time_utc,
@@ -356,6 +357,9 @@ def validate_gbfs_feed_versions(self) -> None:
356357
if not obj.get("required", True) and obj.get("exists", False)
357358
],
358359
}
360+
self.logger.info(
361+
f"Validated GBFS feed version: {version.version} with URL: {version.url}"
362+
)
359363

360364
def create_validation_report_entities(
361365
self, gbfs_version_orm: Gbfsversion, validation_report_data: Dict
@@ -373,7 +377,7 @@ def create_validation_report_entities(
373377
return None
374378

375379
validation_report_id = (
376-
f"{self.stable_id}_v{gbfs_version_orm.version}_{validation_time}"
380+
f"{self.stable_id}_v{gbfs_version_orm.id}_{validation_time}"
377381
)
378382
validation_report = Gbfsvalidationreport(
379383
id=validation_report_id,
@@ -401,21 +405,37 @@ def create_validation_report_entities(
401405
def extract_endpoints_for_all_versions(self):
402406
"""Extract endpoints for all versions of the GBFS feed."""
403407
for version in self.gbfs_versions:
404-
if version.version in self.gbfs_endpoints:
408+
version_id = f"{self.stable_id}_{version.version}_{version.extracted_from}"
409+
if version_id in self.gbfs_endpoints:
405410
continue
406-
endpoints, _ = self.extract_gbfs_endpoints(version.url)
411+
self.logger.info(f"Extracting endpoints for version {version.version}.")
412+
# Avoid fetching latency data for 'gbfs_versions' endpoint
413+
endpoints, _ = self.extract_gbfs_endpoints(
414+
version.url, "gbfs_versions", latency=False
415+
)
407416
if endpoints:
408-
self.gbfs_endpoints[version.version] = endpoints
417+
self.gbfs_endpoints[version_id] = endpoints
409418
else:
410419
self.logger.error("No endpoints found for version %s.", version.version)
411420

412421
def trigger_location_extraction(self):
413422
"""Trigger the location extraction process."""
414-
latest_version = self.get_latest_version()
415-
if not latest_version:
416-
self.logger.error("No latest version found.")
423+
autodiscovery_version = next(
424+
(
425+
version
426+
for version in self.gbfs_versions
427+
if version.extracted_from == "autodiscovery"
428+
),
429+
None,
430+
)
431+
if not autodiscovery_version:
432+
self.logger.error(
433+
"No autodiscovery version found. Cannot trigger location extraction."
434+
)
417435
return
418-
endpoints = self.gbfs_endpoints.get(latest_version, [])
436+
version_id = f"{self.stable_id}_{autodiscovery_version.version}_{autodiscovery_version.extracted_from}"
437+
endpoints = self.gbfs_endpoints.get(version_id, [])
438+
419439
# Find the station_information_url endpoint
420440
station_information_url = next(
421441
(

0 commit comments

Comments
 (0)