Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
01a50d3
Create processor.py
JulianHamo Dec 18, 2024
78be5ac
CTAO blazar comparison file (parquet)
JulianHamo Dec 18, 2024
ccac0a9
First writing draft of processor.py
JulianHamo Dec 18, 2024
ff00fb8
Create utils.py
JulianHamo Dec 18, 2024
19e79b4
Update processor.py
JulianHamo Dec 18, 2024
6bbe9e4
Update utils.py
JulianHamo Dec 18, 2024
4ff425b
Create utils.py
JulianHamo Dec 18, 2024
69cdb1f
Add files via upload
JulianHamo Dec 18, 2024
6e5f09e
Create processor.py
JulianHamo Dec 18, 2024
1e7281b
Update utils.py
JulianHamo Dec 18, 2024
c3243f2
Create __init__.py
JulianHamo Dec 18, 2024
1a29e0a
Create __init__.py
JulianHamo Dec 18, 2024
48993a0
Create README.md
JulianHamo Dec 18, 2024
75b6d5a
Update README.md
JulianHamo Dec 18, 2024
044015a
Create README.md
JulianHamo Dec 18, 2024
536fba6
Update processor.py
JulianHamo Dec 18, 2024
57b2d59
Update utils.py
JulianHamo Dec 18, 2024
20151bf
Update processor.py
JulianHamo Dec 18, 2024
1f8da55
Update processor.py
JulianHamo Dec 18, 2024
28c15af
Update of CI script to run only on single modules
Dec 19, 2024
9b7c51e
Update of CI script to run only on single modules
JulianHamo Dec 19, 2024
2aa9cae
Update of CI script to run only on single modules
JulianHamo Dec 19, 2024
cdb8388
Update of standardized_flux/utils.py
JulianHamo Dec 19, 2024
993f024
Change from 'low' to 'quiescent'
JulianHamo Dec 19, 2024
0a5c000
Documentation update
JulianHamo Dec 19, 2024
d6a5365
PEP8 check test
JulianHamo Dec 19, 2024
9699a0b
PEP8 fix
JulianHamo Dec 19, 2024
8bdd36a
__init__.py fix
JulianHamo Dec 19, 2024
0b2fe70
PEP8 fix test
JulianHamo Dec 19, 2024
8017541
PEP8 fix test
JulianHamo Dec 19, 2024
aa81b31
PEP8 fix test
JulianHamo Dec 19, 2024
fdb574c
PEP8 fix test
JulianHamo Dec 19, 2024
25c249a
PEP8 fix test
JulianHamo Dec 19, 2024
631e618
Datatest creation
JulianHamo Dec 20, 2024
03869f8
Shrinking of datatest size
JulianHamo Dec 20, 2024
27f3928
PEP8 fix
JulianHamo Dec 20, 2024
85b1448
Datatest path change
JulianHamo Dec 20, 2024
f610755
Example fix
JulianHamo Dec 20, 2024
393c4ff
Example fix
JulianHamo Dec 20, 2024
85cfb55
Example fix
JulianHamo Dec 20, 2024
54c5295
Modification of tester.py
JulianHamo Dec 20, 2024
e51819e
Modification of CTAO_blazars path
JulianHamo Dec 20, 2024
e37ba8e
Modification of CTAO_blazars path
JulianHamo Dec 20, 2024
d7d740a
Modification of CTAO_blazars path
JulianHamo Dec 20, 2024
a59dc02
Modification of CTAO_blazars path
JulianHamo Dec 20, 2024
a5b8ab4
Modification of CTAO_blazars path
JulianHamo Dec 20, 2024
594bb8e
Example fix attempt
JulianHamo Dec 20, 2024
1cf6fe5
Example fix attempt
JulianHamo Dec 20, 2024
c7ca3a1
Example fix attempt
JulianHamo Dec 20, 2024
464833a
PATH change
JulianHamo Jan 10, 2025
b7db3d8
Test addition
JulianHamo Jan 10, 2025
3de4fe7
Test addition
JulianHamo Jan 10, 2025
4df8a42
Test addition
JulianHamo Jan 10, 2025
3636765
Test addition
JulianHamo Jan 10, 2025
0621ff5
Minor corrections
JulianHamo Jan 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file not shown.
12 changes: 12 additions & 0 deletions fink_science/blazar_low_state/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Quiescent state of blazar detection

This module adds a new column containing quantities that indicate if the last observation of a given blazar can classify it as a **quiescent state**.
The available blazar set comes from the list of blazars that are planned to be monitored by the CTAO collaboration from the start of the South and North sites.

These quantities are:
* The sliding mean of standardized flux of the last but new alert over a specific threshold computed for the source beforehand
* The sliding mean of standardized flux of the new alert over the same threshold computed for the source beforehand
* The standardized flux of the last alert over the same threshold

