Skip to content

Commit 90b2a60

Browse files
authored
Merge pull request #34 from mwvgroup/tjraen/gcs2bq
tjraen/gcs2bq
2 parents 9b2d306 + a1a7fd2 commit 90b2a60

File tree

11 files changed

+538
-38
lines changed

11 files changed

+538
-38
lines changed
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
#!/usr/bin/env python3
2+
# -*- coding: UTF-8 -*-
3+
4+
"""This module is intended to be deployed as a Google Cloud Function so that it
5+
listens to a Google Cloud Storage (GCS) bucket. When a new file is detected in
6+
the bucket (Avro file format expected), it will automatically load it into a
7+
BigQuery (BQ) table and publish a message to PubSub (PS). This code borrows
8+
heavily from
9+
https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-avro.
10+
11+
Usage Example
12+
-------------
13+
14+
First, check that the buckets (GCS), datasets (BQ), tables (BQ), and topics (PS)
15+
referenced in the ``bucket_resources`` dictionary (below) point to the
16+
appropriate Google Cloud Platform (GCP) resources. (These should have been
17+
initialized during the GCP setup, see
18+
https://pitt-broker.readthedocs.io/en/latest/installation.html#setting-up-gcp.)
19+
Buckets and datasets must exist (with appropriate permissions) prior to
20+
invoking this module. Tables are created automatically and on-the-fly if they
21+
don't already exist.
22+
23+
Deploy the ``stream_GCS_to_BQ`` function by running the following command in
24+
the directory where this module is located. Be sure to replace
25+
``<YOUR_TRIGGER_BUCKET_NAME>`` with the name of the GCS bucket that this
26+
function should listen to. For more information, see
27+
https://cloud.google.com/functions/docs/calling/storage.
28+
29+
.. code-block:: bash
30+
:linenos:
31+
32+
gcloud functions deploy stream_GCS_to_BQ --runtime python37 --set-env-vars
33+
GOOGLE_CLOUD_PROJECT=${GOOGLE_CLOUD_PROJECT} --trigger-resource
34+
<YOUR_TRIGGER_BUCKET_NAME> --trigger-event google.storage.object.finalize
35+
36+
The script ``broker/deploy_cloudfnc.sh`` automates the deployment.
37+
38+
Module Documentation
39+
--------------------
40+
"""
41+
42+
import logging
43+
import os
44+
from google.cloud import bigquery
45+
from google.cloud import pubsub
46+
from google.cloud.pubsub_v1.publisher.futures import Future
47+
48+
log = logging.getLogger(__name__)
49+
PROJECT_ID = os.getenv('GOOGLE_CLOUD_PROJECT')
50+
BQ = bigquery.Client()
51+
52+
# The bucket_resources dictionary determines which BQ table the alert data will
53+
# be uploaded to based on which GCS bucket the alert Avro file is stored in.
54+
ztf_bucket = '_'.join([PROJECT_ID, 'ztf_alert_avro_bucket'])
55+
testing_bucket = '_'.join([PROJECT_ID, 'testing_bucket'])
56+
bucket_resources = {
57+
ztf_bucket: {'BQ_DATASET': 'ztf_alerts',
58+
'BQ_TABLE': 'alerts',
59+
'PS_TOPIC': 'ztf_alerts_in_BQ'
60+
},
61+
testing_bucket: {'BQ_DATASET': 'testing_dataset',
62+
'BQ_TABLE': 'test_GCS_to_BQ',
63+
'PS_TOPIC': 'test_alerts_in_BQ'
64+
}
65+
}
66+
67+
68+
def stream_GCS_to_BQ(data: dict, context: dict) -> str:
69+
"""This function is executed whenever a file is added to Cloud Storage.
70+
Most of this function is taken from
71+
https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-avro
72+
"""
73+
74+
# Create the job
75+
bucket_name = data['bucket']
76+
file_name = data['name']
77+
job_config = bigquery.LoadJobConfig()
78+
job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND
79+
job_config.source_format = bigquery.SourceFormat.AVRO
80+
uri = f'gs://{bucket_name}/{file_name}'
81+
try:
82+
BQ_TABLE_ID = get_BQ_TABLE_ID(bucket_name)
83+
except KeyError as e:
84+
msg = (f'GCS bucket {e} does not have an associated BigQuery dataset '
85+
f'configured for the `stream_GCS_to_BQ` Cloud Function. '
86+
f'Data in {file_name} cannot be uploaded to BigQuery.')
87+
log.error(msg)
88+
return f'GCS bucket {e} not configured' # used in testing
89+
90+
# API request
91+
load_job = BQ.load_table_from_uri(uri, BQ_TABLE_ID, job_config=job_config)
92+
msg = (f'Starting stream_GCS_to_BQ job {load_job.job_id} | '
93+
f'file name: {file_name} | '
94+
f'GCS Bucket: {bucket_name} | '
95+
f'BQ Table ID: {BQ_TABLE_ID}'
96+
)
97+
log.info(msg)
98+
99+
# Run the job
100+
load_job.result() # Start job, wait for it to complete, get the result
101+
error_result = load_job.error_result
102+
103+
# Publish PubSub message if BQ upload was successful
104+
if error_result is None:
105+
topic = bucket_resources[bucket_name]['PS_TOPIC']
106+
publish_pubsub(topic, file_name)
107+
108+
return error_result
109+
110+
111+
def get_BQ_TABLE_ID(bucket_name: str) -> str:
112+
""" Returns the ID of the BQ table associated with the GCS bucket_name.
113+
"""
114+
115+
BQ_DATASET = bucket_resources[bucket_name]['BQ_DATASET']
116+
BQ_TABLE = bucket_resources[bucket_name]['BQ_TABLE']
117+
BQ_TABLE_ID = '.'.join([PROJECT_ID, BQ_DATASET, BQ_TABLE])
118+
119+
return BQ_TABLE_ID
120+
121+
122+
def publish_pubsub(topic: str, message: str) -> Future:
123+
"""Publish a PubSub alert
124+
125+
Args:
126+
message: The message to publish
127+
128+
Returns:
129+
The Id of the published message
130+
"""
131+
132+
# Configure PubSub topic
133+
publisher = pubsub.PublisherClient()
134+
topic_path = publisher.topic_path(PROJECT_ID, topic)
135+
136+
# Publish
137+
log.debug(f'Publishing message: {message}')
138+
message_data = message.encode('UTF-8')
139+
future = publisher.publish(topic_path, data=message_data)
140+
141+
return future.result()
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
# As explained here
2+
# https://cloud.google.com/functions/docs/writing/specifying-dependencies-python
3+
# dependencies for a Cloud Function must be specified in a `requirements.txt`
4+
# file (or packaged with the function) in the same directory as `main.py`
5+
# which contains the `stream_GCS_to_BQ()` function.
6+
7+
google-cloud-bigquery
8+
google-cloud-pubsub

