diff --git a/functions-python/README.md b/functions-python/README.md index 53f7abe08..c94f41a71 100644 --- a/functions-python/README.md +++ b/functions-python/README.md @@ -32,6 +32,24 @@ The function configuration file contains the following properties: - `min_instance_count`: The minimum number of function instances that can be created in response to a load. - `available_cpu_count`: The number of CPU cores that are available to the function. +# Local Setup + +## Requirements +The requirements to run the functions locally might differ depending on the Google cloud dependencies. Please refer to each function to make sure all the requirements are met. + +## Install the Google Cloud SDK +To be able to run the functions locally, the Google Cloud SDK should be installed. Please refer to the [Google Cloud SDK documentation](https://cloud.google.com/sdk/docs/install) for more information. + +## Install the Google Cloud Emulators + +```bash +gcloud components install cloud-datastore-emulator +``` + +- Install the Pub/Sub emulator +```bash +gcloud components install pubsub-emulator +``` # Useful scripts - To locally execute a function use the following command: diff --git a/functions-python/feed_sync_dispatcher_transitland/.coveragerc b/functions-python/feed_sync_dispatcher_transitland/.coveragerc new file mode 100644 index 000000000..89dac199f --- /dev/null +++ b/functions-python/feed_sync_dispatcher_transitland/.coveragerc @@ -0,0 +1,10 @@ +[run] +omit = + */test*/* + */database_gen/* + */dataset_service/* + */helpers/* + +[report] +exclude_lines = + if __name__ == .__main__.: \ No newline at end of file diff --git a/functions-python/feed_sync_dispatcher_transitland/.env.rename_me b/functions-python/feed_sync_dispatcher_transitland/.env.rename_me new file mode 100644 index 000000000..3250ba24d --- /dev/null +++ b/functions-python/feed_sync_dispatcher_transitland/.env.rename_me @@ -0,0 +1,5 @@ +# Environment variables for tokens function to run locally. Delete this line after rename the file. +FEEDS_DATABASE_URL=postgresql://postgres:postgres@localhost:5432/MobilityDatabase +PROJECT_ID=my-project-id +PUBSUB_TOPIC_NAME=my-topic +TRANSITLAND_API_KEY=your-api-key diff --git a/functions-python/feed_sync_dispatcher_transitland/README.md b/functions-python/feed_sync_dispatcher_transitland/README.md new file mode 100644 index 000000000..303f1ef35 --- /dev/null +++ b/functions-python/feed_sync_dispatcher_transitland/README.md @@ -0,0 +1,79 @@ +# Batch Datasets +This directory contains the GCP serverless function that triggers the sync feeds in transitland. +The function publish one Pub/Sub message per transitland feed to be synced. +```json + { + "message": { + "data": + { + external_id="feeds_onestop_id", + feed_id="feed_id", + execution_id=execution_id, + feed_url="feed_url", + spec="spec", + auth_info_url="auth_info_url", + auth_param_name="auth_param_name", + type="type", + operator_name="operator_name", + country="country", + state_province="state_province", + city_name="city_name", + source="TLD", + payload_type=payload_type + } + } + } +``` + +# Function configuration +The function is configured using the following environment variables: +- `PUBSUB_TOPIC`: The Pub/Sub topic to publish the messages to. +- `PROJECT_ID`: The GCP Project id. +- `TRANSITLAND_API_KEY`: The Transitland API key(secret). + +# Local development +The local development of this function follows the same steps as the other functions. + +Install Google Pub/Sub emulator, please refer to the [README.md](../README.md) file for more information. + +## Python requirements + +- Install the requirements +```bash + pip install -r ./functions-python/feed_sync_dispatcher_transitland/requirements.txt +``` + +## Test locally with Google Cloud Emulators + +- Execute the following commands to start the emulators: +```bash + gcloud beta emulators pubsub start --project=test-project --host-port='localhost:8043' +``` + +- Create a Pub/Sub topic in the emulator: +```bash + curl -X PUT "http://localhost:8043/v1/projects/test-project/topics/feed-sync-transitland" +``` + +- Start function +```bash + export PUBSUB_EMULATOR_HOST=localhost:8043 && ./scripts/function-python-run.sh --function_name feed_sync_dispatcher_transitland +``` + +- [Optional]: Create a local subscription to print published messages: +```bash +./scripts/pubsub_message_print.sh feed-sync-transitland +``` + +- Execute function +```bash + curl http://localhost:8080 +``` + +- To run/debug from your IDE use the file `main_local_debug.py` + +# Test +- Run the tests +```bash + ./scripts/api-tests.sh --folder functions-python/feed_sync_dispatcher_transitland +``` diff --git a/functions-python/feed_sync_dispatcher_transitland/function_config.json b/functions-python/feed_sync_dispatcher_transitland/function_config.json new file mode 100644 index 000000000..99554a359 --- /dev/null +++ b/functions-python/feed_sync_dispatcher_transitland/function_config.json @@ -0,0 +1,19 @@ +{ + "name": "feed-sync-dispatcher-transitland", + "description": "Feed Sync Dispatcher for Transitland", + "entry_point": "feed_sync_dispatcher_transitland", + "timeout": 540, + "memory": "512Mi", + "trigger_http": true, + "include_folders": ["database_gen", "helpers"], + "secret_environment_variables": [ + { + "key": "FEEDS_DATABASE_URL" + } + ], + "ingress_settings": "ALLOW_INTERNAL_AND_GCLB", + "max_instance_request_concurrency": 20, + "max_instance_count": 10, + "min_instance_count": 0, + "available_cpu": 1 +} diff --git a/functions-python/feed_sync_dispatcher_transitland/main_local_debug.py b/functions-python/feed_sync_dispatcher_transitland/main_local_debug.py new file mode 100644 index 000000000..5cf6d7529 --- /dev/null +++ b/functions-python/feed_sync_dispatcher_transitland/main_local_debug.py @@ -0,0 +1,26 @@ +# Code to be able to debug locally without affecting the runtime cloud function + + +# Requirements: +# - Google Cloud SDK installed +# - Make sure to have the following environment variables set in your .env.local file +# - Local database in running state +# - Follow the instructions in the README.md file +# +# Usage: +# - python feed_sync_dispatcher_transitland/main_local_debug.py + +from src.main import feed_sync_dispatcher_transitland +from dotenv import load_dotenv + +# Load environment variables from .env.local +load_dotenv(dotenv_path=".env.local_test") + +if __name__ == "__main__": + + class RequestObject: + def __init__(self, headers): + self.headers = headers + + request = RequestObject({"X-Cloud-Trace-Context": "1234567890abcdef"}) + feed_sync_dispatcher_transitland(request) diff --git a/functions-python/feed_sync_dispatcher_transitland/requirements.txt b/functions-python/feed_sync_dispatcher_transitland/requirements.txt new file mode 100644 index 000000000..3d7b3f6ef --- /dev/null +++ b/functions-python/feed_sync_dispatcher_transitland/requirements.txt @@ -0,0 +1,20 @@ +# Common packages +functions-framework==3.* +google-cloud-logging +psycopg2-binary==2.9.6 +aiohttp~=3.10.5 +asyncio~=3.4.3 +urllib3~=2.2.2 +requests~=2.32.3 +attrs~=23.1.0 +pluggy~=1.3.0 +certifi~=2024.8.30 +pandas + +# SQL Alchemy and Geo Alchemy +SQLAlchemy==2.0.23 +geoalchemy2==0.14.7 + +# Google specific packages for this function +google-cloud-pubsub +cloudevents~=1.10.1 diff --git a/functions-python/feed_sync_dispatcher_transitland/requirements_dev.txt b/functions-python/feed_sync_dispatcher_transitland/requirements_dev.txt new file mode 100644 index 000000000..9ee50adce --- /dev/null +++ b/functions-python/feed_sync_dispatcher_transitland/requirements_dev.txt @@ -0,0 +1,2 @@ +Faker +pytest~=7.4.3 \ No newline at end of file diff --git a/functions-python/feed_sync_dispatcher_transitland/src/__init__.py b/functions-python/feed_sync_dispatcher_transitland/src/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/functions-python/feed_sync_dispatcher_transitland/src/main.py b/functions-python/feed_sync_dispatcher_transitland/src/main.py new file mode 100644 index 000000000..90592f725 --- /dev/null +++ b/functions-python/feed_sync_dispatcher_transitland/src/main.py @@ -0,0 +1,358 @@ +# +# MobilityData 2024 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import json +import os +import logging +import time +import random +from dataclasses import dataclass, asdict +from typing import Optional, List +import requests +from requests.exceptions import RequestException, HTTPError +import pandas as pd + +import functions_framework +from google.cloud.pubsub_v1.futures import Future +from sqlalchemy.orm import Session +from sqlalchemy import text + +from helpers.feed_sync.feed_sync_common import FeedSyncProcessor, FeedSyncPayload +from helpers.feed_sync.feed_sync_dispatcher import feed_sync_dispatcher +from helpers.pub_sub import get_pubsub_client, get_execution_id + +# Logging configuration +logging.basicConfig( + level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" +) + +# Environment variables +PUBSUB_TOPIC_NAME = os.getenv("PUBSUB_TOPIC_NAME") +PROJECT_ID = os.getenv("PROJECT_ID") +FEEDS_DATABASE_URL = os.getenv("FEEDS_DATABASE_URL") +apikey = os.getenv("TRANSITLAND_API_KEY") +TRANSITLAND_OPERATOR_URL = os.getenv("TRANSITLAND_OPERATOR_URL") +TRANSITLAND_FEED_URL = os.getenv("TRANSITLAND_FEED_URL") +spec = ["gtfs", "gtfs-rt"] + +# session instance to reuse connections +session = requests.Session() + + +@dataclass +class TransitFeedSyncPayload: + """ + Data class for transit feed sync payloads. + """ + + external_id: str + feed_id: str + feed_url: Optional[str] = None + execution_id: Optional[str] = None + spec: Optional[str] = None + auth_info_url: Optional[str] = None + auth_param_name: Optional[str] = None + type: Optional[str] = None + operator_name: Optional[str] = None + country: Optional[str] = None + state_province: Optional[str] = None + city_name: Optional[str] = None + source: Optional[str] = None + payload_type: Optional[str] = None + + def to_dict(self): + return asdict(self) + + def to_json(self): + return json.dumps(self.to_dict()) + + +class TransitFeedSyncProcessor(FeedSyncProcessor): + def check_url_status(self, url: str) -> bool: + """ + Checks if a URL returns a valid response (not 404 or 500). + """ + try: + response = requests.head(url, timeout=25) + return response.status_code not in {404, 500} + except requests.RequestException as e: + logging.warning(f"Failed to reach {url}: {e}") + return False + + def process_sync( + self, db_session: Optional[Session] = None, execution_id: Optional[str] = None + ) -> List[FeedSyncPayload]: + """ + Process data synchronously to fetch, extract, combine, filter and prepare payloads for publishing + to a queue based on conditions related to the data retrieved from TransitLand API. + """ + feeds_data = self.get_data(TRANSITLAND_FEED_URL, apikey, spec, session) + operators_data = self.get_data( + TRANSITLAND_OPERATOR_URL, apikey, session=session + ) + + feeds = self.extract_feeds_data(feeds_data) + operators = self.extract_operators_data(operators_data) + + # Converts operators and feeds to pandas DataFrames + operators_df = pd.DataFrame(operators) + feeds_df = pd.DataFrame(feeds) + + # Merge operators and feeds DataFrames on 'operator_feed_id' and 'feed_id' + combined_df = pd.merge( + operators_df, + feeds_df, + left_on="operator_feed_id", + right_on="feed_id", + how="inner", + ) + + # Filtered out rows where 'feed_url' is missing + combined_df = combined_df[combined_df["feed_url"].notna()] + + # Group by 'feed_id' and concatenate 'operator_name' while keeping first values of other columns + df_grouped = ( + combined_df.groupby("feed_id") + .agg( + { + "operator_name": lambda x: ", ".join(x), + "feeds_onestop_id": "first", + "feed_url": "first", + "operator_feed_id": "first", + "spec": "first", + "country": "first", + "state_province": "first", + "city_name": "first", + "auth_info_url": "first", + "auth_param_name": "first", + "type": "first", + } + ) + .reset_index() + ) + + # Filtered out unwanted countries + countries_not_included = ["France", "Japan"] + filtered_df = df_grouped[ + ~df_grouped["country"] + .str.lower() + .isin([c.lower() for c in countries_not_included]) + ] + + # Filtered out URLs that return undesired status codes + filtered_df = filtered_df[filtered_df["feed_url"].apply(self.check_url_status)] + + # Convert filtered DataFrame to dictionary format + combined_data = filtered_df.to_dict(orient="records") + + payloads = [] + for data in combined_data: + external_id = data["feeds_onestop_id"] + feed_url = data["feed_url"] + source = "TLD" + + if not self.check_external_id(db_session, external_id, source): + payload_type = "new" + else: + mbd_feed_url = self.get_mbd_feed_url(db_session, external_id, source) + if mbd_feed_url != feed_url: + payload_type = "update" + else: + continue + + # prepare payload + payload = TransitFeedSyncPayload( + external_id=external_id, + feed_id=data["feed_id"], + execution_id=execution_id, + feed_url=feed_url, + spec=data["spec"], + auth_info_url=data["auth_info_url"], + auth_param_name=data["auth_param_name"], + type=data["type"], + operator_name=data["operator_name"], + country=data["country"], + state_province=data["state_province"], + city_name=data["city_name"], + source="TLD", + payload_type=payload_type, + ) + payloads.append(FeedSyncPayload(external_id=external_id, payload=payload)) + + return payloads + + def get_data( + self, + url, + apikey, + spec=None, + session=None, + max_retries=3, + initial_delay=60, + max_delay=120, + ): + """ + This function retrieves data from the specified Transitland feeds and operator endpoints. + Handles rate limits, retries, and error cases. + Returns the parsed data as a dictionary containing feeds and operators. + """ + headers = {"apikey": apikey} + params = {"spec": spec} if spec else {} + all_data = {"feeds": [], "operators": []} + delay = initial_delay + + while url: + for attempt in range(max_retries): + try: + response = session.get( + url, headers=headers, params=params, timeout=30 + ) + response.raise_for_status() + data = response.json() + all_data["feeds"].extend(data.get("feeds", [])) + all_data["operators"].extend(data.get("operators", [])) + url = data.get("meta", {}).get("next") + delay = initial_delay + break + + except (RequestException, HTTPError) as e: + logging.error("Attempt %s failed: %s", attempt + 1, e) + if response.status_code == 429: + logging.warning("Rate limit hit. Waiting for %s seconds", delay) + time.sleep(delay + random.uniform(0, 1)) + delay = min(delay * 2, max_delay) + elif attempt == max_retries - 1: + logging.error( + "Failed to fetch data after %s attempts.", max_retries + ) + return all_data + else: + time.sleep(delay) + return all_data + + def extract_feeds_data(self, feeds_data: dict) -> List[dict]: + """ + This function extracts relevant data from the Transitland feeds endpoint containing feeds information. + Returns a list of dictionaries representing each feed. + """ + feeds = [] + for feed in feeds_data["feeds"]: + feed_url = feed["urls"].get("static_current") + feeds.append( + { + "feed_id": feed["id"], + "feed_url": feed_url, + "spec": feed["spec"].lower(), + "feeds_onestop_id": feed["onestop_id"], + "auth_info_url": feed["authorization"].get("info_url"), + "auth_param_name": feed["authorization"].get("param_name"), + "type": feed["authorization"].get("type"), + } + ) + return feeds + + def extract_operators_data(self, operators_data: dict) -> List[dict]: + """ + This function extracts relevant data from the Transitland operators endpoint. + Constructs a list of dictionaries containing information about each operator. + """ + operators = [] + for operator in operators_data["operators"]: + if operator.get("agencies") and operator["agencies"][0].get("places"): + places = operator["agencies"][0]["places"] + place = places[1] if len(places) > 1 else places[0] + + operator_data = { + "operator_name": operator.get("name"), + "operator_feed_id": operator["feeds"][0]["id"] + if operator.get("feeds") + else None, + "country": place.get("adm0_name") if place else None, + "state_province": place.get("adm1_name") if place else None, + "city_name": place.get("city_name") if place else None, + } + operators.append(operator_data) + return operators + + def check_external_id( + self, db_session: Session, external_id: str, source: str + ) -> bool: + """ + Checks if the external_id exists in the public.externalid table for the given source. + :param db_session: SQLAlchemy session + :param external_id: The external_id (feeds_onestop_id) to check + :param source: The source to filter by (e.g., 'TLD' for TransitLand) + :return: True if the feed exists, False otherwise + """ + query = text( + "SELECT 1 FROM public.externalid WHERE associated_id = :external_id AND source = :source LIMIT 1" + ) + result = db_session.execute( + query, {"external_id": external_id, "source": source} + ).fetchone() + return result is not None + + def get_mbd_feed_url( + self, db_session: Session, external_id: str, source: str + ) -> Optional[str]: + """ + Retrieves the feed_url from the public.feed table in the mbd for the given external_id. + :param db_session: SQLAlchemy session + :param external_id: The external_id (feeds_onestop_id) from TransitLand + :param source: The source to filter by (e.g., 'TLD' for TransitLand) + :return: feed_url in mbd if exists, otherwise None + """ + query = text( + """ + SELECT f.producer_url + FROM public.feed f + JOIN public.externalid e ON f.id = e.feed_id + WHERE e.associated_id = :external_id AND e.source = :source + LIMIT 1 + """ + ) + result = db_session.execute( + query, {"external_id": external_id, "source": source} + ).fetchone() + return result[0] if result else None + + def publish_callback( + self, future: Future, payload: FeedSyncPayload, topic_path: str + ): + """ + Callback function for when the message is published to Pub/Sub. + This function logs the result of the publishing operation. + """ + if future.exception(): + print( + f"Error publishing transit land feed {payload.external_id} " + f"to Pub/Sub topic {topic_path}: {future.exception()}" + ) + else: + print(f"Published transit land feed {payload.external_id}.") + + +@functions_framework.http +def feed_sync_dispatcher_transitland(request): + """ + HTTP Function entry point queries the transitland API and publishes events to a Pub/Sub topic to be processed. + """ + publisher = get_pubsub_client() + topic_path = publisher.topic_path(PROJECT_ID, PUBSUB_TOPIC_NAME) + transit_land_feed_sync_processor = TransitFeedSyncProcessor() + execution_id = get_execution_id(request, "feed-sync-dispatcher") + feed_sync_dispatcher(transit_land_feed_sync_processor, topic_path, execution_id) + return "Feed sync dispatcher executed successfully." diff --git a/functions-python/feed_sync_dispatcher_transitland/tests/test_feed_sync.py b/functions-python/feed_sync_dispatcher_transitland/tests/test_feed_sync.py new file mode 100644 index 000000000..470ce5115 --- /dev/null +++ b/functions-python/feed_sync_dispatcher_transitland/tests/test_feed_sync.py @@ -0,0 +1,359 @@ +import pytest +from unittest.mock import Mock, patch, call +from requests import Session as RequestsSession +from sqlalchemy.orm import Session as DBSession +from feed_sync_dispatcher_transitland.src.main import ( + TransitFeedSyncProcessor, + FeedSyncPayload, +) +import pandas as pd +from requests.exceptions import HTTPError + + +@pytest.fixture +def processor(): + return TransitFeedSyncProcessor() + + +@patch("feed_sync_dispatcher_transitland.src.main.requests.Session.get") +def test_get_data(mock_get, processor): + mock_response = Mock() + mock_response.json.return_value = { + "feeds": [ + { + "id": "feed1", + "urls": {"static_current": "http://example.com/feed1"}, + "spec": "gtfs", + "onestop_id": "onestop1", + "authorization": {}, + } + ], + "operators": [], + } + mock_response.status_code = 200 + mock_get.return_value = mock_response + + result = processor.get_data( + "https://api.transit.land", "dummy_api_key", session=RequestsSession() + ) + assert "feeds" in result + assert result["feeds"][0]["id"] == "feed1" + + +@patch("feed_sync_dispatcher_transitland.src.main.requests.Session.get") +def test_get_data_rate_limit(mock_get, processor): + mock_response = Mock() + mock_response.status_code = 429 + mock_response.json.return_value = {"feeds": [], "operators": []} + mock_get.return_value = mock_response + + result = processor.get_data( + "https://api.transit.land", + "dummy_api_key", + session=RequestsSession(), + max_retries=1, + ) + assert result == {"feeds": [], "operators": []} + + +def test_extract_feeds_data(processor): + feeds_data = { + "feeds": [ + { + "id": "feed1", + "urls": {"static_current": "http://example.com/feed1"}, + "spec": "gtfs", + "onestop_id": "onestop1", + "authorization": {}, + } + ] + } + result = processor.extract_feeds_data(feeds_data) + assert len(result) == 1 + assert result[0]["feed_id"] == "feed1" + + +def test_extract_operators_data(processor): + operators_data = { + "operators": [ + { + "name": "Operator 1", + "feeds": [{"id": "feed1"}], + "agencies": [{"places": [{"adm0_name": "USA"}]}], + } + ] + } + result = processor.extract_operators_data(operators_data) + assert len(result) == 1 + assert result[0]["operator_name"] == "Operator 1" + + +def test_check_external_id(processor): + mock_db_session = Mock(spec=DBSession) + mock_db_session.execute.return_value.fetchone.return_value = (1,) + result = processor.check_external_id(mock_db_session, "onestop1", "TLD") + assert result is True + + mock_db_session.execute.return_value.fetchone.return_value = None + result = processor.check_external_id(mock_db_session, "onestop2", "TLD") + assert result is False + + +def test_get_mbd_feed_url(processor): + mock_db_session = Mock(spec=DBSession) + mock_db_session.execute.return_value.fetchone.return_value = ( + "http://example.com/feed1", + ) + result = processor.get_mbd_feed_url(mock_db_session, "onestop1", "TLD") + assert result == "http://example.com/feed1" + + mock_db_session.execute.return_value.fetchone.return_value = None + result = processor.get_mbd_feed_url(mock_db_session, "onestop2", "TLD") + assert result is None + + +def test_process_sync_new_feed(processor): + mock_db_session = Mock(spec=DBSession) + feeds_data = { + "feeds": [ + { + "id": "feed1", + "urls": {"static_current": "http://example.com/feed1"}, + "spec": "gtfs", + "onestop_id": "onestop1", + "authorization": {}, + } + ], + "operators": [], + } + operators_data = { + "operators": [ + { + "name": "Operator 1", + "feeds": [{"id": "feed1"}], + "agencies": [{"places": [{"adm0_name": "USA"}]}], + } + ], + "feeds": [], + } + + processor.get_data = Mock(side_effect=[feeds_data, operators_data]) + + processor.check_url_status = Mock(return_value=True) + + with patch.object(processor, "check_external_id", return_value=False): + payloads = processor.process_sync( + db_session=mock_db_session, execution_id="exec123" + ) + assert len(payloads) == 1 + assert payloads[0].payload.payload_type == "new" + assert payloads[0].payload.external_id == "onestop1" + + +def test_process_sync_updated_feed(processor): + mock_db_session = Mock(spec=DBSession) + feeds_data = { + "feeds": [ + { + "id": "feed1", + "urls": {"static_current": "http://example.com/feed1_updated"}, + "spec": "gtfs", + "onestop_id": "onestop1", + "authorization": {}, + } + ], + "operators": [], + } + operators_data = { + "operators": [ + { + "name": "Operator 1", + "feeds": [{"id": "feed1"}], + "agencies": [{"places": [{"adm0_name": "USA"}]}], + } + ], + "feeds": [], + } + + processor.get_data = Mock(side_effect=[feeds_data, operators_data]) + + processor.check_url_status = Mock(return_value=True) + + processor.check_external_id = Mock(return_value=True) + + processor.get_mbd_feed_url = Mock(return_value="http://example.com/feed1") + + payloads = processor.process_sync( + db_session=mock_db_session, execution_id="exec123" + ) + + assert len(payloads) == 1 + assert payloads[0].payload.payload_type == "update" + assert payloads[0].payload.external_id == "onestop1" + + +@patch("feed_sync_dispatcher_transitland.src.main.TransitFeedSyncProcessor.get_data") +def test_process_sync_unchanged_feed(mock_get_data, processor): + mock_db_session = Mock(spec=DBSession) + feeds_data = { + "feeds": [ + { + "id": "feed1", + "urls": {"static_current": "http://example.com/feed1"}, + "spec": "gtfs", + "onestop_id": "onestop1", + "authorization": {}, + } + ], + "operators": [], + } + operators_data = { + "operators": [ + { + "name": "Operator 1", + "feeds": [{"id": "feed1"}], + "agencies": [{"places": [{"adm0_name": "USA"}]}], + } + ], + "feeds": [], + } + + processor.get_data = Mock(side_effect=[feeds_data, operators_data]) + processor.check_url_status = Mock(return_value=True) + processor.check_external_id = Mock(return_value=True) + processor.get_mbd_feed_url = Mock(return_value="http://example.com/feed1") + processor.get_mbd_feed_url = Mock(return_value="http://example.com/feed1") + payloads = processor.process_sync( + db_session=mock_db_session, execution_id="exec123" + ) + + assert len(payloads) == 0 + + processor.get_mbd_feed_url.assert_called_once_with( + mock_db_session, "onestop1", "TLD" + ) + + +@patch("feed_sync_dispatcher_transitland.src.main.requests.head") +def test_check_url_status(mock_head, processor): + mock_head.return_value.status_code = 200 + result = processor.check_url_status("http://example.com") + assert result is True + + mock_head.return_value.status_code = 404 + result = processor.check_url_status("http://example.com") + assert result is False + + +def test_merge_and_filter_dataframes(processor): + operators = [ + { + "operator_name": "Operator 1", + "operator_feed_id": "feed1", + "country": "USA", + "state_province": "CA", + "city_name": "San Francisco", + }, + { + "operator_name": "Operator 2", + "operator_feed_id": "feed2", + "country": "Japan", + "state_province": "Tokyo", + "city_name": "Tokyo", + }, + ] + feeds = [ + { + "feed_id": "feed1", + "feed_url": "http://example.com/feed1", + "spec": "gtfs", + "feeds_onestop_id": "onestop1", + "auth_info_url": None, + "auth_param_name": None, + "type": None, + }, + { + "feed_id": "feed2", + "feed_url": "http://example.com/feed2", + "spec": "gtfs", + "feeds_onestop_id": "onestop2", + "auth_info_url": None, + "auth_param_name": None, + "type": None, + }, + ] + + operators_df = pd.DataFrame(operators) + feeds_df = pd.DataFrame(feeds) + + combined_df = pd.merge( + operators_df, + feeds_df, + left_on="operator_feed_id", + right_on="feed_id", + how="inner", + ) + combined_df = combined_df[combined_df["feed_url"].notna()] + countries_not_included = ["France", "Japan"] + filtered_df = combined_df[ + ~combined_df["country"] + .str.lower() + .isin([c.lower() for c in countries_not_included]) + ] + + assert len(filtered_df) == 1 + assert filtered_df.iloc[0]["operator_name"] == "Operator 1" + assert filtered_df.iloc[0]["feed_id"] == "feed1" + + +def test_publish_callback_success(processor): + future = Mock() + future.exception.return_value = None + payload = FeedSyncPayload(external_id="onestop1", payload=None) + topic_path = "projects/project-id/topics/topic-name" + + with patch("builtins.print") as mock_print: + processor.publish_callback(future, payload, topic_path) + mock_print.assert_called_with("Published transit land feed onestop1.") + + +def test_publish_callback_failure(processor): + future = Mock() + future.exception.return_value = Exception("Publish error") + payload = FeedSyncPayload(external_id="onestop1", payload=None) + topic_path = "projects/project-id/topics/topic-name" + + with patch("builtins.print") as mock_print: + processor.publish_callback(future, payload, topic_path) + mock_print.assert_called_with( + f"Error publishing transit land feed onestop1 to Pub/Sub topic {topic_path}: Publish error" + ) + + +def test_get_data_retries(processor): + # Mock the requests session + mock_session = Mock(spec=RequestsSession) + + mock_response = Mock() + mock_response.raise_for_status.side_effect = HTTPError() + mock_response.status_code = 500 + + mock_session.get.return_value = mock_response + + with patch("time.sleep", return_value=None) as mock_sleep: + result = processor.get_data( + url="http://example.com", + apikey="dummy_api_key", + session=mock_session, + max_retries=3, + initial_delay=1, + max_delay=2, + ) + + assert mock_session.get.call_count == 3 + + assert mock_sleep.call_count == 2 + + mock_sleep.assert_has_calls([call(1), call(1)]) + + assert result == {"feeds": [], "operators": []} diff --git a/functions-python/feed_sync_process_transitland/.env.rename_me b/functions-python/feed_sync_process_transitland/.env.rename_me new file mode 100644 index 000000000..8324ba1ef --- /dev/null +++ b/functions-python/feed_sync_process_transitland/.env.rename_me @@ -0,0 +1,6 @@ +# Environment variables for tokens function to run locally. Delete this line after rename the file. +FEEDS_DATABASE_URL=postgresql://postgres:postgres@localhost:54320/MobilityDatabase +PROJECT_ID=my-project-id +PUBSUB_TOPIC_NAME=my-topic +TRANSITLAND_API_KEY=your-api-key +DATASET_BATCH_TOPIC_NAME=dataset_batch_topic diff --git a/functions-python/feed_sync_process_transitland/README.md b/functions-python/feed_sync_process_transitland/README.md new file mode 100644 index 000000000..573f59dad --- /dev/null +++ b/functions-python/feed_sync_process_transitland/README.md @@ -0,0 +1,107 @@ +# Feed Sync Process + +Subscribed to the topic set in the `feed-sync-dispatcher` function, `feed-sync-process` is triggered for each message published. It handles the processing of feed updates, ensuring data consistency and integrity. The function performs the following operations: + +1. **Feed Status Check**: It verifies the current state of the feed in the database using external_id and source. +2. **URL Validation**: Checks if the feed URL already exists in the database. +3. **Feed Processing**: Based on the current state: + - If no existing feed is found, creates a new feed entry + - If feed exists with a different URL, creates a new feed and deprecates the old one + - If feed exists with the same URL, no action is taken +4. **Batch Processing Trigger**: For non-authenticated feeds, publishes events to the dataset batch topic for further processing. + +The function maintains feed history through the `redirectingid` table and ensures proper status tracking with 'active' and 'deprecated' states. + +# Message Format +The function expects a Pub/Sub message with the following format: +```json +{ + "message": { + "data": { + "external_id": "feed-identifier", + "feed_id": "unique-feed-id", + "feed_url": "http://example.com/feed", + "execution_id": "execution-identifier", + "spec": "gtfs", + "auth_info_url": null, + "auth_param_name": null, + "type": null, + "operator_name": "Transit Agency Name", + "country": "Country Name", + "state_province": "State/Province", + "city_name": "City Name", + "source": "TLD", + "payload_type": "new|update" + } + } +} +``` + +# Function Configuration +The function is configured using the following environment variables: +- `PROJECT_ID`: The Google Cloud project ID +- `DATASET_BATCH_TOPIC_NAME`: The name of the topic for batch processing triggers +- `FEEDS_DATABASE_URL`: The URL of the feeds database +- `ENV`: [Optional] Environment identifier (e.g., 'dev', 'prod') + +# Database Schema +The function interacts with the following tables: +1. `feed`: Stores feed information + - Contains fields like id, data_type, feed_name, producer_url, etc. + - Tracks feed status ('active' or 'deprecated') + - Uses CURRENT_TIMESTAMP for created_at + +2. `externalid`: Maps external identifiers to feed IDs + - Links external_id and source to feed entries + - Maintains source tracking + +3. `redirectingid`: Tracks feed updates + - Maps old feed IDs to new ones + - Maintains update history + +# Local development +The local development of this function follows the same steps as the other functions. + +Install Google Pub/Sub emulator, please refer to the [README.md](../README.md) file for more information. + +## Python requirements + +- Install the requirements +```bash + pip install -r ./functions-python/feed_sync_process_transitland/requirements.txt +``` + +## Test locally with Google Cloud Emulators + +- Execute the following commands to start the emulators: +```bash + gcloud beta emulators pubsub start --project=test-project --host-port='localhost:8043' +``` + +- Create a Pub/Sub topic in the emulator: +```bash + curl -X PUT "http://localhost:8043/v1/projects/test-project/topics/feed-sync-transitland" +``` + +- Start function +```bash + export PUBSUB_EMULATOR_HOST=localhost:8043 && ./scripts/function-python-run.sh --function_name feed_sync_process_transitland +``` + +- [Optional]: Create a local subscription to print published messages: +```bash +./scripts/pubsub_message_print.sh feed-sync-process-transitland +``` + +- Execute function +```bash + curl http://localhost:8080 +``` + +- To run/debug from your IDE use the file `main_local_debug.py` + +# Test +- Run the tests +```bash + ./scripts/api-tests.sh --folder functions-python/feed_sync_dispatcher_transitland +``` diff --git a/functions-python/feed_sync_process_transitland/function_config.json b/functions-python/feed_sync_process_transitland/function_config.json new file mode 100644 index 000000000..d2c74f639 --- /dev/null +++ b/functions-python/feed_sync_process_transitland/function_config.json @@ -0,0 +1,23 @@ +{ + "name": "batch-process-dataset", + "description": "Process datasets from the feed passed in the Pub/Sub event", + "entry_point": "process_feed_event", + "timeout": 540, + "memory": "2Gi", + "trigger_http": true, + "include_folders": ["database_gen", "helpers"], + "secret_environment_variables": [ + { + "key": "FEEDS_DATABASE_URL" + }, + { + "key": "FEEDS_CREDENTIALS", + "secret": "FEEDS_CREDENTIALS" + } + ], + "ingress_settings": "ALLOW_INTERNAL_AND_GCLB", + "max_instance_request_concurrency": 1, + "max_instance_count": 10, + "min_instance_count": 0, + "available_cpu": 1 +} diff --git a/functions-python/feed_sync_process_transitland/main_local_debug.py b/functions-python/feed_sync_process_transitland/main_local_debug.py new file mode 100644 index 000000000..d586cc637 --- /dev/null +++ b/functions-python/feed_sync_process_transitland/main_local_debug.py @@ -0,0 +1,104 @@ +# Code to be able to debug locally without affecting the runtime cloud function +# +# Requirements: +# - Google Cloud SDK installed +# - Make sure to have the following environment variables set in your .env.local file +# - Local database in running state +# - Follow the instructions in the README.md file +# +# Usage: +# - python feed_sync_process_transitland/main_local_debug.py + +import base64 +import json +import logging +from dataclasses import dataclass +from dotenv import load_dotenv +from feed_sync_process_transitland.src.main import process_feed_event +import src.main + +logging.basicConfig( + level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" +) + +# Create logger instance +logger = logging.getLogger("feed_processor") +handler = logging.StreamHandler() +handler.setFormatter( + logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") +) +logger.addHandler(handler) +logger.setLevel(logging.INFO) + + +src.main.logger = logger + +# Load environment variables from .env.local +load_dotenv(dotenv_path=".env.local_test") + + +@dataclass +class CloudEvent: + attributes: dict + data: dict + + +if __name__ == "__main__": + logger.info("Starting local debug session...") + + # Define cloud event attributes + attributes = { + "type": "com.google.cloud.pubsub.topic.publish", + "source": "//pubsub.googleapis.com/projects/sample-project/topics/sample-topic", + } + + feed_payload = { + "external_id": "test-feed-1", + "feed_id": "feed1", + "feed_url": "http://example.com/test-feed", + "execution_id": "local-debug-123", + "spec": "gtfs", + "auth_info_url": None, + "auth_param_name": None, + "type": None, + "operator_name": "Test Operator", + "country": "USA", + "state_province": "CA", + "city_name": "Test City", + "source": "TLD", + "payload_type": "new", + } + + data = { + "message": { + "data": base64.b64encode(json.dumps(feed_payload).encode("utf-8")).decode( + "utf-8" + ) + } + } + + # Create and process cloud event + cloud_event = CloudEvent(attributes, data) + logger.info("\nProcessing new feed event:") + logger.info("-" * 50) + process_feed_event(cloud_event) + + # Test update scenario + logger.info("\nProcessing update feed event:") + logger.info("-" * 50) + update_payload = feed_payload.copy() + update_payload["feed_url"] = "http://example.com/test-feed-updated" + update_payload["payload_type"] = "update" + + update_data = { + "message": { + "data": base64.b64encode(json.dumps(update_payload).encode("utf-8")).decode( + "utf-8" + ) + } + } + + cloud_event_update = CloudEvent(attributes, update_data) + process_feed_event(cloud_event_update) + + logger.info("Local debug session completed.") diff --git a/functions-python/feed_sync_process_transitland/requirements.txt b/functions-python/feed_sync_process_transitland/requirements.txt new file mode 100644 index 000000000..3d7b3f6ef --- /dev/null +++ b/functions-python/feed_sync_process_transitland/requirements.txt @@ -0,0 +1,20 @@ +# Common packages +functions-framework==3.* +google-cloud-logging +psycopg2-binary==2.9.6 +aiohttp~=3.10.5 +asyncio~=3.4.3 +urllib3~=2.2.2 +requests~=2.32.3 +attrs~=23.1.0 +pluggy~=1.3.0 +certifi~=2024.8.30 +pandas + +# SQL Alchemy and Geo Alchemy +SQLAlchemy==2.0.23 +geoalchemy2==0.14.7 + +# Google specific packages for this function +google-cloud-pubsub +cloudevents~=1.10.1 diff --git a/functions-python/feed_sync_process_transitland/requirements_dev.txt b/functions-python/feed_sync_process_transitland/requirements_dev.txt new file mode 100644 index 000000000..3b499f462 --- /dev/null +++ b/functions-python/feed_sync_process_transitland/requirements_dev.txt @@ -0,0 +1,4 @@ +Faker +pytest~=7.4.3 +pytest-cov>=4.1.0 +pytest-mock>=3.10.0 \ No newline at end of file diff --git a/functions-python/feed_sync_process_transitland/src/__init__.py b/functions-python/feed_sync_process_transitland/src/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/functions-python/feed_sync_process_transitland/src/main.py b/functions-python/feed_sync_process_transitland/src/main.py new file mode 100644 index 000000000..2b108d97e --- /dev/null +++ b/functions-python/feed_sync_process_transitland/src/main.py @@ -0,0 +1,509 @@ +# +# MobilityData 2024 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import base64 +import json +import logging +import os +import uuid +from dataclasses import dataclass +from typing import Optional, Tuple + +import functions_framework +from google.cloud import pubsub_v1 +from sqlalchemy import text +from sqlalchemy.orm import Session + +from helpers.database import start_db_session, close_db_session + +# Configure logging +logger = logging.getLogger("feed_processor") +if not logger.handlers: + handler = logging.StreamHandler() + handler.setFormatter( + logging.Formatter("%(asctime)s - %(name)s " + "- %(levelname)s - %(message)s") + ) + logger.addHandler(handler) + logger.setLevel(logging.INFO) + +# Environment variables +PROJECT_ID = os.getenv("PROJECT_ID") +DATASET_BATCH_TOPIC = os.getenv("DATASET_BATCH_TOPIC_NAME") +FEEDS_DATABASE_URL = os.getenv("FEEDS_DATABASE_URL") + + +@dataclass +class FeedPayload: + """Data class for feed processing payload""" + + external_id: str + feed_id: str + feed_url: str + execution_id: Optional[str] + spec: str + auth_info_url: Optional[str] + auth_param_name: Optional[str] + type: Optional[str] + operator_name: Optional[str] + country: Optional[str] + state_province: Optional[str] + city_name: Optional[str] + source: str + payload_type: str + + +class FeedProcessor: + """Handles feed processing operations including database interactions""" + + def __init__(self, db_session: Session): + self.session = db_session + self.publisher = pubsub_v1.PublisherClient() + + def process_feed(self, payload: FeedPayload) -> None: + """ + Processes feed idempotently based on database state, not payload type. + This function determines the action by checking + the current state in the database. + + Args: + payload (FeedPayload): The feed payload to process + """ + try: + logger.info( + f"Starting feed processing " + f"for external_id: {payload.external_id}" + ) + + # Check current state in database + current_feed_id, current_url = self.get_current_feed_info( + payload.external_id, payload.source + ) + + if current_feed_id is None: + logger.info("Processing new feed") + # If no existing feed found - checks if URL exists in any feed + if self.check_feed_url_exists(payload.feed_url): + logger.info(f"Feed URL already exists: {payload.feed_url}") + return + # Create new feed + self.process_new_feed(payload) + else: + # If Feed exists - check if URL has changed + if current_url != payload.feed_url: + logger.info("Processing feed update") + logger.debug( + f"Found existing feed: " + f"{current_feed_id} with different URL" + ) + # URL changed - handle update + self.process_feed_update(payload, current_feed_id) + else: + logger.info( + f"Feed already exists with " + f"same URL: {payload.external_id}" + ) + return + + self.session.commit() + logger.debug("Database transaction committed successfully") + + # Publish to dataset batch topic if not authenticated + if not payload.auth_info_url: + self.publish_to_batch_topic(payload) + + except Exception as e: + error_msg = (f"Error processing " + f"feed {payload.external_id}: {str(e)}") + logger.error(error_msg) + self.session.rollback() + logger.debug("Database transaction rolled back due to error") + raise + + def process_new_feed(self, payload: FeedPayload) -> None: + """ + Process creation of a new feed + + Args: + payload (FeedPayload): The feed payload for new feed + """ + logger.info( + f"Starting new feed creation " + f"for external_id: {payload.external_id}" + ) + + # Checks if feed with same URL exists + if self.check_feed_url_exists(payload.feed_url): + logger.info(f"Feed URL already exists: {payload.feed_url}") + return + + # Generate new feed ID and stable ID + feed_id = str(uuid.uuid4()) + stable_id = f"{payload.source}-{payload.external_id}" + + logger.debug(f"Generated new feed_id: " + f"{feed_id} and stable_id: {stable_id}") + + try: + # Insert new feed + feed_query = text( + """ + INSERT INTO public.feed ( + id, + data_type, + feed_name, + producer_url, + authentication_type, + authentication_info_url, + api_key_parameter_name, + stable_id, + status, + feed_contact_email, + provider, + created_at + ) VALUES ( + :feed_id, + :data_type, + :feed_name, + :producer_url, + CASE + WHEN :auth_type IS NOT NULL THEN + cast(:auth_type as authenticationtype) + ELSE '0'::authenticationtype + END, + :auth_info_url, + :api_key_parameter_name, + :stable_id, + 'active'::status, + NULL, + :provider, + CURRENT_TIMESTAMP + ) + """ + ) + + self.session.execute( + feed_query, + { + "feed_id": feed_id, + "data_type": payload.spec, + "feed_name": f"Feed from {payload.operator_name}" + if payload.operator_name + else "Unnamed Feed", + "producer_url": payload.feed_url, + "auth_type": payload.type, + "auth_info_url": payload.auth_info_url, + "api_key_parameter_name": payload.auth_param_name, + "stable_id": stable_id, + "provider": payload.operator_name, + }, + ) + + logger.debug( + f"Successfully inserted new feed record for feed_id: {feed_id}" + ) + + # Create external ID mapping + external_id_query = text( + """ + INSERT INTO public.externalid (feed_id, associated_id, source) + VALUES (:feed_id, :external_id, :source) + """ + ) + + self.session.execute( + external_id_query, + { + "feed_id": feed_id, + "external_id": payload.external_id, + "source": payload.source, + }, + ) + + logger.debug( + f"Successfully created external ID " + f"mapping for feed_id: {feed_id}" + ) + logger.info( + f"Created new feed with ID: {feed_id} for " + f"external_id: {payload.external_id}" + ) + + except Exception as e: + logger.error( + f"Error creating new feed for " + f"external_id {payload.external_id}: {str(e)}" + ) + raise + + def process_feed_update(self, payload: FeedPayload, old_feed_id: str) \ + -> None: + """ + Process feed update when URL has changed + + Args: + payload (FeedPayload): The feed payload for update + old_feed_id (str): The ID of the existing feed to be updated + """ + logger.info( + f"Starting feed update process for " + f"external_id: {payload.external_id}" + ) + logger.debug(f"Old feed_id: {old_feed_id}, " + f"New URL: {payload.feed_url}") + + try: + # Create new feed with updated URL + new_feed_id = str(uuid.uuid4()) + stable_id = f"{payload.source}-{payload.external_id}" + + logger.debug(f"Generated new feed_id: {new_feed_id} for update") + + # Insert new feed + new_feed_query = text( + """ + INSERT INTO public.feed ( + id, data_type, + feed_name, + producer_url, + authentication_type, + authentication_info_url, + api_key_parameter_name, + stable_id, + status, + feed_contact_email, + provider, + created_at + ) VALUES ( + feed_id, + :data_type, + :feed_name, + :producer_url, + CASE + WHEN :auth_type IS NOT NULL THEN + cast(:auth_type as authenticationtype) + ELSE '0'::authenticationtype + END, + :auth_info_url, + :api_key_parameter_name, + :stable_id, + 'active'::status, + NULL, + :provider, + CURRENT_TIMESTAMP + ) + """ + ) + + self.session.execute( + new_feed_query, + { + "feed_id": new_feed_id, + "data_type": payload.spec, + "feed_name": f"Feed from {payload.operator_name}" + if payload.operator_name + else "Unnamed Feed", + "producer_url": payload.feed_url, + "auth_type": payload.type, + "auth_info_url": payload.auth_info_url, + "api_key_parameter_name": payload.auth_param_name, + "stable_id": stable_id, + "provider": payload.operator_name, + }, + ) + + logger.debug( + f"Successfully inserted new feed " + f"record for feed_id: {new_feed_id}" + ) + + # Update old feed status to deprecated + logger.debug(f"Deprecating old feed ID: {old_feed_id}") + deprecate_query = text( + """ + UPDATE public.feed + SET status = 'deprecated'::status + WHERE id = :old_feed_id + """ + ) + self.session.execute(deprecate_query, {"old_feed_id": old_feed_id}) + + # Update external ID mapping + logger.debug(f"Updating external ID mapping " + f"to new feed_id: {new_feed_id}") + update_external_id_query = text( + """ + UPDATE public.externalid + SET feed_id = :new_feed_id + WHERE associated_id = :external_id AND source = :source + """ + ) + self.session.execute( + update_external_id_query, + { + "new_feed_id": new_feed_id, + "external_id": payload.external_id, + "source": payload.source, + }, + ) + + # Add entry to redirecting ID table + logger.debug(f"Creating redirect from " + f"{old_feed_id} to {new_feed_id}") + redirect_query = text( + """ + INSERT INTO public.redirectingid (source_id, target_id) + VALUES (:old_feed_id, :new_feed_id) + """ + ) + self.session.execute( + redirect_query, {"old_feed_id": old_feed_id, + "new_feed_id": new_feed_id} + ) + + logger.info( + f"Updated feed for external_id: {payload.external_id}, " + f"new feed_id: {new_feed_id}" + ) + + except Exception as e: + logger.error( + f"Error updating feed for " + f"external_id {payload.external_id}: {str(e)}" + ) + raise + + def check_feed_url_exists(self, feed_url: str) -> bool: + """ + Check if a feed with the given URL already exists + + Args: + feed_url (str): The URL to check + + Returns: + bool: True if URL exists, False otherwise + """ + query = text( + """ + SELECT 1 FROM public.feed + WHERE producer_url = :feed_url AND status = 'active'::status + LIMIT 1 + """ + ) + result = self.session.execute(query, {"feed_url": feed_url}).fetchone() + + if result is not None: + logger.debug(f"Found existing feed with URL: {feed_url}") + return True + + logger.debug(f"No existing feed found with URL: {feed_url}") + return False + + def get_current_feed_info( + self, external_id: str, source: str + ) -> Tuple[Optional[str], Optional[str]]: + """ + Get current feed ID and URL for given external ID + + Args: + external_id (str): The external ID to look up + source (str): The source of the feed + + Returns: + Tuple[Optional[str], Optional[str]]: Tuple of (feed_id, feed_url) + """ + query = text( + """ + SELECT f.id, f.producer_url + FROM public.feed f + JOIN public.externalid e ON f.id = e.feed_id + WHERE e.associated_id = :external_id + AND e.source = :source + AND f.status = 'active'::status + LIMIT 1 + """ + ) + result = self.session.execute( + query, {"external_id": external_id, "source": source} + ).fetchone() + if result: + logger.debug(f"Retrieved current feed " + f"info for external_id: {external_id}") + return result[0], result[1] + + logger.debug(f"No existing feed found for external_id: {external_id}") + return None, None + + def publish_to_batch_topic(self, payload: FeedPayload) -> None: + """ + Publish feed to dataset batch topic + + Args: + payload (FeedPayload): The feed payload to publish + """ + topic_path = self.publisher.topic_path(PROJECT_ID, DATASET_BATCH_TOPIC) + logger.debug(f"Publishing to topic: {topic_path}") + + data = json.dumps( + {"feed_id": payload.feed_id, "execution_id": payload.execution_id} + ).encode("utf-8") + + try: + logger.debug(f"Preparing to publish feed_id: {payload.feed_id}") + future = self.publisher.publish(topic_path, data=data) + future.result() + logger.info(f"Published feed {payload.feed_id} " + f"to dataset batch topic") + except Exception as e: + error_msg = f"Error publishing to dataset batch topic: {str(e)}" + logger.error(error_msg) + raise Exception(error_msg) + + +@functions_framework.cloud_event +def process_feed_event(cloud_event): + """ + Cloud Function to process feed events from Pub/Sub + + Args: + cloud_event (CloudEvent): The cloud event + containing the Pub/Sub message + """ + try: + # Decode payload from Pub/Sub message + pubsub_message = ( + base64.b64decode(cloud_event.data["message"]["data"]).decode()) + message_data = json.loads(pubsub_message) + + payload = FeedPayload(**message_data) + + db_session = start_db_session(FEEDS_DATABASE_URL) + + try: + + processor = FeedProcessor(db_session) + processor.process_feed(payload) + + logger.info(f"Successfully processed feed: {payload.external_id}") + return "Success", 200 + + finally: + close_db_session(db_session) + + except Exception as e: + error_msg = f"Error processing feed event: {str(e)}" + logger.error(error_msg) + return error_msg, 500 diff --git a/functions-python/feed_sync_process_transitland/tests/test_feed_sync_process.py b/functions-python/feed_sync_process_transitland/tests/test_feed_sync_process.py new file mode 100644 index 000000000..bb58e6d14 --- /dev/null +++ b/functions-python/feed_sync_process_transitland/tests/test_feed_sync_process.py @@ -0,0 +1,338 @@ +import base64 +import json +import uuid +from unittest.mock import Mock, patch, call +import pytest +from google.cloud import pubsub_v1 +from sqlalchemy.orm import Session as DBSession + +from feed_sync_process_transitland.src.main import ( + FeedProcessor, + FeedPayload, + process_feed_event, +) + + +@pytest.fixture(autouse=True) +def mock_logger(): + with patch("feed_sync_process_transitland.src.main.logger") as mock_log: + mock_log.info = Mock() + mock_log.error = Mock() + mock_log.warning = Mock() + mock_log.debug = Mock() + yield mock_log + + +@pytest.fixture +def feed_payload(): + """Fixture for a standard feed payload""" + return FeedPayload( + external_id="onestop1", + feed_id="feed1", + feed_url="http://example.com/feed1", + execution_id="exec123", + spec="gtfs", + auth_info_url=None, + auth_param_name=None, + type=None, + operator_name="Test Operator", + country="USA", + state_province="CA", + city_name="Test City", + source="TLD", + payload_type="new", + ) + + +@pytest.fixture +def db_session(): + """Fixture for database session""" + return Mock(spec=DBSession) + + +@pytest.fixture +def processor(db_session): + """Fixture for FeedProcessor with mocked dependencies""" + return FeedProcessor(db_session) + + +def test_get_current_feed_info(processor, db_session, mock_logger): + """Test retrieving current feed information""" + # Test when feed exists + db_session.execute.return_value.fetchone.return_value = ( + "feed-uuid", + "http://example.com/feed", + ) + feed_id, url = processor.get_current_feed_info("onestop1", "TLD") + assert feed_id == "feed-uuid" + assert url == "http://example.com/feed" + mock_logger.debug.assert_called_with( + "Retrieved current feed info for external_id: onestop1" + ) + + # Test when feed doesn't exist + db_session.execute.return_value.fetchone.return_value = None + feed_id, url = processor.get_current_feed_info("onestop2", "TLD") + assert feed_id is None + assert url is None + mock_logger.debug.assert_called_with( + "No existing feed found for external_id: onestop2" + ) + + +def test_check_feed_url_exists(processor, db_session, mock_logger): + """Test checking if feed URL already exists""" + test_url = "http://example.com/feed" + + # Test URL exists + db_session.execute.return_value.fetchone.return_value = (1,) + assert processor.check_feed_url_exists(test_url) is True + mock_logger.debug.assert_called_with(f"Found existing feed with URL: {test_url}") + + # Test URL doesn't exist + db_session.execute.return_value.fetchone.return_value = None + assert processor.check_feed_url_exists(test_url) is False + mock_logger.debug.assert_called_with(f"No existing feed found with URL: {test_url}") + + +def test_process_new_feed(processor, db_session, feed_payload, mock_logger): + """Test processing new feed creation""" + feed_id = "12345678-1234-5678-1234-567812345678" + + # We need to patch both uuid and check_feed_url_exists + with patch("uuid.uuid4", return_value=uuid.UUID(feed_id)), patch.object( + processor, "check_feed_url_exists", return_value=False + ): + processor.process_new_feed(feed_payload) + + # Verify database operations + assert db_session.execute.call_count == 2 + + # Verify the specific calls + execute_calls = db_session.execute.call_args_list + assert len(execute_calls) == 2 + + # First call should be inserting into feed table + feed_query = execute_calls[0][0][0].text + assert "INSERT INTO public.feed" in feed_query + + # Second call should be inserting into externalid table + external_id_query = execute_calls[1][0][0].text + assert "INSERT INTO public.externalid" in external_id_query + + # Verify logging sequence + expected_logs = [ + # Initial log + call( + f"Starting new feed creation for external_id: {feed_payload.external_id}" + ), + # Final success log + call( + f"Created new feed with ID: {feed_id} for external_id: {feed_payload.external_id}" + ), + ] + + expected_debug_logs = [ + call( + f"Generated new feed_id: {feed_id} and stable_id: {feed_payload.source}-{feed_payload.external_id}" + ), + call(f"Successfully inserted new feed record for feed_id: {feed_id}"), + call(f"Successfully created external ID mapping for feed_id: {feed_id}"), + ] + + # Check that all expected logs were made + mock_logger.info.assert_has_calls(expected_logs, any_order=False) + mock_logger.debug.assert_has_calls(expected_debug_logs, any_order=False) + + +def test_process_feed_update(processor, db_session, feed_payload, mock_logger): + """Test processing feed updates""" + old_feed_id = "old-uuid" + new_feed_id = "12345678-1234-5678-1234-567812345678" + + with patch("uuid.uuid4", return_value=uuid.UUID(new_feed_id)): + processor.process_feed_update(feed_payload, old_feed_id) + + # Verify logging + mock_logger.info.assert_called_with( + f"Updated feed for external_id: {feed_payload.external_id}, new feed_id: {new_feed_id}" + ) + mock_logger.debug.assert_any_call(f"Deprecating old feed ID: {old_feed_id}") + + +def test_process_feed_full(processor, db_session, feed_payload, mock_logger): + """Test full feed processing flow""" + # Setup mocks for new feed scenario + processor.get_current_feed_info = Mock(return_value=(None, None)) + processor.check_feed_url_exists = Mock(return_value=False) + processor.process_new_feed = Mock() + processor.publish_to_batch_topic = Mock() + + processor.process_feed(feed_payload) + + processor.get_current_feed_info.assert_called_once_with("onestop1", "TLD") + processor.check_feed_url_exists.assert_called_once_with("http://example.com/feed1") + processor.process_new_feed.assert_called_once_with(feed_payload) + processor.publish_to_batch_topic.assert_called_once_with(feed_payload) + db_session.commit.assert_called_once() + + # Verify logging + mock_logger.info.assert_called_with("Processing new feed") + + +def test_process_feed_update_full(processor, db_session, feed_payload, mock_logger): + """Test processing a feed update""" + # Setup mocks for update scenario + processor.get_current_feed_info = Mock( + return_value=("old-uuid", "http://example.com/old") + ) + processor.process_feed_update = Mock() + processor.publish_to_batch_topic = Mock() + + processor.process_feed(feed_payload) + + mock_logger.info.assert_called_with("Processing feed update") + mock_logger.debug.assert_any_call( + "Found existing feed: old-uuid with different URL" + ) + + +def test_process_feed_no_change(processor, db_session, feed_payload, mock_logger): + """Test processing a feed with no changes needed""" + processor.get_current_feed_info = Mock( + return_value=("existing-uuid", "http://example.com/feed1") + ) + processor.process_feed_update = Mock() + processor.publish_to_batch_topic = Mock() + + processor.process_feed(feed_payload) + + processor.process_feed_update.assert_not_called() + processor.publish_to_batch_topic.assert_not_called() + db_session.commit.assert_not_called() + + # Verify logging + mock_logger.info.assert_called_with( + f"Feed already exists with same URL: {feed_payload.external_id}" + ) + + +def test_process_feed_error_handling(processor, db_session, feed_payload, mock_logger): + """Test error handling during feed processing""" + # Setup mock to raise an exception + error_msg = "Database error" + processor.get_current_feed_info = Mock(side_effect=Exception(error_msg)) + + # Process feed and verify error handling + with pytest.raises(Exception): + processor.process_feed(feed_payload) + + db_session.rollback.assert_called_once() + mock_logger.error.assert_called_with( + f"Error processing feed {feed_payload.external_id}: {error_msg}" + ) + + +def test_publish_to_batch_topic(processor, feed_payload, mock_logger): + """Test publishing to batch topic""" + mock_publisher = Mock(spec=pubsub_v1.PublisherClient) + processor.publisher = mock_publisher + mock_future = Mock() + mock_publisher.publish.return_value = mock_future + + test_topic = "test_topic" + mock_publisher.topic_path.return_value = test_topic + + processor.publish_to_batch_topic(feed_payload) + + # Verify logging sequence + mock_logger.info.assert_called_with( + f"Published feed {feed_payload.feed_id} to dataset batch topic" + ) + + +def test_publish_to_batch_topic_error(processor, feed_payload, mock_logger): + """Test error handling in batch topic publishing""" + mock_publisher = Mock(spec=pubsub_v1.PublisherClient) + processor.publisher = mock_publisher + error_msg = "Publishing error" + mock_publisher.publish.side_effect = Exception(error_msg) + + with pytest.raises(Exception) as exc_info: + processor.publish_to_batch_topic(feed_payload) + + assert ( + str(exc_info.value) == f"Error publishing to dataset batch topic: {error_msg}" + ) + mock_logger.error.assert_called_with( + f"Error publishing to dataset batch topic: {error_msg}" + ) + + +def test_process_feed_event_success(mock_logger): + """Test successful Cloud Function execution""" + # Mock necessary dependencies + with patch( + "feed_sync_process_transitland.src.main.start_db_session" + ) as mock_start_db, patch( + "feed_sync_process_transitland.src.main.close_db_session" + ), patch( + "feed_sync_process_transitland.src.main.FeedProcessor" + ): + # Setup mock database session + mock_db_session = Mock(spec=DBSession) + mock_start_db.return_value = mock_db_session + + # Create mock cloud event with valid payload + test_payload = { + "external_id": "test1", + "feed_id": "feed1", + "feed_url": "http://example.com/feed1", + "execution_id": "exec123", + "spec": "gtfs", + "auth_info_url": None, + "auth_param_name": None, + "type": None, + "operator_name": "Test Operator", + "country": "USA", + "state_province": "CA", + "city_name": "Test City", + "source": "TLD", + "payload_type": "new", + } + feed_payload = FeedPayload(**test_payload) + cloud_event = Mock() + cloud_event.data = { + "message": { + "data": base64.b64encode(json.dumps(feed_payload.__dict__).encode()) + } + } + + # Process event + result = process_feed_event(cloud_event) + + # Verify result and logging + assert result == ("Success", 200) + mock_logger.info.assert_called_with( + f"Successfully processed feed: {feed_payload.external_id}" + ) + + +def test_process_feed_event_error(mock_logger): + """Test error handling in Cloud Function entry point""" + with patch("feed_sync_process_transitland.src.main.start_db_session"): + cloud_event = Mock() + cloud_event.data = { + "message": {"data": base64.b64encode("invalid json".encode("utf-8"))} + } + + # Process event + result = process_feed_event(cloud_event) + + # Verify error handling + assert result[1] == 500 + assert "Error processing feed event" in result[0] + + assert mock_logger.error.called + error_msg = mock_logger.error.call_args[0][0] + assert "Error processing feed event" in error_msg diff --git a/functions-python/helpers/feed_sync/feed_sync_common.py b/functions-python/helpers/feed_sync/feed_sync_common.py new file mode 100644 index 000000000..a738f3b55 --- /dev/null +++ b/functions-python/helpers/feed_sync/feed_sync_common.py @@ -0,0 +1,59 @@ +# +# MobilityData 2024 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from dataclasses import dataclass +from typing import Any + +from google.cloud.pubsub_v1.publisher.futures import Future +from sqlalchemy.orm import Session + + +@dataclass +class FeedSyncPayload: + """ + Data class for feed sync payloads. + """ + + external_id: str + payload: Any + + +class FeedSyncProcessor: + """ + Abstract class for feed sync processors + """ + + def process_sync( + self, session: Session, execution_id: str + ) -> list[FeedSyncPayload]: + """ + Abstract method to process feed sync. + :param session: database session + :param execution_id: execution ID. This ID is used for logging and debugging purposes. + :return: list of FeedSyncPayload + """ + pass + + def publish_callback( + self, future: Future, payload: FeedSyncPayload, topic_path: str + ): + """ + Abstract method for publishing callback. + :param future: Future object + :param payload: FeedSyncPayload object + :param topic_path: Pub/Sub topic path + """ + pass diff --git a/functions-python/helpers/feed_sync/feed_sync_dispatcher.py b/functions-python/helpers/feed_sync/feed_sync_dispatcher.py new file mode 100644 index 000000000..bb296968b --- /dev/null +++ b/functions-python/helpers/feed_sync/feed_sync_dispatcher.py @@ -0,0 +1,60 @@ +# +# MobilityData 2024 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import json +import os +import logging + +from helpers.database import start_db_session, close_db_session +from helpers.feed_sync.feed_sync_common import FeedSyncProcessor +from helpers.pub_sub import get_pubsub_client, publish + + +def feed_sync_dispatcher( + feed_sync_processor: FeedSyncProcessor, pubsub_topic_path: str, execution_id: str +): + """ + HTTP Function to process APIs feed syncs and publishes events to a Pub/Sub topic to be processed. + :param pubsub_topic_path: name of the Pub/Sub topic to publish to + :param execution_id: execution ID + :param feed_sync_processor: FeedSync object + :return: HTTP response object + """ + publisher = get_pubsub_client() + try: + session = start_db_session(os.getenv("FEEDS_DATABASE_URL")) + payloads = feed_sync_processor.process_sync(session, execution_id) + except Exception as error: + logging.error(f"Error processing feeds sync: {error}") + raise Exception(f"Error processing feeds sync: {error}") + finally: + close_db_session(session) + + logging.info(f"Total feeds to add/update: {len(payloads)}.") + + for payload in payloads: + data_str = json.dumps(payload.payload.__dict__) + print(f"Publishing {data_str} to {pubsub_topic_path}.") + future = publish(publisher, pubsub_topic_path, data_str.encode("utf-8")) + future.add_done_callback( + lambda _: feed_sync_processor.publish_callback( + future, payload, pubsub_topic_path + ) + ) + + logging.info( + f"Publish completed. Published {len(payloads)} feeds to {pubsub_topic_path}." + ) diff --git a/functions-python/helpers/pub_sub.py b/functions-python/helpers/pub_sub.py new file mode 100644 index 000000000..76184b947 --- /dev/null +++ b/functions-python/helpers/pub_sub.py @@ -0,0 +1,45 @@ +# +# MobilityData 2024 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import uuid + +from google.cloud import pubsub_v1 +from google.cloud.pubsub_v1 import PublisherClient +from google.cloud.pubsub_v1.publisher.futures import Future + + +def get_pubsub_client(): + """ + Returns a Pub/Sub client. + """ + return pubsub_v1.PublisherClient() + + +def publish(publisher: PublisherClient, topic_path: str, data_bytes: bytes) -> Future: + """ + Publishes the given data to the Pub/Sub topic. + """ + return publisher.publish(topic_path, data=data_bytes) + + +def get_execution_id(request, prefix: str) -> str: + """ + Returns the execution ID for the request if available, otherwise generates a new one. + @param request: HTTP request object + @param prefix: prefix for the execution ID. Example: "batch-datasets" + """ + trace_id = request.headers.get("X-Cloud-Trace-Context") + execution_id = f"{prefix}-{trace_id}" if trace_id else f"{prefix}-{uuid.uuid4()}" + return execution_id diff --git a/functions-python/helpers/requirements.txt b/functions-python/helpers/requirements.txt index 0c58ed566..ae500c0b2 100644 --- a/functions-python/helpers/requirements.txt +++ b/functions-python/helpers/requirements.txt @@ -1,18 +1,25 @@ +# Common packages functions-framework==3.* -google-cloud-storage -google-cloud-pubsub google-cloud-logging -google-cloud-bigquery -google-api-core -google-cloud-firestore -google-cloud-datastore -google-cloud-bigquery psycopg2-binary==2.9.6 -aiohttp -asyncio -urllib3~=2.1.0 +aiohttp~=3.10.5 +asyncio~=3.4.3 +urllib3~=2.2.2 +requests~=2.32.3 +attrs~=23.1.0 +pluggy~=1.3.0 +certifi~=2024.7.4 + +# SQL Alchemy and Geo Alchemy SQLAlchemy==2.0.23 geoalchemy2==0.14.7 -requests~=2.31.0 + +# Google specific packages for this function +google-cloud-pubsub +google-cloud-storage +google-cloud-datastore cloudevents~=1.10.1 -requests_mock \ No newline at end of file +google-cloud-bigquery +google-api-core +google-cloud-firestore +google-cloud-bigquery \ No newline at end of file diff --git a/scripts/pubsub_message_print.sh b/scripts/pubsub_message_print.sh new file mode 100755 index 000000000..23196d7ca --- /dev/null +++ b/scripts/pubsub_message_print.sh @@ -0,0 +1,126 @@ +#!/bin/bash + +# +# MobilityData 2023 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# + + +# This function uses the Pub/Sub emulator to create a subscription and count the number of messages in a topic. +# If you are debbuging locally with a consumer of the topic, +# you should not be running this script if the consumer shares the same subscription name. +# As this script and any consumer sharing the same subscription will be competing for the messages in the topic. +# If consumers have different subscription names, they will each receive a copy of the messages. +# +# Requires the jq command-line JSON processor: https://stedolan.github.io/jq/ +# +# Usage: ./pubsub_message_print.sh + + + +export PUBSUB_EMULATOR_HOST="localhost:8043" + + +PROJECT="test-project" +SUBSCRIPTION_NAME="my-subscription" + + +TOPIC_NAME="$1" + + +if [ -z "$TOPIC_NAME" ]; then + echo "Error: No topic name provided." + echo "Usage: ./pubsub_message_count.sh " + exit 1 +fi + + +create_subscription() { + echo "Creating subscription: $SUBSCRIPTION_NAME" + SUBSCRIPTION_URL="http://$PUBSUB_EMULATOR_HOST/v1/projects/$PROJECT/subscriptions/$SUBSCRIPTION_NAME" + TOPIC_URL="projects/$PROJECT/topics/$TOPIC_NAME" + + BODY=$(cat <