Skip to content

Commit 3092915

Browse files
authored
Merge pull request #62 from mwvgroup/v/0.5.0/tjr
V/0.5.0/tjr
2 parents d9e7566 + 2d65e83 commit 3092915

File tree

92 files changed

+4671
-1475
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

92 files changed

+4671
-1475
lines changed

README.md

Lines changed: 7 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,13 @@
1+
[![python](https://img.shields.io/badge/python-3.7-g.svg)]()
2+
[![Build Status](https://travis-ci.com/mwvgroup/Pitt-Google-Broker.svg?branch=master)](https://travis-ci.com/mwvgroup/Pitt-Google-Broker)
3+
[![Documentation Status](https://readthedocs.org/projects/pitt-broker/badge/?version=latest)](https://pitt-broker.readthedocs.io/en/latest/?badge=latest)
4+
15
# Pitt-Google Alert Broker
26

37
The Pitt-Google broker is an astronomical alert broker that is being developed for large scale surveys of the night sky, particularly the upcoming [Vera Rubin Observatory's Legacy Survey of Space and Time](https://www.lsst.org/) (LSST).
48
We currently process and serve the [Zwicky Transient Facility](https://www.ztf.caltech.edu/)'s (ZTF) nightly alert stream.
5-
The broker runs on the Google Cloud Platform ([GCP](https://cloud.google.com)).
6-
7-
---
8-
9-
## Access the Data
10-
11-
See [Pitt-Google-Tutorial-Code-Samples.ipynb](https://github.com/mwvgroup/Pitt-Google-Broker/blob/master/pgb_utils/tutorials/Pitt-Google-Tutorial-Code-Samples.ipynb) for a tutorial.
12-
Data can be accessed using Google's [Cloud SDK](https://cloud.google.com/sdk) (Python, command-line, etc.).
13-
In addition, we offer the Python package `pgb_utils` which contains wrappers of Cloud SDK methods and other helper functions to facilitate common use cases.
14-
See the tutorials for details.
15-
16-
If you run into issues or need assistance, please open an Issue on GitHub or contact troy.raen@pitt.edu.
9+
The broker runs on the [Google Cloud Platform](https://cloud.google.com) (GCP).
1710

18-
---
11+
Documentation is at [pitt-broker.readthedocs.io](https://pitt-broker.readthedocs.io/).
1912

20-
## Run the Alert Broker
21-
22-
See [broker/README.md](broker/README.md) for information about the alert broker software and instructions on running it. The broker will connect to a survey alert stream (e.g., ZTF) and process \& redistribute the data. Those looking to __access__ the data do not need to run the broker; instead see [Access the Data](#access-the-data)
23-
24-
<!-- Full online documentation is available online via [Read the Docs](https://pitt-broker.readthedocs.io/en/latest/index.html). -->
25-
26-
---
27-
28-
[![python](https://img.shields.io/badge/python-3.7-g.svg)]()
29-
[![Build Status](https://travis-ci.com/mwvgroup/Pitt-Google-Broker.svg?branch=master)](https://travis-ci.com/mwvgroup/Pitt-Google-Broker)
30-
[![Documentation Status](https://readthedocs.org/projects/pitt-broker/badge/?version=latest)](https://pitt-broker.readthedocs.io/en/latest/?badge=latest)
13+
If you run into issues or need assistance, please [open an Issue](https://github.com/mwvgroup/Pitt-Google-Broker/issues).

broker/broker_utils/MANIFEST.in

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
include requirements.txt

broker/broker_utils/broker_utils/beam_transforms.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ def process(self, alertDict):
5252

5353

5454
class ExtractDIASource(DoFn):
55-
"""Extract the DIA source` fields and information needed for provinance
55+
"""Extract the DIA source` fields and information needed for provenance
5656
from the alertDict.
5757
"""
5858
def __init__(self, schema_map):
@@ -74,7 +74,7 @@ def process_decat(self, alertDict):
7474
sourcename = lambda x: x if x not in dup_cols else f'source_{x}'
7575
src = {sourcename(k):v for k,v in alertDict['triggersource'].items()}
7676

77-
# get info for provinance
77+
# get info for provenance
7878
notmetakeys = ['triggersource', 'sources']
7979
metadict = {k:v for k,v in alertDict.items() if k not in notmetakeys}
8080

@@ -94,7 +94,7 @@ def process_ztf(self, alertDict):
9494
dup_cols = ['candid'] # candid is repeated, drop the one nested here
9595
cand = {k:v for k,v in alertDict['candidate'].items() if k not in dup_cols}
9696

97-
# get info for provinance
97+
# get info for provenance
9898
metakeys = ['schemavsn', 'publisher', 'objectId', 'candid']
9999
metadict = {k:v for k,v in alertDict.items() if k in metakeys}
100100

broker/broker_utils/broker_utils/consumer_sim.py

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ def publish_stream(
2121
publish_batch_every: Tuple[int,str] = (5,'sec'),
2222
sub_id: Optional[str] = None,
2323
topic_id: Optional[str] = None,
24-
nack: bool = False
24+
nack: bool = False,
25+
auto_confirm: bool = False
2526
):
2627
"""Pulls messages from from a Pub/Sub subscription determined by either
2728
`instance` or `sub_id`, and publishes them to a topic determined by either
@@ -54,6 +55,8 @@ def publish_stream(
5455
messages are published to the topic, but they are not
5556
dropped from the subscription and so will be delivered again
5657
at an arbitrary time in the future.
58+
59+
auto_confirm: Whether to automatically answer "Y" to the confirmation prompt.
5760
"""
5861

5962
pbeN, pbeU = publish_batch_every # shorthand
@@ -69,10 +72,10 @@ def publish_stream(
6972
print(f"\nPublishing:\n\t{Nbatches} batches\n\teach with {alerts_per_batch} alerts\n\tat a rate of 1 batch per {pbeN} {pbeU} (plus processing time)\n\tfor a total of {Nbatches*alerts_per_batch} alerts")
7073

7174
# publish the stream
72-
_do_publish_stream(instance, alerts_per_batch, Nbatches, publish_batch_every, sub_id, topic_id, nack)
75+
_do_publish_stream(instance, alerts_per_batch, Nbatches, publish_batch_every, sub_id, topic_id, nack, auto_confirm)
7376

7477
def _do_publish_stream(
75-
instance, alerts_per_batch, Nbatches, publish_batch_every, sub_id=None, topic_id=None, nack=False):
78+
instance, alerts_per_batch, Nbatches, publish_batch_every, sub_id=None, topic_id=None, nack=False, auto_confirm=False):
7679

7780
# check units
7881
if publish_batch_every[1] != 'sec':
@@ -85,7 +88,7 @@ def _do_publish_stream(
8588
print(f"and\n\tPublish to topic: {topic_path}\n")
8689

8790
# make the user confirm
88-
_user_confirm()
91+
_user_confirm(auto_confirm)
8992
print(f"\nPublishing...")
9093

9194
b = 0
@@ -105,10 +108,11 @@ def _do_publish_stream(
105108
b = b+1
106109
time.sleep(publish_batch_every[0])
107110

108-
def _user_confirm():
109-
cont = input('Continue? [Y/n]: ') or 'Y'
110-
if cont not in ['Y', 'y']:
111-
sys.exit('Exiting consumer simulator.')
111+
def _user_confirm(auto_confirm=False):
112+
if not auto_confirm:
113+
cont = input('Continue? [Y/n]: ') or 'Y'
114+
if cont not in ['Y', 'y']:
115+
sys.exit('Exiting consumer simulator.')
112116

113117
def _setup_subscribe(alerts_per_batch, instance=None, sub_id=None):
114118
if (instance is None) and (sub_id is None):
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
# Schema Maps
2+
3+
The files in this directory contain mappings between the schema of an individual survey and a PGB-standardized schema that is used within the broker code.
4+
5+
Note: This directory is __not__ packaged with the `broker_utils` module.
6+
In order to allow broker instances to use unique schema maps, independent of other instances, each instance uploads this directory to its [`broker_files`] Cloud Storage bucket upon setup.
7+
The broker code loads the schema maps from the bucket of the appropriate instance at runtime.

broker/broker_utils/schema_maps/decat.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
SURVEY: decat
2+
TOPIC_SYNTAX: decat_yyyymmdd_2021A-0113 # replace yyyymmdd with the date
23
FILTER_MAP:
34
g DECam SDSS c0001 4720.0 1520.0: g
45
r DECam SDSS c0002 6415.0 1480.0: r

broker/broker_utils/schema_maps/ztf.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
SURVEY: ztf
2+
TOPIC_SYNTAX: ztf_yyyymmdd_programid1 # replace yyyymmdd with the date
23
FILTER_MAP:
34
1: g
45
2: r

broker/broker_utils/setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131

3232
setup(
3333
name='pgb_broker_utils', # Required
34-
version='0.2.7', # Required
34+
version='0.2.8', # Required
3535
description='Tools used by the Pitt-Google astronomical alert broker.',
3636
long_description=long_description,
3737
long_description_content_type='text/markdown',
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# Check cue response
2+
3+
This Cloud Function checks whether the broker responded appropriately to the auto-scheduler's cue.
4+
It does this by first pausing to allow time for the response, and then checking each broker component, such as VMs and Dataflow jobs.
5+
If a component is found to be in an unexpected state, a "Critical" error is raised which triggers a GCP alerting policy.
6+
7+
This Cloud Function is triggered by the auto-scheduler's Pub/Sub topic. For reference, the auto-scheduling process looks like this (see [Auto-scheduler](auto-scheduler.md)):
8+
9+
Cloud Scheduler cron job -> Pub/Sub -> Cloud Function -> Night Conductor VM startup
10+
11+
12+
## Alerting policy
13+
14+
An alerting policy was created manually to notify Troy Raen of anything written to the log named `check-cue-response-cloudfnc` that has severity `'CRITICAL'`.
15+
Every broker instance has a unique `check_cue_response` Cloud Function, but they all write to the same log.
16+
Therefore, a new policy does not need to be created with each new broker instance.
17+
(Also, recall that the auto-scheduler is typically only active in Production instances.)
18+
19+
To update the existing policy, or create a new one, see:
20+
- [Managing log-based alerts](https://cloud.google.com/logging/docs/alerting/log-based-alerts)
21+
- [Managing alerting policies by API](https://cloud.google.com/monitoring/alerts/using-alerting-api)
22+
- [Managing notification channels](https://cloud.google.com/monitoring/support/notification-options)
23+
24+
## Links
25+
26+
- [`googleapiclient.discovery.build` Library reference documentation by API](https://github.com/googleapis/google-api-python-client/blob/master/docs/dyn/index.md)

0 commit comments

Comments
 (0)