Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion .github/workflows/api-deployer.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ on:
OAUTH2_CLIENT_SECRET:
description: Oauth client secret part of the the Identity Aware Proxy configuration
required: true
OP_SERVICE_ACCOUNT_TOKEN:
description: 1Password service account token
required: true
inputs:
ENVIRONMENT:
description: API environment. Possible values prod, staging and dev
Expand Down Expand Up @@ -280,10 +283,18 @@ jobs:
echo "GLOBAL_RATE_LIMIT_REQ_PER_MINUTE=${{ inputs.GLOBAL_RATE_LIMIT_REQ_PER_MINUTE }}" >> $GITHUB_ENV
echo "VALIDATOR_ENDPOINT=${{ inputs.VALIDATOR_ENDPOINT }}" >> $GITHUB_ENV

- name: Load secret from 1Password
uses: 1password/load-secrets-action@v2
with:
export-env: true
env:
OP_SERVICE_ACCOUNT_TOKEN: ${{ secrets.OP_SERVICE_ACCOUNT_TOKEN }}
TRANSITLAND_API_KEY: "op://rbiv7rvkkrsdlpcrz3bmv7nmcu/TansitLand API Key/credential"

- name: Populate Variables
run: |
scripts/replace-variables.sh -in_file infra/backend.conf.rename_me -out_file infra/backend.conf -variables BUCKET_NAME,OBJECT_PREFIX
scripts/replace-variables.sh -in_file infra/vars.tfvars.rename_me -out_file infra/vars.tfvars -variables PROJECT_ID,REGION,ENVIRONMENT,DEPLOYER_SERVICE_ACCOUNT,FEED_API_IMAGE_VERSION,OAUTH2_CLIENT_ID,OAUTH2_CLIENT_SECRET,GLOBAL_RATE_LIMIT_REQ_PER_MINUTE,ARTIFACT_REPO_NAME,VALIDATOR_ENDPOINT
scripts/replace-variables.sh -in_file infra/vars.tfvars.rename_me -out_file infra/vars.tfvars -variables PROJECT_ID,REGION,ENVIRONMENT,DEPLOYER_SERVICE_ACCOUNT,FEED_API_IMAGE_VERSION,OAUTH2_CLIENT_ID,OAUTH2_CLIENT_SECRET,GLOBAL_RATE_LIMIT_REQ_PER_MINUTE,ARTIFACT_REPO_NAME,VALIDATOR_ENDPOINT,TRANSITLAND_API_KEY

- uses: hashicorp/setup-terraform@v3
with:
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/api-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ jobs:
GCP_MOBILITY_FEEDS_SA_KEY: ${{ secrets.DEV_GCP_MOBILITY_FEEDS_SA_KEY }}
OAUTH2_CLIENT_ID: ${{ secrets.DEV_MOBILITY_FEEDS_OAUTH2_CLIENT_ID}}
OAUTH2_CLIENT_SECRET: ${{ secrets.DEV_MOBILITY_FEEDS_OAUTH2_CLIENT_SECRET}}
OP_SERVICE_ACCOUNT_TOKEN: ${{ secrets.OP_SERVICE_ACCOUNT_TOKEN }}

integration-tests:
if: ${{ github.event.inputs.run_integration_tests == 'true' }}
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/api-prod.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@ jobs:
GCP_MOBILITY_FEEDS_SA_KEY: ${{ secrets.PROD_GCP_MOBILITY_FEEDS_SA_KEY }}
OAUTH2_CLIENT_ID: ${{ secrets.PROD_MOBILITY_FEEDS_OAUTH2_CLIENT_ID}}
OAUTH2_CLIENT_SECRET: ${{ secrets.PROD_MOBILITY_FEEDS_OAUTH2_CLIENT_SECRET}}
OP_SERVICE_ACCOUNT_TOKEN: ${{ secrets.OP_SERVICE_ACCOUNT_TOKEN }}
2 changes: 1 addition & 1 deletion .github/workflows/api-qa.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@ jobs:
GCP_MOBILITY_FEEDS_SA_KEY: ${{ secrets.QA_GCP_MOBILITY_FEEDS_SA_KEY }}
OAUTH2_CLIENT_ID: ${{ secrets.DEV_MOBILITY_FEEDS_OAUTH2_CLIENT_ID}}
OAUTH2_CLIENT_SECRET: ${{ secrets.DEV_MOBILITY_FEEDS_OAUTH2_CLIENT_SECRET}}