If the last two ratio are below one, the source is considered to be in a quiescent state.
If the source is in a quiescent state but the first ratio is above one, this quiescent state is considered to have just started.
Empty file.
173 changes: 173 additions & 0 deletions fink_science/blazar_low_state/processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
# Copyright 2025 AstroLab Software
# Author: Julian Hamo
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from line_profiler import profile

import pandas as pd

from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import ArrayType, DoubleType
from fink_science.blazar_low_state.utils import quiescent_state_

from fink_science.tester import spark_unit_tests
from fink_science import __file__
import os

RELEASE = 22


@pandas_udf(ArrayType(DoubleType()))
@profile
def quiescent_state(
candid: pd.Series,
objectId: pd.Series,
cstd_flux: pd.Series,
cjd: pd.Series) -> pd.Series:
"""Returns an array containing:
The mean over threshold ratio of the last but one alert
The mean over threshold ratio of the last alert
The standardized flux over threshold ratio of the last alert

Parameters
----------
pdf: pd.core.frame.DataFrame
Pandas DataFrame of the alert history containing:
candid, ojbectId, cdistnr, cmagpsf, csigmapsf, cmagnr,
csigmagnr, cisdiffpos, cfid, cjd, cstd_flux, csigma_std_flux
CTAO_blazar: pd.core.frame.DataFrame
Pandas DataFrame of the monitored sources containing:
3FGL Name, ZTF Name, Arrays of Medians, Computed Threshold,
Observed Threshold, Redshift, Final Threshold
Returns
-------
out: pd.Series of np.ndarray of np.float64
Array of ratios for:
Mean over threshold of the last but one alert
Mean over threshold of the last alert
Measurement over threshold of the last alert
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to specify default values of -1 in case of problem.

All the default values are set to -1 as it is an unphysical value

Examples
--------
>>> import os
>>> import numpy as np
>>> import pandas as pd
>>> from fink_utils.spark.utils import concat_col
>>> import pyspark.sql.functions as F
>>> from fink_science.standardized_flux.processor import standardized_flux

>>> parDF = spark.read.parquet(ztf_alert_sample)

# Required alert columns
>>> what = [
... 'distnr',
... 'magpsf',
... 'sigmapsf',
... 'magnr',
... 'sigmagnr',
... 'isdiffpos',
... 'fid',
... 'jd'
... ]

# Concatenation
>>> prefix = 'c'
>>> for key in what:
... parDF = concat_col(parDF, colname=key, prefix=prefix)

# Preliminary module run
>>> args = [
... 'candid',
... 'objectId',
... 'cdistnr',
... 'cmagpsf',
... 'csigmapsf',
... 'cmagnr',
... 'csigmagnr',
... 'cisdiffpos',
... 'cfid',
... 'cjd'
... ]
>>> parDF = parDF.withColumn(
... 'container',
... standardized_flux(*args)
... )
>>> parDF = parDF.withColumn(
... 'cstd_flux',
... parDF['container'].getItem('flux')
... )
>>> parDF = parDF.withColumn(
... 'csigma_std_flux',
... parDF['container'].getItem('sigma')
... )

# Drop temporary columns
>>> what_prefix = [prefix + key for key in what]
>>> parDF = parDF.drop('container')

# Test the module
>>> args = ['candid', 'objectId', 'cstd_flux', 'cjd']
>>> parDF = parDF.withColumn('blazar_stats', quiescent_state(*args))

# Test
>>> stats = parDF.select('blazar_stats').toPandas()['blazar_stats']
>>> stats = np.array(stats.to_list())
>>> stats[pd.isnull(stats)] = np.nan
>>> (stats.sum(axis=-1) == -3).sum()
320
"""

path = os.path.dirname(os.path.abspath(__file__))
CTAO_PATH = os.path.join(path, 'data/catalogs')
CTAO_filename = 'CTAO_blazars_ztf_dr{}.parquet'.format(RELEASE)
CTAO_blazar = pd.read_parquet(os.path.join(CTAO_PATH, CTAO_filename))

pdf = pd.DataFrame(
{
"candid": candid,
"objectId": objectId,
"cstd_flux": cstd_flux,
"cjd": cjd
}
)
out = []
for candid_ in pdf["candid"]:
tmp = pdf[pdf["candid"] == candid_]
if len(tmp["cstd_flux"].to_numpy()[0]) == 0:
out.append([-1., -1., -1.])
continue
sub = pd.DataFrame(
{
"candid": tmp["candid"].to_numpy()[0],
"objectId": tmp["objectId"].to_numpy()[0],
"cstd_flux": tmp["cstd_flux"].to_numpy()[0],
"cjd": tmp["cjd"].to_numpy()[0],
}
)
out.append(quiescent_state_(sub, CTAO_blazar))

return pd.Series(out)


if __name__ == "__main__":
"""Execute the test suite"""

globs = globals()
path = os.path.join(os.path.dirname(__file__), 'data/alerts/datatest')
filename = 'CTAO_blazar_datatest_v20-12-24.parquet'
ztf_alert_sample = "file://{}/{}".format(path, filename)
globs["ztf_alert_sample"] = ztf_alert_sample