broker/alert_ingestion/consume.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,9 @@
2424
# Create a GCS consumer object
2525
c = consume.GCSKafkaConsumer(
2626
kafka_config=config,
27-
bucket_name='my-gcs-bucket-name',
27+
bucket_name='<PROJECT_ID>_ztf_alert_avro_bucket',
2828
kafka_topic='my_kafka_topic_name',
29-
pubsub_topic='my-gcs-pubsub-name',
29+
pubsub_topic='ztf_alert_avro_in_bucket',
3030
debug=True # Use debug to run without updating your kafka offset
3131
)
3232
@@ -331,7 +331,6 @@ def guess_schema_version(alert_bytes: bytes) -> str:
331331

332332
return version_match.group(2).decode()
333333

334-
335334
def guess_schema_survey(alert_bytes: bytes) -> str:
336335
"""Retrieve the ZTF schema version
337336

broker/alert_ingestion/gen_valid_schema.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434

3535
from pathlib import Path
3636
import logging
37-
from typing import Tuple, BinaryIO, Union
37+
from typing import Tuple, List, BinaryIO, Union
3838
import pickle
3939
import json
4040
import fastavro
@@ -144,7 +144,7 @@ def _reverse_types(field: dict) -> dict:
144144
return field
145145

146146

147-
def _load_Avro(fin: Union[Path, BinaryIO]) -> Tuple[dict, dict]:
147+
def _load_Avro(fin: Union[Path, BinaryIO]) -> Tuple[dict, List[dict]]:
148148
"""
149149
Args:
150150
fin (str or file-like) : Path to, or file-like object representing,

broker/deploy_cloudfnc.sh

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
#!/bin/sh
2+
3+
###
4+
# This script deploys the ``stream_GCS_to_BQ`` function in
5+
# ``broker/alert_ingestion/GCS_to_BQ/main.py`` as a Google Cloud Function
6+
# so that it listens for new Avro files added to a Google Cloud Storage bucket
7+
# and uploads them to a BigQuery table.
8+
#
9+
# Note that the `GOOGLE_CLOUD_PROJECT` environment variable must be set
10+
# explicitly within the `gcloud` command.
11+
###
12+
13+
# NOT SURE OF THE RIGHT WAY TO GET INTO THIS DIRECTORY:
14+
cd broker/alert_ingestion/GCS_to_BQ
15+
16+
# deploy stream_GCS_to_BQ() to listen to the ztf_alert_avro_bucket
17+
bucket="${GOOGLE_CLOUD_PROJECT}_ztf_alert_avro_bucket"
18+
gcloud functions deploy stream_GCS_to_BQ --runtime python37 --set-env-vars GOOGLE_CLOUD_PROJECT=${GOOGLE_CLOUD_PROJECT} --trigger-resource ${bucket} --trigger-event google.storage.object.finalize

broker/gcp_setup.py

