diff --git a/functions-python/helpers/requirements.txt b/functions-python/helpers/requirements.txt index 59b67dd1a..b3b476452 100644 --- a/functions-python/helpers/requirements.txt +++ b/functions-python/helpers/requirements.txt @@ -24,5 +24,5 @@ google-api-core google-cloud-firestore google-cloud-bigquery -#Additional package -pycountry +# Additional package +pycountry \ No newline at end of file diff --git a/functions-python/reverse_geolocation/src/reverse_geolocation_aggregator.py b/functions-python/reverse_geolocation/src/reverse_geolocation_aggregator.py new file mode 100644 index 000000000..0cfdcfcb3 --- /dev/null +++ b/functions-python/reverse_geolocation/src/reverse_geolocation_aggregator.py @@ -0,0 +1,340 @@ +import json +import logging +import os +from collections import defaultdict +from typing import Dict, Tuple, List, Optional +import pycountry + +import flask +from google.cloud import storage +from shapely.geometry import Polygon, mapping +from database_gen.sqlacodegen_models import Geopolygon +from helpers.database import with_db_session +from sqlalchemy.orm import Session +from helpers.logger import Logger +from geoalchemy2.shape import to_shape +from common import ERROR_STATUS_CODE +from shapely.validation import make_valid +import matplotlib.pyplot as plt + +# Initialize logging +logging.basicConfig(level=logging.INFO) + +def generate_color(points_match, max_match, colormap_name="OrRd"): + """ + Generate a color based on the points_match value using a matplotlib colormap. + """ + colormap = plt.get_cmap(colormap_name) + # Restrict normalized_value to the upper half of the spectrum (0.5 to 1) + normalized_value = 0.5 + 0.5 * (points_match / max_match) + rgba = colormap(normalized_value) # Returns RGBA + return f"rgba({int(rgba[0] * 255)}, {int(rgba[1] * 255)}, {int(rgba[2] * 255)}, {rgba[3]})" + + +class ReverseGeolocation: + def __init__( + self, + osm_id: int, + iso_3166_1: Optional[str], + iso_3166_2: Optional[str], + name: str, + admin_level: int, + points_match: int, + ): + self.osm_id = osm_id + self.iso_3166_1 = iso_3166_1 + self.iso_3166_2 = iso_3166_2 + self.name = name + self.admin_level = admin_level + self.points_match = points_match + self.children: List[ReverseGeolocation] = [] + self.parent: Optional[ReverseGeolocation] = None + self.geometry: Optional[Polygon] = None + + def __str__(self) -> str: + return f"{self.osm_id} - {self.name} - {self.points_match}" + + def set_geometry(self, geopolygon: Geopolygon) -> None: + shape = to_shape(geopolygon.geometry) + if shape.is_valid: + self.geometry = shape + else: + self.geometry = make_valid(shape) + + @staticmethod + def from_dict(data: dict, parent: Optional["ReverseGeolocation"] = None) -> List["ReverseGeolocation"]: + nodes = [] + locations = data if isinstance(data, list) else data.get("grouped_matches", []) + for location in locations: + node = ReverseGeolocation( + osm_id=location["osm_id"], + iso_3166_1=location.get("iso_3166_1"), + iso_3166_2=location.get("iso_3166_2"), + name=location["name"], + admin_level=location["admin_level"], + points_match=location["points_match"], + ) + if parent: + node.parent = parent + nodes.append(node) + if "sub_levels" in location: + node.children = ReverseGeolocation.from_dict(location["sub_levels"], parent=node) + return nodes + + def to_dict(self) -> dict: + return { + "osm_id": self.osm_id, + "iso_3166_1": self.iso_3166_1, + "iso_3166_2": self.iso_3166_2, + "name": self.name, + "admin_level": self.admin_level, + "points_match": self.points_match, + "sub_levels": [child.to_dict() for child in self.children], + } + + def get_level(self, target_level: int, current_level: int = 0) -> List["ReverseGeolocation"]: + if current_level == target_level: + return [self] + results = [] + for child in self.children: + results.extend(child.get_level(target_level, current_level + 1)) + return results + + def get_leaves(self) -> List["ReverseGeolocation"]: + if not self.children: + return [self] + leaves = [] + for child in self.children: + leaves.extend(child.get_leaves()) + return leaves + + def get_country_code(self) -> str: + if self.iso_3166_1: + return self.iso_3166_1 + if self.parent: + return self.parent.get_country_code() + return "" + + def get_display_name(self) -> str: + display_name = self.name + if self.iso_3166_1: + try: + flag = pycountry.countries.get(alpha_2=self.iso_3166_1).flag + display_name = f"{flag} {display_name}" + except AttributeError: + pass + if self.parent: + display_name = f"{self.parent.get_display_name()}, {display_name}" + return display_name + + def get_geojson_feature(self, max_leaves_points) -> Dict: + if not self.geometry: + return {} + return { + "type": "Feature", + "properties": { + "osm_id": self.osm_id, + "country_code": self.get_country_code(), + "display_name": self.get_display_name(), + "points_match": self.points_match, + "color": generate_color(self.points_match, max_leaves_points), + }, + "geometry": mapping(self.geometry), + } + + +def parse_request_parameters(request: flask.Request) -> Tuple[str, int, int, str]: + """ + Parse the request parameters and return the execution ID, number of batches, and retry count. + """ + logging.info("Parsing request parameters.") + try: + retry_count = int(request.headers.get("X-CloudTasks-TaskRetryCount", 0)) + except ValueError: + logging.error( + f"Error parsing retry count: {request.headers.get('X-CloudTasks-TaskRetryCount')}. Defaulting to 0." + ) + retry_count = 0 + + request_json = request.get_json(silent=True) + if (not request_json + or "execution_id" not in request_json + or "n_batches" not in request_json + or "stable_id" not in request_json): + raise ValueError("Missing required 'execution_id', 'stable_id' or 'n_batches' in the request.") + + return ( + request_json["execution_id"], + int(request_json["n_batches"]), + retry_count, + request_json["stable_id"] + ) + + +def list_blobs(bucket_name: str, prefix: str = "") -> List[storage.Blob]: + """ + List all JSON files in a GCP bucket with the given prefix. + """ + storage_client = storage.Client() + blobs = list(storage_client.list_blobs(bucket_name, prefix=prefix)) + return [blob for blob in blobs if blob.name.endswith(".json")] + + +def merge_reverse_geolocations(locations: List[ReverseGeolocation]) -> List[ReverseGeolocation]: + """ + Recursively merge a list of ReverseGeolocation objects. + """ + if not locations: + return [] + + # Group by osm_id + per_osm_id = defaultdict(list) + for location in locations: + per_osm_id[location.osm_id].append(location) + + merged_results = [] + for osm_id, grouped_locations in per_osm_id.items(): + # Aggregate points_match and merge children + total_points_match = sum(loc.points_match for loc in grouped_locations) + chosen_location = grouped_locations[0] + chosen_location.points_match = total_points_match + + # Merge children recursively + all_children = [child for loc in grouped_locations for child in loc.children] + chosen_location.children = merge_reverse_geolocations(all_children) + + merged_results.append(chosen_location) + + return merged_results + + +def reverse_geolocation_aggregate( + request: flask.Request, +) -> Tuple[str, int] | Tuple[Dict, int]: + """ + Handle reverse geolocation aggregation by merging JSON data into a single result. + """ + Logger.init_logger() + + source_bucket = os.getenv("BUCKET_NAME") + max_retry = int(os.getenv("MAX_RETRY", 10)) + + if not source_bucket: + logging.error("Source bucket name not set.") + return "Source bucket name not set.", ERROR_STATUS_CODE + + try: + execution_id, n_batches, retry_count, stable_id = parse_request_parameters(request) + logging.info(f"Execution ID: {execution_id}, Number of batches: {n_batches}, Retry Count: {retry_count}") + except ValueError as e: + return handle_error("Error parsing request parameters", e, ERROR_STATUS_CODE) + + try: + files = validate_files_ready(source_bucket, execution_id, n_batches, retry_count, max_retry) + except ValueError as e: + return handle_error("Validation error", e) + + try: + aggregated_data, total_points, geojson_data = aggregate_data_from_files(files) + logging.info(f"Aggregated {total_points} points from {len(files)} files.") + except Exception as e: + return handle_error("Error aggregating data", e, ERROR_STATUS_CODE) + + try: + save_aggregated_data(source_bucket, execution_id, aggregated_data, total_points) + save_geojson(os.getenv("DATASETS_BUCKET_NAME"), stable_id, geojson_data) + except Exception as e: + return handle_error("Error saving aggregated data", e, ERROR_STATUS_CODE) + + return "Done", 200 + + +def validate_files_ready( + bucket_name: str, prefix: str, n_batches: int, retry_count: int, max_retry: int +) -> List[storage.Blob]: + """ + Validate that the required number of files is available in the bucket. + """ + files = list_blobs(bucket_name, prefix) + logging.info(f"Found {len(files)} files in the bucket.") + + if len(files) < n_batches: + if retry_count < max_retry: + logging.warning("Files are not ready yet. Retrying...") + raise ValueError("Not yet ready to process") + logging.error("Maximum retries exceeded. Aborting.") + raise ValueError("Maximum retries exceeded.") + return files + +@with_db_session +def aggregate_data_from_files(files: List[storage.Blob], session: Session) -> Tuple[List[Dict], int, Dict]: + """ + Aggregate data from the given list of files. + """ + results: List[ReverseGeolocation] = [] + total_points = 0 + + for file in files: + if file.name.endswith("output.json"): + continue + json_data = json.loads(file.download_as_string()) + results.extend(ReverseGeolocation.from_dict(json_data)) + total_points += json_data.get("summary", {}).get("total_points", 0) + + root_nodes = merge_reverse_geolocations(results) + leaves = [leaf for node in root_nodes for leaf in node.get_leaves()] + max_leaves_points = max(leaf.points_match for leaf in leaves) + osm_ids = [leaf.osm_id for leaf in leaves] + + leaves_geopolygons = ( + session.query(Geopolygon).filter(Geopolygon.osm_id.in_(osm_ids)).all() + ) + geopolygons_map = {geopolygon.osm_id: geopolygon for geopolygon in leaves_geopolygons} + for leaf in leaves: + leaf.set_geometry(geopolygons_map[leaf.osm_id]) + geojson_map = { + "type": "FeatureCollection", + "features": [node.get_geojson_feature(max_leaves_points) for node in leaves], + } + return [node.to_dict() for node in root_nodes], total_points, geojson_map + +def save_geojson(bucket_name: str, stable_id: str, geojson: Dict) -> None: + """ + Save the GeoJSON data as a JSON file in the specified bucket. + """ + storage_client = storage.Client() + bucket = storage_client.bucket(bucket_name) + blob = bucket.blob(f"{stable_id}/geolocation-1.geojson") + blob.upload_from_string(json.dumps(geojson)) + blob.make_public() + logging.info(f"GeoJSON data saved to {blob.name}") + +def save_aggregated_data(bucket_name: str, + execution_id: str, + aggregated_data: List[Dict], + total_points: int) -> None: + """ + Save the aggregated data as a JSON file in the specified bucket. + """ + storage_client = storage.Client() + bucket = storage_client.bucket(bucket_name) + blob = bucket.blob(f"{execution_id}/output.json") + matched_points = sum(node["points_match"] for node in aggregated_data) + output = { + "summary" : { + "total_points": total_points, + "matched_points": matched_points, + "unmatched_points": total_points - matched_points, + }, + "locations": aggregated_data, + } + blob.upload_from_string(json.dumps(output, indent=2)) + logging.info(f"Aggregated data saved to {blob.name}") + + +def handle_error(message: str, exception: Exception, error_code: int = 400) -> Tuple[str, int]: + """ + Log and handle an error, returning an appropriate response. + """ + logging.error(f"{message}: {exception}") + return str(exception), error_code diff --git a/functions-python/reverse_geolocation/src/reverse_geolocation_processor.py b/functions-python/reverse_geolocation/src/reverse_geolocation_processor.py new file mode 100644 index 000000000..4e0249b72 --- /dev/null +++ b/functions-python/reverse_geolocation/src/reverse_geolocation_processor.py @@ -0,0 +1,251 @@ +import json +import logging +import os +import uuid +from collections import defaultdict +from typing import List, Dict, Tuple + +import flask +from geoalchemy2 import WKTElement +from geoalchemy2.shape import to_shape +from google.cloud import storage +from shapely.validation import make_valid + +from common import ERROR_STATUS_CODE +from database_gen.sqlacodegen_models import Geopolygon +from helpers.database import Database +from helpers.logger import Logger + +# Initialize logging +logging.basicConfig(level=logging.INFO) + + +def build_response( + proper_match_geopolygons: Dict[str, Dict], total_points: int, unmatched_points: int +) -> Dict: + """Build a structured response from the matched geopolygons.""" + + # Helper function to merge and build hierarchical groups + def merge_hierarchy(root: Dict, geopolygons: List[Geopolygon], count: int) -> None: + if not geopolygons: + return + + # Process the current geopolygon + current = geopolygons[0] + osm_id = current.osm_id + + # Check if the current node already exists in the root + if osm_id not in root: + root[osm_id] = { + "osm_id": current.osm_id, + "iso_3166_1": current.iso_3166_1_code, + "iso_3166_2": current.iso_3166_2_code, + "name": current.name, + "admin_level": current.admin_level, + "points_match": 0, + "sub_levels": defaultdict(dict), + } + + # Increment points_match for the current node + root[osm_id]["points_match"] += count + + # Recursively process the sub-levels + merge_hierarchy(root[osm_id]["sub_levels"], geopolygons[1:], count) + + # Build the hierarchical response + grouped_matches = defaultdict(dict) + + for match_data in proper_match_geopolygons.values(): + geopolygons = match_data["geopolys"] + count = match_data["count"] + + # Merge into the top-level hierarchy + merge_hierarchy(grouped_matches, geopolygons, count) + + # Recursive function to convert defaultdict to a regular dict and clean sub-levels + def clean_hierarchy(root: Dict) -> List[Dict]: + return [ + { + "osm_id": node["osm_id"], + "iso_3166_1": node["iso_3166_1"], + "iso_3166_2": node["iso_3166_2"], + "name": node["name"], + "admin_level": node["admin_level"], + "points_match": node["points_match"], + "sub_levels": clean_hierarchy(node["sub_levels"]) + if node["sub_levels"] + else [], + } + for node in root.values() + ] + + # Construct the final response + response = { + "summary": { + "total_points": total_points, + "matched_points": total_points - unmatched_points, + "unmatched_points": unmatched_points, + }, + "grouped_matches": clean_hierarchy(grouped_matches), + } + + return response + + +def parse_request_parameters( + request: flask.Request, +) -> Tuple[List[WKTElement], WKTElement, str]: + """ + Parse the request parameters and return a list of WKT points and a bounding box. + """ + logging.info("Parsing request parameters.") + request_json = request.get_json(silent=True) + logging.info(f"Request JSON: {request_json}") + + if ( + not request_json + or "points" not in request_json + or "execution_id" not in request_json + ): + raise ValueError( + "Invalid request: missing 'points' or 'execution_id' parameter." + ) + + execution_id = str(request_json["execution_id"]) + points = request_json["points"] + if not points: + raise ValueError("Invalid request: 'points' parameter is empty.") + if not isinstance(points, list): + raise ValueError("Invalid request: 'points' parameter must be a list.") + if not all(isinstance(lat_lon, list) and len(lat_lon) == 2 for lat_lon in points): + raise ValueError( + "Invalid request: 'points' must be a list of lists with two elements each " + "representing latitude and longitude." + ) + + # Create WKT elements for each point + wkt_points = [ + WKTElement(f"POINT({point[0]} {point[1]})", srid=4326) for point in points + ] + + # Generate bounding box + lons, lats = [point[0] for point in points], [point[1] for point in points] + bounding_box_coords = [ + (min(lons), min(lats)), + (max(lons), min(lats)), + (max(lons), max(lats)), + (min(lons), max(lats)), + (min(lons), min(lats)), + ] + bounding_box = WKTElement( + f"POLYGON(({', '.join([f'{lon} {lat}' for lon, lat in bounding_box_coords])}))", + srid=4326, + ) + + return wkt_points, bounding_box, execution_id + + +def reverse_geolocation_process( + request: flask.Request, +) -> Tuple[str, int] | Tuple[Dict, int]: + """ + Main function to handle reverse geolocation population. + """ + Logger.init_logger() + bucket_name = os.getenv("BUCKET_NAME") + + # Parse request parameters + try: + wkt_points, bounding_box, execution_id = parse_request_parameters(request) + except ValueError as e: + logging.error(f"Error parsing request parameters: {e}") + return str(e), ERROR_STATUS_CODE + + # Start the database session + try: + session = Database().start_db_session(os.getenv("FEEDS_DATABASE_URL"), echo=False) + except Exception as e: + logging.error(f"Error connecting to the database: {e}") + return str(e), 500 + + # Fetch geopolygons within the bounding box + try: + geopolygons = ( + session.query(Geopolygon) + .filter(Geopolygon.geometry.ST_Intersects(bounding_box)) + .all() + ) + geopolygons_ids = [geopolygon.osm_id for geopolygon in geopolygons] + except Exception as e: + logging.error(f"Error fetching geopolygons: {e}") + return str(e), ERROR_STATUS_CODE + + try: + logging.info(f"Found {len(geopolygons)} geopolygons within the bounding box.") + logging.info(f"The osm_ids of the geopolygons are: {geopolygons_ids}") + + # Map geopolygons into shapes + wkb_geopolygons = { + geopolygon.osm_id: { + "polygon": to_shape(geopolygon.geometry), + "object": geopolygon, + } + for geopolygon in geopolygons + } + + # Ensure geometries are valid + for geopolygon in wkb_geopolygons.values(): + if not geopolygon["polygon"].is_valid: + geopolygon["polygon"] = make_valid(geopolygon["polygon"]) + + points = [to_shape(point) for point in wkt_points] + points_match = {} + + # Match points to geopolygons + for point in points: + for osm_id, geopolygon in wkb_geopolygons.items(): + if geopolygon["polygon"].contains(point): + point_id = str(point) + if point_id not in points_match: + points_match[point_id] = [] + points_match[point_id].append(geopolygon["object"]) + + # Clean up duplicate admin levels + proper_match_geopolygons = {} + for point, geopolygons in points_match.items(): + if len(geopolygons) > 1: + admin_levels = {g.admin_level for g in geopolygons} + if len(admin_levels) == len(geopolygons): + valid_iso_3166_1 = any(g.iso_3166_1_code for g in geopolygons) + valid_iso_3166_2 = any(g.iso_3166_2_code for g in geopolygons) + if not valid_iso_3166_1 or not valid_iso_3166_2: + logging.info(f"Invalid ISO codes for point: {point}") + continue + geopolygons.sort(key=lambda x: x.admin_level) + geopolygon_as_string = ", ".join([str(g.osm_id) for g in geopolygons]) + if geopolygon_as_string not in proper_match_geopolygons: + proper_match_geopolygons[geopolygon_as_string] = { + "geopolys": geopolygons, + "count": 0, + } + proper_match_geopolygons[geopolygon_as_string]["count"] += 1 + + unmatched_points = len(wkt_points) - sum( + item["count"] for item in proper_match_geopolygons.values() + ) + logging.info(f"Unmatched points: {unmatched_points}") + response = build_response( + proper_match_geopolygons, len(wkt_points), unmatched_points + ) + logging.info(f"Response: {response}") + storage_client = storage.Client() + bucket = storage_client.bucket(bucket_name) + blob = bucket.blob(f"{execution_id}/{uuid.uuid4()}.json") + blob.upload_from_string(json.dumps(response, indent=2)) + blob.make_public() + + # Save the response to GCP bucket + return response, 200 + except Exception as e: + logging.error(f"Error processing geopolygons: {e}") + return str(e), ERROR_STATUS_CODE diff --git a/functions-python/reverse_geolocation_populate/.coveragerc b/functions-python/reverse_geolocation_populate/.coveragerc new file mode 100644 index 000000000..d3ef5cbc8 --- /dev/null +++ b/functions-python/reverse_geolocation_populate/.coveragerc @@ -0,0 +1,9 @@ +[run] +omit = + */test*/* + */helpers/* + */database_gen/* + +[report] +exclude_lines = + if __name__ == .__main__.: \ No newline at end of file diff --git a/functions-python/reverse_geolocation_populate/.env.rename_me b/functions-python/reverse_geolocation_populate/.env.rename_me new file mode 100644 index 000000000..0ffc7e6ad --- /dev/null +++ b/functions-python/reverse_geolocation_populate/.env.rename_me @@ -0,0 +1,2 @@ +# Environment variables for the reverse_geolocation_populate function +FEEDS_DATABASE_URL=${{FEEDS_DATABASE_URL}} diff --git a/functions-python/reverse_geolocation_populate/README.md b/functions-python/reverse_geolocation_populate/README.md new file mode 100644 index 000000000..2817c7666 --- /dev/null +++ b/functions-python/reverse_geolocation_populate/README.md @@ -0,0 +1,54 @@ +# Reverse Geolocation Populate + +## Function Workflow + +The `Reverse Geolocation Populate` function is a GCP Cloud Function that initializes and populates the database with administrative boundary polygons and metadata for specified country codes. It fetches data from OpenStreetMap (OSM) using BigQuery, processes it, and stores the results in the Postgres database. + +### Key Steps: +1. **Input Validation**: + - The function validates the `country_code` parameter in the HTTP request. + - Optionally, it accepts a list of administrative levels (`admin_levels`) to filter the data. + +2. **Administrative Level Retrieval**: + - For the given `country_code`, the function retrieves: + - Country administrative levels (e.g., level 2). + - Subdivision administrative levels (e.g., levels 3–8). + +3. **Data Fetching**: + - The function queries OSM data using BigQuery to retrieve boundary polygons and associated metadata for the specified administrative levels. + +4. **Data Processing and Storage**: + - The function processes the retrieved data and saves it into a PostgreSQL database, using the `geoalchemy2` library for spatial data. + +5. **Error Handling**: + - Any errors in data fetching or processing are logged and returned in the HTTP response. + +--- + +## Expected Behavior + +### Input +The function expects an HTTP POST request with a JSON payload containing: +- **`country_code`** (required): The ISO 3166-1 alpha-2 code of the country (e.g., `"FR"` for France). +- **`admin_levels`** (optional): A comma-separated list of administrative levels to process (e.g., `"2,4,6"`). If not provided, the function calculates levels automatically. + +### Output +- If successful, the function initializes or updates the database with the administrative boundary data for the given country and returns: "Database initialized for ." +- If an error occurs, the function returns an appropriate error message and a `400` or `500` HTTP status code. + +--- + +## Function Configuration + +### Environment Variables +- **`FEEDS_DATABASE_URL`**: Connection string for the PostgreSQL database where geolocation data will be stored. +- **`reverse_geolocation_populate_url`**: URL of the Cloud Function for batch workflow calls. + +### BigQuery Access +- The function queries the `bigquery-public-data.geo_openstreetmap.planet_features_multipolygons` dataset. Ensure the function has the necessary permissions to access this dataset. + +--- + +## Local Development + +Local development of these functions should follow standard practices for GCP serverless functions. For general instructions on setting up the development environment, refer to the main [README.md](../README.md) file. \ No newline at end of file diff --git a/functions-python/reverse_geolocation_populate/function_config.json b/functions-python/reverse_geolocation_populate/function_config.json new file mode 100644 index 000000000..0f74b5bd1 --- /dev/null +++ b/functions-python/reverse_geolocation_populate/function_config.json @@ -0,0 +1,21 @@ +{ + "name": "reverse-geolocation-populate", + "description": "Populate the database with reverse geolocation data", + "entry_point": "reverse_geolocation_populate", + "timeout": 3600, + "memory": "8Gi", + "trigger_http": true, + "include_folders": ["database_gen", "helpers"], + "environment_variables": [], + "secret_environment_variables": [ + { + "key": "FEEDS_DATABASE_URL" + } + ], + "ingress_settings": "ALLOW_ALL", + "max_instance_request_concurrency": 1, + "max_instance_count": 10, + "min_instance_count": 0, + "available_cpu": 2, + "available_memory": "4Gi" +} diff --git a/functions-python/reverse_geolocation_populate/requirements.txt b/functions-python/reverse_geolocation_populate/requirements.txt new file mode 100644 index 000000000..36fb55b46 --- /dev/null +++ b/functions-python/reverse_geolocation_populate/requirements.txt @@ -0,0 +1,26 @@ +# Common packages +functions-framework==3.* +google-cloud-logging +psycopg2-binary==2.9.6 +aiohttp~=3.10.5 +asyncio~=3.4.3 +urllib3~=2.2.2 +requests~=2.32.3 +attrs~=23.1.0 +pluggy~=1.3.0 +certifi~=2024.7.4 + +# SQL Alchemy and Geo Alchemy +SQLAlchemy==2.0.23 +geoalchemy2==0.14.7 + +# Google specific packages for this function +google-cloud-pubsub +google-cloud-datastore +google-cloud-storage +google-cloud-bigquery +cloudevents~=1.10.1 + +# Additional packages for this function +gtfs-kit +pycountry \ No newline at end of file diff --git a/functions-python/reverse_geolocation_populate/requirements_dev.txt b/functions-python/reverse_geolocation_populate/requirements_dev.txt new file mode 100644 index 000000000..800a4ac11 --- /dev/null +++ b/functions-python/reverse_geolocation_populate/requirements_dev.txt @@ -0,0 +1,4 @@ +Faker +pytest~=7.4.3 +urllib3-mock +requests-mock \ No newline at end of file diff --git a/functions-python/reverse_geolocation_populate/src/__init__.py b/functions-python/reverse_geolocation_populate/src/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/functions-python/reverse_geolocation_populate/src/main.py b/functions-python/reverse_geolocation_populate/src/main.py new file mode 100644 index 000000000..bf5f04262 --- /dev/null +++ b/functions-python/reverse_geolocation_populate/src/main.py @@ -0,0 +1,261 @@ +import logging +import os +import pycountry + +from google.cloud import bigquery +from geoalchemy2 import WKTElement +from helpers.database import Database +from helpers.logger import Logger +from database_gen.sqlacodegen_models import Geopolygon +import functions_framework + +# Initialize logging +logging.basicConfig(level=logging.INFO) +client = None # Global BigQuery client + + +def parse_request_parameters(request): + """Parse and validate request parameters, including the country code.""" + logging.info("Parsing request parameters.") + request_json = request.get_json(silent=True) + if not request_json or "country_code" not in request_json: + logging.error("Request missing required country_code parameter.") + raise ValueError("Invalid request parameters: country_code is required.") + + country_code = request_json["country_code"] + if pycountry.countries.get(alpha_2=country_code) is None: + logging.error(f"Invalid country code detected: {country_code}") + raise ValueError(f"Invalid country code: {country_code}") + + admin_levels = request_json.get("admin_levels", None) + try: + if admin_levels: + admin_levels = [int(level) for level in admin_levels.split(",")] + if admin_levels and not all(2 <= level <= 8 for level in admin_levels): + raise ValueError("Invalid admin levels.") + except ValueError: + logging.error(f"Invalid admin levels detected: {admin_levels}") + raise ValueError(f"Invalid admin levels: {admin_levels}") + return country_code, admin_levels + + +def fetch_subdivision_admin_levels(country_code): + """Fetch distinct subdivision admin levels for the given country code.""" + logging.info("Fetching subdivision administrative levels.") + query = f""" + SELECT DISTINCT + CAST((SELECT value FROM UNNEST(all_tags) WHERE key = 'admin_level') AS INT) AS admin_level + FROM + `bigquery-public-data.geo_openstreetmap.planet_features_multipolygons` + WHERE + ('boundary', 'administrative') IN (SELECT STRUCT(key, value) FROM UNNEST(all_tags)) + AND EXISTS ( + SELECT 1 + FROM UNNEST(all_tags) AS tag + WHERE tag.key = 'ISO3166-2' AND tag.value LIKE '{country_code}%' + ) + ORDER BY admin_level; + """ + query_job = client.query(query) + results = query_job.result() + return [row.admin_level for row in results if row.admin_level is not None] + + +def fetch_country_admin_levels(country_code): + """Fetch distinct country admin levels for the given country code.""" + logging.info("Fetching country administrative levels.") + query = f""" + SELECT DISTINCT + CAST((SELECT value FROM UNNEST(all_tags) WHERE key = 'admin_level') AS INT) AS admin_level + FROM + `bigquery-public-data.geo_openstreetmap.planet_features_multipolygons` + WHERE + ('boundary', 'administrative') IN (SELECT STRUCT(key, value) FROM UNNEST(all_tags)) + AND EXISTS ( + SELECT 1 + FROM UNNEST(all_tags) AS tag + WHERE tag.key = 'ISO3166-1' AND tag.value LIKE '{country_code}' + ) + ORDER BY admin_level; + """ + query_job = client.query(query) + results = query_job.result() + return [row.admin_level for row in results if row.admin_level is not None] + + +def generate_query(admin_level, country_code, is_lower=False, country_name=None): + """Generate the query for a specific admin level.""" + logging.info( + f"Generating query for admin level: {admin_level}, country code: {country_code}" + ) + country_name_filter = "" + # Define query parameters + query_parameters = [ + bigquery.ScalarQueryParameter("country_code", "STRING", country_code), + bigquery.ScalarQueryParameter("admin_level", "STRING", admin_level), + ] + if country_name: + country_name = country_name.replace("'", "\\'") + country_name_filter = "AND ('name:en', @country_name) IN (SELECT STRUCT(key, value) FROM UNNEST(all_tags))" + query_parameters.append( + bigquery.ScalarQueryParameter("country_name", "STRING", country_name) + ) + + iso_3166_1_condition = ( + f"AND ('ISO3166-1', '{country_code}') IN (SELECT STRUCT(key, value) FROM UNNEST(all_tags))" + if not is_lower + else "" + ) + + query = f""" + WITH bounding_area AS ( + SELECT geometry + FROM `bigquery-public-data.geo_openstreetmap.planet_features_multipolygons` + WHERE + ('ISO3166-1', @country_code) IN (SELECT STRUCT(key, value) FROM UNNEST(all_tags)) + {country_name_filter} + AND ('boundary', 'administrative') IN (SELECT STRUCT(key, value) FROM UNNEST(all_tags)) + AND ('admin_level', '2') IN (SELECT STRUCT(key, value) FROM UNNEST(all_tags)) + ) + SELECT planet_features.* + FROM `bigquery-public-data.geo_openstreetmap.planet_features_multipolygons` planet_features, bounding_area + WHERE + ('boundary', 'administrative') IN (SELECT STRUCT(key, value) FROM UNNEST(planet_features.all_tags)) + AND ('admin_level', @admin_level) IN (SELECT STRUCT(key, value) FROM UNNEST(planet_features.all_tags)) + AND ST_DWithin(bounding_area.geometry, planet_features.geometry, 0) + {iso_3166_1_condition}; + """ + + job_config = bigquery.QueryJobConfig(query_parameters=query_parameters) + return query, job_config + + +def fetch_data(admin_level, country_code, is_lower=False, country_name=None): + """Fetch data for a specific admin level.""" + query, job_config = generate_query( + admin_level, country_code, is_lower, country_name + ) + query_job = client.query(query, job_config=job_config) + results = query_job.result() + logging.info(f"Fetched {results.total_rows} rows for admin level {admin_level}.") + + data = [] + for row in results: + if row["osm_id"] is None: + continue + all_tags = {tag["key"]: tag["value"] for tag in row.all_tags} + data.append( + { + "admin_lvl": admin_level, + "osm_id": row.osm_id, + "iso3166_1": all_tags.get("ISO3166-1"), + "iso3166_2": all_tags.get("ISO3166-2"), + "name": all_tags.get("name"), + "name:en": all_tags.get("name:en"), + "name:fr": all_tags.get("name:fr"), + "geometry": row.geometry, + } + ) + return data + + +def save_to_database(data, session): + """Save data to the database.""" + for row in data: + if not row["name"] or not row["geometry"]: + logging.info(f"Skipping row with missing data: {row['osm_id']}") + continue + + geopolygon = ( + session.query(Geopolygon).filter(Geopolygon.osm_id == row["osm_id"]).first() + ) + if geopolygon: + logging.info(f"Geopolygon with osm_id {row['osm_id']} already exists.") + else: + logging.info(f"Adding geopolygon with osm_id {row['osm_id']}.") + geopolygon = Geopolygon(osm_id=row["osm_id"]) + session.add(geopolygon) + + geopolygon.admin_level = row["admin_lvl"] + geopolygon.iso_3166_1_code = row["iso3166_1"] + geopolygon.iso_3166_2_code = row["iso3166_2"] + geopolygon.name = row["name:en"] if row["name:en"] else row["name"] + geopolygon.geometry = WKTElement(row["geometry"], srid=4326) + session.commit() + + +@functions_framework.http +def reverse_geolocation_populate(request): + """Cloud Function entry point to populate the reverse geolocation database.""" + Logger.init_logger() + global client + client = bigquery.Client() + logging.info("Reverse geolocation database population triggered.") + + try: + session = Database().start_db_session(os.getenv("FEEDS_DATABASE_URL"), echo=False) + except Exception as e: + logging.error(f"Error connecting to the database: {e}") + return str(e), 500 + + try: + country_code, admin_levels = parse_request_parameters(request) + logging.info(f"Country code parsed: {country_code}") + + if ( + not admin_levels + and session.query(Geopolygon) + .filter(Geopolygon.iso_3166_1_code == country_code) + .first() + ): + return f"Database already initialized for {country_code}.", 200 + except ValueError as e: + logging.error(e) + return str(e), 400 + + try: + country_admin_levels = fetch_country_admin_levels(country_code) + if not country_admin_levels: + raise ValueError(f"No admin levels found for country {country_code}") + country_admin_level = country_admin_levels[0] + logging.info(f"Country admin level: {country_admin_level}") + if not admin_levels: + admin_levels = get_admin_levels(country_code, country_admin_level) + logging.info(f"Filtered admin levels: {admin_levels}") + + data = [] + country_name = None + for level in admin_levels: + data.extend( + fetch_data( + level, country_code, level > country_admin_level, country_name + ) + ) + if level == country_admin_level and data: + country_name = data[0]["name:en"] or data[0]["name"] + logging.info(f"Extracted country name: {country_name}") + + save_to_database(data, session) + return f"Database initialized for {country_code}.", 200 + + except Exception as e: + logging.error(f"Error processing {country_code}: {e}") + return str(e), 400 + + +def get_admin_levels(country_code, country_admin_level): + """Get the pertinent admin levels for the given country code.""" + subdivision_levels = fetch_subdivision_admin_levels(country_code) + logging.info(f"Subdivision levels: {subdivision_levels}") + admin_levels = sorted( + { + country_admin_level, + *subdivision_levels, + max(subdivision_levels + [country_admin_level]) + + 1, # Adding a level higher than the highest subdivision level + max(subdivision_levels + [country_admin_level]) + + 2, # Adding another level higher than the highest subdivision level + } + ) + admin_levels = [level for level in admin_levels if level <= 8][:5] + return admin_levels diff --git a/functions-python/reverse_geolocation_populate/tests/test_reverse_geolocation_populate.py b/functions-python/reverse_geolocation_populate/tests/test_reverse_geolocation_populate.py new file mode 100644 index 000000000..556fdc6f6 --- /dev/null +++ b/functions-python/reverse_geolocation_populate/tests/test_reverse_geolocation_populate.py @@ -0,0 +1,259 @@ +import unittest +from unittest.mock import patch, MagicMock + +from faker import Faker + +from helpers.tests.test_database import default_db_url + +faker = Faker() + + +class TestReverseGeolocationPopulate(unittest.TestCase): + def test_parse_request(self): + from reverse_geolocation_populate.src.main import parse_request_parameters + + # Valid request + request = MagicMock() + request.get_json.return_value = { + "country_code": "CA", + "admin_levels": "2, 3, 4", + } + result = parse_request_parameters(request) + self.assertIsNotNone(result[0]) + self.assertEqual(result[0], "CA") + self.assertIsNotNone(result[1]) + self.assertEqual(result[1], [2, 3, 4]) + + # Invalid country code + request.get_json.return_value = { + "country_code": "USA", + "admin_levels": "2, 3, 4", + } + with self.assertRaises(ValueError): + parse_request_parameters(request) + + # Invalid admin levels + request.get_json.return_value = { + "country_code": "CA", + "admin_levels": "1, 3, 4, 5", + } + with self.assertRaises(ValueError): + parse_request_parameters(request) + request.get_json.return_value = { + "country_code": "CA", + "admin_levels": "1, 3, invalid,", + } + with self.assertRaises(ValueError): + parse_request_parameters(request) + + # Missing country code and admin levels + request.get_json.return_value = {} + with self.assertRaises(ValueError): + parse_request_parameters(request) + + # Missing admin levels + request.get_json.return_value = { + "country_code": "CA", + } + result = parse_request_parameters(request) + self.assertIsNotNone(result[0]) + self.assertEqual(result[0], "CA") + self.assertIsNone(result[1]) + + def test_fetch_subdivision_admin_levels(self): + from reverse_geolocation_populate.src.main import fetch_subdivision_admin_levels + + country_code = "CA" + client_mock = MagicMock() + client_mock.query.return_value.result.return_value = [ + MagicMock(admin_level=2), + MagicMock(admin_level=3), + MagicMock(admin_level=None), + ] + with patch("reverse_geolocation_populate.src.main.client", client_mock): + result = fetch_subdivision_admin_levels(country_code) + self.assertIsNotNone(result) + self.assertEqual(result, [2, 3]) + + def test_fetch_country_admin_levels(self): + from reverse_geolocation_populate.src.main import fetch_country_admin_levels + + country_code = "CA" + client_mock = MagicMock() + client_mock.query.return_value.result.return_value = [ + MagicMock(admin_level=2), + MagicMock(admin_level=3), + MagicMock(admin_level=None), + ] + with patch("reverse_geolocation_populate.src.main.client", client_mock): + result = fetch_country_admin_levels(country_code) + self.assertIsNotNone(result) + self.assertEqual(result, [2, 3]) + + def test_generate_query(self): + from reverse_geolocation_populate.src.main import generate_query + + country_code = "CA" + admin_levels = [2, 3] + result = generate_query(admin_levels, country_code) + self.assertIsNotNone(result) + query_result = result[0] + self.assertNotIn( + "AND ('name:en', @country_name) IN (SELECT STRUCT(key, value) FROM UNNEST(all_tags))", + query_result, + ) + + result = generate_query(admin_levels, country_code, country_name="Canada") + self.assertIsNotNone(result) + query_result = result[0] + self.assertIn( + "AND ('name:en', @country_name) IN (SELECT STRUCT(key, value) FROM UNNEST(all_tags))", + query_result, + ) + + result = generate_query(admin_levels, country_code, is_lower=False) + self.assertIsNotNone(result) + self.assertIn( + "AND ('ISO3166-1', 'CA') IN (SELECT STRUCT(key, value) FROM UNNEST(all_tags))", + result[0], + ) + + @patch("reverse_geolocation_populate.src.main.generate_query") + def test_fetch_data(self, mock_generate_query): + from reverse_geolocation_populate.src.main import fetch_data + + def gen_mock_row(osm_id, name): + row_mock = MagicMock( + all_tags=[{"key": "name:en", "value": name}], + items={"osm_id": osm_id}, + geometry=MagicMock(type="Point", coordinates=[0, 0]), + ) + row_mock.__getitem__.side_effect = lambda x: row_mock.items[x] + return row_mock + + query = faker.sentence() + mock_generate_query.return_value = [query, []] + client_mock = MagicMock() + client_mock.query.return_value.result.return_value = MagicMock( + total_rows=3, + __iter__=lambda x: iter( + [ + gen_mock_row(1, "Toronto"), + gen_mock_row(2, "Vancouver"), + gen_mock_row(3, "Montreal"), + gen_mock_row(None, "Invalid"), + ] + ), + ) + with patch("reverse_geolocation_populate.src.main.client", client_mock): + result = fetch_data(3, "CA") + self.assertIsNotNone(result) + self.assertEqual(len(result), 3) + + def test_save_to_database(self): + from reverse_geolocation_populate.src.main import save_to_database + + data = [ + { + "osm_id": 1, + "name": "Toronto", + "geometry": "POINT(-73.5673 45.5017)", + "admin_lvl": 4, + "name:en": "Toronto", + "iso3166_1": None, + "iso3166_2": None, + }, + { + "osm_id": 2, + "name": "Ontario", + "geometry": "POINT(-73.5673 45.5017)", + "admin_lvl": 3, + "name:en": "Ontario", + "iso3166_1": None, + "iso3166_2": "CA-ON", + }, + { + "osm_id": 3, + "name": "Canada", + "geometry": "POINT(-73.5673 45.5017)", + "admin_lvl": 2, + "name:en": "Canada", + "iso3166_1": "CA", + "iso3166_2": None, + }, + { + "osm_id": 5, + "name": None, + "geometry": None, + "admin_lvl": 2, + "name:en": "Canada", + "iso3166_1": "CA", + "iso3166_2": None, + }, + ] + mock_session = MagicMock() + mock_session.query.return_value.filter.return_value.first.return_value = None + save_to_database(data, mock_session) + self.assertEqual(mock_session.add.call_count, 3) + + mock_session = MagicMock() + mock_session.query.return_value.filter.return_value.first.return_value = ( + MagicMock() + ) + save_to_database(data, mock_session) + self.assertEqual(mock_session.add.call_count, 0) + + @patch( + "reverse_geolocation_populate.src.main.fetch_subdivision_admin_levels", + return_value=[2, 3], + ) + def test_get_admin_levels(self, _): + from reverse_geolocation_populate.src.main import get_admin_levels + + country_code = "CA" + result = get_admin_levels(country_code, 1) + self.assertIsNotNone(result) + self.assertEqual(result, [1, 2, 3, 4, 5]) + + @patch("reverse_geolocation_populate.src.main.Logger") + @patch("reverse_geolocation_populate.src.main.bigquery") + @patch("reverse_geolocation_populate.src.main.parse_request_parameters") + @patch("reverse_geolocation_populate.src.main.fetch_country_admin_levels") + @patch("reverse_geolocation_populate.src.main.fetch_data") + @patch("reverse_geolocation_populate.src.main.save_to_database") + @patch.dict( + "os.environ", + { + "FEEDS_DATABASE_URL": default_db_url, + "GOOGLE_APPLICATION_CREDENTIALS": faker.file_path(), + }, + ) + def test_reverse_geolocation_populate( + self, + __, + mock_fetch_data, + mock_fetch_country_admin_lvl, + mock_parse_req, + mock_bigquery, + ___, + ): + mock_parse_req.return_value = ("CA", [2]) + mock_bigquery.Client.return_value = MagicMock() + from reverse_geolocation_populate.src.main import reverse_geolocation_populate + + _, response_code = reverse_geolocation_populate(MagicMock()) + self.assertEqual(400, response_code) + mock_fetch_country_admin_lvl.return_value = [2] + mock_fetch_data.return_value = [ + { + "osm_id": 1, + "name": "Canada", + "geometry": "POINT(-73.5673 45.5017)", + "admin_lvl": 2, + "name:en": "Canada", + "iso3166_1": "CA", + "iso3166_2": None, + } + ] + _, response_code = reverse_geolocation_populate(MagicMock()) + self.assertEqual(200, response_code) diff --git a/infra/functions-python/main.tf b/infra/functions-python/main.tf index 77cc1151b..634820cd9 100644 --- a/infra/functions-python/main.tf +++ b/infra/functions-python/main.tf @@ -44,6 +44,9 @@ locals { function_gbfs_validation_report_config = jsondecode(file("${path.module}/../../functions-python/gbfs_validator/function_config.json")) function_gbfs_validation_report_zip = "${path.module}/../../functions-python/gbfs_validator/.dist/gbfs_validator.zip" + function_reverse_geolocation_populate_config = jsondecode(file("${path.module}/../../functions-python/reverse_geolocation_populate/function_config.json")) + function_reverse_geolocation_populate_zip = "${path.module}/../../functions-python/reverse_geolocation_populate/.dist/reverse_geolocation_populate.zip" + function_feed_sync_dispatcher_transitland_config = jsondecode(file("${path.module}/../../functions-python/feed_sync_dispatcher_transitland/function_config.json")) function_feed_sync_dispatcher_transitland_zip = "${path.module}/../../functions-python/feed_sync_dispatcher_transitland/.dist/feed_sync_dispatcher_transitland.zip" @@ -61,7 +64,8 @@ locals { [for x in local.function_tokens_config.secret_environment_variables : x.key], [for x in local.function_extract_location_config.secret_environment_variables : x.key], [for x in local.function_process_validation_report_config.secret_environment_variables : x.key], - [for x in local.function_update_validation_report_config.secret_environment_variables : x.key] + [for x in local.function_update_validation_report_config.secret_environment_variables : x.key], + [for x in local.function_gbfs_validation_report_config.secret_environment_variables : x.key] ) # Convert the list to a set to ensure uniqueness @@ -149,6 +153,13 @@ resource "google_storage_bucket_object" "operations_api_zip" { source = local.function_operations_api_zip } +# 9. Reverse geolocation populate +resource "google_storage_bucket_object" "reverse_geolocation_populate_zip" { + bucket = google_storage_bucket.functions_bucket.name + name = "reverse-geolocation-populate-${substr(filebase64sha256(local.function_reverse_geolocation_populate_zip), 0, 10)}.zip" + source = local.function_reverse_geolocation_populate_zip +} + # Secrets access resource "google_secret_manager_secret_iam_member" "secret_iam_member" { for_each = local.unique_secret_keys @@ -617,6 +628,7 @@ resource "google_cloudfunctions2_function" "gbfs_validator_pubsub" { resource "google_pubsub_topic" "transitland_feeds_dispatch" { name = "transitland-feeds-dispatch" } +# 6.2 Create batch function that publishes to the Pub/Sub topic resource "google_cloudfunctions2_function" "feed_sync_dispatcher_transitland" { name = "${local.function_feed_sync_dispatcher_transitland_config.name}-batch" description = local.function_feed_sync_dispatcher_transitland_config.description @@ -761,6 +773,51 @@ resource "google_cloudfunctions2_function" "feed_sync_process_transitland" { } } +# 9. functions/reverse_geolocation_populate cloud function +resource "google_cloudfunctions2_function" "reverse_geolocation_populate" { + name = local.function_reverse_geolocation_populate_config.name + description = local.function_reverse_geolocation_populate_config.description + location = var.gcp_region + depends_on = [google_project_iam_member.event-receiving, google_secret_manager_secret_iam_member.secret_iam_member] + + build_config { + runtime = var.python_runtime + entry_point = local.function_reverse_geolocation_populate_config.entry_point + source { + storage_source { + bucket = google_storage_bucket.functions_bucket.name + object = google_storage_bucket_object.reverse_geolocation_populate_zip.name + } + } + } + service_config { + environment_variables = { + PYTHONNODEBUGRANGES = 0 + DB_REUSE_SESSION = "True" + } + available_memory = local.function_reverse_geolocation_populate_config.available_memory + timeout_seconds = local.function_reverse_geolocation_populate_config.timeout + available_cpu = local.function_reverse_geolocation_populate_config.available_cpu + max_instance_request_concurrency = local.function_reverse_geolocation_populate_config.max_instance_request_concurrency + max_instance_count = local.function_reverse_geolocation_populate_config.max_instance_count + min_instance_count = local.function_reverse_geolocation_populate_config.min_instance_count + service_account_email = google_service_account.functions_service_account.email + ingress_settings = local.function_reverse_geolocation_populate_config.ingress_settings + vpc_connector = data.google_vpc_access_connector.vpc_connector.id + vpc_connector_egress_settings = "PRIVATE_RANGES_ONLY" + dynamic "secret_environment_variables" { + for_each = local.function_reverse_geolocation_populate_config.secret_environment_variables + content { + key = secret_environment_variables.value["key"] + project_id = var.project_id + secret = "${upper(var.environment)}_${secret_environment_variables.value["key"]}" + version = "latest" + } + } + } +} + + # IAM entry for all users to invoke the function resource "google_cloudfunctions2_function_iam_member" "tokens_invoker" { project = var.project_id @@ -811,7 +868,11 @@ resource "google_project_iam_member" "event-receiving" { # Grant read access to the datasets bucket for the service account resource "google_storage_bucket_iam_binding" "bucket_object_viewer" { - bucket = "${var.datasets_bucket_name}-${var.environment}" + for_each = { + datasets_bucket = "${var.datasets_bucket_name}-${var.environment}" + } + bucket = each.value + depends_on = [] role = "roles/storage.objectViewer" members = [ "serviceAccount:${google_service_account.functions_service_account.email}" @@ -819,8 +880,12 @@ resource "google_storage_bucket_iam_binding" "bucket_object_viewer" { } # Grant write access to the gbfs bucket for the service account -resource "google_storage_bucket_iam_binding" "gbfs_bucket_object_creator" { - bucket = google_storage_bucket.gbfs_snapshots_bucket.name +resource "google_storage_bucket_iam_binding" "bucket_object_creator" { + for_each = { + gbfs_snapshots_bucket = google_storage_bucket.gbfs_snapshots_bucket.name + } + depends_on = [google_storage_bucket.gbfs_snapshots_bucket] + bucket = each.value role = "roles/storage.objectCreator" members = [ "serviceAccount:${google_service_account.functions_service_account.email}" @@ -926,4 +991,11 @@ resource "google_project_iam_member" "datastore_owner" { project = var.project_id role = "roles/datastore.owner" member = "serviceAccount:${google_service_account.functions_service_account.email}" +} + +# Grant permissions to the service account to create bigquery jobs +resource "google_project_iam_member" "bigquery_job_user" { + project = var.project_id + role = "roles/bigquery.jobUser" + member = "serviceAccount:${google_service_account.functions_service_account.email}" } \ No newline at end of file diff --git a/infra/workflows/main.tf b/infra/workflows/main.tf index 0b3b2f9f1..b8cf8748c 100644 --- a/infra/workflows/main.tf +++ b/infra/workflows/main.tf @@ -14,6 +14,10 @@ # limitations under the License. # +locals { + function_reverse_geolocation_populate_config = jsondecode(file("${path.module}/../../functions-python/reverse_geolocation_populate/function_config.json")) +} + # Service account to execute the workflow resource "google_service_account" "workflows_service_account" { account_id = "workflows-service-account" @@ -130,3 +134,18 @@ resource "google_eventarc_trigger" "gtfs_validator_trigger" { } +# Workflow to populate the db with all countries for reverse geocoding +resource "google_workflows_workflow" "reverse_geocoding_population" { + name = "reverse_geolocation_populate" + region = var.gcp_region + project = var.project_id + description = "Populate the database with all countries for reverse geocoding" + service_account = google_service_account.workflows_service_account.id + user_env_vars = { + reverse_geolocation_populate_url = "https://${var.gcp_region}-${var.project_id}.cloudfunctions.net/${local.function_reverse_geolocation_populate_config.name}" + batch_size = 5 + } + source_contents = file("${path.module}../../../workflows/reverse_geolocation_populate.yml") +} + + diff --git a/liquibase/changelog.xml b/liquibase/changelog.xml index 89dfe9d0d..616d4f9d4 100644 --- a/liquibase/changelog.xml +++ b/liquibase/changelog.xml @@ -33,6 +33,7 @@ + diff --git a/liquibase/changes/feat_823.sql b/liquibase/changes/feat_823.sql new file mode 100644 index 000000000..ab0bdcd58 --- /dev/null +++ b/liquibase/changes/feat_823.sql @@ -0,0 +1,10 @@ +CREATE TABLE IF NOT EXISTS GeoPolygon ( + osm_id INTEGER PRIMARY KEY, + admin_level INTEGER, + name VARCHAR(255), + iso_3166_1_code VARCHAR(3), + iso_3166_2_code VARCHAR(8), + geometry GEOMETRY(Geometry, 4326) +); + +CREATE INDEX IF NOT EXISTS idx_geopolygon_geometry ON GeoPolygon USING GIST (geometry); diff --git a/web-app/src/app/components/MapGeoJSON.tsx b/web-app/src/app/components/MapGeoJSON.tsx new file mode 100644 index 000000000..dd88d574d --- /dev/null +++ b/web-app/src/app/components/MapGeoJSON.tsx @@ -0,0 +1,96 @@ +import * as React from 'react'; +import 'leaflet/dist/leaflet.css'; +import { MapContainer, TileLayer, GeoJSON } from 'react-leaflet'; +import { type LatLngBoundsExpression } from 'leaflet'; + +export interface MapProps { + latest_dataset_url: string; // The base URL to construct the GeoJSON file path +} + +export const MapGeoJSON = ( + props: React.PropsWithChildren, +): JSX.Element => { + const [geoJsonData, setGeoJsonData] = React.useState(null); + + // Construct the GeoJSON URL based on the latest_dataset_url + const geoJsonUrl = props.latest_dataset_url + .split('/') + .slice(0, -2) + .concat('geolocation.geojson') + .join('/'); + console.log('geoJsonUrl = ', geoJsonUrl); + + React.useEffect(() => { + const fetchGeoJson = async (): Promise => { + try { + const response = await fetch(geoJsonUrl); + if (!response.ok) { + throw new Error(`Failed to fetch GeoJSON: ${response.statusText}`); + } + const data = await response.json(); + setGeoJsonData(data); + } catch (error) { + console.error(error); + } + }; + + fetchGeoJson().then( + () => { + console.log('GeoJSON fetched successfully'); + }, + (error) => { + console.error('Failed to fetch GeoJSON: ', error); + }, + ); + }, [geoJsonUrl]); + + const getBoundsFromGeoJson = ( + geoJson: any, + ): LatLngBoundsExpression | undefined => { + if (!geoJson?.features) return undefined; + + const coordinates = geoJson.features.flatMap((feature: any) => + feature.geometry.coordinates.flat(), + ); + const lats = coordinates.map((coord: [number, number]) => coord[1]); + const lngs = coordinates.map((coord: [number, number]) => coord[0]); + + const southWest = [Math.min(...lats), Math.min(...lngs)] as [ + number, + number, + ]; + const northEast = [Math.max(...lats), Math.max(...lngs)] as [ + number, + number, + ]; + return [southWest, northEast] as LatLngBoundsExpression; + }; + + const bounds = geoJsonData ? getBoundsFromGeoJson(geoJsonData) : undefined; + + return ( + + + {geoJsonData && ( + ({ + fillColor: feature?.properties.color || '#3388ff', // Default to blue if no color is specified + weight: 2, + opacity: 1, + color: 'black', // Border color + fillOpacity: 0.7, + })} + /> + )} + + ); +}; diff --git a/workflows/reverse_geolocation_populate.yml b/workflows/reverse_geolocation_populate.yml new file mode 100644 index 000000000..107308c65 --- /dev/null +++ b/workflows/reverse_geolocation_populate.yml @@ -0,0 +1,52 @@ +main: + steps: + - initialize: + assign: + - countryCodes: ['AW', 'AF', 'AO', 'AI', 'AX', 'AL', 'AD', 'AE', 'AR', 'AM', 'AS', 'AQ', 'TF', 'AG', 'AU', 'AT', 'AZ', 'BI', 'BE', 'BJ', 'BQ', 'BF', 'BD', 'BG', 'BH', 'BS', 'BA', 'BL', 'BY', 'BZ', 'BM', 'BO', 'BR', 'BB', 'BN', 'BT', 'BV', 'BW', 'CF', 'CA', 'CC', 'CH', 'CL', 'CN', 'CI', 'CM', 'CD', 'CG', 'CK', 'CO', 'KM', 'CV', 'CR', 'CU', 'CW', 'CX', 'KY', 'CY', 'CZ', 'DE', 'DJ', 'DM', 'DK', 'DO', 'DZ', 'EC', 'EG', 'ER', 'EH', 'ES', 'EE', 'ET', 'FI', 'FJ', 'FK', 'FR', 'FO', 'FM', 'GA', 'GB', 'GE', 'GG', 'GH', 'GI', 'GN', 'GP', 'GM', 'GW', 'GQ', 'GR', 'GD', 'GL', 'GT', 'GF', 'GU', 'GY', 'HK', 'HM', 'HN', 'HR', 'HT', 'HU', 'ID', 'IM', 'IN', 'IO', 'IE', 'IR', 'IQ', 'IS', 'IL', 'IT', 'JM', 'JE', 'JO', 'JP', 'KZ', 'KE', 'KG', 'KH', 'KI', 'KN', 'KR', 'KW', 'LA', 'LB', 'LR', 'LY', 'LC', 'LI', 'LK', 'LS', 'LT', 'LU', 'LV', 'MO', 'MF', 'MA', 'MC', 'MD', 'MG', 'MV', 'MX', 'MH', 'MK', 'ML', 'MT', 'MM', 'ME', 'MN', 'MP', 'MZ', 'MR', 'MS', 'MQ', 'MU', 'MW', 'MY', 'YT', 'NA', 'NC', 'NE', 'NF', 'NG', 'NI', 'NU', 'NL', 'NO', 'NP', 'NR', 'NZ', 'OM', 'PK', 'PA', 'PN', 'PE', 'PH', 'PW', 'PG', 'PL', 'PR', 'KP', 'PT', 'PY', 'PS', 'PF', 'QA', 'RE', 'RO', 'RU', 'RW', 'SA', 'SD', 'SN', 'SG', 'GS', 'SH', 'SJ', 'SB', 'SL', 'SV', 'SM', 'SO', 'PM', 'RS', 'SS', 'ST', 'SR', 'SK', 'SI', 'SE', 'SZ', 'SX', 'SC', 'SY', 'TC', 'TD', 'TG', 'TH', 'TJ', 'TK', 'TM', 'TL', 'TO', 'TT', 'TN', 'TR', 'TV', 'TW', 'TZ', 'UG', 'UA', 'UM', 'UY', 'US', 'UZ', 'VA', 'VC', 'VE', 'VG', 'VI', 'VN', 'VU', 'WF', 'WS', 'YE', 'ZA', 'ZM', 'ZW'] + - successList: "" + - failureList: "" + - process_batch: + parallel: + concurrency_limit: ${sys.get_env("batch_size")} + shared: [successList, failureList] + for: + value: countryCode + in: ${countryCodes} + steps: + - make_request: + try: + call: http.post + args: + url: ${sys.get_env("reverse_geolocation_populate_url")} + timeout: 1800 # Max allowed + auth: + type: OIDC + body: + country_code: ${countryCode} + result: response + except: + as: e + steps: + - log_http_error: + call: sys.log + args: + text: '${"HTTP error for country code " + countryCode + ": " + e.message}' + severity: ERROR + - increment_failure: + assign: + - failureList: ${failureList + " " + countryCode} + - check_response: + switch: + - condition: ${response != null and response.code == 200} + steps: + - increment_success: + assign: + - successList: ${successList + " " + countryCode} + - log_response: + call: sys.log + args: + text: '${"Country code " + countryCode + " got response " + string(response.code)}' + - finalize: + return: + successList: ${successList} + failureList: ${failureList}