diff --git a/fink_utils/sso/ssoft.py b/fink_utils/sso/ssoft.py index 0964218..532fbc8 100644 --- a/fink_utils/sso/ssoft.py +++ b/fink_utils/sso/ssoft.py @@ -18,6 +18,8 @@ from pyspark.sql import SparkSession import pyspark.sql.functions as F +from astropy.time import Time + from fink_utils.tester import spark_unit_tests COLUMNS = { @@ -475,7 +477,11 @@ def join_aggregated_sso_data(df_prev, df_new, on="ssnamenr", output_filename=Non def aggregate_ztf_sso_data( - year, month, prefix_path="archive/science", output_filename=None + year, + month=None, + stop_previous_month=False, + prefix_path="archive/science", + output_filename=None, ): """Aggregate ZTF SSO data in Fink @@ -483,8 +489,15 @@ def aggregate_ztf_sso_data( ---------- year: str Year date in format YYYY. - month: str - Month date in format MM. + month: str, optional + Month date in format MM. Default is None, in + which case `year` only will be considered. + stop_previous_month: bool, optional + If True, load data only until previous month. + To use only with month=None, to reconstruct + data from the current year. + prefix_path: str, optional + Prefix path on HDFS. Default is archive/science output_filename: str, optional If given, save data on HDFS. Cannot overwrite. Default is None. @@ -496,9 +509,18 @@ def aggregate_ztf_sso_data( Examples -------- >>> path = "fink_utils/test_data/benoit_julien_2025/science" + + Check monthly aggregation >>> df_agg = aggregate_ztf_sso_data(year=2025, month=1, prefix_path=path) >>> assert df_agg.count() == 1, df_agg.count() + >>> out = df_agg.collect() + >>> assert len(out[0]["cfid"]) == 3, len(out[0]["cfid"]) + + Check yearly aggregation + >>> df_agg = aggregate_ztf_sso_data(year=2025, prefix_path=path) + >>> assert df_agg.count() == 1, df_agg.count() + >>> out = df_agg.collect() >>> assert len(out[0]["cfid"]) == 3, len(out[0]["cfid"]) """ @@ -513,11 +535,20 @@ def aggregate_ztf_sso_data( "candidate.jd", ] - df = ( - spark.read.format("parquet") - .option("basePath", prefix_path) - .load("{}/year={}/month={}".format(prefix_path, year, month)) - ) + if month is not None: + path = "{}/year={}/month={}".format(prefix_path, year, month) + else: + path = "{}/year={}".format(prefix_path, year) + + df = spark.read.format("parquet").option("basePath", prefix_path).load(path) + + if month is None and stop_previous_month: + prevdate = retrieve_last_date_of_previous_month(datetime.datetime.today()) + # take the last hour of the last day + prevdate = prevdate.replace(hour=23) + jd0 = Time(prevdate, format="datetime").jd + df = df.filter(df["candidate.jd"] <= jd0) + df_agg = ( df.select(cols0 + cols) .filter(F.col("roid") == 3) diff --git a/fink_utils/sso/utils.py b/fink_utils/sso/utils.py index ff230ef..64d31b9 100644 --- a/fink_utils/sso/utils.py +++ b/fink_utils/sso/utils.py @@ -26,10 +26,12 @@ import astropy.units as u from scipy import signal +from line_profiler import profile from fink_utils.tester import regular_unit_tests +@profile def query_miriade( ident, jd, @@ -129,6 +131,7 @@ def query_miriade( return ephem +@profile def query_miriade_ephemcc( ident, jd, @@ -236,6 +239,7 @@ def query_miriade_ephemcc( return ephem +@profile def get_miriade_data( pdf, sso_colname="i:ssnamenr",