OP_SERVICE_ACCOUNT_TOKEN: ${{ secrets.OP_SERVICE_ACCOUNT_TOKEN }}
1 change: 1 addition & 0 deletions functions-python/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ The function configuration file contains the following properties:
- `max_instance_count`: The maximum number of function instances that can be created in response to a load.
- `min_instance_count`: The minimum number of function instances that can be created in response to a load.
- `available_cpu_count`: The number of CPU cores that are available to the function.
- `available_memory`: The amount of memory available to the function.

# Local Setup

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"name": "feed-sync-dispatcher-transitland",
"description": "Feed Sync Dispatcher for Transitland",
"entry_point": "feed_sync_dispatcher_transitland",
"timeout": 540,
"timeout": 3600,
"memory": "512Mi",
"trigger_http": true,
"include_folders": ["database_gen", "helpers"],
Expand All @@ -11,9 +11,10 @@
"key": "FEEDS_DATABASE_URL"
}
],
"ingress_settings": "ALLOW_INTERNAL_AND_GCLB",
"max_instance_request_concurrency": 20,
"max_instance_count": 10,
"ingress_settings": "ALLOW_ALL",
"max_instance_request_concurrency": 1,
"max_instance_count": 1,
"min_instance_count": 0,
"available_cpu": 1
"available_cpu": 1,
"available_memory": "512Mi"
}
99 changes: 63 additions & 36 deletions functions-python/feed_sync_dispatcher_transitland/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,35 +15,34 @@
#

import json
import os
import logging
import time
import os
import random
import time
from dataclasses import dataclass, asdict
from typing import Optional, List
import requests
from requests.exceptions import RequestException, HTTPError
import pandas as pd

import functions_framework
import pandas as pd
import requests
from google.cloud.pubsub_v1.futures import Future
from requests.exceptions import RequestException, HTTPError
from sqlalchemy.orm import Session
from sqlalchemy import text

from database_gen.sqlacodegen_models import Gtfsfeed
from helpers.feed_sync.feed_sync_common import FeedSyncProcessor, FeedSyncPayload
from helpers.feed_sync.feed_sync_dispatcher import feed_sync_dispatcher
from helpers.logger import Logger
from helpers.pub_sub import get_pubsub_client, get_execution_id

# Logging configuration
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
logging.basicConfig(level=logging.INFO)

