Skip to content

Commit 9066360

Browse files
Support for LSST (#230)
* Add multi-suvey for the configuration file * PEP8 * Rewrite log * Update doc and test * Propagate survey to the consumer. Display only relevant topics per service * Propagate survey to bin * Properly decode payload based on the survey * Fix WARN messahe * Fix writing avro function * Update setup * Update tools for visualisation * DOc * Data transfer ready for LSST * Documentation * Doc * Fix test * Add utilities to read parquet files * Add survey * Fix test * 10.0-rc0 * Fix name of argument * Rename argument to be consistent with other tools. Make argument rquired.
1 parent 7614eca commit 9066360

File tree

14 files changed

+674
-282
lines changed

14 files changed

+674
-282
lines changed

.ruff.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@ ignore = [
4343
"D400", "D401", "D100", "D102", "D103", "D104", "D415", "D419",
4444
"E731",
4545
"N812", "N806", "N803",
46-
"PD901",
4746
"PLR0913"
4847
]
4948

README.md

Lines changed: 71 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,10 @@ In order to connect and poll alerts from Fink, you need to get your credentials:
3939
1. Subscribe to one or more Fink streams by filling this [form](https://forms.gle/2td4jysT4e9pkf889).
4040
2. After filling the form, we will send your credentials. Register them on your laptop by simply running:
4141
```
42-
fink_client_register -username <USERNAME> -group_id <GROUP_ID> -servers <SERVERS> ...
42+
fink_client_register -survey SURVEY -username USERNAME -group_id GROUP_ID -servers SERVERS ...
4343
```
4444

45-
In case of doubt, run `fink_client_register -h`. You can also inspect the configuration file on disk:
45+
Note that `SURVEY` is among `ztf` or `lsst`. In case of doubt, run `fink_client_register -h`. You can also inspect the configuration file on disk:
4646

4747
```bash
4848
cat ~/.finkclient/credentials.yml
@@ -54,77 +54,104 @@ Once you have your credentials, you are ready to poll streams! You can easily ac
5454

5555
```bash
5656
fink_consumer -h
57-
usage: fink_consumer [-h] [--display] [--display_statistics] [-limit LIMIT]
58-
[--available_topics] [--save] [-outdir OUTDIR]
59-
[-schema SCHEMA] [--dump_schema] [-start_at START_AT]
57+
usage: fink_consumer [-h] -survey SURVEY [--display]
58+
[--display_statistics] [-limit LIMIT]
59+
[--available_topics] [--save]
60+
[-outdir OUTDIR] [-schema SCHEMA]
61+
[--dump_schema] [-start_at START_AT]
6062

61-
Kafka consumer to listen and archive Fink streams from the Livestream service
63+
Kafka consumer to listen and archive Fink streams from the
64+
Livestream service
6265

63-
optional arguments:
66+
options:
6467
-h, --help show this help message and exit
65-
--display If specified, print on screen information about
66-
incoming alert.
67-
--display_statistics If specified, print on screen information about queues,
68-
and exit.
69-
-limit LIMIT If specified, download only `limit` alerts. Default is
70-
None.
71-
--available_topics If specified, print on screen information about
72-
available topics.
73-
--save If specified, save alert data on disk (Avro). See also
74-
-outdir.
75-
-outdir OUTDIR Folder to store incoming alerts if --save is set. It
76-
must exist.
77-
-schema SCHEMA Avro schema to decode the incoming alerts. Default is
78-
None (version taken from each alert)
79-
--dump_schema If specified, save the schema on disk (json file)
80-
-start_at START_AT If specified, reset offsets to 0 (`earliest`) or empty
81-
queue (`latest`).
68+
-survey SURVEY Survey name among ztf or lsst. Note that
69+
each survey will have its own configuration
70+
file.
71+
--display If specified, print on screen information
72+
about incoming alert.
73+
--display_statistics If specified, print on screen information
74+
about queues, and exit.
75+
-limit LIMIT If specified, download only `limit` alerts.
76+
Default is None.
77+
--available_topics If specified, print on screen information
78+
about available topics.
79+
--save If specified, save alert data on disk
80+
(Avro). See also -outdir.
81+
-outdir OUTDIR Folder to store incoming alerts if --save is
82+
set. It must exist.
83+
-schema SCHEMA Avro schema to decode the incoming alerts.
84+
Default is None (version taken from each
85+
alert)
86+
--dump_schema If specified, save the schema on disk (json
87+
file)
88+
-start_at START_AT If specified, reset offsets to 0
89+
(`earliest`) or empty queue (`latest`).
8290
```
8391
8492
You can also look at an alert on the disk:
8593
8694
```bash
8795
fink_alert_viewer -h
88-
usage: fink_alert_viewer [-h] [-filename FILENAME]
96+
usage: fink_alert_viewer [-h] [-f F] [-s S]
8997

90-
Display cutouts and lightcurve from a ZTF alert
98+
Display cutouts and lightcurve from an alert
9199

92-
optional arguments:
93-
-h, --help show this help message and exit
94-
-filename FILENAME Path to an alert data file (avro format)
100+
options:
101+
-h, --help show this help message and exit
102+
-f F Path to an alert data file (avro format)
103+
-s S Survey name among ztf or lsst.
95104
```
96105
97-
More information at [docs/livestream](https://fink-broker.readthedocs.io/en/latest/services/livestream).
106+
More information at [docs/livestream](https://doc.lsst.fink-broker.org/en/latest/services/livestream).
98107
99108
## Data Transfer usage
100109
101-
If you requested data using the [Data Transfer service](https://fink-portal.org/download), you can easily poll your stream using:
110+
If you requested data using the Data Transfer service ([ZTF](https://ztf.fink-portal.org/download) or [LSST](https://lsst.fink-portal.org/download)), you can easily poll your stream using:
102111
103112
```bash
104-
usage: fink_datatransfer.py [-h] [-topic TOPIC] [-limit LIMIT] [-outdir OUTDIR] [-partitionby PARTITIONBY] [-batchsize BATCHSIZE] [-nconsumers NCONSUMERS]
105-
[-maxtimeout MAXTIMEOUT] [-number_partitions NUMBER_PARTITIONS] [--restart_from_beginning] [--verbose]
113+
usage: fink_datatransfer [-h] -survey SURVEY [-topic TOPIC] [-limit LIMIT]
114+
[-outdir OUTDIR] [-partitionby PARTITIONBY]
115+
[-batchsize BATCHSIZE] [-nconsumers NCONSUMERS]
116+
[-maxtimeout MAXTIMEOUT]
117+
[-number_partitions NUMBER_PARTITIONS]
118+
[--restart_from_beginning] [--dump_schema] [--verbose]
106119

107120
Kafka consumer to listen and archive Fink streams from the data transfer service
108121

109-
optional arguments:
122+
options:
110123
-h, --help show this help message and exit
124+
-survey SURVEY Survey name among ztf or lsst. Note that each survey will have its
125+
own configuration file.
111126
-topic TOPIC Topic name for the stream that contains the data.
112-
-limit LIMIT If specified, download only `limit` alerts from the stream. Default is None, that is download all alerts.
113-
-outdir OUTDIR Folder to store incoming alerts. It will be created if it does not exist.
127+
-limit LIMIT If specified, download only `limit` alerts from the stream.
128+
Default is None, that is download all alerts.
129+
-outdir OUTDIR Folder to store incoming alerts. It will be created if it does not
130+
exist.
114131
-partitionby PARTITIONBY
115-
Partition data by `time` (year=YYYY/month=MM/day=DD), or `finkclass` (finkclass=CLASS), or `tnsclass` (tnsclass=CLASS). `classId` is
116-
also available for ELASTiCC data. Default is time.
117-
-batchsize BATCHSIZE Maximum number of alert within the `maxtimeout` (see conf). Default is 1000 alerts.
132+
If specified, partition data when writing alerts on disk.
133+
Available options: - `time`: year=YYYY/month=MM/day=DD (ztf and
134+
lsst) - `finkclass`: finkclass=CLASS (ztf only) - `tnsclass`:
135+
tnsclass=CLASS (ztf only) - `classId`: classId=CLASSID (ELASTiCC
136+
only) Default is None, that is no partitioning is applied (all
137+
parquet files in the `outdir` folder).
138+
-batchsize BATCHSIZE Maximum number of alert within the `maxtimeout` (see conf).
139+
Default is 1000 alerts.
118140
-nconsumers NCONSUMERS
119-
Number of parallel consumer to use. Default (-1) is the number of logical CPUs in the system.
141+
Number of parallel consumer to use. Default (-1) is the number of
142+
logical CPUs in the system.
120143
-maxtimeout MAXTIMEOUT
121-
Overwrite the default timeout (in seconds) from user configuration. Default is None.
144+
Overwrite the default timeout (in seconds) from user
145+
configuration. Default is None.
122146
-number_partitions NUMBER_PARTITIONS
123-
Number of partitions for the topic in the distant Kafka cluster. Do not touch unless you know what your are doing. Default is 10
147+
Number of partitions for the topic in the distant Kafka cluster.
148+
Do not touch unless you know what your are doing. Default is 10
124149
(Fink Kafka cluster)
125150
--restart_from_beginning
126-
If specified, restart downloading from the 1st alert in the stream. Default is False.
151+
If specified, restart downloading from the 1st alert in the
152+
stream. Default is False.
153+
--dump_schema If specified, save the schema on disk (json file)
127154
--verbose If specified, print on screen information about the consuming.
128155
```
129156
130-
More information at [docs/datatransfer](https://fink-broker.readthedocs.io/en/latest/services/data_transfer/).
157+
More information at [docs/datatransfer](https://doc.ztf.fink-broker.org/en/latest/services/data_transfer/).

bin/fink_client_test.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ done
2727
TEST_DIR=${FINK_CLIENT_HOME}/tests
2828

2929
# Fake credentials
30-
fink_client_register -username test -password None \
30+
fink_client_register -survey ztf -username test -password None \
3131
-servers 'localhost:9093, localhost:9094, localhost:9095' \
3232
-mytopics rrlyr -group_id test_group -maxtimeout 10 --tmp
3333

fink_client/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,5 +12,5 @@
1212
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
15-
__version__ = "9.2"
15+
__version__ = "10.0-rc0"
1616
__schema_version__ = "distribution_schema_fink_ztf_{}.avsc"

fink_client/avro_utils.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -230,9 +230,9 @@ def write_alert(
230230
alert: dict,
231231
schema: str,
232232
path: str,
233-
overwrite: bool = False,
234-
id1: str = "",
233+
id1: str,
235234
id2: str = "",
235+
overwrite: bool = False,
236236
):
237237
"""Write avro alert on disk
238238
@@ -245,12 +245,12 @@ def write_alert(
245245
path: str
246246
Folder that will contain the alert. The filename will always be
247247
<objectID>.avro
248+
id1: str
249+
Prefix for alert name
250+
id2: str, optional
251+
Second prefix for alert name if need be (id1_id2.avro)
248252
overwrite: bool, optional
249253
If True, overwrite existing alert. Default is False.
250-
id1: str, optional
251-
First prefix for alert name: {id1}_{id2}.avro
252-
id2: str, optional
253-
Second prefix for alert name: {id1}_{id2}.avro
254254
255255
Examples
256256
--------
@@ -270,7 +270,10 @@ def write_alert(
270270
...
271271
OSError: ./ZTF19acihgng_1060135832015015002.avro already exists!
272272
"""
273-
alert_filename = os.path.join(path, "{}_{}.avro".format(alert[id1], alert[id2]))
273+
if id2 is not None:
274+
alert_filename = os.path.join(path, "{}_{}.avro".format(alert[id1], alert[id2]))
275+
else:
276+
alert_filename = os.path.join(path, "{}.avro".format(alert[id1]))
274277

275278
if isinstance(schema, str):
276279
schema = _get_alert_schema(schema)

fink_client/configuration.py

Lines changed: 44 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
#!/usr/bin/env python
2-
# Copyright 2019-2020 AstroLab Software
2+
# Copyright 2019-2026 AstroLab Software
33
# Author: Julien Peloton
44
#
55
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -16,13 +16,25 @@
1616
import yaml
1717
import os
1818

19+
from fink_client.logger import get_fink_logger
1920
from fink_client.tester import regular_unit_tests
2021

22+
_LOG = get_fink_logger()
2123
_ROOTDIR = os.path.join(os.environ["HOME"], ".finkclient")
22-
_CREDNAME = "credentials.yml"
24+
_CREDNAME = "{}_credentials.yml"
2325

2426

25-
def write_credentials(dict_file: dict, verbose: bool = False, tmp: bool = False):
27+
def check_survey_exists(survey):
28+
"""Check survey exists"""
29+
if survey not in ["ztf", "lsst"]:
30+
raise KeyError(
31+
"-survey must be one of ['ztf', 'lsst']. {} is not recognized.".format(
32+
survey
33+
)
34+
)
35+
36+
37+
def write_credentials(dict_file: dict, log_level: str = "WARN", tmp: bool = False):
2638
"""Store user credentials on the computer.
2739
2840
To get your credentials, contact Fink admins or fill the registration form:
@@ -32,56 +44,63 @@ def write_credentials(dict_file: dict, verbose: bool = False, tmp: bool = False)
3244
----------
3345
dict_file: dict
3446
Dictionnary containing user credentials.
35-
verbose: bool, optional
36-
If True, print the credentials location. Default is False.
47+
log_level: str, optional
48+
Level of verbosity. Default is WARN.
3749
tmp: bool, optional
3850
If True, store the credentials under /tmp. Default is False.
3951
4052
Examples
4153
--------
4254
>>> conf = {
55+
... 'survey': 'lsst',
4356
... 'username': 'test',
4457
... 'password': None,
4558
... 'mytopics': ['rrlyr'],
4659
... 'servers': 'localhost:9093',
4760
... 'group_id': 'test_group',
4861
... 'maxtimeout': 10
4962
... }
50-
>>> write_credentials(conf, verbose=False, tmp=True)
63+
>>> write_credentials(conf, log_level="WARN", tmp=True)
5164
"""
65+
_LOG.setLevel(log_level)
5266
if tmp:
5367
ROOTDIR = "/tmp"
5468
else:
5569
ROOTDIR = _ROOTDIR
5670

5771
# check there are no missing information
5872
mandatory_keys = [
73+
"survey",
5974
"username",
60-
"password",
6175
"group_id",
62-
"mytopics",
6376
"servers",
64-
"maxtimeout",
6577
]
6678
for k in mandatory_keys:
6779
assert k in dict_file.keys(), "You need to specify {}".format(k)
6880

81+
check_survey_exists(dict_file["survey"])
82+
6983
# Create the folder if it does not exist
7084
os.makedirs(ROOTDIR, exist_ok=True)
7185

7286
# Store data into yml file
73-
with open(os.path.join(ROOTDIR, _CREDNAME), "w") as f:
87+
with open(os.path.join(ROOTDIR, _CREDNAME.format(dict_file["survey"])), "w") as f:
7488
yaml.dump(dict_file, f)
7589

76-
if verbose:
77-
print("Credentials stored at {}/{}".format(ROOTDIR, _CREDNAME))
90+
_LOG.info(
91+
"Credentials stored at {}/{}".format(
92+
ROOTDIR, _CREDNAME.format(dict_file["survey"])
93+
)
94+
)
7895

7996

80-
def load_credentials(tmp: bool = False) -> dict:
97+
def load_credentials(survey: str, tmp: bool = False) -> dict:
8198
"""Load fink-client credentials.
8299
83100
Parameters
84101
----------
102+
survey: str
103+
Survey name, among ztf or lsst
85104
tmp: bool, optional
86105
If True, load the credentials from /tmp. Default is False.
87106
@@ -93,45 +112,47 @@ def load_credentials(tmp: bool = False) -> dict:
93112
Examples
94113
--------
95114
>>> conf_in = {
115+
... 'survey': 'lsst',
96116
... 'username': 'test',
97117
... 'password': None,
98118
... 'mytopics': ['rrlyr'],
99119
... 'servers': 'localhost:9093',
100120
... 'group_id': 'test_group',
101121
... 'maxtimeout': 10
102122
... }
103-
>>> write_credentials(conf_in, verbose=False, tmp=True)
104-
>>> conf_out = load_credentials(tmp=True)
123+
>>> write_credentials(conf_in, tmp=True)
124+
>>> conf_out = load_credentials(survey=conf_in["survey"], tmp=True)
105125
106126
If, however the credentials do not exist yet
107-
>>> os.remove('/tmp/credentials.yml')
108-
>>> conf = load_credentials(tmp=True) # doctest: +NORMALIZE_WHITESPACE, +ELLIPSIS
127+
>>> os.remove('/tmp/lsst_credentials.yml')
128+
>>> conf = load_credentials(survey="lsst", tmp=True) # doctest: +NORMALIZE_WHITESPACE, +ELLIPSIS
109129
Traceback (most recent call last):
110130
...
111-
OSError: No credentials found, did you register?
131+
OSError: No credentials found for survey lsst, did you register?
112132
To get your credentials, and use fink-client you need to:
113133
1. subscribe to one or more Fink streams at
114134
https://forms.gle/2td4jysT4e9pkf889
115135
2. run `fink_client_register` to register
116-
See https://fink-broker.readthedocs.io/en/latest/services/data_transfer/
136+
See https://doc.lsst.fink-broker.org/en/latest/services/data_transfer/
117137
118138
"""
139+
check_survey_exists(survey)
119140
if tmp:
120141
ROOTDIR = "/tmp"
121142
else:
122143
ROOTDIR = _ROOTDIR
123144

124-
path = os.path.join(ROOTDIR, _CREDNAME)
145+
path = os.path.join(ROOTDIR, _CREDNAME.format(survey))
125146

126147
if not os.path.exists(path):
127148
msg = """
128-
No credentials found, did you register?
149+
No credentials found for survey {}, did you register?
129150
To get your credentials, and use fink-client you need to:
130151
1. subscribe to one or more Fink streams at
131152
https://forms.gle/2td4jysT4e9pkf889
132153
2. run `fink_client_register` to register
133-
See https://fink-broker.readthedocs.io/en/latest/services/data_transfer/
134-
"""
154+
See https://doc.{}.fink-broker.org/en/latest/services/data_transfer/
155+
""".format(survey, survey)
135156
raise IOError(msg)
136157

137158
with open(path) as f:

0 commit comments

Comments
 (0)