Skip to content

Commit 479a7a9

Browse files
committed
NRL-1186 Lightweight framework for ETL process
1 parent 90ab98c commit 479a7a9

File tree

5 files changed

+100
-104
lines changed

5 files changed

+100
-104
lines changed

terraform/account-wide-infrastructure/modules/glue/src/transforms.py renamed to terraform/account-wide-infrastructure/modules/glue/src/__init__.py

File renamed without changes.
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
import logging
2+
3+
from awsglue.context import GlueContext
4+
from pyspark.sql import SparkSession
5+
6+
7+
class GlueContextSingleton:
8+
"""Singleton for GlueContext and SparkSession"""
9+
10+
_instance = None
11+
12+
def __new__(cls, spark_context):
13+
if not cls._instance:
14+
cls._instance = super().__new__(cls)
15+
cls._instance.spark = SparkSession.builder.getOrCreate()
16+
cls._instance.context = GlueContext(spark_context)
17+
return cls._instance
18+
19+
20+
class LoggerSingleton:
21+
"""Singleton for logger"""
22+
23+
_instance = None
24+
25+
def __new__(cls):
26+
if not cls._instance:
27+
cls._instance = super().__new__(cls)
28+
cls._instance.logger = logging.getLogger("ETLLogger")
29+
cls._instance.logger.setLevel(logging.INFO)
30+
return cls._instance
Lines changed: 19 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -1,111 +1,26 @@
1-
from awsglue.context import GlueContext
2-
from awsglue.dynamicframe import DynamicFrame
1+
import sys
32

4-
# from awsglue.job import Job
3+
from awsglue.utils import getResolvedOptions
54
from pyspark.context import SparkContext
65

7-
# from pyspark.sql import DataFrame
6+
# Get arguments from AWS Glue job
7+
args = getResolvedOptions(
8+
sys.argv, ["JOB_NAME", "SOURCE_PATH", "TARGET_PATH", "PARTITION_COLS"]
9+
)
810

11+
# Start Glue context
12+
sc = SparkContext()
913

10-
def create_glue_context():
11-
# Initialize the SparkContext and GlueContext
12-
sc = SparkContext()
13-
glueContext = GlueContext(sc)
14+
partition_cols = args["PARTITION_COLS"].split(",") if "PARTITION_COLS" in args else []
1415

15-
return glueContext
16+
# Initialize ETL process
17+
etl_job = ETLTemplate(
18+
spark_context=sc,
19+
source_path=args["SOURCE_PATH"],
20+
target_path=args["TARGET_PATH"],
21+
partition_cols=partition_cols,
22+
transformations=[placeholder],
23+
)
1624

