From fc3996da6a861058815638272243d6a427d2038e Mon Sep 17 00:00:00 2001 From: JulienPeloton Date: Fri, 25 Apr 2025 11:27:33 +0200 Subject: [PATCH 1/4] Add yearly reprocessing --- fink_utils/sso/ssoft.py | 25 +++++++++++++++++++++---- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/fink_utils/sso/ssoft.py b/fink_utils/sso/ssoft.py index 0964218..ee523d8 100644 --- a/fink_utils/sso/ssoft.py +++ b/fink_utils/sso/ssoft.py @@ -475,7 +475,7 @@ def join_aggregated_sso_data(df_prev, df_new, on="ssnamenr", output_filename=Non def aggregate_ztf_sso_data( - year, month, prefix_path="archive/science", output_filename=None + year, month=None, prefix_path="archive/science", output_filename=None ): """Aggregate ZTF SSO data in Fink @@ -483,8 +483,11 @@ def aggregate_ztf_sso_data( ---------- year: str Year date in format YYYY. - month: str - Month date in format MM. + month: str, optional + Month date in format MM. Default is None, in + which case `year` only will be considered. + prefix_path: str, optional + Prefix path on HDFS. Default is archive/science output_filename: str, optional If given, save data on HDFS. Cannot overwrite. Default is None. @@ -496,9 +499,18 @@ def aggregate_ztf_sso_data( Examples -------- >>> path = "fink_utils/test_data/benoit_julien_2025/science" + + Check monthly aggregation >>> df_agg = aggregate_ztf_sso_data(year=2025, month=1, prefix_path=path) >>> assert df_agg.count() == 1, df_agg.count() + >>> out = df_agg.collect() + >>> assert len(out[0]["cfid"]) == 3, len(out[0]["cfid"]) + + Check yearly aggregation + >>> df_agg = aggregate_ztf_sso_data(year=2025, prefix_path=path) + >>> assert df_agg.count() == 1, df_agg.count() + >>> out = df_agg.collect() >>> assert len(out[0]["cfid"]) == 3, len(out[0]["cfid"]) """ @@ -513,10 +525,15 @@ def aggregate_ztf_sso_data( "candidate.jd", ] + if month is not None: + path = "{}/year={}/month={}".format(prefix_path, year, month) + else: + path = "{}/year={}".format(prefix_path, year) + df = ( spark.read.format("parquet") .option("basePath", prefix_path) - .load("{}/year={}/month={}".format(prefix_path, year, month)) + .load(path) ) df_agg = ( df.select(cols0 + cols) From 3023868fff0c7df400366a2d95449ddd85f079af Mon Sep 17 00:00:00 2001 From: JulienPeloton Date: Fri, 25 Apr 2025 11:27:46 +0200 Subject: [PATCH 2/4] Add profiling --- fink_utils/sso/utils.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/fink_utils/sso/utils.py b/fink_utils/sso/utils.py index ff230ef..64d31b9 100644 --- a/fink_utils/sso/utils.py +++ b/fink_utils/sso/utils.py @@ -26,10 +26,12 @@ import astropy.units as u from scipy import signal +from line_profiler import profile from fink_utils.tester import regular_unit_tests +@profile def query_miriade( ident, jd, @@ -129,6 +131,7 @@ def query_miriade( return ephem +@profile def query_miriade_ephemcc( ident, jd, @@ -236,6 +239,7 @@ def query_miriade_ephemcc( return ephem +@profile def get_miriade_data( pdf, sso_colname="i:ssnamenr", From 6649d849b9160ee439bdd1de820d7a1ebfef4470 Mon Sep 17 00:00:00 2001 From: JulienPeloton Date: Fri, 25 Apr 2025 12:40:42 +0200 Subject: [PATCH 3/4] Allow stopping the aggregation the previous mnth --- fink_utils/sso/ssoft.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/fink_utils/sso/ssoft.py b/fink_utils/sso/ssoft.py index ee523d8..b516bd4 100644 --- a/fink_utils/sso/ssoft.py +++ b/fink_utils/sso/ssoft.py @@ -475,7 +475,7 @@ def join_aggregated_sso_data(df_prev, df_new, on="ssnamenr", output_filename=Non def aggregate_ztf_sso_data( - year, month=None, prefix_path="archive/science", output_filename=None + year, month=None, stop_previous_month=False, prefix_path="archive/science", output_filename=None ): """Aggregate ZTF SSO data in Fink @@ -486,6 +486,10 @@ def aggregate_ztf_sso_data( month: str, optional Month date in format MM. Default is None, in which case `year` only will be considered. + stop_previous_month: bool, optional + If True, load data only until previous month. + To use only with month=None, to reconstruct + data from the current year. prefix_path: str, optional Prefix path on HDFS. Default is archive/science output_filename: str, optional @@ -535,6 +539,14 @@ def aggregate_ztf_sso_data( .option("basePath", prefix_path) .load(path) ) + + if month is None and stop_previous_month: + prevdate = retrieve_last_date_of_previous_month(datetime.datetime.today()) + # take the last hour of the last day + prevdate = prevdate.replace(hour=23) + jd0 = Time(prevdate, format="datetime").jd + df = df.filter(df["candidate.jd"] <= jd0) + df_agg = ( df.select(cols0 + cols) .filter(F.col("roid") == 3) From eb80627f712d1c90717432802f5172d65fe7793e Mon Sep 17 00:00:00 2001 From: JulienPeloton Date: Fri, 25 Apr 2025 12:47:37 +0200 Subject: [PATCH 4/4] Fix missing import --- fink_utils/sso/ssoft.py | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/fink_utils/sso/ssoft.py b/fink_utils/sso/ssoft.py index b516bd4..532fbc8 100644 --- a/fink_utils/sso/ssoft.py +++ b/fink_utils/sso/ssoft.py @@ -18,6 +18,8 @@ from pyspark.sql import SparkSession import pyspark.sql.functions as F +from astropy.time import Time + from fink_utils.tester import spark_unit_tests COLUMNS = { @@ -475,7 +477,11 @@ def join_aggregated_sso_data(df_prev, df_new, on="ssnamenr", output_filename=Non def aggregate_ztf_sso_data( - year, month=None, stop_previous_month=False, prefix_path="archive/science", output_filename=None + year, + month=None, + stop_previous_month=False, + prefix_path="archive/science", + output_filename=None, ): """Aggregate ZTF SSO data in Fink @@ -484,11 +490,11 @@ def aggregate_ztf_sso_data( year: str Year date in format YYYY. month: str, optional - Month date in format MM. Default is None, in + Month date in format MM. Default is None, in which case `year` only will be considered. stop_previous_month: bool, optional If True, load data only until previous month. - To use only with month=None, to reconstruct + To use only with month=None, to reconstruct data from the current year. prefix_path: str, optional Prefix path on HDFS. Default is archive/science @@ -534,11 +540,7 @@ def aggregate_ztf_sso_data( else: path = "{}/year={}".format(prefix_path, year) - df = ( - spark.read.format("parquet") - .option("basePath", prefix_path) - .load(path) - ) + df = spark.read.format("parquet").option("basePath", prefix_path).load(path) if month is None and stop_previous_month: prevdate = retrieve_last_date_of_previous_month(datetime.datetime.today())