Skip to content

Commit 6908e62

Browse files
committed
feature: changing from delta to iceberg and using the common libary
1 parent 4e853c4 commit 6908e62

File tree

1 file changed

+33
-51
lines changed

1 file changed

+33
-51
lines changed

src/bronze/python/incremental_load.py

Lines changed: 33 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
from datetime import datetime,date, timedelta
2-
from pyspark.sql import SparkSession # type: ignore
2+
from nau_analytics_data_product_utils_lib import Config,get_required_env,get_iceberg_spark_session #type: ignore
3+
from pyspark.sql import SparkSession #type: ignore
34
from pyspark.sql import DataFrame #type:ignore
45
import pyspark.sql.functions as F # type: ignore
56
import pyspark.sql.types as T # type: ignore
67
import argparse
78
import logging
89
import os
910
from typing import List, Union, Optional,Tuple
11+
import base64
1012

1113
logging.basicConfig(
1214
level=logging.INFO,
@@ -17,17 +19,11 @@
1719
)
1820

1921

20-
def get_required_env(env_name:str) -> str:
21-
env_value = os.getenv(env_name)
22-
if env_value is None:
23-
raise ValueError(f"Environment variable {env_name} is not set")
24-
return env_value
2522

2623

2724

2825
def get_args() -> argparse.Namespace:
2926
parser = argparse.ArgumentParser()
30-
parser.add_argument("--savepath", type = str,required= True, help = "The S3 bucket intended for the data to be stored")
3127
parser.add_argument("--metadatapath", type = str, required = True, help ="The S3 bucket that contains stores the metada for the process")
3228
parser.add_argument("--table", type = str, required = True, help ="The S3 bucket that contains stores the metada for the process")
3329
parser.add_argument("--first_ingestion_flag",type = int,default=0,help="flag to indicate if it is the first ingestion on regular ingestion")
@@ -61,35 +57,10 @@ def update_metadata(metadatapath: str ,spark: SparkSession,table:str,last_date:s
6157
return False
6258

6359

64-
65-
def get_spark_session(S3_ACCESS_KEY: str,S3_SECRET_KEY: str , S3_ENDPOINT: str) -> SparkSession:
66-
67-
spark = SparkSession.builder \
68-
.appName("incremental_table_ingestion") \
69-
.config("spark.jars", "/opt/spark/jars/hadoop-aws-3.3.4.jar,/opt/spark/jars/aws-java-sdk-bundle-1.12.375.jar,/opt/spark/jars/delta-spark_2.12-3.2.1.jar,/opt/spark/jars/delta-storage-3.2.1.jar,/opt/spark/jars/delta-kernel-api-3.2.1.jar,/opt/spark/jars/mysql-connector-j-8.3.0.jar") \
70-
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")\
71-
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")\
72-
.config("spark.hadoop.fs.s3a.access.key", S3_ACCESS_KEY) \
73-
.config("spark.hadoop.fs.s3a.secret.key", S3_SECRET_KEY) \
74-
.config("spark.hadoop.fs.s3a.endpoint", S3_ENDPOINT) \
75-
.config("spark.hadoop.fs.s3a.path.style.access", "true") \
76-
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
77-
.getOrCreate()
78-
return spark
79-
80-
81-
82-
83-
84-
8560
def add_ingestion_metadata_column(df: DataFrame,table: str) -> DataFrame:
8661
tmp_df = df.withColumn("ingestion_date", F.current_timestamp()).withColumn("source_name", F.lit(table))
8762
return tmp_df
8863

89-
def add_date_partition_columns(df: DataFrame,column_name:str) -> DataFrame:
90-
df = df.withColumn("year", F.year(F.col(column_name)))\
91-
.withColumn("month", F.month(F.col(column_name)))
92-
return df
9364

9465
def get_all_dates_until_today(start_date: date):
9566
months = []
@@ -105,11 +76,11 @@ def get_all_dates_until_today(start_date: date):
10576

10677
return months
10778

108-
def full_initial_ingestion(spark: SparkSession, table: str, savepath: str, jdbc_url:str, MYSQL_USER:str, MYSQL_SECRET:str) -> Tuple[bool, str]:
79+
def full_initial_ingestion(spark: SparkSession, table: str, jdbc_url:str, MYSQL_USER:str, MYSQL_SECRET:str) -> Tuple[bool, str]:
10980
processing_dates = get_all_dates_until_today(date(2020,1,1))
11081
current_year = datetime.now().year
11182
current_month = datetime.now().month
112-
path = f"{savepath}/{table}"
83+
11384

11485
for d in processing_dates:
11586
query = f"(SELECT * FROM {table} WHERE YEAR(created) = {d.year} AND MONTH(created) = {d.month}) AS limited_table"
@@ -122,21 +93,19 @@ def full_initial_ingestion(spark: SparkSession, table: str, savepath: str, jdbc_
12293
.option("dbtable", query) \
12394
.load()
12495
df = df.withColumn("created", F.date_format("created", "yyyy-MM-dd'T'HH:mm:ss.SSSSSS"))
125-
df = add_date_partition_columns(df=df, column_name="created")
12696
df = add_ingestion_metadata_column(df=df,table=table)
12797

12898

12999
if d == processing_dates[-1]:
130100
last_update = datetime.now().isoformat()
131101
logging.info(f"Last ingestion from full tables: {d}")
132-
133-
df.write.format("delta").mode("append").partitionBy("year", "month").save(path)
102+
saveTale = f"bronze_local.entidades.{table}"
103+
df.write.format("iceberg").mode("append").saveAsTable(saveTale)
134104
return (True, last_update)
135105

136106

137-
def delta_load(spark: SparkSession, jdbc_url:str, MYSQL_USER:str, MYSQL_SECRET:str,last_updated:str,table:str,savepath: str) -> Tuple[bool, str]:
107+
def delta_load(spark: SparkSession, jdbc_url:str, MYSQL_USER:str, MYSQL_SECRET:str,last_updated:str,table:str) -> Tuple[bool, str]:
138108

139-
path = f"{savepath}/{table}"
140109
query = f"(SELECT * FROM {table} WHERE created >= '{last_updated}') AS limited_table"
141110

142111
logging.info(query)
@@ -151,13 +120,9 @@ def delta_load(spark: SparkSession, jdbc_url:str, MYSQL_USER:str, MYSQL_SECRET:s
151120

152121
last_update = datetime.now().isoformat()
153122
incremental_df = new_df.withColumn("created", F.date_format("created", "yyyy-MM-dd'T'HH:mm:ss.SSSSSS"))
154-
incremental_df = add_date_partition_columns(df=incremental_df,column_name="created")
155123
incremental_df = add_ingestion_metadata_column(df=incremental_df,table=table)
156-
157-
#incremental_df = incremental_df.withColumn("ingestion_date", F.current_timestamp()).withColumn("source_name", F.lit(table))
158-
159-
# Append new partitions directly
160-
incremental_df.write.format("delta").mode("append").partitionBy("year", "month").save(path)
124+
saveTale = f"bronze_local.entidades.{table}"
125+
incremental_df.write.format("iceberg").mode("append").saveAsTable(saveTale)
161126

162127
return (True,last_update)
163128

@@ -175,17 +140,34 @@ def main() -> None:
175140
S3_SECRET_KEY = get_required_env("S3_SECRET_KEY")
176141
S3_ENDPOINT = get_required_env("S3_ENDPOINT")
177142

143+
ICEBERG_CATALOG_HOST = get_required_env("ICEBERG_CATALOG_HOST")
144+
ICEBERG_CATALOG_PORT = get_required_env("ICEBERG_CATALOG_PORT")
145+
ICEBERG_CATALOG_NAME = get_required_env("ICEBERG_CATALOG_NAME")
146+
ICEBERG_CATALOG_USER = get_required_env("ICEBERG_CATALOG_USER")
147+
ICEBERG_CATALOG_PASSWORD = get_required_env("ICEBERG_CATALOG_PASSWORD")
148+
ICEBERG_CATALOG_WAREHOUSE = get_required_env("ICEBERG_CATALOG_WAREHOUSE")
149+
ICEBERG_CATALOG_URI = f"jdbc:mysql://{ICEBERG_CATALOG_HOST}:{ICEBERG_CATALOG_PORT}/{ICEBERG_CATALOG_NAME}"
150+
ICEBERG_CATALOG_PASSWORD = base64.b64decode(ICEBERG_CATALOG_PASSWORD).decode()
178151
args = get_args()
179-
savepath = args.savepath
152+
table = args.table
180153
metadata = args.metadatapath
181154
is_full_ingestion_flag = args.first_ingestion_flag
182-
table = args.table
183-
184155

185-
spark = get_spark_session(S3_ACCESS_KEY,S3_SECRET_KEY,S3_ENDPOINT)
156+
icerberg_cfg = Config(
157+
app_name="Full table ingestion",
158+
s3_access_key=S3_ACCESS_KEY,
159+
s3_endpoint=S3_ENDPOINT,
160+
s3_secret_key=S3_SECRET_KEY,
161+
iceberg_catalog_uri=ICEBERG_CATALOG_URI,
162+
iceberg_catalog_user=ICEBERG_CATALOG_USER,
163+
iceberg_catalog_password=ICEBERG_CATALOG_PASSWORD,
164+
iceberg_catalog_warehouse=ICEBERG_CATALOG_WAREHOUSE
165+
166+
)
167+
spark = get_iceberg_spark_session(cfg=icerberg_cfg)
186168

187169
if is_full_ingestion_flag == 1:
188-
result = full_initial_ingestion(spark,table,savepath,jdbc_url,MYSQL_USER,MYSQL_SECRET)
170+
result = full_initial_ingestion(spark,table,jdbc_url,MYSQL_USER,MYSQL_SECRET)
189171
logging.info(result)
190172
if result[0]:
191173
update_metadata(metadatapath=metadata,spark=spark,table=table,last_date=result[1])
@@ -196,7 +178,7 @@ def main() -> None:
196178
raise Exception("No date Found")
197179

198180
last_date = str(last_date)
199-
result = delta_load(spark=spark,jdbc_url=jdbc_url,MYSQL_USER=MYSQL_USER,MYSQL_SECRET=MYSQL_SECRET,last_updated=last_date,table=table,savepath=savepath)
181+
result = delta_load(spark=spark,jdbc_url=jdbc_url,MYSQL_USER=MYSQL_USER,MYSQL_SECRET=MYSQL_SECRET,last_updated=last_date,table=table)
200182

201183
if result[0]:
202184
update_metadata(metadatapath=metadata,spark=spark,table=table,last_date=result[1])

0 commit comments

Comments
 (0)