Skip to content

Commit d73671c

Browse files
authored
Merge branch 'develop' into u/ch/swift
2 parents 037ba42 + 645edf4 commit d73671c

File tree

11 files changed

+456
-50
lines changed

11 files changed

+456
-50
lines changed
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
# Use the official lightweight Python image.
2+
# https://hub.docker.com/_/python
3+
FROM python:3.12-slim
4+
5+
# Allow statements and log messages to immediately appear in the Knative logs
6+
ENV PYTHONUNBUFFERED True
7+
8+
# Copy local code to the container image.
9+
ENV APP_HOME /app
10+
WORKDIR $APP_HOME
11+
COPY . ./
12+
13+
# Install production dependencies.
14+
RUN pip install --no-cache-dir -r requirements.txt
15+
16+
# Run the web service on container startup. Here we use the gunicorn
17+
# webserver, with one worker process and 8 threads.
18+
# For environments with multiple CPU cores, increase the number of workers
19+
# to be equal to the cores available.
20+
# Timeout is set to 0 to disable the timeouts of the workers to allow Cloud Run to handle instance scaling.
21+
CMD exec gunicorn --bind :$PORT --workers 1 --threads 8 --timeout 0 main:app
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
# https://cloud.google.com/build/docs/deploying-builds/deploy-cloud-run
2+
# containerize the module and deploy it to Cloud Run
3+
steps:
4+
# Build the image
5+
- name: 'gcr.io/cloud-builders/docker'
6+
args: ['build', '-t', '${_REGION}-docker.pkg.dev/${PROJECT_ID}/${_REPOSITORY}/${_MODULE_IMAGE_NAME}', '.']
7+
# Push the image to Artifact Registry
8+
- name: 'gcr.io/cloud-builders/docker'
9+
args: ['push', '${_REGION}-docker.pkg.dev/${PROJECT_ID}/${_REPOSITORY}/${_MODULE_IMAGE_NAME}']
10+
# Deploy image to Cloud Run
11+
- name: 'gcr.io/google.com/cloudsdktool/cloud-sdk'
12+
entrypoint: gcloud
13+
args: ['run', 'deploy', '${_MODULE_NAME}', '--image', '${_REGION}-docker.pkg.dev/${PROJECT_ID}/${_REPOSITORY}/${_MODULE_IMAGE_NAME}', '--region', '${_REGION}', '--set-env-vars', '${_ENV_VARS}']
14+
images:
15+
- '${_REGION}-docker.pkg.dev/${PROJECT_ID}/${_REPOSITORY}/${_MODULE_IMAGE_NAME}'
16+
substitutions:
17+
_SURVEY: 'lvk'
18+
_TESTID: 'testid'
19+
_MODULE_NAME: '${_SURVEY}-alerts-to-storage-${_TESTID}'
20+
_MODULE_IMAGE_NAME: 'gcr.io/${PROJECT_ID}/${_REPOSITORY}/${_MODULE_NAME}'
21+
_REPOSITORY: 'cloud-run-services'
22+
# cloud functions automatically sets the projectid env var using the name "GCP_PROJECT"
23+
# use the same name here for consistency
24+
# [TODO] PROJECT_ID is set in setup.sh. this is confusing and we should revisit the decision.
25+
# i (Raen) think i didn't make it a substitution because i didn't want to set a default for it.
26+
_ENV_VARS: 'GCP_PROJECT=${PROJECT_ID},SURVEY=${_SURVEY},TESTID=${_TESTID},VERSIONTAG=${_VERSIONTAG}'
27+
_REGION: 'us-central1'
28+
options:
29+
dynamic_substitutions: true
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
#! /bin/bash
2+
# Deploys or deletes broker Cloud Run service
3+
# This script will not delete Cloud Run services that are in production
4+
5+
# "False" uses production resources
6+
# any other string will be appended to the names of all resources
7+
testid="${1:-test}"
8+
# "True" tearsdown/deletes resources, else setup
9+
teardown="${2:-False}"
10+
# name of the survey this broker instance will ingest
11+
survey="${3:-lvk}"
12+
region="${4:-us-central1}"
13+
versiontag="${5:-v1_0}"
14+
# get the environment variable
15+
PROJECT_ID=$GOOGLE_CLOUD_PROJECT
16+
PROJECT_NUMBER=$(gcloud projects describe "$PROJECT_ID" --format="value(projectNumber)")
17+
18+
MODULE_NAME="alerts-to-storage" # lower case required by cloud run
19+
ROUTE_RUN="/" # url route that will trigger main.run()
20+
21+
define_GCP_resources() {
22+
local base_name="$1"
23+
local separator="${2:--}"
24+
local testid_suffix=""
25+
26+
if [ "$testid" != "False" ] && [ -n "$testid" ]; then
27+
testid_suffix="${separator}${testid}"
28+
fi
29+
echo "${base_name}${testid_suffix}"
30+
}
31+
32+
#--- GCP resources used in this script
33+
artifact_registry_repo=$(define_GCP_resources "${survey}-cloud-run-services")
34+
cr_module_name=$(define_GCP_resources "${survey}-${MODULE_NAME}") # lower case required by cloud run
35+
gcs_alerts_bucket=$(define_GCP_resources "${PROJECT_ID}-${survey}_alerts")
36+
ps_deadletter_topic=$(define_GCP_resources "${survey}-deadletter")
37+
ps_input_subscrip=$(define_GCP_resources "${survey}-alerts_raw") # pub/sub subscription used to trigger cloud run module
38+
ps_topic_alerts_in_bucket=$(define_GCP_resources "projects/${PROJECT_ID}/topics/${survey}-alerts_in_bucket")
39+
ps_trigger_topic=$(define_GCP_resources "${survey}-alerts_raw")
40+
runinvoker_svcact="cloud-run-invoker@${PROJECT_ID}.iam.gserviceaccount.com"
41+
service_account="service-${PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com"
42+
43+
if [ "${teardown}" = "True" ]; then
44+
# ensure that we do not teardown production resources
45+
if [ "${testid}" != "False" ]; then
46+
echo
47+
echo "Deleting resources for ${MODULE_NAME} module..."
48+
gsutil rm -r "gs://${gcs_alerts_bucket}"
49+
gcloud pubsub subscriptions delete "${ps_input_subscrip}"
50+
gcloud pubsub topics delete "${ps_topic_alerts_in_bucket}"
51+
gcloud run services delete "${cr_module_name}" --region "${region}"
52+
else
53+
echo 'ERROR: No testid supplied.'
54+
echo 'To avoid accidents, this script will not delete production resources.'
55+
echo 'If that is your intention, you must delete them manually.'
56+
echo 'Otherwise, please supply a testid.'
57+
exit 1
58+
fi
59+
else
60+
echo
61+
echo "Creating gcs_alert_bucket, uploading files, and setting permissions..."
62+
if ! gsutil ls -b "gs://${gcs_alerts_bucket}" >/dev/null 2>&1; then
63+
#--- Create the bucket that will store the alerts
64+
gsutil mb -b on -l "${region}" "gs://${gcs_alerts_bucket}"
65+
gsutil uniformbucketlevelaccess set on "gs://${gcs_alerts_bucket}"
66+
gsutil requesterpays set on "gs://${gcs_alerts_bucket}"
67+
# set IAM policies on public GCP resources
68+
if [ "$testid" = "False" ]; then
69+
gcloud storage buckets add-iam-policy-binding "gs://${gcs_alerts_bucket}" \
70+
--member="allUsers" \
71+
--role="roles/storage.objectViewer"
72+
fi
73+
else
74+
echo "${gcs_alerts_bucket} already exists."
75+
fi
76+
77+
echo
78+
echo "Configuring Pub/Sub notifications on GCS bucket..."
79+
trigger_event=OBJECT_FINALIZE
80+
format=json # json or none; if json, file metadata sent in message body
81+
gsutil notification create \
82+
-t "$ps_topic_alerts_in_bucket" \
83+
-e "$trigger_event" \
84+
-f "$format" \
85+
"gs://${gcs_alerts_bucket}"
86+
87+
#--- Deploy Cloud Run service
88+
echo
89+
echo "Creating container image for ${MODULE_NAME} module and deploying to Cloud Run..."
90+
moduledir="." # assumes deploying what's in our current directory
91+
config="${moduledir}/cloudbuild.yaml"
92+
# deploy the service and capture the endpoint's URL
93+
url=$(gcloud builds submit --config="${config}" \
94+
--substitutions="_SURVEY=${survey},_TESTID=${testid},_MODULE_NAME=${cr_module_name},_REPOSITORY=${artifact_registry_repo},_VERSIONTAG=${versiontag}" \
95+
"${moduledir}" | sed -n 's/^Step #2: Service URL: \(.*\)$/\1/p')
96+
echo
97+
echo "Creating trigger subscription for ${MODULE_NAME} Cloud Run service..."
98+
gcloud pubsub subscriptions create "${ps_input_subscrip}" \
99+
--topic "${ps_trigger_topic}" \
100+
--topic-project "${PROJECT_ID}" \
101+
--ack-deadline=600 \
102+
--push-endpoint="${url}${ROUTE_RUN}" \
103+
--push-auth-service-account="${runinvoker_svcact}" \
104+
--dead-letter-topic="${ps_deadletter_topic}" \
105+
--max-delivery-attempts=5
106+
gcloud pubsub subscriptions add-iam-policy-binding "${ps_input_subscrip}" \
107+
--member="serviceAccount:${service_account}" \
108+
--role="roles/pubsub.subscriber"
109+
fi
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
#!/usr/bin/env python3
2+
# -*- coding: UTF-8 -*-
3+
4+
"""This module stores LVK alert data as a JSON file in Cloud Storage."""
5+
6+
import os
7+
import flask
8+
import pittgoogle
9+
import attrs
10+
from google.cloud import logging, storage
11+
from google.cloud.exceptions import PreconditionFailed
12+
13+
# [FIXME] Make this helpful or else delete it.
14+
# Connect the python logger to the google cloud logger.
15+
# By default, this captures INFO level and above.
16+
# pittgoogle uses the python logger.
17+
# We don't currently use the python logger directly in this script, but we could.
18+
logging.Client().setup_logging()
19+
20+
PROJECT_ID = os.getenv("GCP_PROJECT")
21+
TESTID = os.getenv("TESTID")
22+
SURVEY = os.getenv("SURVEY")
23+
VERSIONTAG = os.getenv("VERSIONTAG")
24+
25+
# Variables for incoming data
26+
# A url route is used in setup.sh when the trigger subscription is created.
27+
# It is possible to define multiple routes in a single module and trigger them using different subscriptions.
28+
ROUTE_RUN = "/" # HTTP route that will trigger run(). Must match deploy.sh
29+
30+
# Variables for outgoing data
31+
HTTP_204 = 204 # HTTP code: Success
32+
HTTP_400 = 400 # HTTP code: Bad Request
33+
34+
# GCP resources used in this module
35+
TOPIC_ALERTS = pittgoogle.Topic.from_cloud(
36+
"alerts", survey=SURVEY, testid=TESTID, projectid=PROJECT_ID
37+
)
38+
bucket_name = f"{PROJECT_ID}-{SURVEY}_alerts"
39+
if TESTID != "False":
40+
bucket_name = f"{bucket_name}-{TESTID}"
41+
42+
client = storage.Client()
43+
bucket = client.get_bucket(client.bucket(bucket_name, user_project=PROJECT_ID))
44+
45+
app = flask.Flask(__name__)
46+
47+
48+
@app.route(ROUTE_RUN, methods=["POST"])
49+
def run():
50+
"""Uploads alert data to a GCS bucket. Publishes a de-duplicated JSON-serialized "alerts" stream
51+
(${survey}-alerts) containing the original alert bytes. A BigQuery subscription is used to write alert data to
52+
the appropriate BigQuery table.
53+
54+
This module is intended to be deployed as a Cloud Run service. It will operate as an HTTP endpoint
55+
triggered by Pub/Sub messages. This function will be called once for every message sent to this route.
56+
It should accept the incoming HTTP request and return a response.
57+
58+
Returns
59+
-------
60+
response : tuple(str, int)
61+
Tuple containing the response body (string) and HTTP status code (int). Flask will convert the
62+
tuple into a proper HTTP response. Note that the response is a status message for the web server.
63+
"""
64+
# extract the envelope from the request that triggered the endpoint
65+
# this contains a single Pub/Sub message with the alert to be processed
66+
envelope = flask.request.get_json()
67+
try:
68+
alert = pittgoogle.Alert.from_cloud_run(envelope, "lvk")
69+
except pittgoogle.exceptions.BadRequest as exc:
70+
return str(exc), HTTP_400
71+
72+
# We need to know the schema version for a few different things across the broker,
73+
# but it's not included in LVK alerts. Add it now so we never have to worry about it again.
74+
msg_data = f'{{"schema_version": "{VERSIONTAG}", '.encode() + alert.msg.data[1:]
75+
alert.msg = attrs.evolve(alert.msg, data=msg_data)
76+
77+
# Force rebuild of alert.dict to include the schema_version field
78+
alert._dict = None
79+
_ = alert.dict
80+
81+
blob = bucket.blob(alert.name_in_bucket)
82+
blob.metadata = _create_file_metadata(alert, event_id=envelope["message"]["messageId"])
83+
84+
# raise a PreconditionFailed exception if filename already exists in the bucket using "if_generation_match=0"
85+
try:
86+
blob.upload_from_string(alert.msg.data, if_generation_match=0)
87+
except PreconditionFailed:
88+
# this alert is a duplicate. drop it.
89+
return "", HTTP_204
90+
91+
# publish the same alert as JSON
92+
TOPIC_ALERTS.publish(alert)
93+
94+
return "", HTTP_204
95+
96+
97+
def _create_file_metadata(alert: pittgoogle.Alert, event_id: str) -> dict:
98+
"""Return key/value pairs to be attached to the file as metadata."""
99+
# https://git.ligo.org/emfollow/igwn-gwalert-schema/-/blob/main/igwn.alerts.v1_0.Alert.schema.json
100+
metadata = {"file_origin_message_id": event_id}
101+
metadata["time_created"] = alert.dict["time_created"]
102+
metadata["alert_type"] = alert.dict["alert_type"]
103+
metadata["superevent_id"] = alert.dict["superevent_id"]
104+
metadata["schema_version"] = alert.dict["schema_version"]
105+
metadata["kafka.timestamp"] = alert.attributes["kafka.timestamp"]
106+
107+
return metadata
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
# As explained here
2+
# https://cloud.google.com/functions/docs/writing/specifying-dependencies-python
3+
# dependencies for a Cloud Function must be specified in a `requirements.txt`
4+
# file (or packaged with the function) in the same directory as `main.py`
5+
6+
google-cloud-logging
7+
google-cloud-storage
8+
pittgoogle-client>=0.3.16
9+
10+
# for Cloud Run
11+
# https://cloud.google.com/run/docs/quickstarts/build-and-deploy/deploy-python-service
12+
Flask
13+
gunicorn
14+
Werkzeug

broker/consumer/lsst/ps-connector.properties

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,3 +37,6 @@ cps.topic=PS_TOPIC
3737
cps.project=PROJECT_ID
3838
# include Kafka topic, partition, offset, timestamp as msg attributes
3939
metadata.publish=true
40+
# specify the maximum number of messages that can be received for the messages on a topic partition before publishing
41+
# them to Cloud Pub/Sub
42+
maxBufferSize=50

broker/consumer/lvk/vm_startup.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ fi
2323

2424
#--- GCP resources used in this script
2525
broker_bucket="${PROJECT_ID}-${survey}-broker_files"
26-
PS_TOPIC_DEFAULT="${survey}-alerts"
26+
PS_TOPIC_DEFAULT="${survey}-alerts_raw"
2727
# use test resources, if requested
2828
if [ "$testid" != "False" ]; then
2929
broker_bucket="${broker_bucket}-${testid}"

0 commit comments

Comments
 (0)