1818from pyspark .sql import SparkSession
1919import pyspark .sql .functions as F
2020
21+ from astropy .time import Time
22+
2123from fink_utils .tester import spark_unit_tests
2224
2325COLUMNS = {
@@ -475,16 +477,27 @@ def join_aggregated_sso_data(df_prev, df_new, on="ssnamenr", output_filename=Non
475477
476478
477479def aggregate_ztf_sso_data (
478- year , month , prefix_path = "archive/science" , output_filename = None
480+ year ,
481+ month = None ,
482+ stop_previous_month = False ,
483+ prefix_path = "archive/science" ,
484+ output_filename = None ,
479485):
480486 """Aggregate ZTF SSO data in Fink
481487
482488 Parameters
483489 ----------
484490 year: str
485491 Year date in format YYYY.
486- month: str
487- Month date in format MM.
492+ month: str, optional
493+ Month date in format MM. Default is None, in
494+ which case `year` only will be considered.
495+ stop_previous_month: bool, optional
496+ If True, load data only until previous month.
497+ To use only with month=None, to reconstruct
498+ data from the current year.
499+ prefix_path: str, optional
500+ Prefix path on HDFS. Default is archive/science
488501 output_filename: str, optional
489502 If given, save data on HDFS. Cannot overwrite. Default is None.
490503
@@ -496,9 +509,18 @@ def aggregate_ztf_sso_data(
496509 Examples
497510 --------
498511 >>> path = "fink_utils/test_data/benoit_julien_2025/science"
512+
513+ Check monthly aggregation
499514 >>> df_agg = aggregate_ztf_sso_data(year=2025, month=1, prefix_path=path)
500515 >>> assert df_agg.count() == 1, df_agg.count()
501516
517+ >>> out = df_agg.collect()
518+ >>> assert len(out[0]["cfid"]) == 3, len(out[0]["cfid"])
519+
520+ Check yearly aggregation
521+ >>> df_agg = aggregate_ztf_sso_data(year=2025, prefix_path=path)
522+ >>> assert df_agg.count() == 1, df_agg.count()
523+
502524 >>> out = df_agg.collect()
503525 >>> assert len(out[0]["cfid"]) == 3, len(out[0]["cfid"])
504526 """
@@ -513,11 +535,20 @@ def aggregate_ztf_sso_data(
513535 "candidate.jd" ,
514536 ]
515537
516- df = (
517- spark .read .format ("parquet" )
518- .option ("basePath" , prefix_path )
519- .load ("{}/year={}/month={}" .format (prefix_path , year , month ))
520- )
538+ if month is not None :
539+ path = "{}/year={}/month={}" .format (prefix_path , year , month )
540+ else :
541+ path = "{}/year={}" .format (prefix_path , year )
542+
543+ df = spark .read .format ("parquet" ).option ("basePath" , prefix_path ).load (path )
544+
545+ if month is None and stop_previous_month :
546+ prevdate = retrieve_last_date_of_previous_month (datetime .datetime .today ())
547+ # take the last hour of the last day
548+ prevdate = prevdate .replace (hour = 23 )
549+ jd0 = Time (prevdate , format = "datetime" ).jd
550+ df = df .filter (df ["candidate.jd" ] <= jd0 )
551+
521552 df_agg = (
522553 df .select (cols0 + cols )
523554 .filter (F .col ("roid" ) == 3 )
0 commit comments