File tree Expand file tree Collapse file tree 2 files changed +4
-4
lines changed
Expand file tree Collapse file tree 2 files changed +4
-4
lines changed Original file line number Diff line number Diff line change @@ -62,7 +62,7 @@ def deduplicate_by_id_and_last_updated(df):
6262 return deduplicated_df
6363
6464
65- def prepare_increments (increment_df ):
65+ def prepare_increments (increment_df , logger ):
6666 # In case there are several days worth of increments: only keep the latest version of a record
6767 id_partition = Window .partitionBy ("id" )
6868 # preparation step: create a temporary column to replace NULL last_updated values with 01/01/2020
@@ -233,7 +233,7 @@ def purge_today_partition(
233233 )
234234 continue
235235 # create first snapshot
236- increment_df = prepare_increments (increment_df )
236+ increment_df = prepare_increments (increment_df , logger )
237237 snapshot_df = increment_df
238238
239239 # snapshot table in glue catalogue
@@ -274,7 +274,7 @@ def purge_today_partition(
274274 )
275275 else :
276276 # prepare COU
277- increment_df = prepare_increments (increment_df )
277+ increment_df = prepare_increments (increment_df , logger )
278278 increment_df = add_snapshot_date_columns (increment_df )
279279 # apply COU
280280 logger .info (f"Applying increment { increment_table_name } " )
Original file line number Diff line number Diff line change @@ -264,7 +264,7 @@ module "tascomi_create_daily_snapshot" {
264264 job_name = " ${ local . short_identifier_prefix } tascomi_create_daily_snapshot_planning"
265265 glue_version = " 2.0"
266266 glue_job_worker_type = " G.2X"
267- number_of_workers_for_glue_job = 12
267+ number_of_workers_for_glue_job = 30
268268 helper_module_key = data. aws_s3_object . helpers . key
269269 pydeequ_zip_key = data. aws_s3_object . pydeequ . key
270270 spark_ui_output_storage_id = module. spark_ui_output_storage_data_source . bucket_id
You can’t perform that action at this time.
0 commit comments