Lines changed: 77 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -28,46 +28,100 @@
2828
"""
2929

3030
import os
31+
from pathlib import Path
3132

3233
if not os.getenv('GPB_OFFLINE', False):
3334
from google.api_core.exceptions import NotFound
34-
from google.cloud import bigquery, logging, storage
35+
from google.cloud import bigquery, pubsub, logging, storage
3536

36-
_tables = ('alert', 'candidate')
37+
PROJECT_ID = os.getenv('GOOGLE_CLOUD_PROJECT')
38+
39+
_tables = ('alerts', 'test_GCS_to_BQ')
3740

3841

3942
def setup_big_query() -> None:
4043
"""Create the necessary Big Query datasets if they do not already exist
4144
42-
New data sets include:
45+
New datasets include:
4346
``ztf_alerts``
47+
``testing_dataset``
4448
"""
4549

4650
bigquery_client = bigquery.Client()
4751
bigquery_client.create_dataset('ztf_alerts', exists_ok=True)
52+
bigquery_client.create_dataset('testing_dataset', exists_ok=True)
4853

4954

5055
def setup_buckets() -> None:
51-
"""Create new storage buckets
56+
"""Create new storage buckets and upload testing files.
57+
Files are expected to reside in the ``tests/test_alerts`` directory.
5258
53-
New buckets include:
54-
``<project_id>_alert_avro_bucket
59+
New buckets [files] include:
60+
``<PROJECT_ID>_ztf_alert_avro_bucket``
61+
``<PROJECT_ID>_testing_bucket`` [``ztf_3.3_validschema_1154446891615015011.avro``]
5562
"""
5663

57-
storage_client = storage.Client()
64+
buckets = { # '<bucket name>': ['file name',]
65+
f'{PROJECT_ID}_ztf_alert_avro_bucket': [],
66+
f'{PROJECT_ID}_testing_bucket':
67+
['ztf_3.3_validschema_1154446891615015011.avro']
68+
}
5869

59-
# Create bucket names
60-
project_id = os.environ['GOOGLE_CLOUD_PROJECT']
61-
alert_avro_name = f'{project_id}_alert_avro_bucket'
70+
storage_client = storage.Client()
6271

63-
# Create buckets if the do not exist
64-
for bucket_name in (alert_avro_name,):
72+
for bucket_name, files in buckets.items():
73+
# Create buckets if they do not exist
6574
try:
6675
storage_client.get_bucket(bucket_name)
67-
6876
except NotFound:
6977
storage_client.create_bucket(bucket_name)
7078

79+
# Upload any files
80+
for filename in files:
81+
bucket = storage_client.get_bucket(bucket_name)
82+
blob = bucket.blob(filename)
83+
inpath = Path('tests/test_alerts') / filename
84+
with inpath.open('rb') as infile:
85+
blob.upload_from_file(infile)
86+
87+
88+
def setup_pubsub() -> None:
89+
""" Create new Pub/Sub topics and subscriptions
90+
91+
New topics [subscriptions] include:
92+
``ztf_alert_avro_in_bucket``
93+
``ztf_alerts_in_BQ``
94+
``test_alerts_in_BQ``
95+
``test_alerts_PS_publish`` [``test_alerts_PS_subscribe``]
96+
"""
97+
98+
topics = {# '<topic_name>': ['<subscription_name>', ]
99+
'ztf_alert_avro_in_bucket': [],
100+
'ztf_alerts_in_BQ': [],
101+
'test_alerts_in_BQ': [],
102+
'test_alerts_PS_publish': ['test_alerts_PS_subscribe']
103+
}
104+
105+
publisher = pubsub.PublisherClient()
106+
subscriber = pubsub.SubscriberClient()
107+
108+
for topic, subscriptions in topics.items():
109+
topic_path = publisher.topic_path(PROJECT_ID, topic)
110+
111+
# Create the topic
112+
try:
113+
publisher.get_topic(topic_path)
114+
except NotFound:
115+
publisher.create_topic(topic_path)
116+
117+
# Create any subscriptions:
118+
for sub_name in subscriptions:
119+
sub_path = subscriber.subscription_path(PROJECT_ID, sub_name)
120+
try:
121+
subscriber.get_subscription(sub_path)
122+
except NotFound:
123+
subscriber.create_subscription(sub_path, topic_path)
124+
71125

72126
def setup_logging_sinks() -> None:
73127
"""Create sinks for exporting log entries to GCP
@@ -92,16 +146,22 @@ def auto_setup() -> None:
92146
"""Create and setup GCP products required by the ``broker`` package
93147
94148
New data sets include:
95-
``ztf_alerts``
149+
``ztf_alerts``
150+
``testing_dataset``
96151
97152
New buckets include:
98-
``<project_id>_logging_bucket``
99-
``<project_id>_ztf_images``
153+
``<PROJECT_ID>_ztf_alert_avro_bucket``
154+
``<PROJECT_ID>_testing_bucket``
155+
156+
New topics include:
157+
``ztf_alerts_in_BQ``
158+
``test_alerts_in_BQ``
100159
101160
New sinks include:
102-
``broker_logging_sink``
161+
``broker_logging_sink``
103162
"""
104163

105164
setup_big_query()
106165
setup_buckets()
166+
setup_pubsub()
107167
setup_logging_sinks()

0 commit comments

Comments
 (0)