Skip to content

Commit 6b4feff

Browse files
committed
add strategy decision for extracting location from feeds
1 parent 6361130 commit 6b4feff

File tree

12 files changed

+306
-85
lines changed

12 files changed

+306
-85
lines changed

functions-python/helpers/locations.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,19 @@
1+
from enum import Enum
12
from typing import Dict, Optional
23
from sqlalchemy.orm import Session
34
import pycountry
45
from shared.database_gen.sqlacodegen_models import Feed, Location
56
import logging
67

78

9+
class ReverseGeocodingStrategy(str, Enum):
10+
"""
11+
Enum for reverse geocoding strategies.
12+
"""
13+
14+
PER_POINT = "per-point"
15+
16+
817
def get_country_code(country_name: str) -> Optional[str]:
918
"""
1019
Get ISO 3166 country code from country name
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
import time
2+
import tracemalloc
3+
import psutil
4+
import logging
5+
6+
7+
def track_metrics(metrics=("time", "memory", "cpu"), logger=None):
8+
"""Decorator to track specified metrics (time, memory, cpu) during function execution.
9+
The decorator logs the metrics using the provided logger or a default logger if none is provided.
10+
Args:
11+
metrics (tuple): Metrics to track. Options are "time", "memory", "cpu
12+
logger (logging.Logger): Logger instance to log the metrics. If None, uses a default logger.
13+
Usage:
14+
@track_metrics(metrics=("time", "memory", "cpu"), logger=dynamic_logger)
15+
def example_function():
16+
data = [i for i in range(10**6)] # Simulate work
17+
time.sleep(1) # Simulate delay
18+
return sum(data)
19+
"""
20+
21+
def decorator(func):
22+
def wrapper(*args, **kwargs):
23+
if not logger:
24+
# Use a default logger if none is provided
25+
logger_instance = logging.getLogger(func.__name__)
26+
# handler = logging.StreamHandler()
27+
# formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
28+
# handler.setFormatter(formatter)
29+
# logger_instance.addHandler(handler)
30+
# logger_instance.setLevel(logging.DEBUG)
31+
else:
32+
logger_instance = logger
33+
34+
process = psutil.Process()
35+
tracemalloc.start() if "memory" in metrics else None
36+
start_time = time.time() if "time" in metrics else None
37+
cpu_before = (
38+
process.cpu_percent(interval=None) if "cpu" in metrics else None
39+
)
40+
41+
try:
42+
result = func(*args, **kwargs)
43+
except Exception as e:
44+
logger_instance.error(
45+
f"Function '{func.__name__}' raised an exception: {e}"
46+
)
47+
raise
48+
finally:
49+
if "time" in metrics:
50+
duration = time.time() - start_time
51+
logger_instance.debug(
52+
f"Function '{func.__name__}' executed in {duration:.2f} seconds."
53+
)
54+
if "memory" in metrics:
55+
current, peak = tracemalloc.get_traced_memory()
56+
tracemalloc.stop()
57+
logger_instance.debug(
58+
f"Function '{func.__name__}' peak memory usage: {peak / (1024 ** 2):.2f} MB."
59+
)
60+
if "cpu" in metrics:
61+
cpu_after = process.cpu_percent(interval=None)
62+
logger_instance.debug(
63+
f"Function '{func.__name__}' CPU usage: {cpu_after - cpu_before:.2f}%."
64+
)
65+
66+
return result
67+
68+
return wrapper
69+
70+
return decorator

functions-python/helpers/transform.py

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,17 @@
1616
from typing import List, Optional
1717

1818

19-
def to_boolean(value):
19+
def to_boolean(value, default_value: Optional[bool] = False) -> bool:
2020
"""
2121
Convert a value to a boolean.
2222
"""
2323
if isinstance(value, bool):
2424
return value
25+
if isinstance(value, (int, float)):
26+
return value != 0
2527
if isinstance(value, str):
26-
return value.lower() in ["true", "1", "yes", "y"]
27-
return False
28+
return value.strip().lower() in {"true", "1", "yes", "y"}
29+
return default_value
2830

2931

3032
def get_nested_value(
@@ -53,3 +55,23 @@ def get_nested_value(
5355
result = current_data.strip()
5456
return result if result else default_value
5557
return current_data
58+
59+
60+
def to_enum(value, enum_class=None, default_value=None):
61+
"""
62+
Convert a value to an enum member of the specified enum class.
63+
64+
Args:
65+
value: The value to convert.
66+
enum_class: The enum class to convert the value to.
67+
default_value: The default value to return if conversion fails.
68+
69+
Returns:
70+
An enum member if conversion is successful, otherwise the default value.
71+
"""
72+
if enum_class and isinstance(value, enum_class):
73+
return value
74+
try:
75+
return enum_class(str(value))
76+
except (ValueError, TypeError):
77+
return default_value

functions-python/reverse_geolocation/.coveragerc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ omit =
44
*/helpers/*
55
*/database_gen/*
66
*/shared/*
7+
*/scripts/*
78

89
[report]
910
exclude_lines =

functions-python/reverse_geolocation/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ This function performs the core reverse geolocation logic. It processes location
6262
- `vehicle_status_url`: Required if `data_type` is `gbfs` and `station_information_url` and `free_bike_status_url` are omitted. URL of the GBFS `vehicle_status.json` file.
6363
- `free_bike_status_url`: Required if `data_type` is `gbfs` and `station_information_url` and `vehicle_status_url` are omitted. URL of the GBFS `free_bike_status.json` file.
6464
- `data_type`: Optional. Specifies the type of data being processed. Can be `gtfs` or `gbfs`. If not provided, the function will attempt to determine the type based on the URLs provided.
65+
- `strategy`: Optional. Specifies the reverse geolocation strategy to use. Defaults to `per-point`.
66+
- `public`: Optional. Indicates whether the resulting geojson files will be public or private. Defaults to `true`.
6567

6668
### Processing Steps:
6769

functions-python/reverse_geolocation/requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ shapely
2929
gtfs-kit
3030
matplotlib
3131
jsonpath_ng
32+
psutil
3233

3334
# Configuration
3435
python-dotenv==1.0.0
Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
Faker
22
pytest~=7.4.3
33
urllib3-mock
4-
requests-mock
4+
requests-mock
5+
gcp-storage-emulator
6+
geopandas

functions-python/reverse_geolocation/src/parse_request.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@
77
import requests
88
from jsonpath_ng import parse
99

10+
from shared.helpers.locations import ReverseGeocodingStrategy
11+
from shared.helpers.transform import to_boolean, to_enum
12+
1013

1114
def parse_request_parameters(
1215
request: flask.Request,
@@ -45,7 +48,19 @@ def parse_request_parameters(
4548
raise ValueError(
4649
f"Invalid data_type '{data_type}'. Supported types are 'gtfs' and 'gbfs'."
4750
)
48-
return df, stable_id, dataset_id, data_type, urls
51+
public = True
52+
if "public" in request_json:
53+
public = to_boolean(request_json["public"], default_value=True)
54+
strategy = ReverseGeocodingStrategy.PER_POINT
55+
if "strategy" in request_json:
56+
strategy = to_enum(
57+
value=request_json["strategy"],
58+
default_value=ReverseGeocodingStrategy.PER_POINT,
59+
)
60+
else:
61+
logging.info("No strategy provided, using default")
62+
logging.info("Strategy set to: %s.", strategy)
63+
return df, stable_id, dataset_id, data_type, urls, public, strategy
4964

5065

5166
def parse_request_parameters_gtfs(

functions-python/reverse_geolocation/src/reverse_geolocation.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,13 @@ def parse_resource_data(data: dict) -> Tuple[str, str, str]:
4141
def reverse_geolocation_pubsub(request: CloudEvent) -> None:
4242
"""
4343
Reverse geolocation function triggered by a Pub/Sub message.
44+
@:request: CloudEvent containing the Pub/Sub message data. Example data:
45+
{
46+
"stable_id": "example_stable_id",
47+
"dataset_id": "example_dataset_id",
48+
"url": "https://example.com/path/to/feed.zip"
49+
}
50+
4451
"""
4552
try:
4653
init(request)

functions-python/reverse_geolocation/src/reverse_geolocation_processor.py

Lines changed: 70 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from sqlalchemy import func
1616
from sqlalchemy.orm import Session
1717
from sqlalchemy.orm import joinedload
18+
from shared.helpers.locations import ReverseGeocodingStrategy
1819

1920
from location_group_utils import (
2021
ERROR_STATUS_CODE,
@@ -37,6 +38,7 @@
3738
Gtfsfeed,
3839
)
3940
from shared.helpers.logger import get_logger
41+
from shared.helpers.runtime_metrics import track_metrics
4042

4143

4244
@with_db_session
@@ -231,6 +233,7 @@ def create_geojson_aggregate(
231233
data_type: str,
232234
logger,
233235
extraction_urls: List[str] = None,
236+
public: bool = True,
234237
) -> None:
235238
"""Create a GeoJSON file with the aggregated locations. This file will be uploaded to GCS and used for
236239
visualization."""
@@ -286,7 +289,8 @@ def create_geojson_aggregate(
286289
bucket = storage_client.bucket(bucket_name)
287290
blob = bucket.blob(f"{stable_id}/geolocation.geojson")
288291
blob.upload_from_string(json.dumps(json_data))
289-
blob.make_public()
292+
if public:
293+
blob.make_public()
290294
logger.info("GeoJSON data saved to %s", blob.name)
291295

292296

@@ -316,7 +320,7 @@ def get_or_create_location(
316320

317321

318322
@with_db_session
319-
def extract_location_aggregates(
323+
def extract_location_aggregates_per_point(
320324
feed_id: str,
321325
stops_df: pd.DataFrame,
322326
location_aggregates: Dict[str, GeopolygonAggregate],
@@ -381,6 +385,9 @@ def extract_location_aggregates(
381385

382386

383387
@with_db_session
388+
@track_metrics(
389+
metrics=("time", "memory", "cpu"), logger=get_logger(__name__, "stable_id")
390+
)
384391
def update_dataset_bounding_box(
385392
dataset_id: str, stops_df: pd.DataFrame, db_session: Session
386393
) -> shapely.Polygon:
@@ -418,9 +425,27 @@ def reverse_geolocation_process(
418425
) -> Tuple[str, int] | Tuple[Dict, int]:
419426
"""
420427
Main function to handle reverse geolocation processing.
421-
"""
422-
overall_start = datetime.now()
428+
@:request: Flask request object containing the parameters for the reverse geolocation process.
429+
Example request JSON(GTFS):
430+
{
431+
"stable_id": "example_stable_id",
432+
"dataset_id": "example_dataset_id",
433+
"stops_url": "https://example.com/path/to/stops.csv",
434+
"data_type": "gtfs",
435+
"strategy": "per-point"
436+
}
437+
Example request JSON(GBFS):
438+
{
439+
"stable_id": "example_stable_id",
440+
"dataset_id": "example_dataset_id",
441+
"station_information_url": "https://example.com/path/to/station_information.json",
442+
"vehicle_status_url": "https://example.com/path/to/vehicle_status.json",
443+
"free_bike_status_url": "https://example.com/path/to/free_bike_status.json",
444+
"strategy": "per-point"
445+
}
446+
@:returns: A tuple containing a message and the HTTP status code.
423447
448+
"""
424449
try:
425450
# Parse request parameters
426451
(
@@ -429,6 +454,8 @@ def reverse_geolocation_process(
429454
dataset_id,
430455
data_type,
431456
extraction_urls,
457+
public,
458+
strategy,
432459
) = parse_request_parameters(request)
433460

434461
# Remove duplicate lat/lon points
@@ -447,19 +474,16 @@ def reverse_geolocation_process(
447474
return str(e), ERROR_STATUS_CODE
448475

449476
logger = get_logger(__name__, stable_id)
450-
451477
try:
452478
# Update the bounding box of the dataset
453479
bounding_box = update_dataset_bounding_box(dataset_id, stops_df)
454480

455-
# Get Cached Geopolygons
456-
feed_id, location_groups, stops_df = get_cached_geopolygons(
457-
stable_id, stops_df, logger
481+
location_groups = reverse_geolocation(
482+
strategy,
483+
stable_id,
484+
stops_df,
485+
logger,
458486
)
459-
logger.info("Number of location groups extracted: %s", len(location_groups))
460-
461-
# Extract Location Groups
462-
extract_location_aggregates(feed_id, stops_df, location_groups, logger)
463487

464488
# Create GeoJSON Aggregate
465489
create_geojson_aggregate(
@@ -470,15 +494,12 @@ def reverse_geolocation_process(
470494
data_type=data_type,
471495
extraction_urls=extraction_urls,
472496
logger=logger,
497+
public=public,
473498
)
474-
475-
# Overall Time
476-
overall_duration = (datetime.now() - overall_start).total_seconds()
477-
logger.info(f"Total time taken for the process: {overall_duration:.2f} seconds")
478499
logger.info(
479-
"COMPLETED. Processed %s stops for stable ID %s. "
500+
"COMPLETED. Processed %s stops for stable ID %s with strategy. "
480501
"Retrieved %s locations.",
481-
total_stops,
502+
len(stops_df),
482503
stable_id,
483504
len(location_groups),
484505
)
@@ -493,3 +514,34 @@ def reverse_geolocation_process(
493514
logger.error("Error processing geopolygons: %s", e)
494515
logger.error(traceback.format_exc()) # Log full traceback
495516
return str(e), ERROR_STATUS_CODE
517+
518+
519+
@track_metrics(
520+
metrics=("time", "memory", "cpu"), logger=get_logger(__name__, "stable_id")
521+
)
522+
def reverse_geolocation(
523+
strategy,
524+
stable_id,
525+
stops_df,
526+
logger,
527+
):
528+
"""
529+
Reverse geolocation processing based on the specified strategy.
530+
"""
531+
logger.info("Processing geopolygons with per-point strategy.")
532+
# Get Cached Geopolygons
533+
feed_id, location_groups, stops_df = get_cached_geopolygons(
534+
stable_id, stops_df, logger
535+
)
536+
logger.info("Number of location groups extracted: %s", len(location_groups))
537+
# Extract Location Groups
538+
match strategy:
539+
case ReverseGeocodingStrategy.PER_POINT:
540+
extract_location_aggregates_per_point(
541+
feed_id, stops_df, location_groups, logger
542+
)
543+
case _:
544+
logger.error("Invalid strategy: %s", strategy)
545+
return f"Invalid strategy: {strategy}", ERROR_STATUS_CODE
546+
547+
return location_groups

0 commit comments

Comments
 (0)