Skip to content

Commit d5c46c2

Browse files
authored
fix: trigger downstream tasks when a new feed is added (#815)
1 parent ca3e5cb commit d5c46c2

File tree

3 files changed

+34
-22
lines changed

3 files changed

+34
-22
lines changed

.github/workflows/db-update.yml

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ jobs:
8787

8888
- name: Google Cloud Setup
8989
uses: google-github-actions/setup-gcloud@v2
90-
90+
9191
- name: Load secrets from 1Password
9292
uses: 1password/[email protected]
9393
with:
@@ -97,12 +97,12 @@ jobs:
9797
GCP_FEED_SSH_USER: "op://rbiv7rvkkrsdlpcrz3bmv7nmcu/GCP_FEED_SSH_USER/username"
9898
GCP_FEED_BASTION_NAME: "op://rbiv7rvkkrsdlpcrz3bmv7nmcu/GCP_FEED_BASTION_NAME/username"
9999
GCP_FEED_BASTION_SSH_KEY: "op://rbiv7rvkkrsdlpcrz3bmv7nmcu/GCP_FEED_BASTION_SSH_KEY/private key"
100-
100+
101101
- name: Tunnel
102102
run: |
103103
mkdir -p ~/.ssh
104104
echo "${{ env.GCP_FEED_BASTION_SSH_KEY }}" > ~/.ssh/id_rsa
105-
chmod 600 ~/.ssh/id_rsa
105+
chmod 600 ~/.ssh/id_rsa
106106
./scripts/tunnel-create.sh -project_id ${{ inputs.PROJECT_ID }} -zone ${{ inputs.REGION }}-a -instance ${{ env.GCP_FEED_BASTION_NAME }}-${{ inputs.DB_ENVIRONMENT}} -target_account ${{ env.GCP_FEED_SSH_USER }} -db_instance ${{ secrets.POSTGRE_SQL_INSTANCE_NAME }}
107107
sleep 10 # Wait for the tunnel to establish
108108
@@ -112,21 +112,21 @@ jobs:
112112
PGPASSWORD=${{ secrets.DB_USER_PASSWORD }} psql -h localhost -p 5432 -U ${{ secrets.DB_USER_NAME }} -d ${{ inputs.DB_NAME }} -c "SELECT version();"
113113
114114
- name: Run Liquibase
115-
run: |
115+
run: |
116116
wget -O- https://repo.liquibase.com/liquibase.asc | gpg --dearmor > liquibase-keyring.gpg && \
117117
cat liquibase-keyring.gpg | sudo tee /usr/share/keyrings/liquibase-keyring.gpg > /dev/null && \
118118
echo 'deb [trusted=yes arch=amd64 signed-by=/usr/share/keyrings/liquibase-keyring.gpg] https://repo.liquibase.com stable main' | sudo tee /etc/apt/sources.list.d/liquibase.list
119-
119+
120120
sudo apt-get update
121121
sudo apt-get install liquibase=4.25.1
122-
122+
123123
export LIQUIBASE_CLASSPATH="liquibase"
124124
export LIQUIBASE_COMMAND_CHANGELOG_FILE="changelog.xml"
125125
export LIQUIBASE_COMMAND_URL=jdbc:postgresql://localhost:5432/${{ inputs.DB_NAME }}
126126
export LIQUIBASE_COMMAND_USERNAME=${{ secrets.DB_USER_NAME }}
127127
export LIQUIBASE_COMMAND_PASSWORD=${{ secrets.DB_USER_PASSWORD }}
128128
export LIQUIBASE_LOG_LEVEL=FINE
129-
129+
130130
liquibase update
131131
132132
db-content-update:
@@ -224,7 +224,7 @@ jobs:
224224
if: ${{ github.event_name == 'repository_dispatch' || github.event_name == 'workflow_dispatch' }}
225225
runs-on: ubuntu-latest
226226
steps:
227-
- name: Authenticate to Google Cloud QA/PROD
227+
- name: Authenticate to Google Cloud
228228
uses: google-github-actions/auth@v2
229229
with:
230230
credentials_json: ${{ secrets.GCP_MOBILITY_FEEDS_SA_KEY }}

api/src/scripts/load_dataset_on_create.py

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,37 @@
11
import json
2-
import logging
32
import os
43
import threading
54
import uuid
65
from typing import List
6+
from concurrent import futures
77

8-
from database_gen.sqlacodegen_models import Feed
8+
from google.auth import default
99
from google.cloud import pubsub_v1
1010
from google.cloud.pubsub_v1.futures import Future
1111

12-
env = os.getenv("ENV", "dev")
13-
pubsub_topic_name = f"datasets-batch-topic-{env}"
14-
project_id = f"mobility-feeds-{env}"
12+
from database_gen.sqlacodegen_models import Feed
13+
from utils.logger import Logger
14+
1515
# Lazy create so we won't try to connect to google cloud when the file is imported.
1616
pubsub_client = None
1717

1818
lock = threading.Lock()
19+
logger = Logger("load_dataset_on_create").get_logger()
1920

2021

2122
def get_pubsub_client():
2223
with lock:
2324
global pubsub_client
2425
if pubsub_client is None:
2526
pubsub_client = pubsub_v1.PublisherClient()
27+
2628
return pubsub_client
2729

2830

2931
def get_topic_path():
30-
if pubsub_topic_name is None or project_id is None:
31-
raise ValueError("PUBSUB_TOPIC_NAME and PROJECT_ID must be set in the environment")
32-
32+
env = os.getenv("ENV", "dev")
33+
pubsub_topic_name = f"datasets-batch-topic-{env}"
34+
project_id = f"mobility-feeds-{env}" # Cannot use GOOGLE_CLOUD_PROJECT because it points to QA for DEV
3335
return get_pubsub_client().topic_path(project_id, pubsub_topic_name)
3436

3537

@@ -42,16 +44,17 @@ def publish_callback(future: Future, stable_id: str, topic_path: str):
4244
@param topic_path: The path to the Pub/Sub topic
4345
"""
4446
if future.exception():
45-
logging.info(f"Error publishing feed {stable_id} to Pub/Sub topic {topic_path}: {future.exception()}")
47+
logger.info(f"Error publishing feed {stable_id} to Pub/Sub topic {topic_path}: {future.exception()}")
4648
else:
47-
logging.info(f"Published stable_id = {stable_id}.")
49+
logger.info(f"Published stable_id = {stable_id}.")
4850

4951

50-
def publish(feed: Feed, topic_path: str):
52+
def publish(feed: Feed, topic_path: str) -> Future:
5153
"""
5254
Publishes a feed to the Pub/Sub topic.
5355
:param feed: The feed to publish
5456
:param topic_path: The path to the Pub/Sub topic
57+
:return: The Future object representing the result of the publishing operation
5558
"""
5659
payload = {
5760
"execution_id": f"batch-uuid-{uuid.uuid4()}",
@@ -67,6 +70,7 @@ def publish(feed: Feed, topic_path: str):
6770
data_bytes = json.dumps(payload).encode("utf-8")
6871
future = get_pubsub_client().publish(topic_path, data=data_bytes)
6972
future.add_done_callback(lambda _: publish_callback(future, feed.stable_id, topic_path))
73+
return future
7074

7175

7276
def publish_all(feeds: List[Feed]):
@@ -75,6 +79,14 @@ def publish_all(feeds: List[Feed]):
7579
:param feeds: The list of feeds to publish
7680
"""
7781
topic_path = get_topic_path()
82+
logger.info(f"Publishing {len(feeds)} feeds to Pub/Sub topic {topic_path}...")
83+
credentials, project = default()
84+
logger.info(f"Authenticated project: {project}")
85+
logger.info(f"Service Account Email: {credentials.service_account_email}")
86+
publish_futures = []
7887
for feed in feeds:
79-
publish(feed, topic_path)
80-
logging.info(f"Published {len(feeds)} feeds to Pub/Sub topic {topic_path}.")
88+
logger.info(f"Publishing feed {feed.stable_id} to Pub/Sub topic {topic_path}...")
89+
future = publish(feed, topic_path)
90+
publish_futures.append(future)
91+
futures.wait(publish_futures, return_when=futures.ALL_COMPLETED)
92+
logger.info(f"Published {len(feeds)} feeds to Pub/Sub topic {topic_path}.")

api/src/scripts/populate_db_gtfs.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,7 @@ def trigger_downstream_tasks(self):
228228
Trigger downstream tasks after populating the database
229229
"""
230230
self.logger.info("Triggering downstream tasks")
231-
self.logger.debug(
231+
self.logger.info(
232232
f"New feeds added to the database: "
233233
f"{','.join([feed.stable_id for feed in self.added_gtfs_feeds] if self.added_gtfs_feeds else [])}"
234234
)

0 commit comments

Comments
 (0)