|
| 1 | +import logging |
| 2 | +import os |
| 3 | +import socket |
| 4 | +import subprocess |
| 5 | +from typing import Dict |
| 6 | +import uuid |
| 7 | +from io import BytesIO |
| 8 | + |
| 9 | +import requests |
| 10 | + |
| 11 | +from shared.database.database import with_db_session |
| 12 | +from shared.database_gen.sqlacodegen_models import Gtfsfeed, Gbfsfeed |
| 13 | +from shared.helpers.runtime_metrics import track_metrics |
| 14 | + |
| 15 | +from google.cloud import storage |
| 16 | +from sqlalchemy.orm import Session |
| 17 | + |
| 18 | + |
| 19 | +EMULATOR_STORAGE_BUCKET_NAME = "verifier" |
| 20 | +EMULATOR_HOST = "localhost" |
| 21 | +EMULATOR_STORAGE_PORT = 9023 |
| 22 | + |
| 23 | + |
| 24 | +@track_metrics(metrics=("time", "memory", "cpu")) |
| 25 | +def download_to_local( |
| 26 | + feed_stable_id: str, url: str, filename: str, force_download: bool = False |
| 27 | +): |
| 28 | + """ |
| 29 | + Download a file from a URL and upload it to the Google Cloud Storage emulator. |
| 30 | + If the file already exists, it will not be downloaded again. |
| 31 | + Args: |
| 32 | + url (str): The URL to download the file from. |
| 33 | + filename (str): The name of the file to save in the emulator. |
| 34 | + """ |
| 35 | + if not url: |
| 36 | + return |
| 37 | + blob_path = f"{feed_stable_id}/{filename}" |
| 38 | + client = storage.Client() |
| 39 | + bucket = client.bucket(EMULATOR_STORAGE_BUCKET_NAME) |
| 40 | + blob = bucket.blob(blob_path) |
| 41 | + |
| 42 | + # Check if the blob already exists in the emulator |
| 43 | + if not blob.exists() or force_download: |
| 44 | + logging.info(f"Downloading and uploading: {blob_path}") |
| 45 | + with requests.get(url, stream=True) as response: |
| 46 | + response.raise_for_status() |
| 47 | + blob.content_type = "application/json" |
| 48 | + # The file is downloaded into memory before uploading to ensure it's seekable. |
| 49 | + # Be careful with large files. |
| 50 | + data = BytesIO(response.content) |
| 51 | + blob.upload_from_file(data, rewind=True) |
| 52 | + else: |
| 53 | + logging.info( |
| 54 | + f"Blob already exists: gs://{EMULATOR_STORAGE_BUCKET_NAME}/{blob_path}" |
| 55 | + ) |
| 56 | + |
| 57 | + |
| 58 | +@with_db_session |
| 59 | +def create_test_data(feed_stable_id: str, feed_dict: Dict, db_session: Session = None): |
| 60 | + """ |
| 61 | + Create test data in the database if it does not exist. |
| 62 | + This function is used to ensure that the reverse geolocation process has the necessary data to work with. |
| 63 | + """ |
| 64 | + # Here you would typically interact with your database to create the necessary test data |
| 65 | + # For this example, we will just log the action |
| 66 | + logging.info(f"Creating test data for {feed_stable_id} with data: {feed_dict}") |
| 67 | + model = Gtfsfeed if feed_dict["data_type"] == "gtfs" else Gbfsfeed |
| 68 | + local_feed = ( |
| 69 | + db_session.query(model).filter(model.stable_id == feed_stable_id).one_or_none() |
| 70 | + ) |
| 71 | + if not local_feed: |
| 72 | + local_feed = model( |
| 73 | + id=uuid.uuid4(), |
| 74 | + stable_id=feed_stable_id, |
| 75 | + data_type=feed_dict["data_type"], |
| 76 | + feed_name="Test Feed", |
| 77 | + note="This is a test feed created for reverse geolocation verification.", |
| 78 | + producer_url="https://files.mobilitydatabase.org/mdb-2014/mdb-2014-202508120303/mdb-2014-202508120303.zip", |
| 79 | + authentication_type="0", |
| 80 | + status="active", |
| 81 | + ) |
| 82 | + db_session.add(local_feed) |
| 83 | + db_session.commit() |
| 84 | + |
| 85 | + |
| 86 | +def setup_local_storage_emulator(): |
| 87 | + """ |
| 88 | + Setup the Google Cloud Storage emulator by creating the necessary bucket. |
| 89 | + """ |
| 90 | + from gcp_storage_emulator.server import create_server |
| 91 | + |
| 92 | + os.environ[ |
| 93 | + "STORAGE_EMULATOR_HOST" |
| 94 | + ] = f"http://{EMULATOR_HOST}:{EMULATOR_STORAGE_PORT}" |
| 95 | + os.environ["DATASETS_BUCKET_NAME_GBFS"] = EMULATOR_STORAGE_BUCKET_NAME |
| 96 | + os.environ["DATASETS_BUCKET_NAME_GTFS"] = EMULATOR_STORAGE_BUCKET_NAME |
| 97 | + os.environ["DATASTORE_EMULATOR_HOST"] = "localhost:8081" |
| 98 | + server = create_server( |
| 99 | + host=EMULATOR_HOST, |
| 100 | + port=EMULATOR_STORAGE_PORT, |
| 101 | + in_memory=False, |
| 102 | + default_bucket=EMULATOR_STORAGE_BUCKET_NAME, |
| 103 | + ) |
| 104 | + server.start() |
| 105 | + return server |
| 106 | + |
| 107 | + |
| 108 | +def shutdown_local_storage_emulator(server): |
| 109 | + """Shutdown the Google Cloud Storage emulator.""" |
| 110 | + server.stop() |
| 111 | + |
| 112 | + |
| 113 | +def is_datastore_emulator_running(host=EMULATOR_HOST, port=8081): |
| 114 | + """Check if the Google Cloud Datastore emulator is running.""" |
| 115 | + try: |
| 116 | + with socket.create_connection((host, port), timeout=2): |
| 117 | + return True |
| 118 | + except OSError: |
| 119 | + return False |
| 120 | + |
| 121 | + |
| 122 | +def start_datastore_emulator(project_id="test-project"): |
| 123 | + """Start the Google Cloud Datastore emulator if it's not already running.""" |
| 124 | + if not is_datastore_emulator_running(): |
| 125 | + process = subprocess.Popen( |
| 126 | + [ |
| 127 | + "gcloud", |
| 128 | + "beta", |
| 129 | + "emulators", |
| 130 | + "datastore", |
| 131 | + "start", |
| 132 | + "--project={}".format(project_id), |
| 133 | + "--host-port=localhost:8081", |
| 134 | + ] |
| 135 | + ) |
| 136 | + return process |
| 137 | + return None # Already running |
| 138 | + |
| 139 | + |
| 140 | +def shutdown_datastore_emulator(process): |
| 141 | + """Shutdown the Google Cloud Datastore emulator.""" |
| 142 | + if process: |
| 143 | + process.terminate() |
| 144 | + process.wait() |
0 commit comments