Skip to content

Commit 71da15e

Browse files
committed
working on improvements in ingestion script
1 parent 63d366b commit 71da15e

File tree

5 files changed

+81
-36
lines changed

5 files changed

+81
-36
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
.env
2-
.mypy_cache
2+
.mypy_cache
3+
utils/.

Docker/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
FROM nauedu/nau-analytics-base-spark:featurespark-shell-docker-image
1+
FROM nauedu/nau-analytics-base-spark:latest
22

33
# Copy your application code
44
COPY src/ /opt/spark/work-dir/src/

src/bronze/get_full_tables.py

Lines changed: 33 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
from pyspark.sql import SparkSession #type:ignore
2+
from pyspark.sql import Dataframe #type:ignore
23
import pyspark.sql.functions as F #type:ignore
34
import argparse
45
import os
56
import logging
7+
from typing import List, Union, Optional,Tuple
68

79
logging.basicConfig(
810
level=logging.INFO,
@@ -11,6 +13,12 @@
1113
logging.StreamHandler()
1214
]
1315
)
16+
def get_required_env(env_name:str) -> str:
17+
env_value = os.getenv(env_name)
18+
if env_value is None:
19+
raise ValueError(f"Environment variable {env_name} is not set")
20+
return env_value
21+
1422
def get_args() -> argparse.Namespace:
1523
parser = argparse.ArgumentParser()
1624
parser.add_argument("--savepath", type = str,required= True, help = "The S3 bucket intended for the data to be stored")
@@ -21,7 +29,7 @@ def get_args() -> argparse.Namespace:
2129
def get_spark_session(S3_ACCESS_KEY: str,S3_SECRET_KEY: str , S3_ENDPOINT: str) -> SparkSession:
2230

2331
spark = SparkSession.builder \
24-
.appName("incremental_table_ingestion") \
32+
.appName("full_table_ingestion") \
2533
.config("spark.jars", "/opt/spark/jars/hadoop-aws-3.3.4.jar,/opt/spark/jars/aws-java-sdk-bundle-1.12.375.jar,/opt/spark/jars/delta-spark_2.12-3.2.1.jar,/opt/spark/jars/delta-storage-3.2.1.jar,/opt/spark/jars/delta-kernel-api-3.2.1.jar,/opt/spark/jars/mysql-connector-j-8.3.0.jar") \
2634
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")\
2735
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")\
@@ -33,26 +41,26 @@ def get_spark_session(S3_ACCESS_KEY: str,S3_SECRET_KEY: str , S3_ENDPOINT: str)
3341
.getOrCreate()
3442
return spark
3543

36-
###################################################################################
37-
# GET MYSQL CREDENTIALS #
38-
###################################################################################
39-
def main() -> None:
40-
MYSQL_DATABASE = os.getenv("MYSQL_DATABASE")
41-
MYSQL_HOST = os.getenv("MYSQL_HOST")
42-
MYSQL_PORT = os.getenv("MYSQL_PORT")
43-
MYSQL_USER = os.getenv("MYSQL_USER")
44-
MYSQL_SECRET = os.getenv("MYSQL_SECRET")
45-
jdbc_url = f"jdbc:mysql://{MYSQL_HOST}:{MYSQL_PORT}/{MYSQL_DATABASE}"
46-
44+
def add_ingestion_metadata_column(df: Dataframe,table: str) -> Dataframe:
45+
tmp_df = df.withColumn("ingestion_date", F.current_timestamp()).withColumn("source_name", F.lit(table))
46+
return tmp_df
4747

48+
def add_date_partition_columns(df: Dataframe,column_name:str) -> Dataframe:
49+
df = df.withColumn("year", F.year(F.col(column_name)))\
50+
.withColumn("month", F.month(F.col(column_name)))\
51+
.withColumn("day",F.day(column_name))
4852

49-
###################################################################################
50-
# GET S3 CREDENTIALS #
51-
###################################################################################
52-
S3_ACCESS_KEY = str(os.getenv("S3_ACCESS_KEY"))
53-
S3_SECRET_KEY = str(os.getenv("S3_SECRET_KEY"))
54-
S3_ENDPOINT = str(os.getenv("S3_ENDPOINT"))
53+
def main() -> None:
54+
MYSQL_DATABASE = get_required_env("MYSQL_DATABASE")
55+
MYSQL_HOST = get_required_env("MYSQL_HOST")
56+
MYSQL_PORT = get_required_env("MYSQL_PORT")
57+
MYSQL_USER = get_required_env("MYSQL_USER")
58+
MYSQL_SECRET = get_required_env("MYSQL_SECRET")
59+
jdbc_url = f"jdbc:mysql://{MYSQL_HOST}:{MYSQL_PORT}/{MYSQL_DATABASE}"
5560

61+
S3_ACCESS_KEY = get_required_env("S3_ACCESS_KEY")
62+
S3_SECRET_KEY = get_required_env("S3_SECRET_KEY")
63+
S3_ENDPOINT = get_required_env("S3_ENDPOINT")
5664
args = get_args()
5765
S3_SAVEPATH = args.savepath
5866
undesired_column = args.undesired_column
@@ -65,15 +73,15 @@ def main() -> None:
6573
"auth_userprofile",
6674
"student_userattribute",
6775
"organizations_organization",
76+
"organizations_historicalorganization"
6877
"auth_user"
6978
]
7079

80+
spark = get_spark_session(S3_ACCESS_KEY=S3_ACCESS_KEY,S3_SECRET_KEY=S3_SECRET_KEY,S3_ENDPOINT=S3_ENDPOINT)
7181
for table in TABLES:
7282

7383
logging.info(f"getting table {table}")
7484
try:
75-
76-
spark = get_spark_session(S3_ACCESS_KEY=S3_ACCESS_KEY,S3_SECRET_KEY=S3_SECRET_KEY,S3_ENDPOINT=S3_ENDPOINT)
7785

