Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file not shown.
12 changes: 12 additions & 0 deletions fink_science/blazar_low_state/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Quiescent state of blazar detection

This module adds a new column containing quantities that indicate if the last observation of a given blazar can classify it as a **quiescent state**.
The available blazar set comes from the list of blazars that are planned to be monitored by the CTAO collaboration from the start of the South and North sites.

These quantities are:
* The sliding mean of standardized flux of the last but new alert over a specific threshold computed for the source beforehand
* The sliding mean of standardized flux of the new alert over the same threshold computed for the source beforehand
* The standardized flux of the last alert over the same threshold

If the last two ratio are below one, the source is considered to be in a quiescent state.
If the source is in a quiescent state but the first ratio is above one, this quiescent state is considered to have just started.
Empty file.
173 changes: 173 additions & 0 deletions fink_science/blazar_low_state/processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
# Copyright 2025 AstroLab Software
# Author: Julian Hamo
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from line_profiler import profile

import pandas as pd

from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import ArrayType, DoubleType
from fink_science.blazar_low_state.utils import quiescent_state_

from fink_science.tester import spark_unit_tests
from fink_science import __file__
import os

RELEASE = 22


@pandas_udf(ArrayType(DoubleType()))
@profile
def quiescent_state(
candid: pd.Series,
objectId: pd.Series,
cstd_flux: pd.Series,
cjd: pd.Series) -> pd.Series:
"""Returns an array containing:
The mean over threshold ratio of the last but one alert
The mean over threshold ratio of the last alert
The standardized flux over threshold ratio of the last alert

Parameters
----------
pdf: pd.core.frame.DataFrame
Pandas DataFrame of the alert history containing:
candid, ojbectId, cdistnr, cmagpsf, csigmapsf, cmagnr,
csigmagnr, cisdiffpos, cfid, cjd, cstd_flux, csigma_std_flux
CTAO_blazar: pd.core.frame.DataFrame
Pandas DataFrame of the monitored sources containing:
3FGL Name, ZTF Name, Arrays of Medians, Computed Threshold,
Observed Threshold, Redshift, Final Threshold
Returns
-------
out: pd.Series of np.ndarray of np.float64
Array of ratios for:
Mean over threshold of the last but one alert
Mean over threshold of the last alert
Measurement over threshold of the last alert
All the default values are set to -1 as it is an unphysical value

Examples
--------
>>> import os
>>> import numpy as np
>>> import pandas as pd
>>> from fink_utils.spark.utils import concat_col
>>> import pyspark.sql.functions as F
>>> from fink_science.standardized_flux.processor import standardized_flux

>>> parDF = spark.read.parquet(ztf_alert_sample)

# Required alert columns
>>> what = [
... 'distnr',
... 'magpsf',
... 'sigmapsf',
... 'magnr',
... 'sigmagnr',
... 'isdiffpos',
... 'fid',
... 'jd'
... ]

# Concatenation
>>> prefix = 'c'
>>> for key in what:
... parDF = concat_col(parDF, colname=key, prefix=prefix)

# Preliminary module run
>>> args = [
... 'candid',
... 'objectId',
... 'cdistnr',
... 'cmagpsf',
... 'csigmapsf',
... 'cmagnr',
... 'csigmagnr',
... 'cisdiffpos',
... 'cfid',
... 'cjd'
... ]
>>> parDF = parDF.withColumn(
... 'container',
... standardized_flux(*args)
... )
>>> parDF = parDF.withColumn(
... 'cstd_flux',
... parDF['container'].getItem('flux')
... )
>>> parDF = parDF.withColumn(
... 'csigma_std_flux',
... parDF['container'].getItem('sigma')
... )

# Drop temporary columns
>>> what_prefix = [prefix + key for key in what]
>>> parDF = parDF.drop('container')

# Test the module
>>> args = ['candid', 'objectId', 'cstd_flux', 'cjd']
>>> parDF = parDF.withColumn('blazar_stats', quiescent_state(*args))

# Test
>>> stats = parDF.select('blazar_stats').toPandas()['blazar_stats']
>>> stats = np.array(stats.to_list())
>>> stats[pd.isnull(stats)] = np.nan
>>> (stats.sum(axis=-1) == -3).sum()
320
"""

path = os.path.dirname(os.path.abspath(__file__))
CTAO_PATH = os.path.join(path, 'data/catalogs')
CTAO_filename = 'CTAO_blazars_ztf_dr{}.parquet'.format(RELEASE)
CTAO_blazar = pd.read_parquet(os.path.join(CTAO_PATH, CTAO_filename))

pdf = pd.DataFrame(
{
"candid": candid,
"objectId": objectId,
"cstd_flux": cstd_flux,
"cjd": cjd
}
)
out = []
for candid_ in pdf["candid"]:
tmp = pdf[pdf["candid"] == candid_]
if len(tmp["cstd_flux"].to_numpy()[0]) == 0:
out.append([-1., -1., -1.])
continue
sub = pd.DataFrame(
{
"candid": tmp["candid"].to_numpy()[0],
"objectId": tmp["objectId"].to_numpy()[0],
"cstd_flux": tmp["cstd_flux"].to_numpy()[0],
"cjd": tmp["cjd"].to_numpy()[0],
}
)
out.append(quiescent_state_(sub, CTAO_blazar))

return pd.Series(out)