# Run the test suite
spark_unit_tests(globs)
142 changes: 142 additions & 0 deletions fink_science/blazar_low_state/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
import numpy as np
import pandas as pd


def instantness_criterion(
pdf: pd.DataFrame,
CTAO_blazar: pd.DataFrame) -> np.float64:
"""Returns the standardized flux of the last measurement
over the precomputed threshold ratio

Parameters
----------
pdf: pd.core.frame.DataFrame
Pandas DataFrame of the alert history containing:
candid, ojbectId, cdistnr, cmagpsf, csigmapsf, cmagnr,
csigmagnr, cisdiffpos, cfid, cjd, cstd_flux, csigma_std_flux
CTAO_blazar: pd.core.frame.DataFrame
Pandas DataFrame of the monitored sources containing:
3FGL Name, ZTF Name, Arrays of Medians, Computed Threshold,
Observed Threshold, Redshift, Final Threshold

Returns
-------
out: np.float64
Ratio of the standardized flux coming from the last measurement alert
over precomputed threshold
"""

name = pdf['objectId'].values[0]

try:
threshold = np.array(
CTAO_blazar.loc[
CTAO_blazar['ZTF Name'] == name,
'Final Threshold'
].values[0]
)
except IndexError:
threshold = np.nan

try:
return pdf['cstd_flux'].iloc[-1] / threshold
except KeyError:
return np.nan


def robustness_criterion(
pdf: pd.DataFrame,
CTAO_blazar: pd.DataFrame) -> np.float64:
"""Returns the sliding mean over 30 days of the standardized flux
over the precomputed threshold ratio

Parameters
----------
pdf: pd.core.frame.DataFrame
Pandas DataFrame of the alert history containing:
candid, ojbectId, cdistnr, cmagpsf, csigmapsf, cmagnr,
csigmagnr, cisdiffpos, cfid, cjd, cstd_flux, csigma_std_flux
CTAO_blazar: pd.core.frame.DataFrame
Pandas DataFrame of the monitored sources containing:
3FGL Name, ZTF Name, Arrays of Medians, Computed Threshold,
Observed Threshold, Redshift, Final Threshold

Returns
-------
out: np.float64
Ratio of the sliding mean over 30 days of the standardized flux over
the precomputed threshold
"""

integration_period = 30
name = pdf['objectId'].values[0]

try:
threshold = np.array(
CTAO_blazar.loc[
CTAO_blazar['ZTF Name'] == name,
'Final Threshold'
].values[0]
)
except IndexError:
threshold = np.nan

try:
full_time = pdf['cjd']
maskTime = full_time >= full_time.iloc[-1] - integration_period
time = pdf.loc[maskTime, 'cjd']
flux = pdf.loc[maskTime, 'cstd_flux']
except KeyError:
return np.nan

maskNan = ~pd.isnull(flux)
mtime = time[maskNan]
if maskNan.sum() > 1:
mtimestart = mtime.iloc[0]
mtimestop = mtime.iloc[-1]
integral = np.trapz(flux[maskNan], x=mtime)
return integral / (mtimestop - mtimestart) / threshold
else:
return np.nan


def quiescent_state_(
pdf: pd.DataFrame,
CTAO_blazar: pd.DataFrame) -> np.ndarray:
"""Returns an array containing:
The mean over threshold ratio of the last but one alert
The mean over threshold ratio of the last alert
The standardized flux over threshold ratio of the last alert

Parameters
----------
pdf: pd.core.frame.DataFrame
Pandas DataFrame of the alert history containing:
candid, ojbectId, cdistnr, cmagpsf, csigmapsf, cmagnr,
csigmagnr, cisdiffpos, cfid, cjd, cstd_flux, csigma_std_flux
CTAO_blazar: pd.core.frame.DataFrame
Pandas DataFrame of the monitored sources containing:
3FGL Name, ZTF Name, Arrays of Medians, Computed Threshold,
Observed Threshold, Redshift, Final Threshold
Returns
-------
out: np.ndarray of np.float64
Array of ratios for:
Mean over threshold of the last but one alert
Mean over threshold of the last alert
Measurement over threshold of the last alert
"""

name = pdf['objectId'].values[0]

if not CTAO_blazar.loc[CTAO_blazar['ZTF Name'] == name].empty:
return np.array(
[
robustness_criterion(pdf[:-1], CTAO_blazar),
robustness_criterion(pdf, CTAO_blazar),
instantness_criterion(pdf, CTAO_blazar)
]
)

else:
return np.full(3, np.nan)
Binary file not shown.
Binary file not shown.
5 changes: 5 additions & 0 deletions fink_science/standardized_flux/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Flux standardization module

This module adds two new columns for a set of sources defined beforehand.
These sources are blazars that the CTAO collaboration wants to monitor starting the launch of the South and North sites.
The new columns are the **standardized flux** (which is the flux over the median flux computed beforhand using archival light curves of ZTF) and the **uncertainties** over this standardized flux.
Empty file.
Loading