1515#
1616
1717import json
18- import os
1918import logging
20- import time
19+ import os
2120import random
21+ import time
2222from dataclasses import dataclass , asdict
2323from typing import Optional , List
24- import requests
25- from requests .exceptions import RequestException , HTTPError
26- import pandas as pd
2724
2825import functions_framework
26+ import pandas as pd
27+ import requests
2928from google .cloud .pubsub_v1 .futures import Future
29+ from requests .exceptions import RequestException , HTTPError
3030from sqlalchemy .orm import Session
31- from sqlalchemy import text
3231
32+ from database_gen .sqlacodegen_models import Gtfsfeed
3333from helpers .feed_sync .feed_sync_common import FeedSyncProcessor , FeedSyncPayload
3434from helpers .feed_sync .feed_sync_dispatcher import feed_sync_dispatcher
35+ from helpers .logger import Logger
3536from helpers .pub_sub import get_pubsub_client , get_execution_id
3637
3738# Logging configuration
38- logging .basicConfig (
39- level = logging .INFO , format = "%(asctime)s - %(levelname)s - %(message)s"
40- )
39+ logging .basicConfig (level = logging .INFO )
4140
4241# Environment variables
4342PUBSUB_TOPIC_NAME = os .getenv ("PUBSUB_TOPIC_NAME" )
4443PROJECT_ID = os .getenv ("PROJECT_ID" )
4544FEEDS_DATABASE_URL = os .getenv ("FEEDS_DATABASE_URL" )
46- apikey = os .getenv ("TRANSITLAND_API_KEY" )
45+ TRANSITLAND_API_KEY = os .getenv ("TRANSITLAND_API_KEY" )
4746TRANSITLAND_OPERATOR_URL = os .getenv ("TRANSITLAND_OPERATOR_URL" )
4847TRANSITLAND_FEED_URL = os .getenv ("TRANSITLAND_FEED_URL" )
4948spec = ["gtfs" , "gtfs-rt" ]
@@ -83,11 +82,16 @@ def to_json(self):
8382class TransitFeedSyncProcessor (FeedSyncProcessor ):
8483 def check_url_status (self , url : str ) -> bool :
8584 """
86- Checks if a URL returns a valid response (not 404 or 500) .
85+ Checks if a URL returns a valid response status code .
8786 """
8887 try :
88+ logging .info (f"Checking URL: { url } " )
89+ if url is None or len (url ) == 0 :
90+ logging .warning ("URL is empty. Skipping check." )
91+ return False
8992 response = requests .head (url , timeout = 25 )
90- return response .status_code not in {404 , 500 }
93+ logging .info (f"URL status code: { response .status_code } " )
94+ return response .status_code < 400
9195 except requests .RequestException as e :
9296 logging .warning (f"Failed to reach { url } : { e } " )
9397 return False
@@ -99,9 +103,17 @@ def process_sync(
99103 Process data synchronously to fetch, extract, combine, filter and prepare payloads for publishing
100104 to a queue based on conditions related to the data retrieved from TransitLand API.
101105 """
102- feeds_data = self .get_data (TRANSITLAND_FEED_URL , apikey , spec , session )
106+ feeds_data = self .get_data (
107+ TRANSITLAND_FEED_URL , TRANSITLAND_API_KEY , spec , session
108+ )
109+ logging .info ("Fetched %s feeds from TransitLand API" , len (feeds_data ["feeds" ]))
110+
103111 operators_data = self .get_data (
104- TRANSITLAND_OPERATOR_URL , apikey , session = session
112+ TRANSITLAND_OPERATOR_URL , TRANSITLAND_API_KEY , session = session
113+ )
114+ logging .info (
115+ "Fetched %s operators from TransitLand API" ,
116+ len (operators_data ["operators" ]),
105117 )
106118
107119 feeds = self .extract_feeds_data (feeds_data )
@@ -151,12 +163,25 @@ def process_sync(
151163 .str .lower ()
152164 .isin ([c .lower () for c in countries_not_included ])
153165 ]
166+ logging .info (
167+ "Filtered out %s feeds from countries: %s" ,
168+ len (df_grouped ) - len (filtered_df ),
169+ countries_not_included ,
170+ )
154171
155172 # Filtered out URLs that return undesired status codes
173+ filtered_df = filtered_df .drop_duplicates (
174+ subset = ["feed_url" ]
175+ ) # Drop duplicates
156176 filtered_df = filtered_df [filtered_df ["feed_url" ].apply (self .check_url_status )]
177+ logging .info (
178+ "Filtered out %s feeds with invalid URLs" ,
179+ len (df_grouped ) - len (filtered_df ),
180+ )
157181
158182 # Convert filtered DataFrame to dictionary format
159183 combined_data = filtered_df .to_dict (orient = "records" )
184+ logging .info ("Prepared %s feeds for publishing" , len (combined_data ))
160185
161186 payloads = []
162187 for data in combined_data :
@@ -197,7 +222,7 @@ def process_sync(
197222 def get_data (
198223 self ,
199224 url ,
200- apikey ,
225+ api_key ,
201226 spec = None ,
202227 session = None ,
203228 max_retries = 3 ,
@@ -209,11 +234,13 @@ def get_data(
209234 Handles rate limits, retries, and error cases.
210235 Returns the parsed data as a dictionary containing feeds and operators.
211236 """
212- headers = {"apikey" : apikey }
237+ headers = {"apikey" : api_key }
213238 params = {"spec" : spec } if spec else {}
214239 all_data = {"feeds" : [], "operators" : []}
215240 delay = initial_delay
241+ response = None
216242
243+ logging .info ("Fetching data from %s" , url )
217244 while url :
218245 for attempt in range (max_retries ):
219246 try :
@@ -225,12 +252,17 @@ def get_data(
225252 all_data ["feeds" ].extend (data .get ("feeds" , []))
226253 all_data ["operators" ].extend (data .get ("operators" , []))
227254 url = data .get ("meta" , {}).get ("next" )
255+ logging .info (
256+ "Fetched %s feeds and %s operators" ,
257+ len (all_data ["feeds" ]),
258+ len (all_data ["operators" ]),
259+ )
260+ logging .info ("Next URL: %s" , url )
228261 delay = initial_delay
229262 break
230-
231263 except (RequestException , HTTPError ) as e :
232264 logging .error ("Attempt %s failed: %s" , attempt + 1 , e )
233- if response .status_code == 429 :
265+ if response is not None and response .status_code == 429 :
234266 logging .warning ("Rate limit hit. Waiting for %s seconds" , delay )
235267 time .sleep (delay + random .uniform (0 , 1 ))
236268 delay = min (delay * 2 , max_delay )
@@ -240,7 +272,9 @@ def get_data(
240272 )
241273 return all_data
242274 else :
275+ logging .info ("Retrying in %s seconds" , delay )
243276 time .sleep (delay )
277+ logging .info ("Finished fetching data." )
244278 return all_data
245279
246280 def extract_feeds_data (self , feeds_data : dict ) -> List [dict ]:
@@ -297,13 +331,12 @@ def check_external_id(
297331 :param source: The source to filter by (e.g., 'TLD' for TransitLand)
298332 :return: True if the feed exists, False otherwise
299333 """
300- query = text (
301- "SELECT 1 FROM public.externalid WHERE associated_id = :external_id AND source = :source LIMIT 1"
334+ results = (
335+ db_session .query (Gtfsfeed )
336+ .filter (Gtfsfeed .externalids .any (associated_id = external_id ))
337+ .all ()
302338 )
303- result = db_session .execute (
304- query , {"external_id" : external_id , "source" : source }
305- ).fetchone ()
306- return result is not None
339+ return results is not None and len (results ) > 0
307340
308341 def get_mbd_feed_url (
309342 self , db_session : Session , external_id : str , source : str
@@ -315,19 +348,12 @@ def get_mbd_feed_url(
315348 :param source: The source to filter by (e.g., 'TLD' for TransitLand)
316349 :return: feed_url in mbd if exists, otherwise None
317350 """
318- query = text (
319- """
320- SELECT f.producer_url
321- FROM public.feed f
322- JOIN public.externalid e ON f.id = e.feed_id
323- WHERE e.associated_id = :external_id AND e.source = :source
324- LIMIT 1
325- """
351+ results = (
352+ db_session .query (Gtfsfeed )
353+ .filter (Gtfsfeed .externalids .any (associated_id = external_id ))
354+ .all ()
326355 )
327- result = db_session .execute (
328- query , {"external_id" : external_id , "source" : source }
329- ).fetchone ()
330- return result [0 ] if result else None
356+ return results [0 ].producer_url if results else None
331357
332358 def publish_callback (
333359 self , future : Future , payload : FeedSyncPayload , topic_path : str
@@ -350,6 +376,7 @@ def feed_sync_dispatcher_transitland(request):
350376 """
351377 HTTP Function entry point queries the transitland API and publishes events to a Pub/Sub topic to be processed.
352378 """
379+ Logger .init_logger ()
353380 publisher = get_pubsub_client ()
354381 topic_path = publisher .topic_path (PROJECT_ID , PUBSUB_TOPIC_NAME )
355382 transit_land_feed_sync_processor = TransitFeedSyncProcessor ()
0 commit comments