From 03f4715eab0c85f5a16528c5d75b3206eb40e3fd Mon Sep 17 00:00:00 2001 From: jcpitre Date: Tue, 18 Nov 2025 09:28:36 -0500 Subject: [PATCH 1/5] Modified to reduce memory usage in export_csv function. --- api/src/shared/common/db_utils.py | 164 ++++++++++++------ functions-python/export_csv/src/main.py | 30 +++- functions-python/export_csv/tests/conftest.py | 63 ++++--- .../export_csv/tests/test_export_csv_main.py | 6 +- 4 files changed, 181 insertions(+), 82 deletions(-) diff --git a/api/src/shared/common/db_utils.py b/api/src/shared/common/db_utils.py index 8e1bf698b..ade72ebee 100644 --- a/api/src/shared/common/db_utils.py +++ b/api/src/shared/common/db_utils.py @@ -1,11 +1,11 @@ +import logging import os from typing import Iterator, List, Dict, Optional from geoalchemy2 import WKTElement from sqlalchemy import or_ from sqlalchemy import select -from sqlalchemy.orm import joinedload, Session, contains_eager, load_only -from sqlalchemy.orm.query import Query +from sqlalchemy.orm import Session, contains_eager, load_only, selectinload, with_loader_criteria, Query from sqlalchemy.orm.strategy_options import _AbstractLoad from sqlalchemy import func from sqlalchemy.sql import and_ @@ -48,7 +48,7 @@ def get_gtfs_feeds_query( is_official: bool | None = None, published_only: bool = True, include_options_for_joinedload: bool = True, -) -> Query[any]: +) -> Query: """Get the DB query to use to retrieve the GTFS feeds..""" gtfs_feed_filter = GtfsFeedFilter( stable_id=stable_id, @@ -61,7 +61,9 @@ def get_gtfs_feeds_query( subquery = apply_bounding_filtering( subquery, dataset_latitudes, dataset_longitudes, bounding_filter_method ).subquery() - feed_query = db_session.query(Gtfsfeed).filter(Gtfsfeed.id.in_(subquery)) + # Type checkers prefer an explicit scalar select from the subquery; this is equivalent + # to "id IN (SELECT id FROM (subquery))" and avoids complaints about passing a Subquery. + feed_query = db_session.query(Gtfsfeed).filter(Gtfsfeed.id.in_(select(subquery.c.id))) if country_code or subdivision_name or municipality: location_filter = LocationFilter( @@ -78,11 +80,24 @@ def get_gtfs_feeds_query( feed_query = add_official_filter(feed_query, is_official) if include_options_for_joinedload: + # IMPORTANT: We use selectinload (batched IN-queries) for collections instead of + # joinedload (single big JOIN). This avoids huge "row multiplication" when a feed has + # many related rows (e.g., datasets -> reports -> features). It keeps memory usage and + # query size reasonable, and works well with streaming/yielding results. + # + # The chained calls below mean: for all feeds, load their latest_dataset; for these + # datasets, load their validation_reports; for these reports, load their features. + # All of that happens in a few compact, batched queries. feed_query = feed_query.options( - joinedload(Gtfsfeed.latest_dataset) - .joinedload(Gtfsdataset.validation_reports) - .joinedload(Validationreport.features), - joinedload(Gtfsfeed.visualization_dataset), + selectinload(Gtfsfeed.latest_dataset) + .selectinload(Gtfsdataset.validation_reports) + .selectinload(Validationreport.features), + # visualization_dataset is a single related row; selectinload is still fine here + # and keeps the pattern consistent. + selectinload(Gtfsfeed.visualization_dataset), + # Ensure the bounding_box_dataset is present to avoid lazy-loads after the session + # is cleared during batch processing. + selectinload(Gtfsfeed.bounding_box_dataset), *get_joinedload_options(), ).order_by(Gtfsfeed.provider, Gtfsfeed.stable_id) @@ -159,36 +174,70 @@ def get_all_gtfs_feeds( :return: The GTFS feeds in an iterator. """ - batch_size = int(os.getenv("BATCH_SIZE", "500")) - batch_query = db_session.query(Gtfsfeed).order_by(Gtfsfeed.stable_id).yield_per(batch_size) + batch_size = int(os.getenv("BATCH_SIZE", "50")) + + # We fetch in small batches and stream results to avoid loading the whole table in memory. + # stream_results=True lets SQLAlchemy iterate rows without buffering them all at once. + # We also clear the session cache between batches (see expunge_all() below) to prevent + # memory from growing indefinitely when many ORM objects are loaded. + batch_query = db_session.query(Gtfsfeed).order_by(Gtfsfeed.stable_id).execution_options(stream_results=True) if published_only: batch_query = batch_query.filter(Gtfsfeed.operational_status == "published") - for batch in batched(batch_query, batch_size): - stable_ids = (f.stable_id for f in batch) + processed = 0 + + for batch_num, batch in enumerate(batched(batch_query, batch_size), start=1): + start_index = processed + 1 + end_index = processed + len(batch) + logging.info("Processing feeds %d - %d", start_index, end_index) + + # Convert to a list intentionally: we want to "materialize" IDs now to make any cost + # visible here (and keep the logic simple). This also avoids subtle lazy-evaluation + # effects that can hide where time/memory is really spent. + stable_ids = [f.stable_id for f in batch] + if not stable_ids: + processed += len(batch) + continue + if w_extracted_locations_only: feed_query = apply_most_common_location_filter(db_session.query(Gtfsfeed), db_session) - yield from ( - feed_query.filter(Gtfsfeed.stable_id.in_(stable_ids)).options( - joinedload(Gtfsfeed.latest_dataset) - .joinedload(Gtfsdataset.validation_reports) - .joinedload(Validationreport.features), - *get_joinedload_options(include_extracted_location_entities=True), - ) + inner_q = feed_query.filter(Gtfsfeed.stable_id.in_(stable_ids)).options( + # See note above: selectinload is chosen for collections to keep memory and row + # counts under control when streaming. + selectinload(Gtfsfeed.latest_dataset) + .selectinload(Gtfsdataset.validation_reports) + .selectinload(Validationreport.features), + selectinload(Gtfsfeed.bounding_box_dataset), + *get_joinedload_options(include_extracted_location_entities=True), ) else: - yield from ( + inner_q = ( db_session.query(Gtfsfeed) .outerjoin(Gtfsfeed.gtfsdatasets) .filter(Gtfsfeed.stable_id.in_(stable_ids)) .options( - joinedload(Gtfsfeed.latest_dataset) - .joinedload(Gtfsdataset.validation_reports) - .joinedload(Validationreport.features), + selectinload(Gtfsfeed.latest_dataset) + .selectinload(Gtfsdataset.validation_reports) + .selectinload(Validationreport.features), + selectinload(Gtfsfeed.bounding_box_dataset), *get_joinedload_options(include_extracted_location_entities=False), ) ) + # Iterate and stream rows out; the options above ensure related data is preloaded in + # a few small queries per batch, rather than one giant join. + for item in inner_q.execution_options(stream_results=True): + yield item + + # Clear the Session identity map so objects from this batch can be GC'd. Without this, + # the Session will keep references and memory usage will grow with each batch. + try: + db_session.expunge_all() + except Exception: + logging.getLogger("get_all_gtfs_feeds").exception("Failed to expunge session after batch %d", batch_num) + + processed += len(batch) + def get_gtfs_rt_feeds_query( limit: int | None, @@ -238,8 +287,11 @@ def get_gtfs_rt_feeds_query( feed_query = feed_query.filter(Gtfsrealtimefeed.operational_status == "published") feed_query = feed_query.options( - joinedload(Gtfsrealtimefeed.entitytypes), - joinedload(Gtfsrealtimefeed.gtfs_feeds), + selectinload(Gtfsrealtimefeed.entitytypes), + selectinload(Gtfsrealtimefeed.gtfs_feeds), + # Only include GTFS feeds that are "active" in the realtime feed's gtfs_feeds list. + # This filters the related collection at load time (no extra filtering needed later). + with_loader_criteria(Gtfsfeed, Gtfsfeed.status == "active", include_aliases=True), *get_joinedload_options(), ) feed_query = add_official_filter(feed_query, is_official) @@ -278,7 +330,9 @@ def get_all_gtfs_rt_feeds( :return: The GTFS realtime feeds in an iterator. """ batched_query = ( - db_session.query(Gtfsrealtimefeed.stable_id).order_by(Gtfsrealtimefeed.stable_id).yield_per(batch_size) + db_session.query(Gtfsrealtimefeed.stable_id) + .order_by(Gtfsrealtimefeed.stable_id) + .execution_options(stream_results=True) ) if published_only: batched_query = batched_query.filter(Gtfsrealtimefeed.operational_status == "published") @@ -290,8 +344,8 @@ def get_all_gtfs_rt_feeds( yield from ( feed_query.filter(Gtfsrealtimefeed.stable_id.in_(stable_ids)) .options( - joinedload(Gtfsrealtimefeed.entitytypes), - joinedload(Gtfsrealtimefeed.gtfs_feeds), + selectinload(Gtfsrealtimefeed.entitytypes), + selectinload(Gtfsrealtimefeed.gtfs_feeds), *get_joinedload_options(include_extracted_location_entities=True), ) .order_by(Gtfsfeed.stable_id) @@ -301,8 +355,8 @@ def get_all_gtfs_rt_feeds( db_session.query(Gtfsrealtimefeed) .filter(Gtfsrealtimefeed.stable_id.in_(stable_ids)) .options( - joinedload(Gtfsrealtimefeed.entitytypes), - joinedload(Gtfsrealtimefeed.gtfs_feeds), + selectinload(Gtfsrealtimefeed.entitytypes), + selectinload(Gtfsrealtimefeed.gtfs_feeds), *get_joinedload_options(include_extracted_location_entities=False), ) ) @@ -319,10 +373,10 @@ def apply_bounding_filtering( if not bounding_latitudes or not bounding_longitudes or not bounding_filter_method: return query - if ( - len(bounding_latitudes_tokens := bounding_latitudes.split(",")) != 2 - or len(bounding_longitudes_tokens := bounding_longitudes.split(",")) != 2 - ): + # Parse tokens explicitly to satisfy static analyzers and keep error messages clear. + bounding_latitudes_tokens = bounding_latitudes.split(",") + bounding_longitudes_tokens = bounding_longitudes.split(",") + if len(bounding_latitudes_tokens) != 2 or len(bounding_longitudes_tokens) != 2: raise_internal_http_validation_error( invalid_bounding_coordinates.format(bounding_latitudes, bounding_longitudes) ) @@ -366,23 +420,31 @@ def apply_bounding_filtering( raise_internal_http_validation_error(invalid_bounding_method.format(bounding_filter_method)) -def get_joinedload_options(include_extracted_location_entities: bool = False) -> [_AbstractLoad]: +def get_joinedload_options(include_extracted_location_entities: bool = False) -> List[_AbstractLoad]: """ Returns common joinedload options for feeds queries. :param include_extracted_location_entities: Whether to include extracted location entities. :return: A list of joinedload options. """ - joinedload_options = [] + # NOTE: For collections we prefer selectinload to avoid row explosion and high memory usage + # during streaming. When callers explicitly join some paths (e.g., most common locations), + # we use contains_eager on that specific path to tell SQLAlchemy the data came from a JOIN. + loaders = [] if include_extracted_location_entities: - joinedload_options = [contains_eager(Feed.feedosmlocationgroups).joinedload(Feedosmlocationgroup.group)] - return joinedload_options + [ - joinedload(Feed.locations), - joinedload(Feed.externalids), - joinedload(Feed.feedrelatedlinks), - joinedload(Feed.redirectingids).joinedload(Redirectingid.target), - joinedload(Feed.officialstatushistories), - ] + loaders.append(contains_eager(Feed.feedosmlocationgroups).joinedload(Feedosmlocationgroup.group)) + + # collections -> selectinload; scalar relationships can remain joinedload + loaders.extend( + [ + selectinload(Feed.locations), + selectinload(Feed.externalids), + selectinload(Feed.feedrelatedlinks), + selectinload(Feed.redirectingids).selectinload(Redirectingid.target), + selectinload(Feed.officialstatushistories), + ] + ) + return loaders def get_gbfs_feeds_query( @@ -414,7 +476,7 @@ def get_gbfs_feeds_query( if version else None, ) - # Subquery: latest report per version + # We compute the latest validation report per GBFS version (so we only join one report per version) latest_report_subq = ( db_session.query( Gbfsvalidationreport.gbfs_version_id.label("gbfs_version_id"), @@ -424,7 +486,10 @@ def get_gbfs_feeds_query( .subquery() ) - # Join validation reports filtered by latest `validated_at` + # We explicitly JOIN versions and their latest validation report. Because we already joined + # these tables, we tell SQLAlchemy to "trust" the join for those paths using contains_eager. + # We avoid adding another loader on the same path (like selectinload/joinedload) to prevent + # strategy conflicts and unnecessary row multiplication. query = gbfs_feed_filter.filter( db_session.query(Gbfsfeed) .outerjoin(Location, Gbfsfeed.locations) @@ -439,10 +504,9 @@ def get_gbfs_feeds_query( ) .options( contains_eager(Gbfsfeed.gbfsversions).contains_eager(Gbfsversion.gbfsvalidationreports), - contains_eager(Gbfsfeed.gbfsversions).joinedload(Gbfsversion.gbfsendpoints), - joinedload(Feed.locations), - joinedload(Feed.externalids), - joinedload(Feed.redirectingids).joinedload(Redirectingid.target), + selectinload(Gbfsfeed.locations), + selectinload(Gbfsfeed.externalids), + selectinload(Feed.redirectingids).selectinload(Redirectingid.target), ) ) return query diff --git a/functions-python/export_csv/src/main.py b/functions-python/export_csv/src/main.py index 85c23bc8d..5949485c9 100644 --- a/functions-python/export_csv/src/main.py +++ b/functions-python/export_csv/src/main.py @@ -15,7 +15,6 @@ # import argparse import csv -import logging import os import re from typing import Dict, Iterator, Optional @@ -28,6 +27,7 @@ from google.cloud import storage from geoalchemy2.shape import to_shape +from shared.helpers.runtime_metrics import track_metrics from shared.database.database import with_db_session from shared.helpers.logger import init_logger from shared.database_gen.sqlacodegen_models import Gtfsfeed, Gtfsrealtimefeed, Feed @@ -39,6 +39,8 @@ from shared.database_gen.sqlacodegen_models import Geopolygon +import logging + load_dotenv() csv_default_file_path = "./output.csv" init_logger() @@ -124,6 +126,7 @@ def export_and_upload_csv(_): return "Export successful" +@track_metrics(metrics=("time", "memory", "cpu")) def export_csv(csv_file_path: str): """ Write feed data to a local CSV file. @@ -318,7 +321,7 @@ def get_feed_csv_data( ) -> Dict: """ This function takes a generic feed and returns a dictionary with the data to be written to the CSV file. - Any specific data (for GTFS or GTFS_RT has to be added after this call. + Any specific data (for GTFS or GTFS_RT) has to be added after this call. """ redirect_ids = [] @@ -409,15 +412,24 @@ def get_gtfs_rt_feed_csv_data( static_references = "" first_feed_reference = None if feed.gtfs_feeds: - valid_feed_references = [ - feed_reference.stable_id.strip() - for feed_reference in feed.gtfs_feeds - if feed_reference and feed_reference.stable_id + # Prefer active feeds first using a stable sort so original relative order + # within active and inactive groups is preserved. + def _is_active(fr): + try: + return getattr(fr, "status", None) == "active" + except Exception: + return False + + # Filter to valid references, then stable sort by active flag (True > False) + valid_refs = [ + fr for fr in feed.gtfs_feeds if fr and getattr(fr, "stable_id", None) ] + sorted_refs = sorted(valid_refs, key=_is_active, reverse=True) + + valid_feed_references = [fr.stable_id.strip() for fr in sorted_refs] static_references = "|".join(valid_feed_references) - # If there is more than one GTFS feeds associated with this RT feed (why?) - # We will arbitrarily use the first one in the list for the bounding box. - first_feed_reference = feed.gtfs_feeds[0] if feed.gtfs_feeds else None + # First reference (after sort) will be an active one if any are present + first_feed_reference = sorted_refs[0] if sorted_refs else None data["static_reference"] = static_references # For the RT feed, we use the bounding box of the associated GTFS feed, if any. diff --git a/functions-python/export_csv/tests/conftest.py b/functions-python/export_csv/tests/conftest.py index 45532d838..13593e1ac 100644 --- a/functions-python/export_csv/tests/conftest.py +++ b/functions-python/export_csv/tests/conftest.py @@ -50,6 +50,30 @@ def populate_database(db_session): fake = Faker() feeds = [] + + # Put the deprecated feeds before the active feeds in the DB so they will be listed first + # in GtfsRealtimeFeed.gtfs_feeds (the RT feed references). This allows testing that active feeds + # are put first in GtfsRealtimeFeed.gtfs_feeds. Admittedly, it's a bit weak but it works for now. + for i in range(2): + feed = Gtfsfeed( + id=fake.uuid4(), + data_type="gtfs", + feed_name=f"deprecated-gtfs-{i} Some fake name", + note=f"deprecated-gtfs-{i} Some fake note", + producer_url=f"https://deprecated-gtfs-{i}_some_fake_producer_url", + authentication_type="0" if (i == 0) else "1", + authentication_info_url=None, + api_key_parameter_name=None, + license_url=f"https://gtfs-{i}_some_fake_license_url", + stable_id=f"deprecated-gtfs-{i}", + status="deprecated", + feed_contact_email=f"deprecated-gtfs-{i}_some_fake_email@fake.com", + provider=f"deprecated-gtfs-{i} Some fake company", + operational_status="published", + official=True, + ) + db_session.add(feed) + # We create 3 feeds. The first one is active. The third one is inactive and redirected to the first one. # The second one is active but not redirected. # First fill the generic parameters @@ -97,25 +121,6 @@ def populate_database(db_session): for feed in feeds: db_session.add(feed) db_session.flush() - for i in range(2): - feed = Gtfsfeed( - id=fake.uuid4(), - data_type="gtfs", - feed_name=f"gtfs-deprecated-{i} Some fake name", - note=f"gtfs-deprecated-{i} Some fake note", - producer_url=f"https://gtfs-deprecated-{i}_some_fake_producer_url", - authentication_type="0" if (i == 0) else "1", - authentication_info_url=None, - api_key_parameter_name=None, - license_url=f"https://gtfs-{i}_some_fake_license_url", - stable_id=f"gtfs-deprecated-{i}", - status="deprecated", - feed_contact_email=f"gtfs-deprecated-{i}_some_fake_email@fake.com", - provider=f"gtfs-deprecated-{i} Some fake company", - operational_status="published", - official=True, - ) - db_session.add(feed) location_entity = Location(id="CA-quebec-montreal") @@ -273,10 +278,28 @@ def populate_database(db_session): entitytypes=[vp_entitytype, tu_entitytype] if i == 0 else [vp_entitytype], operational_status="published", official=True, - gtfs_feeds=[active_gtfs_feeds[0]] if i == 0 else [], + # Do not attach GTFS feeds at creation; we'll set them in a controlled order below + # gtfs_feeds=[], ) gtfs_rt_feeds.append(feed) + db_session.add_all(gtfs_rt_feeds) + + # --- Attach both a deprecated GTFS feed and an active GTFS feed to the first RT feed + try: + deprecated_feeds = ( + db_session.query(Gtfsfeed) + .filter(Gtfsfeed.status == "deprecated") + .order_by(Gtfsfeed.stable_id) + .all() + ) + if deprecated_feeds: + gtfs_rt_feeds[0].gtfs_feeds = [deprecated_feeds[0], active_gtfs_feeds[0]] + db_session.flush() + except Exception: + # Best effort in test setup; if it fails the rest of the tests will surface the issue. + pass + # Add redirecting IDs (from main branch logic) gtfs_rt_feeds[1].redirectingids = [ Redirectingid( diff --git a/functions-python/export_csv/tests/test_export_csv_main.py b/functions-python/export_csv/tests/test_export_csv_main.py index c8150da2a..2c0328a64 100644 --- a/functions-python/export_csv/tests/test_export_csv_main.py +++ b/functions-python/export_csv/tests/test_export_csv_main.py @@ -24,12 +24,12 @@ # the data is correct. expected_csv = """ id,data_type,entity_type,location.country_code,location.subdivision_name,location.municipality,provider,is_official,name,note,feed_contact_email,static_reference,urls.direct_download,urls.authentication_type,urls.authentication_info,urls.api_key_parameter_name,urls.latest,urls.license,location.bounding_box.minimum_latitude,location.bounding_box.maximum_latitude,location.bounding_box.minimum_longitude,location.bounding_box.maximum_longitude,location.bounding_box.extracted_on,status,features,redirect.id,redirect.comment +deprecated-gtfs-0,gtfs,,,,,deprecated-gtfs-0 Some fake company,True,deprecated-gtfs-0 Some fake name,deprecated-gtfs-0 Some fake note,deprecated-gtfs-0_some_fake_email@fake.com,,https://deprecated-gtfs-0_some_fake_producer_url,0,,,,https://gtfs-0_some_fake_license_url,,,,,,deprecated,,, +deprecated-gtfs-1,gtfs,,,,,deprecated-gtfs-1 Some fake company,True,deprecated-gtfs-1 Some fake name,deprecated-gtfs-1 Some fake note,deprecated-gtfs-1_some_fake_email@fake.com,,https://deprecated-gtfs-1_some_fake_producer_url,1,,,,https://gtfs-1_some_fake_license_url,,,,,,deprecated,,, gtfs-0,gtfs,,CA,Quebec,Laval,gtfs-0 Some fake company,True,gtfs-0 Some fake name,gtfs-0 Some fake note,gtfs-0_some_fake_email@fake.com,,https://gtfs-0_some_fake_producer_url,0,,,https://url_prefix/gtfs-0/latest.zip,https://gtfs-0_some_fake_license_url,-9.0,9.0,-18.0,18.0,2025-01-12 00:00:00+00:00,active,Route Colors|Shapes,, gtfs-1,gtfs,,CA,Quebec,Montreal,gtfs-1 Some fake company,True,gtfs-1 Some fake name,gtfs-1 Some fake note,gtfs-1_some_fake_email@fake.com,,https://gtfs-1_some_fake_producer_url,0,,,https://url_prefix/gtfs-1/latest.zip,https://gtfs-1_some_fake_license_url,-9.0,9.0,-18.0,18.0,2025-01-12 00:00:00+00:00,active,Route Colors|Shapes,, gtfs-2,gtfs,,,,,gtfs-2 Some fake company,True,gtfs-2 Some fake name,gtfs-2 Some fake note,gtfs-2_some_fake_email@fake.com,,https://gtfs-2_some_fake_producer_url,0,,,,https://gtfs-2_some_fake_license_url,,,,,,inactive,,gtfs-0,Some redirect comment -gtfs-deprecated-0,gtfs,,,,,gtfs-deprecated-0 Some fake company,True,gtfs-deprecated-0 Some fake name,gtfs-deprecated-0 Some fake note,gtfs-deprecated-0_some_fake_email@fake.com,,https://gtfs-deprecated-0_some_fake_producer_url,0,,,,https://gtfs-0_some_fake_license_url,,,,,,deprecated,,, -gtfs-deprecated-1,gtfs,,,,,gtfs-deprecated-1 Some fake company,True,gtfs-deprecated-1 Some fake name,gtfs-deprecated-1 Some fake note,gtfs-deprecated-1_some_fake_email@fake.com,,https://gtfs-deprecated-1_some_fake_producer_url,1,,,,https://gtfs-1_some_fake_license_url,,,,,,deprecated,,, -gtfs-rt-0,gtfs_rt,tu|vp,,,,gtfs-rt-0 Some fake company,True,gtfs-rt-0 Some fake name,gtfs-rt-0 Some fake note,gtfs-rt-0_some_fake_email@fake.com,gtfs-0,https://gtfs-rt-0_some_fake_producer_url,0,https://gtfs-rt-0_some_fake_authentication_info_url,gtfs-rt-0_fake_api_key_parameter_name,,https://gtfs-rt-0_some_fake_license_url,-9.0,9.0,-18.0,18.0,2025-01-12 00:00:00+00:00,active,,, +gtfs-rt-0,gtfs_rt,tu|vp,,,,gtfs-rt-0 Some fake company,True,gtfs-rt-0 Some fake name,gtfs-rt-0 Some fake note,gtfs-rt-0_some_fake_email@fake.com,gtfs-0|deprecated-gtfs-0,https://gtfs-rt-0_some_fake_producer_url,0,https://gtfs-rt-0_some_fake_authentication_info_url,gtfs-rt-0_fake_api_key_parameter_name,,https://gtfs-rt-0_some_fake_license_url,-9.0,9.0,-18.0,18.0,2025-01-12 00:00:00+00:00,active,,, gtfs-rt-1,gtfs_rt,vp,,,,gtfs-rt-1 Some fake company,True,gtfs-rt-1 Some fake name,gtfs-rt-1 Some fake note,gtfs-rt-1_some_fake_email@fake.com,,https://gtfs-rt-1_some_fake_producer_url,1,https://gtfs-rt-1_some_fake_authentication_info_url,gtfs-rt-1_fake_api_key_parameter_name,,https://gtfs-rt-1_some_fake_license_url,,,,,,inactive,,gtfs-rt-0|gtfs-rt-2,comment 1|comment 2 gtfs-rt-2,gtfs_rt,vp,,,,gtfs-rt-2 Some fake company,True,gtfs-rt-2 Some fake name,gtfs-rt-2 Some fake note,gtfs-rt-2_some_fake_email@fake.com,,https://gtfs-rt-2_some_fake_producer_url,2,https://gtfs-rt-2_some_fake_authentication_info_url,gtfs-rt-2_fake_api_key_parameter_name,,https://gtfs-rt-2_some_fake_license_url,,,,,,active,,, """ # noqa From c39136dc74de8f6c7ae81fa97083afb2d5407262 Mon Sep 17 00:00:00 2001 From: jcpitre Date: Tue, 18 Nov 2025 09:35:19 -0500 Subject: [PATCH 2/5] Added missing python package --- functions-python/export_csv/requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/functions-python/export_csv/requirements.txt b/functions-python/export_csv/requirements.txt index 5d435f0d5..f5565e36e 100644 --- a/functions-python/export_csv/requirements.txt +++ b/functions-python/export_csv/requirements.txt @@ -27,4 +27,4 @@ python-dotenv==1.0.0 # Other dependencies natsort - +psutil From fe821db5a827bec9fc9f0d711beafa9eed84a239 Mon Sep 17 00:00:00 2001 From: jcpitre Date: Tue, 18 Nov 2025 09:52:51 -0500 Subject: [PATCH 3/5] Increased timeout to 1 hour for export_csv --- functions-python/export_csv/function_config.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/functions-python/export_csv/function_config.json b/functions-python/export_csv/function_config.json index ab009c933..8ded57b31 100644 --- a/functions-python/export_csv/function_config.json +++ b/functions-python/export_csv/function_config.json @@ -2,7 +2,7 @@ "name": "export-csv", "description": "Export the DB feed data as a csv file", "entry_point": "export_and_upload_csv", - "timeout": 600, + "timeout": 3600, "memory": "2Gi", "trigger_http": true, "include_folders": ["helpers", "dataset_service"], From c9c30e2e54c4b4ecd1ff8ac60892c9134928cb27 Mon Sep 17 00:00:00 2001 From: jcpitre Date: Tue, 18 Nov 2025 21:06:42 -0500 Subject: [PATCH 4/5] Limited the changes to code used by export_csv. --- api/src/shared/common/db_utils.py | 76 +++++++++++++++---------------- 1 file changed, 38 insertions(+), 38 deletions(-) diff --git a/api/src/shared/common/db_utils.py b/api/src/shared/common/db_utils.py index ade72ebee..ff0f68881 100644 --- a/api/src/shared/common/db_utils.py +++ b/api/src/shared/common/db_utils.py @@ -5,7 +5,8 @@ from geoalchemy2 import WKTElement from sqlalchemy import or_ from sqlalchemy import select -from sqlalchemy.orm import Session, contains_eager, load_only, selectinload, with_loader_criteria, Query +from sqlalchemy.orm import joinedload, Session, contains_eager, load_only, selectinload +from sqlalchemy.orm.query import Query from sqlalchemy.orm.strategy_options import _AbstractLoad from sqlalchemy import func from sqlalchemy.sql import and_ @@ -61,9 +62,7 @@ def get_gtfs_feeds_query( subquery = apply_bounding_filtering( subquery, dataset_latitudes, dataset_longitudes, bounding_filter_method ).subquery() - # Type checkers prefer an explicit scalar select from the subquery; this is equivalent - # to "id IN (SELECT id FROM (subquery))" and avoids complaints about passing a Subquery. - feed_query = db_session.query(Gtfsfeed).filter(Gtfsfeed.id.in_(select(subquery.c.id))) + feed_query = db_session.query(Gtfsfeed).filter(Gtfsfeed.id.in_(subquery)) if country_code or subdivision_name or municipality: location_filter = LocationFilter( @@ -80,24 +79,11 @@ def get_gtfs_feeds_query( feed_query = add_official_filter(feed_query, is_official) if include_options_for_joinedload: - # IMPORTANT: We use selectinload (batched IN-queries) for collections instead of - # joinedload (single big JOIN). This avoids huge "row multiplication" when a feed has - # many related rows (e.g., datasets -> reports -> features). It keeps memory usage and - # query size reasonable, and works well with streaming/yielding results. - # - # The chained calls below mean: for all feeds, load their latest_dataset; for these - # datasets, load their validation_reports; for these reports, load their features. - # All of that happens in a few compact, batched queries. feed_query = feed_query.options( - selectinload(Gtfsfeed.latest_dataset) - .selectinload(Gtfsdataset.validation_reports) - .selectinload(Validationreport.features), - # visualization_dataset is a single related row; selectinload is still fine here - # and keeps the pattern consistent. - selectinload(Gtfsfeed.visualization_dataset), - # Ensure the bounding_box_dataset is present to avoid lazy-loads after the session - # is cleared during batch processing. - selectinload(Gtfsfeed.bounding_box_dataset), + joinedload(Gtfsfeed.latest_dataset) + .joinedload(Gtfsdataset.validation_reports) + .joinedload(Validationreport.features), + joinedload(Gtfsfeed.visualization_dataset), *get_joinedload_options(), ).order_by(Gtfsfeed.provider, Gtfsfeed.stable_id) @@ -208,7 +194,7 @@ def get_all_gtfs_feeds( .selectinload(Gtfsdataset.validation_reports) .selectinload(Validationreport.features), selectinload(Gtfsfeed.bounding_box_dataset), - *get_joinedload_options(include_extracted_location_entities=True), + *get_selectinload_options(include_extracted_location_entities=True), ) else: inner_q = ( @@ -220,7 +206,7 @@ def get_all_gtfs_feeds( .selectinload(Gtfsdataset.validation_reports) .selectinload(Validationreport.features), selectinload(Gtfsfeed.bounding_box_dataset), - *get_joinedload_options(include_extracted_location_entities=False), + *get_selectinload_options(include_extracted_location_entities=False), ) ) @@ -287,11 +273,8 @@ def get_gtfs_rt_feeds_query( feed_query = feed_query.filter(Gtfsrealtimefeed.operational_status == "published") feed_query = feed_query.options( - selectinload(Gtfsrealtimefeed.entitytypes), - selectinload(Gtfsrealtimefeed.gtfs_feeds), - # Only include GTFS feeds that are "active" in the realtime feed's gtfs_feeds list. - # This filters the related collection at load time (no extra filtering needed later). - with_loader_criteria(Gtfsfeed, Gtfsfeed.status == "active", include_aliases=True), + joinedload(Gtfsrealtimefeed.entitytypes), + joinedload(Gtfsrealtimefeed.gtfs_feeds), *get_joinedload_options(), ) feed_query = add_official_filter(feed_query, is_official) @@ -357,7 +340,7 @@ def get_all_gtfs_rt_feeds( .options( selectinload(Gtfsrealtimefeed.entitytypes), selectinload(Gtfsrealtimefeed.gtfs_feeds), - *get_joinedload_options(include_extracted_location_entities=False), + *get_selectinload_options(include_extracted_location_entities=False), ) ) @@ -420,7 +403,26 @@ def apply_bounding_filtering( raise_internal_http_validation_error(invalid_bounding_method.format(bounding_filter_method)) -def get_joinedload_options(include_extracted_location_entities: bool = False) -> List[_AbstractLoad]: +def get_joinedload_options(include_extracted_location_entities: bool = False) -> [_AbstractLoad]: + """ + Returns common joinedload options for feeds queries. + :param include_extracted_location_entities: Whether to include extracted location entities. + + :return: A list of joinedload options. + """ + joinedload_options = [] + if include_extracted_location_entities: + joinedload_options = [contains_eager(Feed.feedosmlocationgroups).joinedload(Feedosmlocationgroup.group)] + return joinedload_options + [ + joinedload(Feed.locations), + joinedload(Feed.externalids), + joinedload(Feed.feedrelatedlinks), + joinedload(Feed.redirectingids).joinedload(Redirectingid.target), + joinedload(Feed.officialstatushistories), + ] + + +def get_selectinload_options(include_extracted_location_entities: bool = False) -> List[_AbstractLoad]: """ Returns common joinedload options for feeds queries. :param include_extracted_location_entities: Whether to include extracted location entities. @@ -476,7 +478,7 @@ def get_gbfs_feeds_query( if version else None, ) - # We compute the latest validation report per GBFS version (so we only join one report per version) + # Subquery: latest report per version latest_report_subq = ( db_session.query( Gbfsvalidationreport.gbfs_version_id.label("gbfs_version_id"), @@ -486,10 +488,7 @@ def get_gbfs_feeds_query( .subquery() ) - # We explicitly JOIN versions and their latest validation report. Because we already joined - # these tables, we tell SQLAlchemy to "trust" the join for those paths using contains_eager. - # We avoid adding another loader on the same path (like selectinload/joinedload) to prevent - # strategy conflicts and unnecessary row multiplication. + # Join validation reports filtered by latest `validated_at` query = gbfs_feed_filter.filter( db_session.query(Gbfsfeed) .outerjoin(Location, Gbfsfeed.locations) @@ -504,9 +503,10 @@ def get_gbfs_feeds_query( ) .options( contains_eager(Gbfsfeed.gbfsversions).contains_eager(Gbfsversion.gbfsvalidationreports), - selectinload(Gbfsfeed.locations), - selectinload(Gbfsfeed.externalids), - selectinload(Feed.redirectingids).selectinload(Redirectingid.target), + contains_eager(Gbfsfeed.gbfsversions).joinedload(Gbfsversion.gbfsendpoints), + joinedload(Feed.locations), + joinedload(Feed.externalids), + joinedload(Feed.redirectingids).joinedload(Redirectingid.target), ) ) return query From 49a3c8ce2f69d9749b41fb8c64b3149709dfdcbf Mon Sep 17 00:00:00 2001 From: jcpitre Date: Wed, 19 Nov 2025 10:59:14 -0500 Subject: [PATCH 5/5] Added query by batch. --- api/src/shared/common/db_utils.py | 1 + 1 file changed, 1 insertion(+) diff --git a/api/src/shared/common/db_utils.py b/api/src/shared/common/db_utils.py index ff0f68881..a2f646759 100644 --- a/api/src/shared/common/db_utils.py +++ b/api/src/shared/common/db_utils.py @@ -315,6 +315,7 @@ def get_all_gtfs_rt_feeds( batched_query = ( db_session.query(Gtfsrealtimefeed.stable_id) .order_by(Gtfsrealtimefeed.stable_id) + .yield_per(batch_size) .execution_options(stream_results=True) ) if published_only: