Skip to content

Commit a470c73

Browse files
committed
refactor ingestion script to use iceberg tables
1 parent d4f4f32 commit a470c73

File tree

7 files changed

+27
-186
lines changed

7 files changed

+27
-186
lines changed

Docker/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
FROM nauedu/nau-analytics-base-spark:latest
2-
2+
RUN pip install --no-cache-dir git+https://github.com/fccn/nau-analytics-utils.git@main#subdirectory=common_libs/utils
33
# Copy your application code
44
COPY src/ /opt/spark/work-dir/src/
55

Lines changed: 26 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,11 @@
1-
from pyspark.sql import SparkSession #type:ignore
21
from pyspark.sql import DataFrame #type:ignore
3-
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType #type:ignore
2+
from nau_analytics_data_product_utils_lib import Config,get_required_env,get_iceberg_spark_session #type: ignore
43
import pyspark.sql.functions as F #type:ignore
54
import argparse
65
import os
76
import logging
87
from typing import List, Union, Optional,Tuple
9-
from utils.utils import Utils
10-
from utils.config import Config
8+
119

1210
logging.basicConfig(
1311
level=logging.INFO,
@@ -17,12 +15,11 @@
1715
]
1816
)
1917

20-
utils_obj = Utils("dev")
18+
2119

2220

2321
def get_args() -> argparse.Namespace:
2422
parser = argparse.ArgumentParser()
25-
parser.add_argument("--savepath", type = str,required= True, help = "The S3 bucket intended for the data to be stored")
2623
parser.add_argument("--undesired_column", type = str,required= True, help = " the undesired column for a table")
2724
args = parser.parse_args()
2825
return args
@@ -32,32 +29,29 @@ def add_ingestion_metadata_column(df: DataFrame,table: str) -> DataFrame:
3229
tmp_df = df.withColumn("ingestion_date", F.current_timestamp()).withColumn("source_name", F.lit(table))
3330
return tmp_df
3431

35-
def add_date_partition_columns(df: DataFrame,column_name:str) -> DataFrame:
36-
df = df.withColumn("year", F.year(F.col(column_name)))\
37-
.withColumn("month", F.month(F.col(column_name)))\
38-
.withColumn("day",F.day(column_name))
39-
return df
4032

4133
def main() -> None:
4234

43-
MYSQL_DATABASE = utils_obj.get_required_env("MYSQL_DATABASE")
44-
MYSQL_HOST = utils_obj.get_required_env("MYSQL_HOST")
45-
MYSQL_PORT = utils_obj.get_required_env("MYSQL_PORT")
46-
MYSQL_USER = utils_obj.get_required_env("MYSQL_USER")
47-
MYSQL_SECRET = utils_obj.get_required_env("MYSQL_SECRET")
35+
MYSQL_DATABASE = get_required_env("MYSQL_DATABASE")
36+
MYSQL_HOST = get_required_env("MYSQL_HOST")
37+
MYSQL_PORT = get_required_env("MYSQL_PORT")
38+
MYSQL_USER = get_required_env("MYSQL_USER")
39+
MYSQL_SECRET = get_required_env("MYSQL_SECRET")
4840
jdbc_url = f"jdbc:mysql://{MYSQL_HOST}:{MYSQL_PORT}/{MYSQL_DATABASE}"
4941

50-
S3_ACCESS_KEY = utils_obj.get_required_env("S3_ACCESS_KEY")
51-
S3_SECRET_KEY = utils_obj.get_required_env("S3_SECRET_KEY")
52-
S3_ENDPOINT = utils_obj.get_required_env("S3_ENDPOINT")
42+
S3_ACCESS_KEY = get_required_env("S3_ACCESS_KEY")
43+
S3_SECRET_KEY = get_required_env("S3_SECRET_KEY")
44+
S3_ENDPOINT = get_required_env("S3_ENDPOINT")
5345
args = get_args()
54-
S3_SAVEPATH = args.savepath
5546
undesired_column = args.undesired_column
5647

57-
ICEBERG_CATALOG_URI = utils_obj.get_required_env("ICEBERG_CATALOG_URI")
58-
ICEBERG_CATALOG_USER = utils_obj.get_required_env("ICEBERG_CATALOG_USER")
59-
ICEBERG_CATALOG_PASSWORD = utils_obj.get_required_env("ICEBERG_CATALOG_PASSWORD")
60-
ICEBERG_CATALOG_WAREHOUSE = utils_obj.get_required_env("ICEBERG_CATALOG_WAREHOUSE")
48+
ICEBERG_CATALOG_HOST = get_required_env("ICEBERG_CATALOG_HOST")
49+
ICEBERG_CATALOG_PORT = get_required_env("ICEBERG_CATALOG_PORT")
50+
ICEBERG_CATALOG_NAME = get_required_env("ICEBERG_CATALOG_NAME")
51+
ICEBERG_CATALOG_USER = get_required_env("ICEBERG_CATALOG_USER")
52+
ICEBERG_CATALOG_PASSWORD = get_required_env("ICEBERG_CATALOG_PASSWORD")
53+
ICEBERG_CATALOG_WAREHOUSE = get_required_env("ICEBERG_CATALOG_WAREHOUSE")
54+
ICEBERG_CATALOG_URI = f"jdbc:mysql://{ICEBERG_CATALOG_HOST}:{ICEBERG_CATALOG_PORT}/{ICEBERG_CATALOG_NAME}"
6155

6256
TABLES = [
6357
"course_overviews_courseoverview",
@@ -82,7 +76,7 @@ def main() -> None:
8276
iceberg_catalog_warehouse=ICEBERG_CATALOG_WAREHOUSE
8377

8478
)
85-
spark = utils_obj.get_iceberg_spark_session(cfg=icerberg_cfg)
79+
spark = get_iceberg_spark_session(cfg=icerberg_cfg)
8680
for table in TABLES:
8781

8882
logging.info(f"getting table {table}")
@@ -95,19 +89,22 @@ def main() -> None:
9589
.option("driver", "com.mysql.cj.jdbc.Driver") \
9690
.option("dbtable", table) \
9791
.load()
92+
9893
if table == "auth_user":
9994
df = df.drop(undesired_column)
10095

96+
97+
10198
df = add_ingestion_metadata_column(df=df,table=table)
102-
df = add_date_partition_columns(df,"ingestion_date")
99+
103100
if table == "auth_user" and undesired_column and undesired_column in df.columns:
104101
raise Exception("THE undesired column stills in the dataframe")
105102

106-
output_path = f"{S3_SAVEPATH}/{table}"
107103

108-
df.write.format("iceberg").mode("append").partitionBy("year", "month","day").save(output_path)
104+
saveTable = f"bronze_local.entidades.{table}"
105+
df.write.format("iceberg").mode("append").saveAsTable(saveTable)
109106

110-
logging.info(f"Data saved as Delta table to {output_path}")
107+
logging.info(f"Data saved as Delta table to {saveTable}")
111108

112109
except Exception as e:
113110
logging.error(f"Pipeline failed: {e}")

src/bronze/sql/course_overviews_courseoverview_ddl.sql

Lines changed: 0 additions & 86 deletions
This file was deleted.

src/bronze/utils/__init__.py

Whitespace-only changes.

src/bronze/utils/config.py

Lines changed: 0 additions & 12 deletions
This file was deleted.

src/bronze/utils/utils.py

Lines changed: 0 additions & 58 deletions
This file was deleted.

0 commit comments

Comments
 (0)