# Environment variables
PUBSUB_TOPIC_NAME = os.getenv("PUBSUB_TOPIC_NAME")
PROJECT_ID = os.getenv("PROJECT_ID")
FEEDS_DATABASE_URL = os.getenv("FEEDS_DATABASE_URL")
apikey = os.getenv("TRANSITLAND_API_KEY")
TRANSITLAND_API_KEY = os.getenv("TRANSITLAND_API_KEY")
TRANSITLAND_OPERATOR_URL = os.getenv("TRANSITLAND_OPERATOR_URL")
TRANSITLAND_FEED_URL = os.getenv("TRANSITLAND_FEED_URL")
spec = ["gtfs", "gtfs-rt"]
Expand Down Expand Up @@ -83,11 +82,16 @@ def to_json(self):
class TransitFeedSyncProcessor(FeedSyncProcessor):
def check_url_status(self, url: str) -> bool:
"""
Checks if a URL returns a valid response (not 404 or 500).
Checks if a URL returns a valid response status code.
"""
try:
logging.info(f"Checking URL: {url}")
if url is None or len(url) == 0:
logging.warning("URL is empty. Skipping check.")
return False
response = requests.head(url, timeout=25)
return response.status_code not in {404, 500}
logging.info(f"URL status code: {response.status_code}")
return response.status_code < 400
except requests.RequestException as e:
logging.warning(f"Failed to reach {url}: {e}")
return False
Expand All @@ -99,9 +103,17 @@ def process_sync(
Process data synchronously to fetch, extract, combine, filter and prepare payloads for publishing
to a queue based on conditions related to the data retrieved from TransitLand API.
"""
feeds_data = self.get_data(TRANSITLAND_FEED_URL, apikey, spec, session)
feeds_data = self.get_data(
TRANSITLAND_FEED_URL, TRANSITLAND_API_KEY, spec, session
)
logging.info("Fetched %s feeds from TransitLand API", len(feeds_data["feeds"]))

operators_data = self.get_data(
TRANSITLAND_OPERATOR_URL, apikey, session=session
TRANSITLAND_OPERATOR_URL, TRANSITLAND_API_KEY, session=session
)
logging.info(
"Fetched %s operators from TransitLand API",
len(operators_data["operators"]),
)

feeds = self.extract_feeds_data(feeds_data)
Expand Down Expand Up @@ -151,12 +163,25 @@ def process_sync(
.str.lower()
.isin([c.lower() for c in countries_not_included])
]
logging.info(
"Filtered out %s feeds from countries: %s",
len(df_grouped) - len(filtered_df),
countries_not_included,
)

# Filtered out URLs that return undesired status codes
filtered_df = filtered_df.drop_duplicates(
subset=["feed_url"]
) # Drop duplicates
filtered_df = filtered_df[filtered_df["feed_url"].apply(self.check_url_status)]
logging.info(
"Filtered out %s feeds with invalid URLs",
len(df_grouped) - len(filtered_df),
)

# Convert filtered DataFrame to dictionary format
combined_data = filtered_df.to_dict(orient="records")
logging.info("Prepared %s feeds for publishing", len(combined_data))

payloads = []
for data in combined_data:
Expand Down Expand Up @@ -197,7 +222,7 @@ def process_sync(
def get_data(
self,
url,
apikey,
api_key,
spec=None,
session=None,
max_retries=3,
Expand All @@ -209,11 +234,13 @@ def get_data(
Handles rate limits, retries, and error cases.
Returns the parsed data as a dictionary containing feeds and operators.
"""
headers = {"apikey": apikey}
headers = {"apikey": api_key}
params = {"spec": spec} if spec else {}
all_data = {"feeds": [], "operators": []}
delay = initial_delay
response = None

logging.info("Fetching data from %s", url)
while url:
for attempt in range(max_retries):
try:
Expand All @@ -225,12 +252,17 @@ def get_data(
all_data["feeds"].extend(data.get("feeds", []))
all_data["operators"].extend(data.get("operators", []))
url = data.get("meta", {}).get("next")
logging.info(
"Fetched %s feeds and %s operators",
len(all_data["feeds"]),
len(all_data["operators"]),
)
logging.info("Next URL: %s", url)
delay = initial_delay
break

except (RequestException, HTTPError) as e:
logging.error("Attempt %s failed: %s", attempt + 1, e)
if response.status_code == 429:
if response is not None and response.status_code == 429:
logging.warning("Rate limit hit. Waiting for %s seconds", delay)
time.sleep(delay + random.uniform(0, 1))
delay = min(delay * 2, max_delay)
Expand All @@ -240,7 +272,9 @@ def get_data(
)
return all_data
else:
logging.info("Retrying in %s seconds", delay)
time.sleep(delay)
logging.info("Finished fetching data.")
return all_data

def extract_feeds_data(self, feeds_data: dict) -> List[dict]:
Expand Down Expand Up @@ -297,13 +331,12 @@ def check_external_id(
:param source: The source to filter by (e.g., 'TLD' for TransitLand)
:return: True if the feed exists, False otherwise
"""
query = text(
"SELECT 1 FROM public.externalid WHERE associated_id = :external_id AND source = :source LIMIT 1"
results = (
db_session.query(Gtfsfeed)
.filter(Gtfsfeed.externalids.any(associated_id=external_id))
.all()
)
result = db_session.execute(
query, {"external_id": external_id, "source": source}
).fetchone()
return result is not None
return results is not None and len(results) > 0

def get_mbd_feed_url(
self, db_session: Session, external_id: str, source: str
Expand All @@ -315,19 +348,12 @@ def get_mbd_feed_url(
:param source: The source to filter by (e.g., 'TLD' for TransitLand)
:return: feed_url in mbd if exists, otherwise None
"""
query = text(
"""
SELECT f.producer_url
FROM public.feed f
JOIN public.externalid e ON f.id = e.feed_id
WHERE e.associated_id = :external_id AND e.source = :source
LIMIT 1
"""
results = (
db_session.query(Gtfsfeed)
.filter(Gtfsfeed.externalids.any(associated_id=external_id))
.all()
)
result = db_session.execute(
query, {"external_id": external_id, "source": source}
).fetchone()
return result[0] if result else None
return results[0].producer_url if results else None

def publish_callback(
self, future: Future, payload: FeedSyncPayload, topic_path: str
Expand All @@ -350,6 +376,7 @@ def feed_sync_dispatcher_transitland(request):
"""
HTTP Function entry point queries the transitland API and publishes events to a Pub/Sub topic to be processed.
"""
Logger.init_logger()
publisher = get_pubsub_client()
topic_path = publisher.topic_path(PROJECT_ID, PUBSUB_TOPIC_NAME)
transit_land_feed_sync_processor = TransitFeedSyncProcessor()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
from unittest.mock import Mock, patch, call
from requests import Session as RequestsSession
from sqlalchemy.orm import Session as DBSession

from database_gen.sqlacodegen_models import Gtfsfeed
from feed_sync_dispatcher_transitland.src.main import (
TransitFeedSyncProcessor,
FeedSyncPayload,
Expand Down Expand Up @@ -90,24 +92,24 @@ def test_extract_operators_data(processor):

def test_check_external_id(processor):
mock_db_session = Mock(spec=DBSession)
mock_db_session.execute.return_value.fetchone.return_value = (1,)
mock_db_session.query.return_value.filter.return_value.all.return_value = (1,)
result = processor.check_external_id(mock_db_session, "onestop1", "TLD")
assert result is True

mock_db_session.execute.return_value.fetchone.return_value = None
mock_db_session.query.return_value.filter.return_value.all.return_value = None
result = processor.check_external_id(mock_db_session, "onestop2", "TLD")
assert result is False


def test_get_mbd_feed_url(processor):
mock_db_session = Mock(spec=DBSession)
mock_db_session.execute.return_value.fetchone.return_value = (
"http://example.com/feed1",
)
mock_db_session.query.return_value.filter.return_value.all.return_value = [
Gtfsfeed(producer_url="http://example.com/feed1")
]
result = processor.get_mbd_feed_url(mock_db_session, "onestop1", "TLD")
assert result == "http://example.com/feed1"

mock_db_session.execute.return_value.fetchone.return_value = None
mock_db_session.query.return_value.filter.return_value.all.return_value = None
result = processor.get_mbd_feed_url(mock_db_session, "onestop2", "TLD")
assert result is None

Expand Down Expand Up @@ -343,7 +345,7 @@ def test_get_data_retries(processor):
with patch("time.sleep", return_value=None) as mock_sleep:
result = processor.get_data(
url="http://example.com",
apikey="dummy_api_key",
api_key="dummy_api_key",
session=mock_session,
max_retries=3,
initial_delay=1,
Expand Down
4 changes: 2 additions & 2 deletions functions-python/helpers/feed_sync/feed_sync_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def feed_sync_dispatcher(
"""
publisher = get_pubsub_client()
try:
session = start_db_session(os.getenv("FEEDS_DATABASE_URL"))
session = start_db_session(os.getenv("FEEDS_DATABASE_URL"), echo=False)
payloads = feed_sync_processor.process_sync(session, execution_id)
except Exception as error:
logging.error(f"Error processing feeds sync: {error}")
Expand All @@ -47,7 +47,7 @@ def feed_sync_dispatcher(

for payload in payloads:
data_str = json.dumps(payload.payload.__dict__)
print(f"Publishing {data_str} to {pubsub_topic_path}.")
logging.info(f"Publishing {data_str} to {pubsub_topic_path}.")
future = publish(publisher, pubsub_topic_path, data_str.encode("utf-8"))
future.add_done_callback(
lambda _: feed_sync_processor.publish_callback(
Expand Down
Loading