Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 170 additions & 6 deletions fink_utils/sso/spins.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import astropy.units as u

from fink_utils.sso.utils import estimate_axes_ratio
from fink_utils.sso.utils import get_opposition, split_dataframe_per_apparition
from fink_utils.tester import regular_unit_tests


Expand Down Expand Up @@ -343,6 +344,51 @@ def func_sshg1g2(pha, h, g1, g2, alpha0, delta0, period, a_b, a_c, phi0):
return func1 + func2


def sfhg1g2_multiple(phas, g1, g2, *args):
"""HG1G2 model in the case of simultaneous fit

Parameters
----------
phas: np.array
(N, M_o) array of phase angles. N is the number
of opposition, M_o is the number of observations
per opposition. Phase is radians.
g1: float
G1 parameter (no unit)
g2: float
G2 parameter (no unit)
args: float
List of Hs, one per opposition.

Returns
-------
out: np.array
Magnitude as predicted by `func_hg1g2`.
"""
fl = []
for alpha, h in zip(phas, args[0][:]):
fl.append(func_hg1g2(alpha, h, g1, g2))
return np.concatenate(fl)


def sfhg1g2_error_fun(params, phas, mags):
"""Difference between sfHG1G2 predictions and observations

Parameters
----------
params: list
[G1, G2, *H_i], where H_i are the Hs, one per opposition
phas: np.array
(N, M_o) array of phase angles. N is the number
of opposition, M_o is the number of observations
per opposition. Must be sorted by time. Phase is radians.
mags: np.array
Reduced magnitude, that is m_obs - 5 * np.log10('Dobs' * 'Dhelio')
Sorted by time.
"""
return sfhg1g2_multiple(phas, params[0], params[1], params[2:]) - mags


