Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 39 additions & 8 deletions fink_utils/sso/ssoft.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

from astropy.time import Time

from fink_utils.tester import spark_unit_tests

COLUMNS = {
Expand Down Expand Up @@ -475,16 +477,27 @@ def join_aggregated_sso_data(df_prev, df_new, on="ssnamenr", output_filename=Non


def aggregate_ztf_sso_data(
year, month, prefix_path="archive/science", output_filename=None
year,
month=None,
stop_previous_month=False,
prefix_path="archive/science",
output_filename=None,
):
"""Aggregate ZTF SSO data in Fink

Parameters
----------
year: str
Year date in format YYYY.
month: str
Month date in format MM.
month: str, optional
Month date in format MM. Default is None, in
which case `year` only will be considered.
stop_previous_month: bool, optional
If True, load data only until previous month.
To use only with month=None, to reconstruct
data from the current year.
prefix_path: str, optional
Prefix path on HDFS. Default is archive/science
output_filename: str, optional
If given, save data on HDFS. Cannot overwrite. Default is None.

Expand All @@ -496,9 +509,18 @@ def aggregate_ztf_sso_data(
Examples
--------
>>> path = "fink_utils/test_data/benoit_julien_2025/science"

Check monthly aggregation
>>> df_agg = aggregate_ztf_sso_data(year=2025, month=1, prefix_path=path)
>>> assert df_agg.count() == 1, df_agg.count()

>>> out = df_agg.collect()
>>> assert len(out[0]["cfid"]) == 3, len(out[0]["cfid"])

Check yearly aggregation
>>> df_agg = aggregate_ztf_sso_data(year=2025, prefix_path=path)
>>> assert df_agg.count() == 1, df_agg.count()

>>> out = df_agg.collect()
>>> assert len(out[0]["cfid"]) == 3, len(out[0]["cfid"])
"""
Expand All @@ -513,11 +535,20 @@ def aggregate_ztf_sso_data(
"candidate.jd",
]

df = (
spark.read.format("parquet")
.option("basePath", prefix_path)
.load("{}/year={}/month={}".format(prefix_path, year, month))
)
if month is not None:
path = "{}/year={}/month={}".format(prefix_path, year, month)
else:
path = "{}/year={}".format(prefix_path, year)

df = spark.read.format("parquet").option("basePath", prefix_path).load(path)

if month is None and stop_previous_month:
prevdate = retrieve_last_date_of_previous_month(datetime.datetime.today())
# take the last hour of the last day
prevdate = prevdate.replace(hour=23)
jd0 = Time(prevdate, format="datetime").jd
df = df.filter(df["candidate.jd"] <= jd0)

df_agg = (
df.select(cols0 + cols)
.filter(F.col("roid") == 3)
Expand Down
4 changes: 4 additions & 0 deletions fink_utils/sso/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@
import astropy.units as u

from scipy import signal
from line_profiler import profile

from fink_utils.tester import regular_unit_tests


@profile
def query_miriade(
ident,
jd,
Expand Down Expand Up @@ -129,6 +131,7 @@ def query_miriade(
return ephem


@profile
def query_miriade_ephemcc(
ident,
jd,
Expand Down Expand Up @@ -236,6 +239,7 @@ def query_miriade_ephemcc(
return ephem


@profile
def get_miriade_data(
pdf,
sso_colname="i:ssnamenr",
Expand Down