@@ -88,28 +88,31 @@ def add_ingestion_metadata_column(df: DataFrame,table: str) -> DataFrame:
8888
8989def add_date_partition_columns (df : DataFrame ,column_name :str ) -> DataFrame :
9090 df = df .withColumn ("year" , F .year (F .col (column_name )))\
91- .withColumn ("month" , F .month (F .col (column_name )))\
92- .withColumn ("day" , F .day (F .col (column_name )))
91+ .withColumn ("month" , F .month (F .col (column_name )))
9392 return df
9493
9594def get_all_dates_until_today (start_date : date ):
96- dates = []
97- current = start_date
95+ months = []
96+ year , month = start_date . year , start_date . month
9897 today = date .today ()
99- while current <= today :
100- dates .append (current )
101- current += timedelta (days = 1 )
102- return dates
98+
99+ while (year < today .year ) or (year == today .year and month <= today .month ):
100+ months .append (date (year , month ,1 ))
101+ month += 1
102+ if month > 12 :
103+ month = 1
104+ year += 1
105+
106+ return months
103107
104108def full_initial_ingestion (spark : SparkSession , table : str , savepath : str , jdbc_url :str , MYSQL_USER :str , MYSQL_SECRET :str ) -> Tuple [bool , str ]:
105- processing_dates = get_all_dates_until_today (date (2024 ,1 ,1 ))
109+ processing_dates = get_all_dates_until_today (date (2020 ,1 ,1 ))
106110 current_year = datetime .now ().year
107111 current_month = datetime .now ().month
108- current_day = datetime .now ().day
109112 path = f"{ savepath } /{ table } "
110113
111114 for d in processing_dates :
112- query = f"(SELECT * FROM { table } WHERE YEAR(created) = { d .year } AND MONTH(created) = { d .month } AND DAY(created) = { d . day } ) AS limited_table"
115+ query = f"(SELECT * FROM { table } WHERE YEAR(created) = { d .year } AND MONTH(created) = { d .month } ) AS limited_table"
113116 logging .info (query )
114117 df = spark .read .format ("jdbc" ) \
115118 .option ("url" , jdbc_url ) \
@@ -123,10 +126,11 @@ def full_initial_ingestion(spark: SparkSession, table: str, savepath: str, jdbc_
123126 df = add_ingestion_metadata_column (df = df ,table = table )
124127
125128
126- if ( d . year == current_year and d . month == current_month and d . day == current_day ) :
129+ if d == processing_dates [ - 1 ] :
127130 last_update = datetime .now ().isoformat ()
131+ logging .info (f"Last ingestion from full tables: { d } " )
128132
129- df .write .format ("delta" ).mode ("append" ).partitionBy ("year" , "month" , "day" ).save (path )
133+ df .write .format ("delta" ).mode ("append" ).partitionBy ("year" , "month" ).save (path )
130134 return (True , last_update )
131135
132136
@@ -150,10 +154,10 @@ def delta_load(spark: SparkSession, jdbc_url:str, MYSQL_USER:str, MYSQL_SECRET:s
150154 incremental_df = add_date_partition_columns (df = incremental_df ,column_name = "created" )
151155 incremental_df = add_ingestion_metadata_column (df = incremental_df ,table = table )
152156
153- incremental_df = incremental_df .withColumn ("ingestion_date" , F .current_timestamp ()).withColumn ("source_name" , F .lit (table ))
157+ # incremental_df = incremental_df.withColumn("ingestion_date", F.current_timestamp()).withColumn("source_name", F.lit(table))
154158
155159 # Append new partitions directly
156- incremental_df .write .format ("delta" ).mode ("append" ).partitionBy ("year" , "month" , "day" ).save (path )
160+ incremental_df .write .format ("delta" ).mode ("append" ).partitionBy ("year" , "month" ).save (path )
157161
158162 return (True ,last_update )
159163
0 commit comments