|
| 1 | +import logging |
| 2 | +import os |
| 3 | +import functions_framework |
| 4 | + |
| 5 | +from shared.helpers.logger import Logger |
| 6 | + |
| 7 | +from shared.helpers.database import Database |
| 8 | + |
| 9 | +from typing import TYPE_CHECKING |
| 10 | +from sqlalchemy.orm import joinedload |
| 11 | +from sqlalchemy import or_, func |
| 12 | + |
| 13 | +from shared.database_gen.sqlacodegen_models import Gtfsdataset, Validationreport |
| 14 | + |
| 15 | +import requests |
| 16 | +import json |
| 17 | +from datetime import datetime |
| 18 | + |
| 19 | +from google.cloud import storage |
| 20 | + |
| 21 | +env = os.getenv("ENV", "dev").lower() |
| 22 | +bucket_name = f"mobilitydata-datasets-{env}" |
| 23 | + |
| 24 | +if TYPE_CHECKING: |
| 25 | + from sqlalchemy.orm import Session |
| 26 | + |
| 27 | +logging.basicConfig(level=logging.INFO) |
| 28 | + |
| 29 | + |
| 30 | +def is_version_gte(target_version: str, version_field): |
| 31 | + target_parts = func.string_to_array(target_version, ".") |
| 32 | + version_parts = func.string_to_array(version_field, ".") |
| 33 | + return func.array_to_string(version_parts, ".") >= func.array_to_string( |
| 34 | + target_parts, "." |
| 35 | + ) |
| 36 | + |
| 37 | + |
| 38 | +# function backfills the service date range columns in the gtfsdataset table that are null |
| 39 | +# this will not overwrite any existing values |
| 40 | +def backfill_datasets(session: "Session"): |
| 41 | + # Only care about datasets where service_date_range_start or service_date_range_end are NULL |
| 42 | + changes_count = 0 |
| 43 | + total_changes_count = 0 |
| 44 | + elements_per_commit = 100 |
| 45 | + # blob setup |
| 46 | + storage_client = storage.Client() |
| 47 | + bucket = storage_client.bucket(bucket_name) |
| 48 | + |
| 49 | + datasets = ( |
| 50 | + session.query(Gtfsdataset) |
| 51 | + .options(joinedload(Gtfsdataset.validation_reports)) |
| 52 | + .filter( |
| 53 | + or_( |
| 54 | + Gtfsdataset.service_date_range_start.is_(None), |
| 55 | + Gtfsdataset.service_date_range_end.is_(None), |
| 56 | + ) |
| 57 | + ) |
| 58 | + .filter( |
| 59 | + Gtfsdataset.validation_reports.any( |
| 60 | + is_version_gte("6.0.0", Validationreport.validator_version) |
| 61 | + ) |
| 62 | + ) |
| 63 | + ).all() |
| 64 | + |
| 65 | + logging.info(f"Found {len(datasets)} datasets to process.") |
| 66 | + |
| 67 | + for dataset in datasets: |
| 68 | + logging.info(f"Processing gtfsdataset ID {dataset.stable_id}") |
| 69 | + gtfsdataset_id = dataset.stable_id |
| 70 | + feed_stable_id = "-".join(gtfsdataset_id.split("-")[0:2]) |
| 71 | + # Get the latest validation report for the dataset |
| 72 | + latest_validation_report = max( |
| 73 | + dataset.validation_reports, |
| 74 | + key=lambda report: report.validated_at, |
| 75 | + default=None, |
| 76 | + ) |
| 77 | + |
| 78 | + if not latest_validation_report: |
| 79 | + logging.info( |
| 80 | + f"Skipping gtfsdataset ID {gtfsdataset_id}: no validation reports found." |
| 81 | + ) |
| 82 | + continue |
| 83 | + |
| 84 | + json_report_url = latest_validation_report.json_report |
| 85 | + |
| 86 | + try: |
| 87 | + # Download the JSON report |
| 88 | + blob_url = f"{feed_stable_id}/{gtfsdataset_id}/report_{latest_validation_report.validator_version}.json" |
| 89 | + logging.info("Blob URL: " + blob_url) |
| 90 | + dataset_blob = bucket.blob(blob_url) |
| 91 | + if not dataset_blob.exists(): |
| 92 | + logging.info("Blob not found, downloading from URL") |
| 93 | + response = requests.get(json_report_url) |
| 94 | + response.raise_for_status() |
| 95 | + json_data = response.json() |
| 96 | + else: |
| 97 | + logging.info("Blob found, downloading from blob") |
| 98 | + json_data = json.loads(dataset_blob.download_as_string()) |
| 99 | + |
| 100 | + extracted_service_start_date = ( |
| 101 | + json_data.get("summary", {}) |
| 102 | + .get("feedInfo", {}) |
| 103 | + .get("feedServiceWindowStart", None) |
| 104 | + ) |
| 105 | + extracted_service_end_date = ( |
| 106 | + json_data.get("summary", {}) |
| 107 | + .get("feedInfo", {}) |
| 108 | + .get("feedServiceWindowEnd", None) |
| 109 | + ) |
| 110 | + |
| 111 | + try: |
| 112 | + datetime.strptime(extracted_service_start_date, "%Y-%m-%d") |
| 113 | + except ValueError: |
| 114 | + logging.error( |
| 115 | + f""" |
| 116 | + Key 'summary.feedInfo.feedStartDate' not found or bad value in |
| 117 | + JSON for gtfsdataset ID {gtfsdataset_id}. value: {extracted_service_start_date} |
| 118 | + """ |
| 119 | + ) |
| 120 | + continue |
| 121 | + |
| 122 | + try: |
| 123 | + datetime.strptime(extracted_service_end_date, "%Y-%m-%d") |
| 124 | + except ValueError: |
| 125 | + logging.error( |
| 126 | + f""" |
| 127 | + Key 'summary.feedInfo.feedEndDate' not found or bad value in |
| 128 | + JSON for gtfsdataset ID {gtfsdataset_id}. value: {extracted_service_end_date} |
| 129 | + """ |
| 130 | + ) |
| 131 | + continue |
| 132 | + |
| 133 | + dataset.service_date_range_start = extracted_service_start_date |
| 134 | + dataset.service_date_range_end = extracted_service_end_date |
| 135 | + |
| 136 | + formatted_dates = ( |
| 137 | + extracted_service_start_date + " - " + extracted_service_end_date |
| 138 | + ) |
| 139 | + logging.info( |
| 140 | + f"Updated gtfsdataset ID {gtfsdataset_id} with value: {formatted_dates}" |
| 141 | + ) |
| 142 | + total_changes_count += 1 |
| 143 | + changes_count += 1 |
| 144 | + if changes_count >= elements_per_commit: |
| 145 | + try: |
| 146 | + changes_count = 0 |
| 147 | + session.commit() |
| 148 | + logging.info(f"{changes_count} elements committed.") |
| 149 | + except Exception as e: |
| 150 | + logging.error("Error committing changes:", e) |
| 151 | + session.rollback() |
| 152 | + session.close() |
| 153 | + raise Exception(f"Error creating dataset: {e}") |
| 154 | + |
| 155 | + except requests.RequestException as e: |
| 156 | + logging.error( |
| 157 | + f"Error downloading JSON for gtfsdataset ID {gtfsdataset_id}: {e}" |
| 158 | + ) |
| 159 | + except json.JSONDecodeError as e: |
| 160 | + logging.error( |
| 161 | + f"Error parsing JSON for gtfsdataset ID {gtfsdataset_id}: {e}" |
| 162 | + ) |
| 163 | + except Exception as e: |
| 164 | + logging.error(f"Error processing gtfsdataset ID {gtfsdataset_id}: {e}") |
| 165 | + |
| 166 | + try: |
| 167 | + session.commit() |
| 168 | + logging.info("Database changes committed.") |
| 169 | + session.close() |
| 170 | + return total_changes_count |
| 171 | + except Exception as e: |
| 172 | + logging.error("Error committing changes:", e) |
| 173 | + session.rollback() |
| 174 | + session.close() |
| 175 | + raise Exception(f"Error creating dataset: {e}") |
| 176 | + |
| 177 | + |
| 178 | +@functions_framework.http |
| 179 | +def backfill_dataset_service_date_range(_): |
| 180 | + """Fills gtfs dataset service date range from the latest validation report.""" |
| 181 | + Logger.init_logger() |
| 182 | + db = Database(database_url=os.getenv("FEEDS_DATABASE_URL")) |
| 183 | + change_count = 0 |
| 184 | + try: |
| 185 | + with db.start_db_session() as session: |
| 186 | + logging.info("Database session started.") |
| 187 | + change_count = backfill_datasets(session) |
| 188 | + |
| 189 | + except Exception as error: |
| 190 | + logging.error(f"Error setting the datasets service date range values: {error}") |
| 191 | + return f"Error setting the datasets service date range values: {error}", 500 |
| 192 | + |
| 193 | + return f"Script executed successfully. {change_count} datasets updated", 200 |
0 commit comments