Skip to content

Commit f588510

Browse files
[Science module] Blazar low states (#486)
* Low state blazar (#483) * Create processor.py Creation of the low state detection module file * CTAO blazar comparison file (parquet) Computation from ZTF data release 22 Description: - 3FGL Name: Name of the source inthe 3FGL catalog - ZTF Name: ZTF tag associated beforehand - Array of Medians: Medians used to standardize the flux (np.nan if no median could be computed) - Computed Threshold: 10th percentile of the standardized flux from the data release - Observed Threshold: Measured flux during a previous redshift detrmination attempt - Redshift: Redshift found during the previous spectroscopic observation - Final Threshold: Minimum between the Computed Threshold and the Observed Threshold * First writing draft of processor.py * Create utils.py * Update processor.py Refont after utils.py creation * Update utils.py Edit documentation of low_state_ * Create utils.py * Add files via upload CTAO blazar comparison file (parquet) Computation from ZTF data release 22 Description: - 3FGL Name: Name of the source inthe 3FGL catalog - ZTF Name: ZTF tag associated beforehand - Array of Medians: Medians used to standardize the flux (np.nan if no median could be computed) - Computed Threshold: 10th percentile of the standardized flux from the data release - Observed Threshold: Measured flux during a previous redshift detrmination attempt - Redshift: Redshift found during the previous spectroscopic observation - Final Threshold: Minimum between the Computed Threshold and the Observed Threshold * Create processor.py * Update utils.py * Create __init__.py * Create __init__.py * Create README.md * Update README.md Page setting * Create README.md * Update processor.py Update documentation * Update utils.py Fix name in standardized_flux_ * Update processor.py Fix CTAO_PATH * Update processor.py Fix CTAO_PATH * Update of CI script to run only on single modules * Update of CI script to run only on single modules * Update of CI script to run only on single modules * Update of standardized_flux/utils.py * Change from 'low' to 'quiescent' * Documentation update * PEP8 check test * PEP8 fix * __init__.py fix * PEP8 fix test * PEP8 fix test * PEP8 fix test * PEP8 fix test * PEP8 fix test * Datatest creation * Shrinking of datatest size * PEP8 fix * Datatest path change * Example fix * Example fix * Example fix * Modification of tester.py * Modification of CTAO_blazars path * Modification of CTAO_blazars path * Modification of CTAO_blazars path * Modification of CTAO_blazars path * Modification of CTAO_blazars path * Example fix attempt * Example fix attempt * Example fix attempt * PATH change * Test addition * Test addition * Test addition * Test addition * Minor corrections --------- Co-authored-by: Julian <julian@> * Move Blazar file * Fix Illegal Parquet type: INT64 (TIMESTAMP_MICROS) error by recreating the file --------- Co-authored-by: JulianHamo <hamo@ijclab.in2p3.fr>
1 parent a8966dd commit f588510

File tree

12 files changed

+580
-1
lines changed

12 files changed

+580
-1
lines changed
19.1 KB
Binary file not shown.
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
# Quiescent state of blazar detection
2+
3+
This module adds a new column containing quantities that indicate if the last observation of a given blazar can classify it as a **quiescent state**.
4+
The available blazar set comes from the list of blazars that are planned to be monitored by the CTAO collaboration from the start of the South and North sites.
5+
6+
These quantities are:
7+
* The sliding mean of standardized flux of the last but new alert over a specific threshold computed for the source beforehand
8+
* The sliding mean of standardized flux of the new alert over the same threshold computed for the source beforehand
9+
* The standardized flux of the last alert over the same threshold
10+
11+
If the last two ratio are below one, the source is considered to be in a quiescent state.
12+
If the source is in a quiescent state but the first ratio is above one, this quiescent state is considered to have just started.

fink_science/blazar_low_state/__init__.py

Whitespace-only changes.
Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
# Copyright 2025 AstroLab Software
2+
# Author: Julian Hamo
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
from line_profiler import profile
16+
17+
import pandas as pd
18+
19+
from pyspark.sql.functions import pandas_udf
20+
from pyspark.sql.types import ArrayType, DoubleType
21+
from fink_science.blazar_low_state.utils import quiescent_state_
22+
23+
from fink_science.tester import spark_unit_tests
24+
from fink_science import __file__
25+
import os
26+
27+
RELEASE = 22
28+
29+
30+
@pandas_udf(ArrayType(DoubleType()))
31+
@profile
32+
def quiescent_state(
33+
candid: pd.Series,
34+
objectId: pd.Series,
35+
cstd_flux: pd.Series,
36+
cjd: pd.Series) -> pd.Series:
37+
"""Returns an array containing:
38+
The mean over threshold ratio of the last but one alert
39+
The mean over threshold ratio of the last alert
40+
The standardized flux over threshold ratio of the last alert
41+
42+
Parameters
43+
----------
44+
pdf: pd.core.frame.DataFrame
45+
Pandas DataFrame of the alert history containing:
46+
candid, ojbectId, cdistnr, cmagpsf, csigmapsf, cmagnr,
47+
csigmagnr, cisdiffpos, cfid, cjd, cstd_flux, csigma_std_flux
48+
CTAO_blazar: pd.core.frame.DataFrame
49+
Pandas DataFrame of the monitored sources containing:
50+
3FGL Name, ZTF Name, Arrays of Medians, Computed Threshold,
51+
Observed Threshold, Redshift, Final Threshold
52+
Returns
53+
-------
54+
out: pd.Series of np.ndarray of np.float64
55+
Array of ratios for:
56+
Mean over threshold of the last but one alert
57+
Mean over threshold of the last alert
58+
Measurement over threshold of the last alert
59+
All the default values are set to -1 as it is an unphysical value
60+
61+
Examples
62+
--------
63+
>>> import os
64+
>>> import numpy as np
65+
>>> import pandas as pd
66+
>>> from fink_utils.spark.utils import concat_col
67+
>>> import pyspark.sql.functions as F
68+
>>> from fink_science.standardized_flux.processor import standardized_flux
69+
70+
>>> parDF = spark.read.parquet(ztf_alert_sample)
71+
72+
# Required alert columns
73+
>>> what = [
74+
... 'distnr',
75+
... 'magpsf',
76+
... 'sigmapsf',
77+
... 'magnr',
78+
... 'sigmagnr',
79+
... 'isdiffpos',
80+
... 'fid',
81+
... 'jd'
82+
... ]
83+
84+
# Concatenation
85+
>>> prefix = 'c'
86+
>>> for key in what:
87+
... parDF = concat_col(parDF, colname=key, prefix=prefix)
88+
89+
# Preliminary module run
90+
>>> args = [
91+
... 'candid',
92+
... 'objectId',
93+
... 'cdistnr',
94+
... 'cmagpsf',
95+
... 'csigmapsf',
96+
... 'cmagnr',
97+
... 'csigmagnr',
98+
... 'cisdiffpos',
99+
... 'cfid',
100+
... 'cjd'
101+
... ]
102+
>>> parDF = parDF.withColumn(
103+
... 'container',
104+
... standardized_flux(*args)
105+
... )
106+
>>> parDF = parDF.withColumn(
107+
... 'cstd_flux',
108+
... parDF['container'].getItem('flux')
109+
... )
110+
>>> parDF = parDF.withColumn(
111+
... 'csigma_std_flux',
112+
... parDF['container'].getItem('sigma')
113+
... )
114+
115+
# Drop temporary columns
116+
>>> what_prefix = [prefix + key for key in what]
117+
>>> parDF = parDF.drop('container')
118+
119+
# Test the module
120+
>>> args = ['candid', 'objectId', 'cstd_flux', 'cjd']
121+
>>> parDF = parDF.withColumn('blazar_stats', quiescent_state(*args))
122+
123+
# Test
124+
>>> stats = parDF.select('blazar_stats').toPandas()['blazar_stats']
125+
>>> stats = np.array(stats.to_list())
126+
>>> stats[pd.isnull(stats)] = np.nan
127+
>>> (stats.sum(axis=-1) == -3).sum()
128+
320
129+
"""
130+
131+
path = os.path.dirname(os.path.abspath(__file__))
132+
CTAO_PATH = os.path.join(path, 'data/catalogs')
133+
CTAO_filename = 'CTAO_blazars_ztf_dr{}.parquet'.format(RELEASE)
134+
CTAO_blazar = pd.read_parquet(os.path.join(CTAO_PATH, CTAO_filename))
135+
136+
pdf = pd.DataFrame(
137+
{
138+
"candid": candid,
139+
"objectId": objectId,
140+
"cstd_flux": cstd_flux,
141+
"cjd": cjd
142+
}
143+
)
144+
out = []
145+
for candid_ in pdf["candid"]:
146+
tmp = pdf[pdf["candid"] == candid_]
147+
if len(tmp["cstd_flux"].to_numpy()[0]) == 0:
148+
out.append([-1., -1., -1.])
149+
continue
150+
sub = pd.DataFrame(
151+
{
152+
"candid": tmp["candid"].to_numpy()[0],
153+
"objectId": tmp["objectId"].to_numpy()[0],
154+
"cstd_flux": tmp["cstd_flux"].to_numpy()[0],
155+
"cjd": tmp["cjd"].to_numpy()[0],
156+
}
157+
)
158+
out.append(quiescent_state_(sub, CTAO_blazar))
159+
160+
return pd.Series(out)
161+
162+
163+
if __name__ == "__main__":
164+
"""Execute the test suite"""
165+
166+
globs = globals()
167+
path = os.path.join(os.path.dirname(__file__), 'data/alerts')
168+
filename = 'CTAO_blazar_datatest_v20-12-24.parquet'
169+
ztf_alert_sample = "file://{}/{}".format(path, filename)
170+
globs["ztf_alert_sample"] = ztf_alert_sample
171+
172+
# Run the test suite
173+
spark_unit_tests(globs)
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
import numpy as np
2+
import pandas as pd
3+
4+
5+
def instantness_criterion(
6+
pdf: pd.DataFrame,
7+
CTAO_blazar: pd.DataFrame) -> np.float64:
8+
"""Returns the standardized flux of the last measurement
9+
over the precomputed threshold ratio
10+
11+
Parameters
12+
----------
13+
pdf: pd.core.frame.DataFrame
14+
Pandas DataFrame of the alert history containing:
15+
candid, ojbectId, cdistnr, cmagpsf, csigmapsf, cmagnr,
16+
csigmagnr, cisdiffpos, cfid, cjd, cstd_flux, csigma_std_flux
17+
CTAO_blazar: pd.core.frame.DataFrame
18+
Pandas DataFrame of the monitored sources containing:
19+
3FGL Name, ZTF Name, Arrays of Medians, Computed Threshold,
20+
Observed Threshold, Redshift, Final Threshold
21+
22+
Returns
23+
-------
24+
out: np.float64
25+
Ratio of the standardized flux coming from the last measurement alert
26+
over precomputed threshold
27+
"""
28+
29+
name = pdf['objectId'].values[0]
30+
31+
try:
32+
threshold = np.array(
33+
CTAO_blazar.loc[
34+
CTAO_blazar['ZTF Name'] == name,
35+
'Final Threshold'
36+
].values[0]
37+
)
38+
except IndexError:
39+
threshold = np.nan
40+
41+
try:
42+
return pdf['cstd_flux'].iloc[-1] / threshold
43+
except KeyError:
44+
return np.nan
45+
46+
47+
def robustness_criterion(
48+
pdf: pd.DataFrame,
49+
CTAO_blazar: pd.DataFrame) -> np.float64:
50+
"""Returns the sliding mean over 30 days of the standardized flux
51+
over the precomputed threshold ratio
52+
53+
Parameters
54+
----------
55+
pdf: pd.core.frame.DataFrame
56+
Pandas DataFrame of the alert history containing:
57+
candid, ojbectId, cdistnr, cmagpsf, csigmapsf, cmagnr,
58+
csigmagnr, cisdiffpos, cfid, cjd, cstd_flux, csigma_std_flux
59+
CTAO_blazar: pd.core.frame.DataFrame
60+
Pandas DataFrame of the monitored sources containing:
61+
3FGL Name, ZTF Name, Arrays of Medians, Computed Threshold,
62+
Observed Threshold, Redshift, Final Threshold
63+
64+
Returns
65+
-------
66+
out: np.float64
67+
Ratio of the sliding mean over 30 days of the standardized flux over
68+
the precomputed threshold
69+
"""
70+
71+
integration_period = 30
72+
name = pdf['objectId'].values[0]
73+
74+
try:
75+
threshold = np.array(
76+
CTAO_blazar.loc[
77+
CTAO_blazar['ZTF Name'] == name,
78+
'Final Threshold'
79+
].values[0]
80+
)
81+
except IndexError:
82+
threshold = np.nan
83+
84+
try:
85+
full_time = pdf['cjd']
86+
maskTime = full_time >= full_time.iloc[-1] - integration_period
87+
time = pdf.loc[maskTime, 'cjd']
88+
flux = pdf.loc[maskTime, 'cstd_flux']
89+
except KeyError:
90+
return np.nan
91+
92+
maskNan = ~pd.isnull(flux)
93+
mtime = time[maskNan]
94+
if maskNan.sum() > 1:
95+
mtimestart = mtime.iloc[0]
96+
mtimestop = mtime.iloc[-1]
97+
integral = np.trapz(flux[maskNan], x=mtime)
98+
return integral / (mtimestop - mtimestart) / threshold
99+
else:
100+
return np.nan
101+
102+
103+
def quiescent_state_(
104+
pdf: pd.DataFrame,
105+
CTAO_blazar: pd.DataFrame) -> np.ndarray:
106+
"""Returns an array containing:
107+
The mean over threshold ratio of the last but one alert
108+
The mean over threshold ratio of the last alert
109+
The standardized flux over threshold ratio of the last alert
110+
111+
Parameters
112+
----------
113+
pdf: pd.core.frame.DataFrame
114+
Pandas DataFrame of the alert history containing:
115+
candid, ojbectId, cdistnr, cmagpsf, csigmapsf, cmagnr,
116+
csigmagnr, cisdiffpos, cfid, cjd, cstd_flux, csigma_std_flux
117+
CTAO_blazar: pd.core.frame.DataFrame
118+
Pandas DataFrame of the monitored sources containing:
119+
3FGL Name, ZTF Name, Arrays of Medians, Computed Threshold,
120+
Observed Threshold, Redshift, Final Threshold
121+
Returns
122+
-------
123+
out: np.ndarray of np.float64
124+
Array of ratios for:
125+
Mean over threshold of the last but one alert
126+
Mean over threshold of the last alert
127+
Measurement over threshold of the last alert
128+
"""
129+
130+
name = pdf['objectId'].values[0]
131+
132+
if not CTAO_blazar.loc[CTAO_blazar['ZTF Name'] == name].empty:
133+
return np.array(
134+
[
135+
robustness_criterion(pdf[:-1], CTAO_blazar),
136+
robustness_criterion(pdf, CTAO_blazar),
137+
instantness_criterion(pdf, CTAO_blazar)
138+
]
139+
)
140+
141+
else:
142+
return np.full(3, np.nan)
17.4 MB
Binary file not shown.
19.1 KB
Binary file not shown.
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
# Flux standardization module
2+
3+
This module adds two new columns for a set of sources defined beforehand.
4+
These sources are blazars that the CTAO collaboration wants to monitor starting the launch of the South and North sites.
5+
The new columns are the **standardized flux** (which is the flux over the median flux computed beforhand using archival light curves of ZTF) and the **uncertainties** over this standardized flux.

fink_science/standardized_flux/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)