diff --git a/broker/cloud_run/swift/ps_to_storage/Dockerfile b/broker/cloud_run/swift/ps_to_storage/Dockerfile new file mode 100644 index 000000000..4d1b32e05 --- /dev/null +++ b/broker/cloud_run/swift/ps_to_storage/Dockerfile @@ -0,0 +1,21 @@ +# Use the official lightweight Python image. +# https://hub.docker.com/_/python +FROM python:3.12-slim + +# Allow statements and log messages to immediately appear in the Knative logs +ENV PYTHONUNBUFFERED True + +# Copy local code to the container image. +ENV APP_HOME /app +WORKDIR $APP_HOME +COPY . ./ + +# Install production dependencies. +RUN pip install --no-cache-dir -r requirements.txt + +# Run the web service on container startup. Here we use the gunicorn +# webserver, with one worker process and 8 threads. +# For environments with multiple CPU cores, increase the number of workers +# to be equal to the cores available. +# Timeout is set to 0 to disable the timeouts of the workers to allow Cloud Run to handle instance scaling. +CMD exec gunicorn --bind :$PORT --workers 1 --threads 8 --timeout 0 main:app diff --git a/broker/cloud_run/swift/ps_to_storage/cloudbuild.yaml b/broker/cloud_run/swift/ps_to_storage/cloudbuild.yaml new file mode 100644 index 000000000..bf1fa3e66 --- /dev/null +++ b/broker/cloud_run/swift/ps_to_storage/cloudbuild.yaml @@ -0,0 +1,29 @@ +# https://cloud.google.com/build/docs/deploying-builds/deploy-cloud-run +# containerize the module and deploy it to Cloud Run +steps: +# Build the image +- name: 'gcr.io/cloud-builders/docker' + args: ['build', '-t', '${_REGION}-docker.pkg.dev/${PROJECT_ID}/${_REPOSITORY}/${_MODULE_IMAGE_NAME}', '.'] +# Push the image to Artifact Registry +- name: 'gcr.io/cloud-builders/docker' + args: ['push', '${_REGION}-docker.pkg.dev/${PROJECT_ID}/${_REPOSITORY}/${_MODULE_IMAGE_NAME}'] +# Deploy image to Cloud Run +- name: 'gcr.io/google.com/cloudsdktool/cloud-sdk' + entrypoint: gcloud + args: ['run', 'deploy', '${_MODULE_NAME}', '--image', '${_REGION}-docker.pkg.dev/${PROJECT_ID}/${_REPOSITORY}/${_MODULE_IMAGE_NAME}', '--region', '${_REGION}', '--set-env-vars', '${_ENV_VARS}'] +images: +- '${_REGION}-docker.pkg.dev/${PROJECT_ID}/${_REPOSITORY}/${_MODULE_IMAGE_NAME}' +substitutions: + _SURVEY: 'swift' + _TESTID: 'testid' + _MODULE_NAME: '${_SURVEY}-alerts-to-storage-${_TESTID}' + _MODULE_IMAGE_NAME: 'gcr.io/${PROJECT_ID}/${_REPOSITORY}/${_MODULE_NAME}' + _REPOSITORY: 'cloud-run-services' + # cloud functions automatically sets the projectid env var using the name "GCP_PROJECT" + # use the same name here for consistency + # [TODO] PROJECT_ID is set in setup.sh. this is confusing and we should revisit the decision. + # i (Raen) think i didn't make it a substitution because i didn't want to set a default for it. + _ENV_VARS: 'GCP_PROJECT=${PROJECT_ID},SURVEY=${_SURVEY},TESTID=${_TESTID},VERSIONTAG=${_VERSIONTAG}' + _REGION: 'us-central1' +options: + dynamic_substitutions: true diff --git a/broker/cloud_run/swift/ps_to_storage/deploy.sh b/broker/cloud_run/swift/ps_to_storage/deploy.sh new file mode 100755 index 000000000..ba178abb4 --- /dev/null +++ b/broker/cloud_run/swift/ps_to_storage/deploy.sh @@ -0,0 +1,105 @@ +#! /bin/bash +# Deploys or deletes broker Cloud Run service +# This script will not delete Cloud Run services that are in production + +# "False" uses production resources +# any other string will be appended to the names of all resources +testid="${1:-test}" +# "True" tearsdown/deletes resources, else setup +teardown="${2:-False}" +# name of the survey this broker instance will ingest +survey="${3:-swift}" +region="${4:-us-central1}" +versiontag="${5:-v4_5_0}" +# get the environment variable +PROJECT_ID=$GOOGLE_CLOUD_PROJECT + +MODULE_NAME="alerts-to-storage" # lower case required by cloud run +ROUTE_RUN="/" # url route that will trigger main.run() + +define_GCP_resources() { + local base_name="$1" + local separator="${2:--}" + local testid_suffix="" + + if [ "$testid" != "False" ] && [ -n "$testid" ]; then + testid_suffix="${separator}${testid}" + fi + echo "${base_name}${testid_suffix}" +} + +#--- GCP resources used in this script +artifact_registry_repo=$(define_GCP_resources "${survey}-cloud-run-services") +cr_module_name=$(define_GCP_resources "${survey}-${MODULE_NAME}") # lower case required by cloud run +gcs_alerts_bucket=$(define_GCP_resources "${PROJECT_ID}-${survey}_alerts") +ps_deadletter_topic=$(define_GCP_resources "${survey}-deadletter") +ps_input_subscrip=$(define_GCP_resources "${survey}-alerts_raw") # pub/sub subscription used to trigger cloud run module +ps_topic_alerts_in_bucket=$(define_GCP_resources "projects/${PROJECT_ID}/topics/${survey}-alerts_in_bucket") +ps_trigger_topic=$(define_GCP_resources "${survey}-alerts_raw") +runinvoker_svcact="cloud-run-invoker@${PROJECT_ID}.iam.gserviceaccount.com" + +if [ "${teardown}" = "True" ]; then + # ensure that we do not teardown production resources + if [ "${testid}" != "False" ]; then + echo + echo "Deleting resources for ${MODULE_NAME} module..." + gsutil rm -r "gs://${gcs_alerts_bucket}" + gcloud pubsub topics delete "${ps_topic_alerts_in_bucket}" + gcloud pubsub subscriptions delete "${ps_input_subscrip}" + gcloud run services delete "${cr_module_name}" --region "${region}" + else + echo 'ERROR: No testid supplied.' + echo 'To avoid accidents, this script will not delete production resources.' + echo 'If that is your intention, you must delete them manually.' + echo 'Otherwise, please supply a testid.' + exit 1 + fi +else + echo + echo "Creating gcs_alerts_bucket and setting permissions..." + if ! gsutil ls -b "gs://${gcs_alerts_bucket}" >/dev/null 2>&1; then + #--- Create the bucket that will store the alerts + gsutil mb -l "${region}" "gs://${gcs_alerts_bucket}" + gsutil uniformbucketlevelaccess set on "gs://${gcs_alerts_bucket}" + gsutil requesterpays set on "gs://${gcs_alerts_bucket}" + # set IAM policies on public GCP resources + if [ "$testid" = "False" ]; then + gcloud storage buckets add-iam-policy-binding "gs://${gcs_alerts_bucket}" \ + --member="allUsers" \ + --role="roles/storage.objectViewer" + fi + else + echo "${gcs_alerts_bucket} already exists." + fi + + #--- Setup the Pub/Sub notifications on the JSON storage bucket + echo + echo "Configuring Pub/Sub notifications on GCS bucket..." + trigger_event=OBJECT_FINALIZE + format=json # json or none; if json, file metadata sent in message body + gsutil notification create \ + -t "$ps_topic_alerts_in_bucket" \ + -e "$trigger_event" \ + -f "$format" \ + "gs://${gcs_alerts_bucket}" + + #--- Deploy the Cloud Run service + echo + echo "Creating container image for ${MODULE_NAME} module and deploying to Cloud Run..." + moduledir="." # assumes deploying what's in our current directory + config="${moduledir}/cloudbuild.yaml" + # deploy the service and capture the endpoint's URL + url=$(gcloud builds submit --config="${config}" \ + --substitutions="_SURVEY=${survey},_TESTID=${testid},_MODULE_NAME=${cr_module_name},_REPOSITORY=${artifact_registry_repo},_VERSIONTAG=${versiontag}" \ + "${moduledir}" | sed -n 's/^Step #2: Service URL: \(.*\)$/\1/p') + echo + echo "Creating trigger subscription for ${MODULE_NAME} Cloud Run service..." + gcloud pubsub subscriptions create "${ps_input_subscrip}" \ + --topic "${ps_trigger_topic}" \ + --topic-project "${PROJECT_ID}" \ + --ack-deadline=600 \ + --push-endpoint="${url}${ROUTE_RUN}" \ + --push-auth-service-account="${runinvoker_svcact}" \ + --dead-letter-topic="${ps_deadletter_topic}" \ + --max-delivery-attempts=5 +fi diff --git a/broker/cloud_run/swift/ps_to_storage/main.py b/broker/cloud_run/swift/ps_to_storage/main.py new file mode 100644 index 000000000..ce0d8bdcd --- /dev/null +++ b/broker/cloud_run/swift/ps_to_storage/main.py @@ -0,0 +1,107 @@ +#!/usr/bin/env python3 +# -*- coding: UTF-8 -*- + +"""This module stores Swift/BAT-GUANO alert data as a JSON file in Cloud Storage.""" + +import os +import flask +import pittgoogle +from google.cloud import logging, storage +from google.cloud.exceptions import PreconditionFailed + +# [FIXME] Make this helpful or else delete it. +# Connect the python logger to the google cloud logger. +# By default, this captures INFO level and above. +# pittgoogle uses the python logger. +# We don't currently use the python logger directly in this script, but we could. +logging.Client().setup_logging() + +PROJECT_ID = os.getenv("GCP_PROJECT") +TESTID = os.getenv("TESTID") +SURVEY = os.getenv("SURVEY") +VERSIONTAG = os.getenv("VERSIONTAG") + +# Variables for incoming data +# A url route is used in setup.sh when the trigger subscription is created. +# It is possible to define multiple routes in a single module and trigger them using different subscriptions. +ROUTE_RUN = "/" # HTTP route that will trigger run(). Must match deploy.sh + +# Variables for outgoing data +HTTP_204 = 204 # HTTP code: Success +HTTP_400 = 400 # HTTP code: Bad Request + +# GCP resources used in this module +TOPIC_ALERTS_JSON = pittgoogle.Topic.from_cloud( + "alerts", survey=SURVEY, testid=TESTID, projectid=PROJECT_ID +) +bucket_name = f"{PROJECT_ID}-{SURVEY}_alerts" +if TESTID != "False": + bucket_name = f"{bucket_name}-{TESTID}" + +client = storage.Client() +bucket = client.get_bucket(client.bucket(bucket_name, user_project=PROJECT_ID)) + +app = flask.Flask(__name__) + + +@app.route(ROUTE_RUN, methods=["POST"]) +def run() -> tuple[str, int]: + """Uploads alert data to a GCS bucket. Publishes a de-duplicated JSON-serialized "alerts" stream + (${survey}-alerts) containing the original alert bytes. A BigQuery subscription is used to write alert data to + the appropriate BigQuery table. + + This module is intended to be deployed as a Cloud Run service. It will operate as an HTTP endpoint + triggered by Pub/Sub messages. This function will be called once for every message sent to this route. + It should accept the incoming HTTP request and return a response. + + Returns + ------- + response : tuple(str, int) + Tuple containing the response body (string) and HTTP status code (int). Flask will convert the + tuple into a proper HTTP response. Note that the response is a status message for the web server. + """ + # extract the envelope from the request that triggered the endpoint + # this contains a single Pub/Sub message with the alert to be processed + envelope = flask.request.get_json() + try: + alert = pittgoogle.Alert.from_cloud_run(envelope, schema_name="default") + except pittgoogle.exceptions.BadRequest as exc: + return str(exc), HTTP_400 + + blob = bucket.blob(_name_in_bucket(alert)) + blob.metadata = _create_file_metadata(alert, event_id=envelope["message"]["messageId"]) + + # raise a PreconditionFailed exception if filename already exists in the bucket using "if_generation_match=0" + try: + blob.upload_from_string(alert.msg.data, if_generation_match=0) + except PreconditionFailed: + # this alert is a duplicate. drop it. + return "", HTTP_204 + + # publish the same alert as JSON + TOPIC_ALERTS_JSON.publish(alert) + + return "", HTTP_204 + + +def _create_file_metadata(alert: pittgoogle.Alert, event_id: str) -> dict: + """Return key/value pairs to be attached to the file as metadata.""" + # https://github.com/nasa-gcn/gcn-schema/blob/main/gcn/notices/swift/bat/Guano.example.json + metadata = {"file_origin_message_id": event_id} + metadata["_".join("alert_datetime")] = alert.dict["alert_datetime"] + metadata["_".join("alert_type")] = alert.dict["alert_type"] + metadata["_".join("classification")] = alert.dict["classification"] + metadata["_".join("id")] = alert.dict["id"] + + return metadata + + +def _name_in_bucket(alert: pittgoogle.Alert) -> str: + """Return the name of the file in the bucket.""" + # not easily able to extract schema version, see: + # https://github.com/nasa-gcn/gcn-schema/blob/main/gcn/notices/swift/bat/Guano.example.json + _date = alert.dict["alert_datetime"][0:10] + _alert_type = alert.dict["alert_type"] + _id = alert.dict["id"][0] + + return f"{VERSIONTAG}/{_date}/{_alert_type}/{_id}.json" diff --git a/broker/cloud_run/swift/ps_to_storage/requirements.txt b/broker/cloud_run/swift/ps_to_storage/requirements.txt new file mode 100644 index 000000000..b59e4aca9 --- /dev/null +++ b/broker/cloud_run/swift/ps_to_storage/requirements.txt @@ -0,0 +1,14 @@ +# As explained here +# https://cloud.google.com/functions/docs/writing/specifying-dependencies-python +# dependencies for a Cloud Function must be specified in a `requirements.txt` +# file (or packaged with the function) in the same directory as `main.py` + +google-cloud-logging +google-cloud-storage +pittgoogle-client>=0.3.15 + +# for Cloud Run +# https://cloud.google.com/run/docs/quickstarts/build-and-deploy/deploy-python-service +Flask +gunicorn +Werkzeug diff --git a/broker/consumer/swift/README.md b/broker/consumer/swift/README.md new file mode 100644 index 000000000..84accf034 --- /dev/null +++ b/broker/consumer/swift/README.md @@ -0,0 +1,35 @@ +# Start the Swift/BAT-GUANO consumer VM + +See `broker/setup_broker/swift/README.md` for setup instructions. + +To start the consumer VM: + +```bash +survey="swift" +testid="mytest" +consumerVM="${survey}-consumer-${testid}" +zone="us-central1-a" + +# Set the VM metadata +KAFKA_TOPIC="enter Kafka topic" +PS_TOPIC="${survey}-alerts-${testid}" +gcloud compute instances add-metadata ${consumerVM} --zone=${zone} \ + --metadata KAFKA_TOPIC=${KAFKA_TOPIC},PS_TOPIC=${PS_TOPIC} + +# Start the VM +gcloud compute instances start ${consumerVM} --zone ${zone} +# this launches the startup script which configures and starts the +# Kafka -> Pub/Sub connector +``` + +To stop stop the consumer VM: + +```bash +survey="swift" +testid="mytest" +consumerVM="${survey}-consumer-${testid}" +zone="us-central1-a" + +# Stop the VM +gcloud compute instances stop ${consumerVM} --zone ${zone} +``` diff --git a/broker/consumer/swift/admin.properties b/broker/consumer/swift/admin.properties new file mode 100644 index 000000000..ff87231e7 --- /dev/null +++ b/broker/consumer/swift/admin.properties @@ -0,0 +1,13 @@ +# Kafka Admin client configs +# This file is part of a workflow that creates an authenticated connection to the Kafka broker. +# In cases where we can connect without authentication (e.g., ZTF), this file is not used. +# For config options, see https://kafka.apache.org/documentation/#adminclientconfigs +# For Swift-specific options, see https://gcn.nasa.gov/docs/client#java + +security.protocol=SASL_SSL +sasl.mechanism=OAUTHBEARER +sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler +sasl.oauthbearer.token.endpoint.url=https://auth.gcn.nasa.gov/oauth2/token +sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ + clientId="CLIENT_ID" \ + clientSecret="CLIENT_SECRET"; diff --git a/broker/consumer/swift/ps-connector.properties b/broker/consumer/swift/ps-connector.properties new file mode 100644 index 000000000..280b14e58 --- /dev/null +++ b/broker/consumer/swift/ps-connector.properties @@ -0,0 +1,41 @@ +# Kafka Connect sink connector configs +# For config options, see https://docs.confluent.io/platform/current/installation/configuration/connect/sink-connect-configs.html +# For additional Pub/Sub-specific options, see https://github.com/googleapis/java-pubsub-group-kafka-connector?tab=readme-ov-file#sink-connector +# +# -------------------------------------------------------------------------- +# This file is adapted from: +# https://github.com/googleapis/java-pubsub-group-kafka-connector/blob/main/config/cps-sink-connector.properties +# The original copyright and license are reproduced below. +# +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# -------------------------------------------------------------------------- + +# Unique name for the Pub/Sub sink connector. +name=CPSSinkConnector +# Tha Java class for the Pub/Sub sink connector. +connector.class=com.google.pubsub.kafka.sink.CloudPubSubSinkConnector +# The maximum number of tasks that should be created for this connector. +tasks.max=1 +# Set the key converter for the Pub/Sub sink connector. +key.converter=org.apache.kafka.connect.converters.ByteArrayConverter +# Set the value converter for the Pub/Sub sink connector. +value.converter=org.apache.kafka.connect.converters.ByteArrayConverter +# Set kafka the topic +topics=KAFKA_TOPIC +# Set the Pub/Sub configs +cps.project=PROJECT_ID +cps.topic=PS_TOPIC +# include Kafka topic, partition, offset, timestamp as msg attributes +metadata.publish=true diff --git a/broker/consumer/swift/psconnect-worker-authenticated.properties b/broker/consumer/swift/psconnect-worker-authenticated.properties new file mode 100644 index 000000000..3c89e429d --- /dev/null +++ b/broker/consumer/swift/psconnect-worker-authenticated.properties @@ -0,0 +1,35 @@ +# Kafka Connect worker configuration +# This file is part of a workflow that creates an authenticated connection to the Kafka broker. +# For config options, see https://docs.confluent.io/platform/current/connect/references/allconfigs.html#worker-configuration-properties +# See also: https://kafka.apache.org/documentation/#adminclientconfigs + +bootstrap.servers=kafka.gcn.nasa.gov:9092 +plugin.path=/usr/local/share/kafka/plugins +offset.storage.file.filename=/tmp/connect.offsets +connections.max.idle.ms=5400000 + +# ByteArrayConverter provides a “pass-through” option that does no conversion. +key.converter=org.apache.kafka.connect.converters.ByteArrayConverter +value.converter=org.apache.kafka.connect.converters.ByteArrayConverter + +# workers need to use SASL +sasl.mechanism=OAUTHBEARER +sasl.kerberos.service.name=kafka +security.protocol=SASL_SSL +sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler +sasl.oauthbearer.token.endpoint.url=https://auth.gcn.nasa.gov/oauth2/token +sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ + clientId="CLIENT_ID" \ + clientSecret="CLIENT_SECRET"; + +# settings with `consumer.` prefixes are passed through to the Kafka consumer +consumer.group.id=GROUP_ID +consumer.auto.offset.reset=earliest +consumer.sasl.mechanism=OAUTHBEARER +consumer.sasl.kerberos.service.name=kafka +consumer.security.protocol=SASL_SSL +consumer.sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler +consumer.sasl.oauthbearer.token.endpoint.url=https://auth.gcn.nasa.gov/oauth2/token +consumer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ + clientId="CLIENT_ID" \ + clientSecret="CLIENT_SECRET"; diff --git a/broker/consumer/swift/vm_install.sh b/broker/consumer/swift/vm_install.sh new file mode 100755 index 000000000..9db1d65bd --- /dev/null +++ b/broker/consumer/swift/vm_install.sh @@ -0,0 +1,77 @@ +#! /bin/bash +# Installs the software required to run the Kafka Consumer. +# Assumes a Debian 12 OS. + +#--- Get metadata attributes +baseurl="http://metadata.google.internal/computeMetadata/v1" +H="Metadata-Flavor: Google" +PROJECT_ID=$(curl "${baseurl}/project/project-id" -H "${H}") +consumerVM=$(curl "${baseurl}/instance/name" -H "${H}") +zone=$(curl "${baseurl}/instance/zone" -H "${H}") + +# parse the survey name and testid from the VM name +survey=$(echo "$consumerVM" | awk -F "-" '{print $1}') +if [ "$consumerVM" = "${survey}-consumer" ]; then + testid="False" +else + testid=$(echo "$consumerVM" | awk -F "-" '{print $NF}') +fi + +#--- GCP resources used in this script +broker_bucket="${PROJECT_ID}-${survey}-broker_files" +# use test resources, if requested +if [ "$testid" != "False" ]; then + broker_bucket="${broker_bucket}-${testid}" +fi + +#--- Install general utils +apt-get update +apt-get install -y wget screen software-properties-common snapd +# software-properties-common installs add-apt-repository +# install yq (requires snap) +snap install core +snap install yq + +#--- Install Java and the dev kit +# see https://www.digitalocean.com/community/tutorials/how-to-install-java-with-apt-on-debian-11 +apt update +echo "Installing Java..." +apt install -y default-jre +apt install -y default-jdk +echo 'JAVA_HOME="/usr/lib/jvm/java-11-openjdk-amd64/bin/java"' >> /etc/environment +# shellcheck source=/dev/null +source /etc/environment +echo "$JAVA_HOME" +echo "Done installing Java." +apt update + +#--- Install Confluent Platform (includes Kafka) +# see https://docs.confluent.io/platform/current/installation/installing_cp/deb-ubuntu.html +echo "Installing Confluent Platform..." +# install the key used to sign packages +wget -qO - https://packages.confluent.io/deb/7.4/archive.key | sudo apt-key add - +# add the repository +add-apt-repository "deb [arch=amd64] https://packages.confluent.io/deb/7.4 stable main" +# install +apt-get update && sudo apt-get install -y confluent-platform +echo "Done installing Confluent Platform." + +#--- Install Kafka -> Pub/Sub connector +# see https://github.com/googleapis/java-pubsub-group-kafka-connector/tree/main +echo "Installing the Kafka -> Pub/Sub connector" +( + plugindir=/usr/local/share/kafka/plugins + CONNECTOR_RELEASE="1.3.2" + mkdir -p ${plugindir} + #- install the connector + cd ${plugindir} + wget https://repo1.maven.org/maven2/com/google/cloud/pubsub-group-kafka-connector/${CONNECTOR_RELEASE}/pubsub-group-kafka-connector-${CONNECTOR_RELEASE}.jar + echo "Done installing the Kafka -> Pub/Sub connector" +) || exit + +#--- Set the startup script and shutdown +startupscript="gs://${broker_bucket}/${survey}/vm_startup.sh" +gcloud compute instances add-metadata "$consumerVM" --zone "$zone" \ + --metadata startup-script-url="$startupscript" +echo "vm_install.sh is complete. Shutting down." +shutdown -h now diff --git a/broker/consumer/swift/vm_shutdown.sh b/broker/consumer/swift/vm_shutdown.sh new file mode 100755 index 000000000..f0aabf913 --- /dev/null +++ b/broker/consumer/swift/vm_shutdown.sh @@ -0,0 +1,11 @@ +#! /bin/bash + +# Get VM name +baseurl="http://metadata.google.internal/computeMetadata/v1" +H="Metadata-Flavor: Google" +vm_name="$(curl "${baseurl}/instance/name" -H "${H}")" +zone=$(curl "${baseurl}/instance/zone" -H "${H}") + +# Unset FORCE topics in metadata so there's no unexpected behvaior on next startup +topics="KAFKA_TOPIC_FORCE=,PS_TOPIC_FORCE=" +gcloud compute instances add-metadata "${vm_name}" --zone="${zone}" --metadata="${topics}" diff --git a/broker/consumer/swift/vm_startup.sh b/broker/consumer/swift/vm_startup.sh new file mode 100755 index 000000000..ee7a2c471 --- /dev/null +++ b/broker/consumer/swift/vm_startup.sh @@ -0,0 +1,111 @@ +#! /bin/bash +# Configure and Start the Kafka -> Pub/Sub connector + +brokerdir=/home/broker + +#--- Get project and instance metadata +# for info on working with metadata, see here +# https://cloud.google.com/compute/docs/storing-retrieving-metadata +baseurl="http://metadata.google.internal/computeMetadata/v1" +H="Metadata-Flavor: Google" +PROJECT_ID=$(curl "${baseurl}/project/project-id" -H "${H}") +zone=$(curl "${baseurl}/instance/zone" -H "${H}") +PS_TOPIC_FORCE=$(curl "${baseurl}/instance/attributes/PS_TOPIC_FORCE" -H "${H}") +KAFKA_TOPIC_FORCE=$(curl "${baseurl}/instance/attributes/KAFKA_TOPIC_FORCE" -H "${H}") +# parse the survey name and testid from the VM name +consumerVM=$(curl "${baseurl}/instance/name" -H "${H}") +survey=$(echo "$consumerVM" | awk -F "-" '{print $1}') +if [ "$consumerVM" = "${survey}-consumer" ]; then + testid="False" +else + testid=$(echo "$consumerVM" | awk -F "-" '{print $NF}') +fi + +#--- GCP resources used in this script +broker_bucket="${PROJECT_ID}-${survey}-broker_files" +PS_TOPIC_DEFAULT="${survey}-alerts_raw" +# use test resources, if requested +if [ "$testid" != "False" ]; then + broker_bucket="${broker_bucket}-${testid}" + PS_TOPIC_DEFAULT="${PS_TOPIC_DEFAULT}-${testid}" +fi + +#--- Download config files from GCS +( + # remove all files + rm -r "${brokerdir}" + # download fresh files + mkdir "${brokerdir}" + cd ${brokerdir} + + gsutil -m cp -r "gs://${broker_bucket}/${survey}" . + # wait. otherwise the script may continue before all files are downloaded, with adverse behavior. + sleep 30s +) || exit + +#--- Set the topic names to the "FORCE" metadata attributes if exist, else defaults +KAFKA_TOPIC_DEFAULT="gcn.notices.swift.bat.guano" +KAFKA_TOPIC="${KAFKA_TOPIC_FORCE:-${KAFKA_TOPIC_DEFAULT}}" +PS_TOPIC="${PS_TOPIC_FORCE:-${PS_TOPIC_DEFAULT}}" +# set VM metadata, just for clarity and easy viewing +gcloud compute instances add-metadata "$consumerVM" --zone "$zone" \ + --metadata="PS_TOPIC=${PS_TOPIC},KAFKA_TOPIC=${KAFKA_TOPIC}" + +#--- Files this script will write +workingdir="${brokerdir}/${survey}" +fout_run="${workingdir}/run-connector.out" +fout_topics="${workingdir}/list.topics" + +#--- Set the connector's configs (client ID, client secret, project, and topics) +( + cd "${workingdir}" + + # define Swift-related parameters + client_id="${survey}-${PROJECT_ID}-client-id" + client_secret="${survey}-${PROJECT_ID}-client-secret" + CLIENT_ID=$(gcloud secrets versions access latest --secret="${client_id}") + CLIENT_SECRET=$(gcloud secrets versions access latest --secret="${client_secret}") + group_id="pittgooglebroker" + # use test resources, if requested + if [ "$testid" != "False" ]; then + group_id="${group_id}-${testid}" + fi + + fconfig=admin.properties + sed -i "s/CLIENT_ID/${CLIENT_ID}/g" ${fconfig} + sed -i "s/CLIENT_SECRET/${CLIENT_SECRET}/g" ${fconfig} + + fconfig=psconnect-worker-authenticated.properties + sed -i "s/CLIENT_ID/${CLIENT_ID}/g" ${fconfig} + sed -i "s/CLIENT_SECRET/${CLIENT_SECRET}/g" ${fconfig} && sed -i "s/GROUP_ID/${group_id}/g" ${fconfig} + + fconfig=ps-connector.properties + sed -i "s/PROJECT_ID/${PROJECT_ID}/g" ${fconfig} + sed -i "s/PS_TOPIC/${PS_TOPIC}/g" ${fconfig} && sed -i "s/KAFKA_TOPIC/${KAFKA_TOPIC}/g" ${fconfig} +) || exit + +#--- Check until alerts start streaming into the topic +alerts_flowing=false +while [ "${alerts_flowing}" = false ] +do + # get list of topics and dump to file + /bin/kafka-topics \ + --bootstrap-server kafka.gcn.nasa.gov:9092 \ + --list \ + --command-config "${workingdir}/admin.properties" \ + > "${fout_topics}" + + # check if our topic is in the list + if grep -Fq "${KAFKA_TOPIC}" "${fout_topics}" + then + alerts_flowing=true # start consuming + else + sleep 60s # sleep 1 min, then try again + fi +done + +#--- Start the Kafka -> Pub/Sub connector, save stdout and stderr to file +/bin/connect-standalone \ + "${workingdir}/psconnect-worker-authenticated.properties" \ + "${workingdir}/ps-connector.properties" \ + &>> "${fout_run}" diff --git a/broker/setup_broker/swift/README.md b/broker/setup_broker/swift/README.md new file mode 100644 index 000000000..4ebd7dff4 --- /dev/null +++ b/broker/setup_broker/swift/README.md @@ -0,0 +1,161 @@ +# Connect Pitt-Google to the Swift/BAT-GUANO Alert Stream + +Updated June 2025 - Author: Christopher Hernández + +- [Overview](#overview) +- [Setup](#setup) +- [Deploy broker instance](#deploy-broker-instance) +- [Ingest the Swift/BAT-GUANO alert stream](#ingest-the-swift-alert-stream) +- [Delete broker instance](#delete-broker-instance) + +## Overview + +Swift is a MidEx class mission operated by NASA in partnership with agencies in Italy and the United Kingdom. BAT +autonomously detects gamma-ray transients, and Swift autonomously begins a sequence of follow-up observations with XRT +and UVOT. All three instruments provide alerts to GCN autonomously upon the detection of transients. +Here are some links which were used as a reference to set this up: + +- [Start streaming GCN Notices quick start quide](https://gcn.nasa.gov/quickstart) +- [Kafka Client Setup using Java](https://gcn.nasa.gov/docs/client#java) + +Below is the code I used to set up the necessary resources in GCP to ingest the Swift/BAT-GUANO alert stream. + +## Setup + +The following assumes that you have: + +- Completed the [GCN Notices quick start guide](https://gcn.nasa.gov/quickstart) and identified your client +credentials. This includes a client ID and client secret +- Set the environment variables `GOOGLE_CLOUD_PROJECT` and `GOOGLE_APPLICATION_CREDENTIALS` to appropriate values for +your GCP project and service account credentials +- Authenticated the service account to make `gcloud` calls through the project +- [Enabled the Secret Manager API](https://cloud.google.com/secret-manager/docs/configuring-secret-manager#enable_api) +in your Google Cloud Project + +You may want to +[activate a service account for `gcloud` calls](https://pitt-broker.readthedocs.io/en/u-tjr-workingnotes/working-notes/troyraen/service-account.html#switch-the-service-account-your-api-calls-use) +or +[set up a GCP project from scratch](https://pitt-broker.readthedocs.io/en/latest/broker/run-a-broker-instance/initial-setup.html#setup-local-environment). + +### Use GCP's Secret Manager + +[Secret Manager](https://cloud.google.com/secret-manager/docs/overview) is a service that allows users to manage and +store sensitive data. Use the following code snippet to +[create secrets](https://cloud.google.com/secret-manager/docs/creating-and-accessing-secrets#create) for your client ID +and client secret. This information will be used to deploy a broker instance with your client credentials. + +```bash +# define parameters +survey="swift" +PROJECT_ID=$GOOGLE_CLOUD_PROJECT + +# define secret names +client_id="${survey}-${PROJECT_ID}-client-id" +client_secret="${survey}-${PROJECT_ID}-client-secret" + +# create secret(s) +gcloud secrets create "${client_id}" \ + --replication-policy="automatic" +gcloud secrets create "${client_secret}" \ + --replication-policy="automatic" +``` + +Select one of the following options to add a secret version. Note that adding a version directly on the command line is +discouraged by Google Cloud, see +[add a secret version documentation](https://cloud.google.com/secret-manager/docs/add-secret-version#add-secret-version) +for details. + +```bash +# add a secret version from the contents of a file on disk +gcloud secrets versions add "${client_id}" --data-file="/path/to/file.txt" +gcloud secrets versions add "${client_secret}" --data-file="/path/to/file.txt" + +# add a secret version directly on the command line +echo -n "enter the client id provided by GCN" | \ + gcloud secrets versions add "${client_id}" --data-file=- +echo -n "enter the client secret provided by GCN" | \ + gcloud secrets versions add "${client_secret}" --data-file=- +``` + +Access the [IAM & Admin page](https://console.cloud.google.com/iam-admin) and grant the default compute service account +the role of `Secret Manager Secret Accessor`. + +```bash +user="[enter compute service account prefix]@developer.gserviceaccount.com" +roleid="roles/secretmanager.secretAccessor" +gcloud secrets add-iam-policy-binding "${client_id}" --member="serviceAccount:${user}" --role="${roleid}" +gcloud secrets add-iam-policy-binding "${client_secret}" --member="serviceAccount:${user}" --role="${roleid}" +``` + +### BigQuery subscriptions + +Our broker uses [BigQuery subscriptions](https://cloud.google.com/pubsub/docs/bigquery) to write alert data directly to +a BigQuery table. Messages receieved by BigQuery subscriptions that fail to write data to BigQuery are negatively +acknowledged and re-sent. If the messages fail enough times (default number of attempts = 5), then the message is +subsequently moved to a [dead letter topic](https://cloud.google.com/pubsub/docs/handling-failures#dead_letter_topic). +A subscription to this dead letter topic is automatically made, allowing the user to identify which messages failed to +write data to BigQuery and why the write operation failed. + +## Deploy broker instance + +Clone the repo and cd into the directory: + +```bash +git clone https://github.com/mwvgroup/Pitt-Google-Broker.git +cd Pitt-Google-Broker/broker/setup_broker/swift +``` + +Initialize parameters and call the deployment script: + +```bash +testid="mytest" +teardown="False" +survey="swift" +schema_version="4.5.0" +region="us-central1" + +./setup_broker.sh "${testid}" "${teardown}" "${survey}" "${schema_version}" "${region}" +``` + +This will create all of the necessary GCP resources. Allow the consumer VM to finish its installation process. Once +complete, the VM will shut down automatically. You can check the status of the VM in the +[Google Cloud Console](https://console.cloud.google.com/compute). +This entire process should take less than 10 minutes. + +## Start the Consumer VM to ingest the Swift/BAT-GUANO alert stream + +```bash +zone="${region}-a" +consumerVM="${survey}-consumer-${testid}" + +# Set the VM metadata +KAFKA_TOPIC="gcn.notices.swift.bat.guano" +PS_TOPIC="${survey}-alerts-${testid}" +gcloud compute instances add-metadata "${consumerVM}" --zone "$zone" \ + --metadata="PS_TOPIC_FORCE=${PS_TOPIC},KAFKA_TOPIC_FORCE=${KAFKA_TOPIC}" + +# Start the VM +gcloud compute instances start "${consumerVM}" --zone ${zone} +# this launches the startup script which configures and starts the +# Kafka -> Pub/Sub connector +``` + +You can also stop the VM by executing: + +```bash +gcloud compute instances stop "${consumerVM}" --zone="${zone}" +``` + +## Delete broker instance + +Similar to [deploy broker instance](#deploy-broker-instance). Initialize parameters and call the deployment script: + +```bash +testid="mytest" +teardown="True" +survey="swift" +schema_version="1.0" +region="us-central1" + +./setup_broker.sh "${testid}" "${teardown}" "${survey}" "${schema_version}" "${region}" +``` diff --git a/broker/setup_broker/swift/create_vm.sh b/broker/setup_broker/swift/create_vm.sh new file mode 100755 index 000000000..3cdc7499f --- /dev/null +++ b/broker/setup_broker/swift/create_vm.sh @@ -0,0 +1,47 @@ +#! /bin/bash +# Creates or deletes the GCP VM instances needed by the broker. +# This script will not delete VMs that are in production + +# name of GCS bucket where broker files are staged +gcs_broker_bucket=$1 +# "False" uses production resources +# any other string will be appended to the names of all resources +testid="${2:-test}" +# "True" tearsdown/deletes resources, else setup +teardown="${3:-False}" +# name of the survey this broker instance will ingest +survey="${4:-swift}" +zone="${5:-us-central1-a}" + +#--- GCP resources used in this script +consumerVM="${survey}-consumer" +# use test resources, if requested +if [ "$testid" != "False" ]; then + consumerVM="${consumerVM}-${testid}" +fi + +#--- Teardown resources +if [ "$teardown" = "True" ]; then + # ensure that we do not teardown production resources + if [ "$testid" != "False" ]; then + gcloud compute instances delete "$consumerVM" --zone="$zone" + fi +#--- Setup resources if they do not exist +else + if ! gcloud compute instances describe "${consumerVM}" --zone="${zone}" --project="${PROJECT_ID}" >/dev/null 2>&1; then + machinetype=e2-custom-1-5632 + # metadata + googlelogging="google-logging-enabled=true" + startupscript="startup-script-url=gs://${gcs_broker_bucket}/${survey}/vm_install.sh" + shutdownscript="shutdown-script-url=gs://${gcs_broker_bucket}/${survey}/vm_shutdown.sh" + #--- Create VM + gcloud compute instances create "$consumerVM" \ + --zone="$zone" \ + --machine-type="$machinetype" \ + --scopes=cloud-platform \ + --metadata="${googlelogging},${startupscript},${shutdownscript}" + else + echo + echo "VM instance ${consumerVM} already exists in zone ${zone}." + fi +fi diff --git a/broker/setup_broker/swift/setup_broker.sh b/broker/setup_broker/swift/setup_broker.sh new file mode 100755 index 000000000..879a40556 --- /dev/null +++ b/broker/setup_broker/swift/setup_broker.sh @@ -0,0 +1,201 @@ +#! /bin/bash +# Create and configure GCP resources needed to run the broker. + +# "False" uses production resources +# any other string will be appended to the names of all resources +testid="${1:-test}" +# "True" tearsdown/deletes resources, else setup +teardown="${2:-False}" +# name of the survey this broker instance will ingest +survey="${3:-swift}" +schema_version="${4:-4.5.0}" +versiontag=v$(echo "${schema_version}" | tr . _) # 1.0.0 -> v1_0_0 +region="${5:-us-central1}" +zone="${region}-a" # just use zone "a" instead of adding another script arg +# get the environment variables +PROJECT_ID=$GOOGLE_CLOUD_PROJECT +PROJECT_NUMBER=$(gcloud projects describe "$PROJECT_ID" --format="value(projectNumber)") + +#--- Make the user confirm the settings +echo +echo "setup_broker.sh will run with the following configs: " +echo +echo "GOOGLE_CLOUD_PROJECT = ${PROJECT_ID}" +echo "survey = ${survey}" +echo "testid = ${testid}" +echo "schema_version = ${schema_version}" +echo "teardown = ${teardown}" +echo +echo "Continue? [y/(n)]: " + +read -r continue_with_setup +continue_with_setup="${continue_with_setup:-n}" +if [ "$continue_with_setup" != "y" ]; then + echo "Exiting setup." + echo + exit +fi + +define_GCP_resources() { + local base_name="$1" + local separator="${2:--}" + local testid_suffix="" + + if [ "$testid" != "False" ] && [ -n "$testid" ]; then + testid_suffix="${separator}${testid}" + fi + echo "${base_name}${testid_suffix}" +} + +#--- GCP resources used directly in this script +artifact_registry_repo=$(define_GCP_resources "${survey}-cloud-run-services") +bq_dataset=$(define_GCP_resources "${survey}" "_") +bq_table_alerts="alerts_${versiontag}" +gcs_alerts_bucket=$(define_GCP_resources "${PROJECT_ID}-${survey}_alerts") +gcs_broker_bucket=$(define_GCP_resources "${PROJECT_ID}-${survey}-broker_files") +ps_topic_alerts_raw=$(define_GCP_resources "${survey}-alerts_raw") +ps_topic_alerts=$(define_GCP_resources "${survey}-alerts") +ps_subscription_reservoir=$(define_GCP_resources "${survey}-alerts-reservoir") +# topics and subscriptions involved in writing alert data to BigQuery +ps_bigquery_subscription=$(define_GCP_resources "${survey}-bigquery-import-${versiontag}") +ps_deadletter_subscription=$(define_GCP_resources "${survey}-deadletter") +ps_deadletter_topic="${ps_deadletter_subscription}" + +# function used to create (or delete) GCP resources +manage_resources() { + local mode="$1" # setup or teardown + local environment_type="production" + + if [ "$testid" != "False" ]; then + environment_type="testing" + fi + + if [ "$mode" = "setup" ]; then + #--- Create BigQuery dataset and table + echo + echo "Creating BigQuery dataset and table..." + if ! bq ls "${PROJECT_ID}:${bq_dataset}" >/dev/null 2>&1; then + bq --location="${region}" mk --dataset "${bq_dataset}" + # grant public access to the dataset; for more information, see: + # https://cloud.google.com/bigquery/docs/control-access-to-resources-iam#grant_access_to_a_dataset + (cd templates && bq update --source "bq_${survey}_policy.json" "${PROJECT_ID}:${bq_dataset}") || exit 5 + else + echo "${bq_dataset} already exists." + fi + (cd templates && bq mk --table --time_partitioning_field=kafkaPublishTimestamp --time_partitioning_type=DAY "${PROJECT_ID}:${bq_dataset}.${bq_table_alerts}" "bq_${survey}_${bq_table_alerts}_schema.json") || exit 5 + bq update --description "Swift/BAT-GUANO alerts with schema version v${schema_version}. This table is an archive of the swift-alerts Pub/Sub stream. It has the same schema as the original alert bytes, including repeated fields. Original alerts can be retrieved from the Cloud Storage bucket ${gcs_alerts_bucket}." "${PROJECT_ID}:${bq_dataset}.${bq_table_alerts}" + + #--- Create GCS bucket + echo + echo "Creating broker_bucket and uploading files..." + if ! gsutil ls -b "gs://${gcs_broker_bucket}" >/dev/null 2>&1; then + gsutil mb -b on -l "${region}" "gs://${gcs_broker_bucket}" + else + echo "${gcs_broker_bucket} already exists." + fi + ./upload_broker_bucket.sh "${gcs_broker_bucket}" + + #--- Assign IAM roles to the Pub/Sub service account + echo + echo "Assigning IAM roles to the Pub/Sub service account..." + roleid="roles/bigquery.dataEditor" + service_account="service-${PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com" + gcloud projects add-iam-policy-binding "${PROJECT_ID}" \ + --member="serviceAccount:${service_account}" \ + --role="${roleid}" + + #--- Create Pub/Sub topics and subscriptions + echo + echo "Configuring Pub/Sub resources..." + gcloud pubsub topics create "${ps_topic_alerts}" + gcloud pubsub topics create "${ps_topic_alerts_raw}" + gcloud pubsub topics create "${ps_deadletter_topic}" + gcloud pubsub subscriptions create "${ps_deadletter_subscription}" \ + --topic="${ps_deadletter_topic}" + gcloud pubsub subscriptions create "${ps_subscription_reservoir}" \ + --topic="${ps_topic_alerts}" + # create subscription to load alerts to BigQuery + gcloud pubsub subscriptions create "${ps_bigquery_subscription}" \ + --topic="${ps_topic_alerts}" \ + --bigquery-table="${PROJECT_ID}:${bq_dataset}.${bq_table_alerts}" \ + --use-table-schema \ + --drop-unknown-fields \ + --dead-letter-topic="${ps_deadletter_topic}" \ + --max-delivery-attempts=5 \ + --dead-letter-topic-project="${PROJECT_ID}" \ + --message-transforms-file=templates/smt_add_top_level_fields.yaml + + # set IAM policies on public Pub/Sub resources + if [ "$testid" = "False" ]; then + user="allUsers" + roleid="roles/pubsub.subscriber" + gcloud pubsub topics add-iam-policy-binding "${ps_topic_alerts}" --member="${user}" --role="${roleid}" + gcloud pubsub topics add-iam-policy-binding "${ps_deadletter_topic}" \ + --member="serviceAccount:${service_account}" \ + --role="roles/pubsub.publisher" + gcloud pubsub subscriptions add-iam-policy-binding "${ps_bigquery_subscription}" \ + --member="serviceAccount:${service_account}" \ + --role="roles/pubsub.subscriber" + fi + + #--- Create Artifact Registry Repository + echo + echo "Configuring Artifact Registry..." + gcloud artifacts repositories create "${artifact_registry_repo}" --repository-format=docker \ + --location="${region}" --description="Docker repository for Cloud Run services" \ + --project="${PROJECT_ID}" + else + if [ "$environment_type" = "testing" ]; then + # delete testing resources + # Note: create_vm.sh will delete the VM instance + echo + echo "Deleting broker_bucket..." + o="GSUtil:parallel_process_count=1" # disable multiprocessing for Macs + gsutil -m -o "${o}" rm -r "gs://${gcs_broker_bucket}" + #--- Delete BigQuery dataset and Pub/Sub topics/subscriptions + echo + echo "Configuring BigQuery and Pub/Sub resources..." + bq rm -r -f "${PROJECT_ID}:${bq_dataset}" + gcloud pubsub topics delete "${ps_topic_alerts}" + gcloud pubsub topics delete "${ps_topic_alerts_raw}" + gcloud pubsub topics delete "${ps_deadletter_topic}" + gcloud pubsub subscriptions delete "${ps_subscription_reservoir}" + gcloud pubsub subscriptions delete "${ps_deadletter_subscription}" + gcloud pubsub subscriptions delete "${ps_bigquery_subscription}" + echo + gcloud artifacts repositories delete "${artifact_registry_repo}" --location="${region}" + else + echo 'ERROR: No testid supplied.' + echo 'To avoid accidents, this script will not delete production resources.' + echo 'If that is your intention, you must delete them manually.' + echo 'Otherwise, please supply a testid.' + exit 1 + fi + fi +} + +#--- Create (or delete) BigQuery, GCS, Pub/Sub resources +echo +echo "Configuring BigQuery, GCS, Pub/Sub resources..." +if [ "$teardown" = "True" ]; then + manage_resources "teardown" +else + manage_resources "setup" +fi + +#--- Create (or delete) VM instance +echo +echo "Configuring VM..." +./create_vm.sh "${gcs_broker_bucket}" "${testid}" "${teardown}" "${survey}" "${zone}" + +#--- Create (or delete) Cloud Run services +echo +echo "Configuring Cloud Run services..." +( + # navigate to the Cloud Run directory for Swift/BAT-GUANO + cd .. && cd .. && cd cloud_run && cd swift + + #--- alerts-to-storage Cloud Run service + cd ps_to_storage + ./deploy.sh "${testid}" "${teardown}" "${survey}" "${region}" "${versiontag}" +) || exit diff --git a/broker/setup_broker/swift/templates/bq_swift_alerts_v4_5_0_schema.json b/broker/setup_broker/swift/templates/bq_swift_alerts_v4_5_0_schema.json new file mode 100644 index 000000000..1a481ce6e --- /dev/null +++ b/broker/setup_broker/swift/templates/bq_swift_alerts_v4_5_0_schema.json @@ -0,0 +1,278 @@ +[ + { + "description": "The schema version of the alert.", + "mode": "REQUIRED", + "name": "schema_version", + "type": "STRING" + }, + { + "name": "follow_up_event", + "type": "STRING", + "mode": "NULLABLE", + "description": "Name or trigger time of the external trigger that launched the search." + }, + { + "name": "follow_up_type", + "type": "STRING", + "mode": "NULLABLE", + "description": "Type of external trigger that launched the search, eg GW, neutrino, etc." + }, + { + "name": "alert_datetime", + "type": "STRING", + "mode": "NULLABLE", + "description": "Date and time of notice creation [UTC, ISO 8601], ex YYYY-MM-DDTHH:MM:SS.ssssssZ." + }, + { + "name": "alert_tense", + "type": "STRING", + "mode": "NULLABLE", + "description": "Indication of whether this alert refers to a recent observation (current), re-analysis of archival data (archival), a planned observation in the future (planned), a signal injection (injection), commanded trigger (commanded) or a test trigger (test)." + }, + { + "name": "alert_type", + "type": "STRING", + "mode": "NULLABLE", + "description": "Indication of alert sequence if multiple of the same type are sent. Alert sequence: initial refers to the detection, followed by subsequent alerts for recurrent detections of the same source. Updates and retractions come from further analysis." + }, + { + "name": "event_name", + "type": "STRING", + "mode": "REPEATED", + "description": "Name of the event (ex: GRB 170817A) or multiple names of the same event (ex: GRB 170817A, GW170817, AT2017gfo, SSS 17a)." + }, + { + "name": "id", + "type": "STRING", + "mode": "REPEATED", + "description": "Instrument-specific trigger ID (ex: bn230313485 (Fermi), 1159327 (Swift)) or alternate ID" + }, + { + "name": "data_archive_page", + "type": "STRING", + "mode": "NULLABLE", + "description": "URL of archived data files." + }, + { + "name": "trigger_time", + "type": "STRING", + "mode": "NULLABLE", + "description": "Time of the trigger [ISO 8601], ex YYYY-MM-DDTHH:MM:SS.ssssssZ." + }, + { + "name": "trigger_time_error", + "type": "FLOAT", + "mode": "REPEATED", + "description": "Trigger time uncertainty [s, 1-sigma], with symmetric uncertainty." + }, + { + "name": "observation_start", + "type": "STRING", + "mode": "NULLABLE", + "description": "Start time of the observation [ISO 8601]." + }, + { + "name": "observation_stop", + "type": "STRING", + "mode": "NULLABLE", + "description": "Stop time of the observation [ISO 8601]." + }, + { + "name": "observation_livetime", + "type": "FLOAT", + "mode": "NULLABLE", + "description": "Livetime of the observation [s]." + }, + { + "name": "ra", + "type": "FLOAT", + "mode": "NULLABLE", + "description": "ICRS right ascension [deg], utilizes the J2000 epoch and an equatorial coordinate system." + }, + { + "name": "dec", + "type": "FLOAT", + "mode": "NULLABLE", + "description": "ICRS declination [deg], utilizes the J2000 epoch and an equatorial coordinate system." + }, + { + "name": "ra_dec_error", + "type": "FLOAT", + "mode": "REPEATED", + "description": "An array of up to three values that describe the localization region as an ellipse: length of the semi-major axis, length of the semi-minor axis (default is the same as the semi-major axis), and astronomical position angle of the semi-major axis (measured from North through East, default is zero)." + }, + { + "name": "containment_probability", + "type": "FLOAT", + "mode": "NULLABLE", + "description": "Containment probability [dimensionless, 0-1]; if absent, default is 0.9." + }, + { + "name": "systematic_included", + "type": "BOOLEAN", + "mode": "NULLABLE", + "description": "Contains true when the systematic error is included and false when the systematic error is not included." + }, + { + "name": "instrument_phi", + "type": "FLOAT", + "mode": "NULLABLE", + "description": "Instrument phi [deg]." + }, + { + "name": "instrument_theta", + "type": "FLOAT", + "mode": "NULLABLE", + "description": "Instrument theta [deg]." + }, + { + "name": "instrument_semimajor_angle", + "type": "FLOAT", + "mode": "NULLABLE", + "description": "Position angle of semi-major axis in instrument coordinates [deg]." + }, + { + "name": "healpix_url", + "type": "STRING", + "mode": "NULLABLE", + "description": "URL of HEALPix localization probability file." + }, + { + "name": "healpix_file", + "type": "STRING", + "mode": "NULLABLE", + "description": "Base 64 encoded content of a FITS file." + }, + { + "name": "far", + "type": "FLOAT", + "mode": "NULLABLE", + "description": "False alarm rate: the rate of occurrence of non-astrophysical events that are of the same intensity or significance as the current event [Hz]." + }, + { + "name": "trigger_type", + "type": "STRING", + "mode": "NULLABLE", + "description": "Type of trigger algorithm used to identify the transient event." + }, + { + "name": "net_count_rate", + "type": "FLOAT", + "mode": "NULLABLE", + "description": "Net count rate of the transient above the background [counts/s] over rate_duration and rate_energy_range. Do specify rate_duration and rate_energy_range with net_count_rate property." + }, + { + "name": "backgound_count_rate", + "type": "FLOAT", + "mode": "NULLABLE", + "description": "Count rate of the background during the transient [counts/s] over same rate_duration and rate_energy_range used for net_count_rate." + }, + { + "name": "rate_snr", + "type": "FLOAT", + "mode": "NULLABLE", + "description": "Rate signal to noise ratio [dimensionless]." + }, + { + "name": "rate_duration", + "type": "FLOAT", + "mode": "NULLABLE", + "description": "Interval over rate signal to noise ratio calculation [s]." + }, + { + "name": "rate_energy_range", + "type": "FLOAT", + "mode": "REPEATED", + "description": "Low and High energy bounds used in rate signal to noise ratio calculation, if not parsed in Reporter.schema.json, default unit is keV." + }, + { + "name": "image_snr", + "type": "FLOAT", + "mode": "NULLABLE", + "description": "Image signal to noise ratio [dimensionless]." + }, + { + "name": "image_duration", + "type": "FLOAT", + "mode": "NULLABLE", + "description": "Interval over image signal to noise ratio calculation [s]." + }, + { + "name": "image_energy_range", + "type": "FLOAT", + "mode": "REPEATED", + "description": "Low and High energy bounds used in image signal to noise ratio calculation, if not parsed in Reporter.schema.json, default unit is keV." + }, + { + "name": "p_astro", + "type": "FLOAT", + "mode": "NULLABLE", + "description": "Probability [dimensionless, 0-1] that source is of astrophysical origin." + }, + { + "name": "classification", + "type": "STRING", + "mode": "NULLABLE", + "description": "Dictionary mapping mutually exclusive source classes to probabilities between 0 and 1, the sum of all values must be 1. e.g. ({'BNS', 0.9}, {'NSBH', 0.05}, {'BBH', 0.05})." + }, + { + "name": "properties", + "type": "STRING", + "mode": "NULLABLE", + "description": "Dictionary of binary classifiers, each entry is between 0 and 1. e.g. ({'NS', 0.95}, {'REMNANT', 0.3})." + }, + { + "name": "mission", + "type": "STRING", + "mode": "NULLABLE", + "description": "Name of Mission or Telescope reporting the event." + }, + { + "name": "instrument", + "type": "STRING", + "mode": "NULLABLE", + "description": "Name of the Instrument reporting the event." + }, + { + "name": "record_number", + "type": "FLOAT", + "mode": "NULLABLE", + "description": "Incremental number for messages from the instrument during a given trigger (ex: 1, 2, 3)." + }, + { + "name": "messenger", + "type": "STRING", + "mode": "NULLABLE", + "description": "Messenger of report; EM, GW or Neutrino." + }, + { + "name": "spectral_band", + "type": "FLOAT", + "mode": "REPEATED", + "description": "Observed spectral band, expressed in the specified 'units' field." + }, + { + "name": "units", + "type": "STRING", + "mode": "NULLABLE", + "description": "Units for the spectral data; default unit is keV." + }, + { + "name": "filter", + "type": "STRING", + "mode": "REPEATED", + "description": "Optional filter name, as used in optical observations." + }, + { + "name": "additional_info", + "type": "STRING", + "mode": "NULLABLE", + "description": "Additional information about the event." + }, + { + "description": "Kafka timestamp from originating Swift alert.", + "mode": "REQUIRED", + "name": "kafkaPublishTimestamp", + "type": "TIMESTAMP" + } +] diff --git a/broker/setup_broker/swift/templates/bq_swift_policy.json b/broker/setup_broker/swift/templates/bq_swift_policy.json new file mode 100644 index 000000000..10732959b --- /dev/null +++ b/broker/setup_broker/swift/templates/bq_swift_policy.json @@ -0,0 +1,20 @@ +{ + "access": [ + { + "role": "WRITER", + "specialGroup": "projectWriters" + }, + { + "role": "OWNER", + "specialGroup": "projectOwners" + }, + { + "iamMember": "allUsers", + "role": "READER" + }, + { + "role": "READER", + "specialGroup": "projectReaders" + } + ] +} diff --git a/broker/setup_broker/swift/templates/smt_add_top_level_fields.yaml b/broker/setup_broker/swift/templates/smt_add_top_level_fields.yaml new file mode 100644 index 000000000..5e82aede7 --- /dev/null +++ b/broker/setup_broker/swift/templates/smt_add_top_level_fields.yaml @@ -0,0 +1,36 @@ +# https://cloud.google.com/pubsub/docs/smts/create-topic-smt#create +- javascriptUdf: + code: > + function addTopLevelFields(message, metadata) { + const attrs = message.attributes || {}; + const dataStr = message.data.toString(); + + // Create an empty object to hold the new fields we want to inject into the JSON payload + const newFields = {}; + + // Extract the following attributes and add them to newFields + // We avoid casting fields as JavaScript numbers to prevent precision loss + if (attrs["kafka.timestamp"]) { + newFields.kafkaPublishTimestamp = attrs["kafka.timestamp"] * 1000; + } + + // Define the data as a set of key-value pairs to be added to the JSON payload + const newPairs = Object.entries(newFields) + .map(([k, v]) => `"${k}":${v}`); + + if (newPairs.length === 0) { + // No new fields; return the original message + return message; + } + + // Inject the new fields into the JSON payload + const newData = dataStr.endsWith("}") + ? dataStr.slice(0, -1) + "," + newPairs.join(",") + "}" + : dataStr; + + return { + data: newData, + attributes: attrs + }; + } + functionName: addTopLevelFields diff --git a/broker/setup_broker/swift/upload_broker_bucket.sh b/broker/setup_broker/swift/upload_broker_bucket.sh new file mode 100755 index 000000000..8377a4556 --- /dev/null +++ b/broker/setup_broker/swift/upload_broker_bucket.sh @@ -0,0 +1,8 @@ +#! /bin/bash + +gcs_broker_bucket=$1 # name of GCS bucket where broker files should be staged + +echo +echo "Uploading broker files to GCS..." +o="GSUtil:parallel_process_count=1" # disable multiprocessing for Macs +gsutil -m -o "${o}" cp -r ../../consumer/swift "gs://${gcs_broker_bucket}"