11from datetime import datetime
22from pyspark .sql import SparkSession # type: ignore
3+ from pyspark .sql import DataFrame #type:ignore
34import pyspark .sql .functions as F # type: ignore
45import pyspark .sql .types as T # type: ignore
56import argparse
@@ -32,6 +33,18 @@ def get_args() -> argparse.Namespace:
3233 args = parser .parse_args ()
3334 return args
3435
36+
37+ def get_metadata (metadatapath : str , spark : SparkSession , table :str ) -> str | bool :
38+ metadatapath = f"{ metadatapath } /{ table } /last_updated_date"
39+ customSchema = T .StructType ([
40+ T .StructField ("table_name" , T .StringType (), True ),
41+ T .StructField ("last_date" , T .StringType (), True )
42+ ])
43+ row = spark .read .csv (metadatapath ,schema = customSchema ).filter ( F .col ("table_name" ) == table ).first ()
44+ if row is None or row ["last_date" ] is None :
45+ return False
46+ return str (row ["last_date" ])
47+
3548def update_metadata (metadatapath : str ,spark : SparkSession ,table :str ,last_date :str ) -> bool :
3649 try :
3750 tmp_df = spark .createDataFrame (
@@ -68,8 +81,14 @@ def get_spark_session(S3_ACCESS_KEY: str,S3_SECRET_KEY: str , S3_ENDPOINT: str)
6881
6982
7083
84+ def add_ingestion_metadata_column (df : DataFrame ,table : str ) -> DataFrame :
85+ tmp_df = df .withColumn ("ingestion_date" , F .current_timestamp ()).withColumn ("source_name" , F .lit (table ))
86+ return tmp_df
7187
72-
88+ def add_date_partition_columns (df : DataFrame ,column_name :str ) -> DataFrame :
89+ df = df .withColumn ("year" , F .year (F .col (column_name )))\
90+ .withColumn ("month" , F .month (F .col (column_name )))
91+ return df
7392
7493def full_initial_ingestion (spark : SparkSession , table : str , savepath : str , jdbc_url :str , MYSQL_USER :str , MYSQL_SECRET :str ) -> Tuple [bool , str ]:
7594 current_year = datetime .now ().year
@@ -82,6 +101,8 @@ def full_initial_ingestion(spark: SparkSession, table: str, savepath: str, jdbc_
82101
83102 for year in years :
84103 for month in months :
104+ if (year > current_year ) or (year == current_year and month > current_month ):
105+ break
85106
86107 if (year == 2019 and month == 1 ):
87108 query = f"(SELECT * FROM { table } WHERE YEAR(created) = { year } AND MONTH(created) = { month } ) AS limited_table"
@@ -93,22 +114,15 @@ def full_initial_ingestion(spark: SparkSession, table: str, savepath: str, jdbc_
93114 .option ("driver" , "com.mysql.cj.jdbc.Driver" ) \
94115 .option ("dbtable" , query ) \
95116 .load ()
96-
97- df = df .withColumn ("created" , F .date_format ("created" , "yyyy-MM-dd'T'HH:mm:ss.SSSSSS" )) \
98- .withColumn ("year" , F .year (F .col ("created" ))) \
99- .withColumn ("month" , F .month (F .col ("created" )))
100-
101- df = df .withColumn ("ingestion_date" , F .current_timestamp ()) \
102- .withColumn ("source_name" , F .lit (table ))
117+ df = df .withColumn ("created" , F .date_format ("created" , "yyyy-MM-dd'T'HH:mm:ss.SSSSSS" ))
118+ df = add_date_partition_columns (df = df , column_name = "created" )
119+ df = add_ingestion_metadata_column (df = df ,table = table )
103120
104121 df .write .format ("delta" ) \
105122 .mode ("overwrite" ) \
106123 .partitionBy ("year" , "month" ) \
107124 .save (path )
108125 continue
109-
110- if (year > current_year ) or (year == current_year and month > current_month ):
111- break
112126 else :
113127
114128 query = f"(SELECT * FROM { table } WHERE YEAR(created) = { year } AND MONTH(created) = { month } ) AS limited_table"
@@ -122,42 +136,35 @@ def full_initial_ingestion(spark: SparkSession, table: str, savepath: str, jdbc_
122136 .load ()
123137 if (year == current_year and month == current_month ):
124138 last_update = datetime .now ().isoformat ()
125- incremental_df = new_df .withColumn ("created" , F .date_format ("created" , "yyyy-MM-dd'T'HH:mm:ss.SSSSSS" )) \
126- .withColumn ("year" , F .year (F .col ("created" ))) \
127- .withColumn ("month" , F .month (F .col ("created" )))
139+ incremental_df = new_df .withColumn ("created" , F .date_format ("created" , "yyyy-MM-dd'T'HH:mm:ss.SSSSSS" ))
140+ incremental_df = add_date_partition_columns (df = incremental_df , column_name = "created" )
128141
129- incremental_df = incremental_df . withColumn ( "ingestion_date" , F . current_timestamp ()). withColumn ( "source_name" , F . lit ( table ) )
142+ incremental_df = add_ingestion_metadata_column ( df = incremental_df , table = table )
130143
131144 # Append new partitions directly
132145 incremental_df .write .format ("delta" ).mode ("append" ).partitionBy ("year" , "month" ).save (path )
133146 return (True , last_update )
134147
135- def get_metadata (metadatapath : str , spark : SparkSession , table :str ) -> str | bool :
136- metadatapath = f"{ metadatapath } /{ table } /last_updated_date"
137- customSchema = T .StructType ([
138- T .StructField ("table_name" , T .StringType (), True ),
139- T .StructField ("last_date" , T .StringType (), True )
140- ])
141- row = spark .read .csv (metadatapath ,schema = customSchema ).filter ( F .col ("table_name" ) == table ).first ()
142- if row is None or row ["last_date" ] is None :
143- return False
144- return str (row ["last_date" ])
145148
146149def delta_load (spark : SparkSession , jdbc_url :str , MYSQL_USER :str , MYSQL_SECRET :str ,last_updated :str ,table :str ,savepath : str ) -> Tuple [bool , str ]:
150+
147151 path = f"{ savepath } /{ table } "
148152 query = f"(SELECT * FROM { table } WHERE created >= '{ last_updated } ') AS limited_table"
153+
149154 logging .info (query )
155+
150156 new_df = spark .read .format ("jdbc" ) \
151157 .option ("url" , jdbc_url ) \
152158 .option ("user" , MYSQL_USER ) \
153159 .option ("password" , MYSQL_SECRET ) \
154160 .option ("driver" , "com.mysql.cj.jdbc.Driver" ) \
155161 .option ("dbtable" , query ) \
156162 .load ()
163+
157164 last_update = datetime .now ().isoformat ()
158- incremental_df = new_df .withColumn ("created" , F .date_format ("created" , "yyyy-MM-dd'T'HH:mm:ss.SSSSSS" )) \
159- . withColumn ( "year" , F . year ( F . col ( "created" ))) \
160- . withColumn ( "month" , F . month ( F . col ( "created" )) )
165+ incremental_df = new_df .withColumn ("created" , F .date_format ("created" , "yyyy-MM-dd'T'HH:mm:ss.SSSSSS" ))
166+ incremental_df = add_date_partition_columns ( df = incremental_df , column_name = "created" )
167+ incremental_df = add_ingestion_metadata_column ( df = incremental_df , table = table )
161168
162169 incremental_df = incremental_df .withColumn ("ingestion_date" , F .current_timestamp ()).withColumn ("source_name" , F .lit (table ))
163170
@@ -185,20 +192,27 @@ def main() -> None:
185192 metadata = args .metadatapath
186193 is_full_ingestion_flag = args .first_ingestion_flag
187194 table = args .table
195+
196+
188197 spark = get_spark_session (S3_ACCESS_KEY ,S3_SECRET_KEY ,S3_ENDPOINT )
198+
189199 if is_full_ingestion_flag == 1 :
190200 result = full_initial_ingestion (spark ,table ,savepath ,jdbc_url ,MYSQL_USER ,MYSQL_SECRET )
191201 logging .info (result )
192202 if result [0 ]:
193203 update_metadata (metadatapath = metadata ,spark = spark ,table = table ,last_date = result [1 ])
204+
194205 if is_full_ingestion_flag == 0 :
195206 last_date = get_metadata (metadatapath = metadata ,spark = spark ,table = table )
196207 if last_date == False :
197208 raise Exception ("No date Found" )
209+
198210 last_date = str (last_date )
199211 result = delta_load (spark = spark ,jdbc_url = jdbc_url ,MYSQL_USER = MYSQL_USER ,MYSQL_SECRET = MYSQL_SECRET ,last_updated = last_date ,table = table ,savepath = savepath )
212+
200213 if result [0 ]:
201214 update_metadata (metadatapath = metadata ,spark = spark ,table = table ,last_date = result [1 ])
215+
202216 spark .stop ()
203217
204218if __name__ == "__main__" :
0 commit comments