|
| 1 | +import logging |
1 | 2 | import sys |
2 | 3 | from datetime import datetime |
3 | 4 |
|
|
25 | 26 | table_exists_in_catalog, |
26 | 27 | ) |
27 | 28 |
|
| 29 | +logging.basicConfig( |
| 30 | + level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" |
| 31 | +) |
| 32 | +logger = logging.getLogger(__name__) |
| 33 | + |
28 | 34 |
|
29 | 35 | def get_latest_snapshot(dfa): |
30 | 36 | dfa = dfa.where(col("snapshot_date") == dfa.select(max("snapshot_date")).first()[0]) |
@@ -62,7 +68,7 @@ def deduplicate_by_id_and_last_updated(df): |
62 | 68 | return deduplicated_df |
63 | 69 |
|
64 | 70 |
|
65 | | -def prepare_increments(increment_df, logger): |
| 71 | +def prepare_increments(increment_df): |
66 | 72 | # In case there are several days worth of increments: only keep the latest version of a record |
67 | 73 | id_partition = Window.partitionBy("id") |
68 | 74 | # preparation step: create a temporary column to replace NULL last_updated values with 01/01/2020 |
@@ -192,7 +198,6 @@ def purge_today_partition( |
192 | 198 | sc = SparkContext.getOrCreate() |
193 | 199 | glueContext = GlueContext(sc) |
194 | 200 | spark = SparkSession(sc) |
195 | | - logger = glueContext.get_logger() |
196 | 201 | job = Job(glueContext) |
197 | 202 | job.init(args["JOB_NAME"], args) |
198 | 203 |
|
@@ -233,7 +238,7 @@ def purge_today_partition( |
233 | 238 | ) |
234 | 239 | continue |
235 | 240 | # create first snapshot |
236 | | - increment_df = prepare_increments(increment_df, logger) |
| 241 | + increment_df = prepare_increments(increment_df) |
237 | 242 | snapshot_df = increment_df |
238 | 243 |
|
239 | 244 | # snapshot table in glue catalogue |
@@ -274,7 +279,7 @@ def purge_today_partition( |
274 | 279 | ) |
275 | 280 | else: |
276 | 281 | # prepare COU |
277 | | - increment_df = prepare_increments(increment_df, logger) |
| 282 | + increment_df = prepare_increments(increment_df) |
278 | 283 | increment_df = add_snapshot_date_columns(increment_df) |
279 | 284 | # apply COU |
280 | 285 | logger.info(f"Applying increment {increment_table_name}") |
|
0 commit comments