Skip to content

Commit 4bb6f0e

Browse files
committed
using utils folder for testing and iceberg format
1 parent 43fe26c commit 4bb6f0e

File tree

3 files changed

+22
-33
lines changed

3 files changed

+22
-33
lines changed

src/bronze/get_full_tables.py

Lines changed: 14 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import os
66
import logging
77
from typing import List, Union, Optional,Tuple
8+
from utils.utils import Utils
89

910
logging.basicConfig(
1011
level=logging.INFO,
@@ -13,11 +14,9 @@
1314
logging.StreamHandler()
1415
]
1516
)
16-
def get_required_env(env_name:str) -> str:
17-
env_value = os.getenv(env_name)
18-
if env_value is None:
19-
raise ValueError(f"Environment variable {env_name} is not set")
20-
return env_value
17+
18+
utils_obj = Utils()
19+
2120

2221
def get_args() -> argparse.Namespace:
2322
parser = argparse.ArgumentParser()
@@ -26,20 +25,6 @@ def get_args() -> argparse.Namespace:
2625
args = parser.parse_args()
2726
return args
2827

29-
def get_spark_session(S3_ACCESS_KEY: str,S3_SECRET_KEY: str , S3_ENDPOINT: str) -> SparkSession:
30-
31-
spark = SparkSession.builder \
32-
.appName("full_table_ingestion") \
33-
.config("spark.jars", "/opt/spark/jars/hadoop-aws-3.3.4.jar,/opt/spark/jars/aws-java-sdk-bundle-1.12.375.jar,/opt/spark/jars/delta-spark_2.12-3.2.1.jar,/opt/spark/jars/delta-storage-3.2.1.jar,/opt/spark/jars/delta-kernel-api-3.2.1.jar,/opt/spark/jars/mysql-connector-j-8.3.0.jar") \
34-
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")\
35-
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")\
36-
.config("spark.hadoop.fs.s3a.access.key", S3_ACCESS_KEY) \
37-
.config("spark.hadoop.fs.s3a.secret.key", S3_SECRET_KEY) \
38-
.config("spark.hadoop.fs.s3a.endpoint", S3_ENDPOINT) \
39-
.config("spark.hadoop.fs.s3a.path.style.access", "true") \
40-
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
41-
.getOrCreate()
42-
return spark
4328

4429
def add_ingestion_metadata_column(df: DataFrame,table: str) -> DataFrame:
4530
tmp_df = df.withColumn("ingestion_date", F.current_timestamp()).withColumn("source_name", F.lit(table))
@@ -52,16 +37,16 @@ def add_date_partition_columns(df: DataFrame,column_name:str) -> DataFrame:
5237
return df
5338

5439
def main() -> None:
55-
MYSQL_DATABASE = get_required_env("MYSQL_DATABASE")
56-
MYSQL_HOST = get_required_env("MYSQL_HOST")
57-
MYSQL_PORT = get_required_env("MYSQL_PORT")
58-
MYSQL_USER = get_required_env("MYSQL_USER")
59-
MYSQL_SECRET = get_required_env("MYSQL_SECRET")
40+
MYSQL_DATABASE = utils_obj.get_required_env("MYSQL_DATABASE")
41+
MYSQL_HOST = utils_obj.get_required_env("MYSQL_HOST")
42+
MYSQL_PORT = utils_obj.get_required_env("MYSQL_PORT")
43+
MYSQL_USER = utils_obj.get_required_env("MYSQL_USER")
44+
MYSQL_SECRET = utils_obj.get_required_env("MYSQL_SECRET")
6045
jdbc_url = f"jdbc:mysql://{MYSQL_HOST}:{MYSQL_PORT}/{MYSQL_DATABASE}"
6146

62-
S3_ACCESS_KEY = get_required_env("S3_ACCESS_KEY")
63-
S3_SECRET_KEY = get_required_env("S3_SECRET_KEY")
64-
S3_ENDPOINT = get_required_env("S3_ENDPOINT")
47+
S3_ACCESS_KEY = utils_obj.get_required_env("S3_ACCESS_KEY")
48+
S3_SECRET_KEY = utils_obj.get_required_env("S3_SECRET_KEY")
49+
S3_ENDPOINT = utils_obj.get_required_env("S3_ENDPOINT")
6550
args = get_args()
6651
S3_SAVEPATH = args.savepath
6752
undesired_column = args.undesired_column
@@ -79,7 +64,7 @@ def main() -> None:
7964
"auth_user"
8065
]
8166

82-
spark = get_spark_session(S3_ACCESS_KEY=S3_ACCESS_KEY,S3_SECRET_KEY=S3_SECRET_KEY,S3_ENDPOINT=S3_ENDPOINT)
67+
spark = utils_obj.get_spark_session(S3_ACCESS_KEY=S3_ACCESS_KEY,S3_SECRET_KEY=S3_SECRET_KEY,S3_ENDPOINT=S3_ENDPOINT,app_name="Full table ingestion")
8368
for table in TABLES:
8469

8570
logging.info(f"getting table {table}")
@@ -102,7 +87,7 @@ def main() -> None:
10287

10388
output_path = f"{S3_SAVEPATH}/{table}"
10489

105-
df.write.format("delta").mode("append").partitionBy("year", "month","day").save(output_path)
90+
df.write.format("iceberg").mode("append").partitionBy("year", "month","day").save(output_path)
10691

10792
logging.info(f"Data saved as Delta table to {output_path}")
10893

src/utils/__init__.py

Whitespace-only changes.

src/utils/utils.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import os
22
from pyspark.sql import SparkSession #type:ignore
33

4-
class utils:
4+
class Utils:
55
def __init__(self) -> None:
66
pass
77

@@ -15,9 +15,13 @@ def get_spark_session(self,S3_ACCESS_KEY: str,S3_SECRET_KEY: str , S3_ENDPOINT:
1515

1616
spark = SparkSession.builder \
1717
.appName(app_name) \
18-
.config("spark.jars", "/opt/spark/jars/hadoop-aws-3.3.4.jar,/opt/spark/jars/aws-java-sdk-bundle-1.12.375.jar,/opt/spark/jars/delta-spark_2.12-3.2.1.jar,/opt/spark/jars/delta-storage-3.2.1.jar,/opt/spark/jars/delta-kernel-api-3.2.1.jar,/opt/spark/jars/mysql-connector-j-8.3.0.jar") \
19-
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")\
20-
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")\
18+
.config("spark.jars",
19+
"/opt/spark/jars/hadoop-aws-3.3.4.jar,"
20+
"/opt/spark/jars/aws-java-sdk-bundle-1.12.375.jar,"
21+
"/opt/spark/jars/mysql-connector-j-8.3.0.jar,"
22+
"/opt/spark/jars/iceberg-spark-runtime-3.5_2.12-1.10.0.jar") \
23+
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")\
24+
.config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog")\
2125
.config("spark.hadoop.fs.s3a.access.key", S3_ACCESS_KEY) \
2226
.config("spark.hadoop.fs.s3a.secret.key", S3_SECRET_KEY) \
2327
.config("spark.hadoop.fs.s3a.endpoint", S3_ENDPOINT) \

0 commit comments

Comments
 (0)