@@ -20,6 +20,10 @@ def format_ssp(df, logger, name):
2020 logger .info (f"Not SSP logs, returning df: { name } " )
2121 return df
2222
23+ if df .rdd .isEmpty ():
24+ logger .info (f"{ name } dataframe has no rows. Skipping format_ssp." )
25+ return df
26+
2327 logger .info (f"Processing SSP logs" )
2428 noODSCode = df .filter (col ("logReference" ) != "SSP0001" )
2529 ODSCode = df .filter (col ("logReference" ) == "SSP0001" )
@@ -52,6 +56,10 @@ def format_ssp(df, logger, name):
5256
5357
5458def resolve_dupes (df , logger , name ):
59+ if df .rdd .isEmpty ():
60+ logger .info (f"{ name } dataframe has no rows. Skipping resolve_dupes." )
61+ return df
62+
5563 column_groups = defaultdict (list )
5664 for column_name in df .columns :
5765 normalised_name = column_name .lower ().rstrip ("_" )
@@ -79,13 +87,20 @@ def resolve_dupes(df, logger, name):
7987
8088
8189def rename_cols (df , logger , name ):
90+ if df .rdd .isEmpty ():
91+ logger .info (f"{ name } dataframe has no rows. Skipping rename_cols." )
92+ return df
93+
8294 logger .info (f"Replacing '.' with '_' for df: { name } " )
8395 for col_name in df .columns :
8496 df = df .withColumnRenamed (col_name , col_name .replace ("." , "_" ))
8597 return df
8698
8799
88100def dtype_conversion (df , logger , name ):
101+ if df .rdd .isEmpty ():
102+ logger .info (f"{ name } dataframe has no rows. Skipping dtype_conversion." )
103+ return df
89104 try :
90105 logger .info (f"Formatting event_timestamp, time and date columns for df: { name } " )
91106 if "event_timestamp" in df .columns :
0 commit comments