@@ -208,14 +208,18 @@ def loadIncrementsSinceDate(
208208 # snapshot table in glue catalogue
209209 else :
210210 pushDownPredicate = create_pushdown_predicate (
211- partitionDateColumn = "snapshot_date" , daysBuffer = 3
211+ partitionDateColumn = "snapshot_date" , daysBuffer = 60
212212 )
213213 # load latest snpashot
214214 snapshot_ddf = glueContext .create_dynamic_frame .from_catalog (
215215 name_space = source_catalog_database ,
216216 table_name = snapshot_table_name ,
217- # push_down_predicate=pushDownPredicate
217+ push_down_predicate = pushDownPredicate ,
218218 )
219+ if snapshot_ddf .count () == 0 :
220+ logger .error (
221+ f"No data returned for table { snapshot_table_name } using push_down_predicate: { pushDownPredicate } . "
222+ )
219223 snapshot_df = snapshot_ddf .toDF ()
220224 snapshot_df = get_latest_snapshot (snapshot_df )
221225 last_snapshot_date = snapshot_df .select (max ("snapshot_date" )).first ()[0 ]
@@ -248,6 +252,8 @@ def loadIncrementsSinceDate(
248252 # apply COU
249253 logger .info (f"Applying increment { increment_table_name } " )
250254 snapshot_df = apply_increments (snapshot_df , increment_df )
255+ # snapshot_df = snapshot_df.coalesce(1) # Reduce the DataFrame to a single partition - the data is small
256+
251257 else :
252258 logger .info (
253259 f"Couldn't find table { increment_table_name } in database { source_catalog_database } , saving same snapshot as yesterday"
0 commit comments