Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions .github/workflows/db-update.yml
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ jobs:

- name: Google Cloud Setup
uses: google-github-actions/setup-gcloud@v2

- name: Load secrets from 1Password
uses: 1password/[email protected]
with:
Expand All @@ -97,12 +97,12 @@ jobs:
GCP_FEED_SSH_USER: "op://rbiv7rvkkrsdlpcrz3bmv7nmcu/GCP_FEED_SSH_USER/username"
GCP_FEED_BASTION_NAME: "op://rbiv7rvkkrsdlpcrz3bmv7nmcu/GCP_FEED_BASTION_NAME/username"
GCP_FEED_BASTION_SSH_KEY: "op://rbiv7rvkkrsdlpcrz3bmv7nmcu/GCP_FEED_BASTION_SSH_KEY/private key"

- name: Tunnel
run: |
mkdir -p ~/.ssh
echo "${{ env.GCP_FEED_BASTION_SSH_KEY }}" > ~/.ssh/id_rsa
chmod 600 ~/.ssh/id_rsa
chmod 600 ~/.ssh/id_rsa
./scripts/tunnel-create.sh -project_id ${{ inputs.PROJECT_ID }} -zone ${{ inputs.REGION }}-a -instance ${{ env.GCP_FEED_BASTION_NAME }}-${{ inputs.DB_ENVIRONMENT}} -target_account ${{ env.GCP_FEED_SSH_USER }} -db_instance ${{ secrets.POSTGRE_SQL_INSTANCE_NAME }}
sleep 10 # Wait for the tunnel to establish

Expand All @@ -112,21 +112,21 @@ jobs:
PGPASSWORD=${{ secrets.DB_USER_PASSWORD }} psql -h localhost -p 5432 -U ${{ secrets.DB_USER_NAME }} -d ${{ inputs.DB_NAME }} -c "SELECT version();"

- name: Run Liquibase
run: |
run: |
wget -O- https://repo.liquibase.com/liquibase.asc | gpg --dearmor > liquibase-keyring.gpg && \
cat liquibase-keyring.gpg | sudo tee /usr/share/keyrings/liquibase-keyring.gpg > /dev/null && \
echo 'deb [trusted=yes arch=amd64 signed-by=/usr/share/keyrings/liquibase-keyring.gpg] https://repo.liquibase.com stable main' | sudo tee /etc/apt/sources.list.d/liquibase.list

sudo apt-get update
sudo apt-get install liquibase=4.25.1

export LIQUIBASE_CLASSPATH="liquibase"
export LIQUIBASE_COMMAND_CHANGELOG_FILE="changelog.xml"
export LIQUIBASE_COMMAND_URL=jdbc:postgresql://localhost:5432/${{ inputs.DB_NAME }}
export LIQUIBASE_COMMAND_USERNAME=${{ secrets.DB_USER_NAME }}
export LIQUIBASE_COMMAND_PASSWORD=${{ secrets.DB_USER_PASSWORD }}
export LIQUIBASE_LOG_LEVEL=FINE

liquibase update

db-content-update:
Expand Down Expand Up @@ -224,7 +224,7 @@ jobs:
if: ${{ github.event_name == 'repository_dispatch' || github.event_name == 'workflow_dispatch' }}
runs-on: ubuntu-latest
steps:
- name: Authenticate to Google Cloud QA/PROD
- name: Authenticate to Google Cloud
uses: google-github-actions/auth@v2
with:
credentials_json: ${{ secrets.GCP_MOBILITY_FEEDS_SA_KEY }}
Expand Down
38 changes: 25 additions & 13 deletions api/src/scripts/load_dataset_on_create.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,37 @@
import json
import logging
import os
import threading
import uuid
from typing import List
from concurrent import futures

from database_gen.sqlacodegen_models import Feed
from google.auth import default
from google.cloud import pubsub_v1
from google.cloud.pubsub_v1.futures import Future

env = os.getenv("ENV", "dev")
pubsub_topic_name = f"datasets-batch-topic-{env}"
project_id = f"mobility-feeds-{env}"
from database_gen.sqlacodegen_models import Feed
from utils.logger import Logger

# Lazy create so we won't try to connect to google cloud when the file is imported.
pubsub_client = None

lock = threading.Lock()
logger = Logger("load_dataset_on_create").get_logger()


def get_pubsub_client():
with lock:
global pubsub_client
if pubsub_client is None:
pubsub_client = pubsub_v1.PublisherClient()

return pubsub_client


def get_topic_path():
if pubsub_topic_name is None or project_id is None:
raise ValueError("PUBSUB_TOPIC_NAME and PROJECT_ID must be set in the environment")

env = os.getenv("ENV", "dev")
pubsub_topic_name = f"datasets-batch-topic-{env}"
project_id = f"mobility-feeds-{env}" # Cannot use GOOGLE_CLOUD_PROJECT because it points to QA for DEV
return get_pubsub_client().topic_path(project_id, pubsub_topic_name)


Expand All @@ -42,16 +44,17 @@ def publish_callback(future: Future, stable_id: str, topic_path: str):
@param topic_path: The path to the Pub/Sub topic
"""
if future.exception():
logging.info(f"Error publishing feed {stable_id} to Pub/Sub topic {topic_path}: {future.exception()}")
logger.info(f"Error publishing feed {stable_id} to Pub/Sub topic {topic_path}: {future.exception()}")
else:
logging.info(f"Published stable_id = {stable_id}.")
logger.info(f"Published stable_id = {stable_id}.")


def publish(feed: Feed, topic_path: str):
def publish(feed: Feed, topic_path: str) -> Future:
"""
Publishes a feed to the Pub/Sub topic.
:param feed: The feed to publish
:param topic_path: The path to the Pub/Sub topic
:return: The Future object representing the result of the publishing operation
"""
payload = {
"execution_id": f"batch-uuid-{uuid.uuid4()}",
Expand All @@ -67,6 +70,7 @@ def publish(feed: Feed, topic_path: str):
data_bytes = json.dumps(payload).encode("utf-8")
future = get_pubsub_client().publish(topic_path, data=data_bytes)
future.add_done_callback(lambda _: publish_callback(future, feed.stable_id, topic_path))
return future


def publish_all(feeds: List[Feed]):
Expand All @@ -75,6 +79,14 @@ def publish_all(feeds: List[Feed]):
:param feeds: The list of feeds to publish
"""
topic_path = get_topic_path()
logger.info(f"Publishing {len(feeds)} feeds to Pub/Sub topic {topic_path}...")
credentials, project = default()
logger.info(f"Authenticated project: {project}")
logger.info(f"Service Account Email: {credentials.service_account_email}")
publish_futures = []
for feed in feeds:
publish(feed, topic_path)
logging.info(f"Published {len(feeds)} feeds to Pub/Sub topic {topic_path}.")
logger.info(f"Publishing feed {feed.stable_id} to Pub/Sub topic {topic_path}...")
future = publish(feed, topic_path)
publish_futures.append(future)
futures.wait(publish_futures, return_when=futures.ALL_COMPLETED)
logger.info(f"Published {len(feeds)} feeds to Pub/Sub topic {topic_path}.")
2 changes: 1 addition & 1 deletion api/src/scripts/populate_db_gtfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ def trigger_downstream_tasks(self):
Trigger downstream tasks after populating the database
"""
self.logger.info("Triggering downstream tasks")
self.logger.debug(
self.logger.info(
f"New feeds added to the database: "
f"{','.join([feed.stable_id for feed in self.added_gtfs_feeds] if self.added_gtfs_feeds else [])}"
)
Expand Down