17-
18-
def load_data_from_s3(
19-
glueContext, s3_path: str, file_type: str = "json", format_options: dict = {}
20-
):
21-
"""
22-
Loads data from S3 into a Glue DynamicFrame.
23-
"""
24-
if file_type == "json":
25-
return glueContext.create_dynamic_frame.from_options(
26-
connection_type="s3",
27-
connection_options={"paths": [s3_path]},
28-
format=file_type,
29-
)
30-
else:
31-
raise ValueError(f"Unsupported file_type: {file_type}")
32-
33-
34-
def transform_data(dynamic_frame: DynamicFrame) -> DynamicFrame:
35-
"""
36-
Example transformation function. Modify this to suit your transformation logic.
37-
"""
38-
# Convert DynamicFrame to DataFrame to leverage Spark SQL operations if needed
39-
df = dynamic_frame.toDF()
40-
41-
# Perform any necessary transformations using Spark DataFrame API
42-
df_transformed = df.filter(df["x"] == "placeholder")
43-
44-
# Convert DataFrame back to DynamicFrame for Glue compatibility
45-
transformed_dynamic_frame = DynamicFrame.fromDF(
46-
df_transformed, dynamic_frame.glue_ctx, "transformed_dynamic_frame"
47-
)
48-
49-
return transformed_dynamic_frame
50-
51-
52-
def write_data_to_s3(
53-
dynamic_frame: DynamicFrame,
54-
s3_path: str,
55-
file_type: str = "csv",
56-
partition_keys: list = None,
57-
):
58-
"""
59-
Writes a DynamicFrame to S3 with partitioning support for scalability.
60-
"""
61-
if file_type == "csv":
62-
dynamic_frame.toDF().write.option("header", "true").mode(
63-
"overwrite"
64-
).partitionBy(*partition_keys).csv(s3_path)
65-
elif file_type == "parquet":
66-
dynamic_frame.toDF().write.mode("overwrite").partitionBy(
67-
*partition_keys
68-
).parquet(s3_path)
69-
elif file_type == "json":
70-
dynamic_frame.toDF().write.mode("overwrite").partitionBy(*partition_keys).json(
71-
s3_path
72-
)
73-
else:
74-
raise ValueError(f"Unsupported file_type: {file_type}")
75-
76-
77-
def handle_error(exception: Exception):
78-
# Custom error handling for logging
79-
raise exception
80-
81-
82-
def main():
83-
try:
84-
# Initialize Glue Context
85-
glueContext = create_glue_context()
86-
87-
# Example paths and configurations
88-
input_path = "s3://source-data-bucket/input-data/" # probs worth using one bucket and different folders? Cuts costs
89-
output_path = "s3://target-data-bucket/output-data/"
90-
91-
# Load data from S3 (adjust format if needed)
92-
dynamic_frame = load_data_from_s3(glueContext, input_path, format="json")
93-
94-
# Transform data
95-
transformed_dynamic_frame = transform_data(dynamic_frame)
96-
97-
# Write the transformed data back to S3, partitioned by 'date'
98-
write_data_to_s3(
99-
transformed_dynamic_frame,
100-
output_path,
101-
format="csv",
102-
partition_keys=["date"],
103-
)
104-
105-
except Exception as e:
106-
handle_error(e)
107-
108-
109-
# Entry point for Glue job
110-
if __name__ == "__main__":
111-
main()
25+
# Run the job
26+
etl_job.run()
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
class ETLTemplate:
2+
def __init__(
3+
self,
4+
spark_context,
5+
source_path,
6+
target_path,
7+
partition_cols=None,
8+
transformations=[],
9+
):
10+
"""Initialize Glue context, Spark session, logger, and paths"""
11+
self.glue_context = GlueContextSingleton(spark_context).context
12+
self.spark = GlueContextSingleton(spark_context).spark
13+
self.logger = LoggerSingleton().logger
14+
self.source_path = source_path
15+
self.target_path = target_path
16+
self.partition_cols = partition_cols
17+
self.transformations = transformations
18+
19+
def run(self):
20+
"""Runs ETL"""
21+
try:
22+
self.logger.info("ETL Process started.")
23+
df = self.extract()
24+
self.logger.info(f"Data extracted from {self.source_path}.")
25+
df = self.transform(df)
26+
self.logger.info("Data transformed successfully.")
27+
self.load(df)
28+
self.logger.info(f"Data loaded into {self.target_path}.")
29+
except Exception as e:
30+
self.logger.error(f"ETL process failed: {e}")
31+
raise e
32+
33+
def extract(self):
34+
"""Extract JSON data from S3"""
35+
self.logger.info(f"Extracting data from {self.source_path} as JSON")
36+
return self.spark.read.json(self.source_path)
37+
38+
def transform(self, dataframe):
39+
"""Apply a list of transformations on the dataframe"""
40+
for transformation in self.transformations:
41+
self.logger.info(f"Applying transformation: {transformation.__name__}")
42+
dataframe = transformation(dataframe)
43+
return dataframe
44+
45+
def load(self, dataframe):
46+
"""Load transformed data into Parquet format"""
47+
self.logger.info(f"Loading data into {self.target_path} as Parquet")
48+
dataframe.write.mode("overwrite").partitionBy(*self.partition_cols).parquet(
49+
self.target_path
50+
)
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
def placeholder(): ...

0 commit comments

Comments
 (0)