Skip to content

Commit 3f2b712

Browse files
committed
fix taxcomi create daily snapshot by changing push_down_predicate
1 parent 8a7999f commit 3f2b712

File tree

1 file changed

+14
-5
lines changed

1 file changed

+14
-5
lines changed

scripts/jobs/planning/tascomi_create_daily_snapshot.py

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import sys
2-
from datetime import datetime
2+
from datetime import datetime, timedelta
3+
from typing import Optional
34

45
import pyspark.sql.functions as F
56
from awsglue.context import GlueContext
@@ -77,9 +78,18 @@ def apply_increments(snapshot_df, increment_df):
7778
return snapshot_df
7879

7980

80-
def loadIncrementsSinceDate(increment_table_name, name_space, date):
81+
def loadIncrementsSinceDate(
82+
increment_table_name: str, name_space: str, date: Optional[str] = None
83+
) -> DataFrame:
84+
"""
85+
Loads increments from the specified catalog table starting from a given date.
86+
If the provided date is None, it defaults to 30 days ago.
87+
88+
Returns:
89+
DataFrame: A Spark DataFrame containing the loaded increments.
90+
"""
8191
if date is None:
82-
date = "20210101" # default date
92+
date = (datetime.now() - timedelta(days=30)).strftime("%Y%m%d") # default date
8393
increment_ddf = glueContext.create_dynamic_frame.from_catalog(
8494
name_space=name_space,
8595
table_name=increment_table_name,
@@ -170,7 +180,7 @@ def loadIncrementsSinceDate(increment_table_name, name_space, date):
170180
glueContext, snapshot_table_name, source_catalog_database
171181
):
172182
logger.info(
173-
f"Couldn't find table {snapshot_table_name} in database {source_catalog_database}, creating a snapshot from all the increments, starting from 20210101"
183+
f"Couldn't find table {snapshot_table_name} in database {source_catalog_database}, creating a snapshot from all the increments, starting from 30 days ago"
174184
)
175185
# Increment table does not exist in glue catalogue
176186
if not table_exists_in_catalog(
@@ -183,7 +193,6 @@ def loadIncrementsSinceDate(increment_table_name, name_space, date):
183193
increment_df = loadIncrementsSinceDate(
184194
increment_table_name=increment_table_name,
185195
name_space=source_catalog_database,
186-
date="20210101",
187196
)
188197
if increment_df.rdd.isEmpty():
189198
logger.info(

0 commit comments

Comments
 (0)