def color_correction_to_V(): # noqa: N802
"""Color correction from band to V

Expand Down Expand Up @@ -864,12 +910,7 @@ def fit_legacy_models(

Returns
-------
popt: list
Estimated parameters for `func`
perr: list
Error estimates on popt elements
chi2_red: float
Reduced chi2
outdic: dict
"""
if p0 is None:
p0 = [15, 0.15, 0.15]
Expand Down Expand Up @@ -994,6 +1035,129 @@ def fit_legacy_models(
return outdic


def fit_sfhg1g2(
ssnamenr,
magpsf_red,
sigmapsf,
jds,
phase,
filters,
):
"""Fit for phase curve parameters for sfHG1G2

Notes
-----
Unlike other models, it returns less information, and
only per-band.

Parameters
----------
ssnamenr: str
SSO name/number
magpsf_red: array
Reduced magnitude, that is m_obs - 5 * np.log10('Dobs' * 'Dhelio')
sigmapsf: array
Error estimates on magpsf_red
jds: array
Julian Dates
phase: array
Phase angle [rad]
filters: array
Filter name for each measurement

Returns
-------
outdic: dict
Dictionary containing reduced chi2, and estimated parameters and
error on each parameters.
"""
# exit if NaN values
if not np.all([i == i for i in magpsf_red]):
outdic = {"fit": 1, "status": -2}
return outdic

pdf = pd.DataFrame({
"i:magpsf_red": magpsf_red,
"i:sigmapsf": sigmapsf,
"Phase": phase,
"i:jd": jds,
"i:fid": filters,
})
pdf = pdf.sort_values("i:jd")

# Get oppositions
pdf[["elong", "elongFlag"]] = get_opposition(pdf["i:jd"].to_numpy(), ssnamenr)

# loop over filters
ufilters = np.unique(pdf["i:fid"].to_numpy())
outdics = {}
for filt in ufilters:
outdic = {}

# Select data for a filter
sub = pdf[pdf["i:fid"] == filt].copy()

# Compute apparitions
splitted = split_dataframe_per_apparition(sub, "elongFlag", "i:jd")
napparition = len(splitted)

# H for all apparitions plus G1, G2
params_ = ["G1", "G2", *["H{}".format(i) for i in range(napparition)]]
params = [i + "_{}".format(str(filt)) for i in params_]

# Split phase
phase_list = [df["Phase"].to_numpy().tolist() for df in splitted]

# Fit
res_lsq = least_squares(
sfhg1g2_error_fun,
x0=[0.15, 0.15] + [15] * napparition,
bounds=(
[0, 0] + [-3] * napparition,
[1, 1] + [30] * napparition,
),
loss="huber",
method="trf",
args=[
phase_list,
sub["i:magpsf_red"].to_numpy().tolist(),
],
xtol=1e-20,
gtol=1e-17,
)

# Result
popt = res_lsq.x

# estimate covariance matrix using the jacobian
try:
cov = linalg.inv(res_lsq.jac.T @ res_lsq.jac)
chi2dof = np.sum(res_lsq.fun**2) / (res_lsq.fun.size - res_lsq.x.size)
cov *= chi2dof

# 1sigma uncertainty on fitted parameters
perr = np.sqrt(np.diag(cov))
except np.linalg.LinAlgError:
# raised if jacobian is degenerated
outdic = {"fit": 4, "status": res_lsq.status}
return outdic

chisq = np.sum((res_lsq.fun / sub["i:sigmapsf"]) ** 2)
chisq_red = chisq / (res_lsq.fun.size - res_lsq.x.size - 1)
outdic["chi2red_{}".format(filt)] = chisq_red
outdic["rms_{}".format(filt)] = np.sqrt(np.mean(res_lsq.fun**2))
outdic["n_obs_{}".format(filt)] = len(sub)
outdic["n_app_{}".format(filt)] = napparition

for i in range(len(params)):
outdic[params[i]] = popt[i]
outdic["err_" + params[i]] = perr[i]

outdics.update(outdic)

return outdics


def fit_spin(
magpsf_red,
sigmapsf,
Expand Down
150 changes: 150 additions & 0 deletions fink_utils/sso/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import numpy as np

import astropy.constants as const
from astropy.time import Time
from astroquery.jplhorizons import Horizons


from scipy import signal

Expand Down Expand Up @@ -385,6 +388,153 @@ def retrieve_first_date_of_next_month(mydate):
return (mydate.replace(day=1) + datetime.timedelta(days=32)).replace(day=1)


def split_dataframe_per_apparition(df, column_name, time_column):
"""Split a dataframe per apparition

Notes
-----
Function from M. Colazo

Parameters
----------
df: Pandas DataFrame
Input DataFrame
column_name: str
Name of column containing information on oppositions.
This information comes from horizons.
time_column: str
Name of the column containing times, in JD.

Returns
-------
out: list of DataFrames
Initial DataFrame splitted. Each DataFrame
contains data for a given opposition.

Examples
--------
>>> pdf = pd.DataFrame({"elongFlag": ["/L", "/L", "/T", "/T"], "jd": [1, 2, 2000, 4000]})
>>> splitted = split_dataframe_per_apparition(pdf, "elongFlag", "jd")
>>> len(splitted)
2
"""
df_list = []
temp_df = None
prev_value = None
prev_time = None

for _, row in df.iterrows():
current_value = row[column_name]
current_time = row[time_column]

if current_value.startswith("/L") and (
prev_value is None or prev_value.startswith("/T")
):
if temp_df is not None and not temp_df.empty:
df_list.append(temp_df)
temp_df = pd.DataFrame(columns=df.columns)
elif current_value.startswith("/T") and (
prev_value is None or prev_value.startswith("/L")
):
if temp_df is None:
temp_df = pd.DataFrame(columns=df.columns)
elif current_value.startswith("/T") and prev_value.startswith("/T"):
current_time = row[
time_column
] # Extract the Julian date from the current row.
if prev_time is not None and (current_time - prev_time) > 182.5:
# If the difference is greater than 6 months, a new apparition begins.
df_list.append(temp_df)
temp_df = pd.DataFrame(columns=df.columns)
if temp_df is None:
temp_df = pd.DataFrame(columns=df.columns)

if temp_df is not None:
temp_df = pd.concat([temp_df, row.to_frame().T], ignore_index=True)

# Update previous values
prev_value = current_value
prev_time = current_time

if temp_df is not None and not temp_df.empty:
df_list.append(temp_df)

return df_list


def get_opposition(jds, ssnamenr, location="I41"):
"""Get vector of opposition information

Notes
-----
Function from M. Colazo

Parameters
----------
jds: np.array
Array of JD values (float).
Must be UTC, and sorted.

Returns
-------
elong: np.array of float
Elongation angle in degree
elongFlag: np.array of str
Elongation flag (/T or /L)

Examples
--------
# >>> import io
# >>> import requests
# >>> import pandas as pd

# >>> r = requests.post(
# ... 'https://api.fink-portal.org/api/v1/sso',
# ... json={
# ... 'n_or_d': "8467",
# ... 'withEphem': True,
# ... 'output-format': 'json'
# ... }
# ... )
# >>> pdf = pd.read_json(io.BytesIO(r.content))

# # estimate number of oppositions
# >>> pdf = pdf.sort_values("i:jd")
# >>> pdf[["elong", "elongFlag"]] = get_opposition(pdf["i:jd"].to_numpy(), "8467")
"""
# Get min and max TDB Julian dates
jd_min, jd_max = min(jds), max(jds)

# Convert to calendar date format (YYYY-MM-DD)
date_min = Time(jd_min, format="jd", scale="utc").iso[:10]
date_max = Time(jd_max, format="jd", scale="utc").iso[:10]

# Query Horizons using the converted time range
obj = Horizons(
id=ssnamenr,
location=location,
epochs={"start": date_min, "stop": date_max, "step": "1d"},
id_type="smallbody",
)
eph = obj.ephemerides()

# Convert ephemeris data to Pandas DataFrame
eph_df = eph.to_pandas()

pdf = pd.DataFrame({"jds": jds})

# Merge on the closest Julian date
pdf = pd.merge_asof(
pdf.sort_values("jds"),
eph_df[["datetime_jd", "elongFlag", "elong"]].sort_values("datetime_jd"),
left_on="jds",
right_on="datetime_jd",
direction="nearest",
)

return pdf[["elong", "elongFlag"]].to_numpy()


if __name__ == "__main__":
"""Execute the unit test suite"""

Expand Down