7886
df = spark.read.format("jdbc") \
7987
.option("url", jdbc_url) \
@@ -85,18 +93,20 @@ def main() -> None:
8593
if table == "auth_user":
8694
df = df.drop(undesired_column)
8795

88-
df = df.withColumn("ingestion_date", F.current_timestamp()) \
89-
.withColumn("source_name", F.lit(table))
96+
df = add_ingestion_metadata_column(df=df,table=table)
97+
df = add_date_partition_columns(df,"ingestion_date")
9098
if table == "auth_user" and undesired_column and undesired_column in df.columns:
9199
raise Exception("THE undesired column stills in the dataframe")
100+
92101
output_path = f"{S3_SAVEPATH}/{table}"
93102

94-
df.write.format("delta").mode("append").save(output_path)
103+
df.write.format("delta").mode("append").partitionBy("year", "month","day").save(output_path)
95104

96105
logging.info(f"Data saved as Delta table to {output_path}")
97106

98107
except Exception as e:
99108
logging.error(f"Pipeline failed: {e}")
109+
100110
spark.stop()
101111

102112

src/bronze/incremental_load.py

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,11 @@
1515
]
1616
)
1717

18+
def get_required_env(env_name:str) -> str:
19+
env_value = os.getenv(env_name)
20+
if env_value is None:
21+
raise ValueError(f"Environment variable {env_name} is not set")
22+
return env_value
1823

1924

2025

@@ -67,10 +72,12 @@ def get_spark_session(S3_ACCESS_KEY: str,S3_SECRET_KEY: str , S3_ENDPOINT: str)
6772

6873

6974
def full_initial_ingestion(spark: SparkSession, table: str, savepath: str, jdbc_url:str, MYSQL_USER:str, MYSQL_SECRET:str) -> Tuple[bool, str]:
70-
years = [i for i in range(2019,2100)]
71-
months = [i for i in range(1,13)]
7275
current_year = datetime.now().year
7376
current_month = datetime.now().month
77+
last_year_in_loop = int(current_year)+1
78+
years = [i for i in range(2019,last_year_in_loop)]
79+
months = [i for i in range(1,13)]
80+
7481
path = f"{savepath}/{table}"
7582

7683
for year in years:
@@ -162,17 +169,17 @@ def delta_load(spark: SparkSession, jdbc_url:str, MYSQL_USER:str, MYSQL_SECRET:s
162169

163170
def main() -> None:
164171

165-
MYSQL_DATABASE = os.getenv("MYSQL_DATABASE")
166-
MYSQL_HOST = os.getenv("MYSQL_HOST")
167-
MYSQL_PORT = os.getenv("MYSQL_PORT")
168-
MYSQL_USER = str(os.getenv("MYSQL_USER"))
169-
MYSQL_SECRET = str(os.getenv("MYSQL_SECRET"))
172+
MYSQL_DATABASE = get_required_env("MYSQL_DATABASE")
173+
MYSQL_HOST = get_required_env("MYSQL_HOST")
174+
MYSQL_PORT = get_required_env("MYSQL_PORT")
175+
MYSQL_USER = get_required_env("MYSQL_USER")
176+
MYSQL_SECRET = get_required_env("MYSQL_SECRET")
170177
jdbc_url = f"jdbc:mysql://{MYSQL_HOST}:{MYSQL_PORT}/{MYSQL_DATABASE}"
171178

172-
S3_ACCESS_KEY = str(os.getenv("S3_ACCESS_KEY"))
173-
S3_SECRET_KEY = str(os.getenv("S3_SECRET_KEY"))
174-
S3_ENDPOINT = str(os.getenv("S3_ENDPOINT"))
175-
179+
S3_ACCESS_KEY = get_required_env("S3_ACCESS_KEY")
180+
S3_SECRET_KEY = get_required_env("S3_SECRET_KEY")
181+
S3_ENDPOINT = get_required_env("S3_ENDPOINT")
182+
176183
args = get_args()
177184
savepath = args.savepath
178185
metadata = args.metadatapath

src/utils/utils.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
import os
2+
from pyspark.sql import SparkSession #type:ignore
3+
4+
class utils:
5+
def __init__(self) -> None:
6+
pass
7+
8+
def get_required_env(self,env_name:str) -> str:
9+
env_value = os.getenv(env_name)
10+
if env_value is None:
11+
raise ValueError(f"Environment variable {env_name} is not set")
12+
return env_value
13+
14+
def get_spark_session(self,S3_ACCESS_KEY: str,S3_SECRET_KEY: str , S3_ENDPOINT: str,app_name:str) -> SparkSession:
15+
16+
spark = SparkSession.builder \
17+
.appName(app_name) \
18+
.config("spark.jars", "/opt/spark/jars/hadoop-aws-3.3.4.jar,/opt/spark/jars/aws-java-sdk-bundle-1.12.375.jar,/opt/spark/jars/delta-spark_2.12-3.2.1.jar,/opt/spark/jars/delta-storage-3.2.1.jar,/opt/spark/jars/delta-kernel-api-3.2.1.jar,/opt/spark/jars/mysql-connector-j-8.3.0.jar") \
19+
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")\
20+
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")\
21+
.config("spark.hadoop.fs.s3a.access.key", S3_ACCESS_KEY) \
22+
.config("spark.hadoop.fs.s3a.secret.key", S3_SECRET_KEY) \
23+
.config("spark.hadoop.fs.s3a.endpoint", S3_ENDPOINT) \
24+
.config("spark.hadoop.fs.s3a.path.style.access", "true") \
25+
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
26+
.getOrCreate()
27+
return spark

0 commit comments

Comments
 (0)