if __name__ == "__main__":
"""Execute the test suite"""

globs = globals()
path = os.path.join(os.path.dirname(__file__), 'data/alerts')
filename = 'CTAO_blazar_datatest_v20-12-24.parquet'
ztf_alert_sample = "file://{}/{}".format(path, filename)
globs["ztf_alert_sample"] = ztf_alert_sample

# Run the test suite
spark_unit_tests(globs)
142 changes: 142 additions & 0 deletions fink_science/blazar_low_state/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
import numpy as np
import pandas as pd


def instantness_criterion(
pdf: pd.DataFrame,
CTAO_blazar: pd.DataFrame) -> np.float64:
"""Returns the standardized flux of the last measurement
over the precomputed threshold ratio
Parameters
----------
pdf: pd.core.frame.DataFrame
Pandas DataFrame of the alert history containing:
candid, ojbectId, cdistnr, cmagpsf, csigmapsf, cmagnr,
csigmagnr, cisdiffpos, cfid, cjd, cstd_flux, csigma_std_flux
CTAO_blazar: pd.core.frame.DataFrame
Pandas DataFrame of the monitored sources containing:
3FGL Name, ZTF Name, Arrays of Medians, Computed Threshold,
Observed Threshold, Redshift, Final Threshold
Returns
-------
out: np.float64
Ratio of the standardized flux coming from the last measurement alert
over precomputed threshold
"""

name = pdf['objectId'].values[0]

try:
threshold = np.array(
CTAO_blazar.loc[
CTAO_blazar['ZTF Name'] == name,
'Final Threshold'
].values[0]
)
except IndexError:
threshold = np.nan

try:
return pdf['cstd_flux'].iloc[-1] / threshold
except KeyError:
return np.nan


def robustness_criterion(
pdf: pd.DataFrame,
CTAO_blazar: pd.DataFrame) -> np.float64:
"""Returns the sliding mean over 30 days of the standardized flux
over the precomputed threshold ratio
Parameters
----------
pdf: pd.core.frame.DataFrame
Pandas DataFrame of the alert history containing:
candid, ojbectId, cdistnr, cmagpsf, csigmapsf, cmagnr,
csigmagnr, cisdiffpos, cfid, cjd, cstd_flux, csigma_std_flux
CTAO_blazar: pd.core.frame.DataFrame
Pandas DataFrame of the monitored sources containing:
3FGL Name, ZTF Name, Arrays of Medians, Computed Threshold,
Observed Threshold, Redshift, Final Threshold
Returns
-------
out: np.float64
Ratio of the sliding mean over 30 days of the standardized flux over
the precomputed threshold
"""

integration_period = 30
name = pdf['objectId'].values[0]

try:
threshold = np.array(
CTAO_blazar.loc[
CTAO_blazar['ZTF Name'] == name,
'Final Threshold'
].values[0]
)
except IndexError:
threshold = np.nan

try:
full_time = pdf['cjd']
maskTime = full_time >= full_time.iloc[-1] - integration_period
time = pdf.loc[maskTime, 'cjd']
flux = pdf.loc[maskTime, 'cstd_flux']
except KeyError:
return np.nan

maskNan = ~pd.isnull(flux)
mtime = time[maskNan]
if maskNan.sum() > 1:
mtimestart = mtime.iloc[0]
mtimestop = mtime.iloc[-1]
integral = np.trapz(flux[maskNan], x=mtime)
return integral / (mtimestop - mtimestart) / threshold
else:
return np.nan


def quiescent_state_(
pdf: pd.DataFrame,
CTAO_blazar: pd.DataFrame) -> np.ndarray:
"""Returns an array containing:
The mean over threshold ratio of the last but one alert
The mean over threshold ratio of the last alert
The standardized flux over threshold ratio of the last alert
Parameters
----------
pdf: pd.core.frame.DataFrame
Pandas DataFrame of the alert history containing:
candid, ojbectId, cdistnr, cmagpsf, csigmapsf, cmagnr,
csigmagnr, cisdiffpos, cfid, cjd, cstd_flux, csigma_std_flux
CTAO_blazar: pd.core.frame.DataFrame
Pandas DataFrame of the monitored sources containing:
3FGL Name, ZTF Name, Arrays of Medians, Computed Threshold,
Observed Threshold, Redshift, Final Threshold
Returns
-------
out: np.ndarray of np.float64
Array of ratios for:
Mean over threshold of the last but one alert
Mean over threshold of the last alert
Measurement over threshold of the last alert
"""

name = pdf['objectId'].values[0]

if not CTAO_blazar.loc[CTAO_blazar['ZTF Name'] == name].empty:
return np.array(
[
robustness_criterion(pdf[:-1], CTAO_blazar),
robustness_criterion(pdf, CTAO_blazar),
instantness_criterion(pdf, CTAO_blazar)
]
)

else:
return np.full(3, np.nan)
Binary file not shown.
Binary file not shown.
5 changes: 5 additions & 0 deletions fink_science/standardized_flux/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Flux standardization module

This module adds two new columns for a set of sources defined beforehand.
These sources are blazars that the CTAO collaboration wants to monitor starting the launch of the South and North sites.
The new columns are the **standardized flux** (which is the flux over the median flux computed beforhand using archival light curves of ZTF) and the **uncertainties** over this standardized flux.
Empty file.
Loading
Loading