Skip to content

Commit a2502fc

Browse files
jacklearyaxelkrastek1-nhs
authored andcommitted
NRL-1186 python transformation script update
1 parent 1763cfd commit a2502fc

File tree

2 files changed

+110
-44
lines changed

2 files changed

+110
-44
lines changed
Lines changed: 110 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,45 +1,111 @@
1-
import sys
2-
from awsglue.transforms import *
3-
from awsglue.utils import getResolvedOptions
4-
from pyspark.context import SparkContext
51
from awsglue.context import GlueContext
6-
from awsglue.job import Job
7-
8-
# Initialize SparkContext, GlueContext, and SparkSession
9-
sc = SparkContext()
10-
glueContext = GlueContext(sc)
11-
spark = glueContext.spark_session
12-
13-
# Get job arguments
14-
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
15-
16-
# Create Glue job
17-
job = Job(glueContext)
18-
job.init(args['JOB_NAME'], args)
19-
20-
# Script generated for node AWS Glue Data Catalog
21-
AWSGlueDataCatalog_node1704102689282 = glueContext.create_dynamic_frame.from_catalog(
22-
database="raw-log",
23-
table_name="source_data_bucket",
24-
transformation_ctx="AWSGlueDataCatalog_node1704102689282",
25-
)
26-
27-
# Script generated for node Change Schema
28-
ChangeSchema_node1704102716061 = ApplyMapping.apply(
29-
frame=AWSGlueDataCatalog_node1704102689282,
30-
mappings=[
31-
# TBC
32-
],
33-
transformation_ctx="ChangeSchema_node1704102716061",
34-
)
35-
36-
# Script generated for node Amazon S3
37-
AmazonS3_node1704102720699 = glueContext.write_dynamic_frame.from_options(
38-
frame=ChangeSchema_node1704102716061,
39-
connection_type="s3",
40-
format="csv",
41-
connection_options={"path": "s3://target-data-bucket", "partitionKeys": []},
42-
transformation_ctx="AmazonS3_node1704102720699",
43-
)
44-
45-
job.commit()
2+
from awsglue.dynamicframe import DynamicFrame
3+
4+
# from awsglue.job import Job
5+
from pyspark.context import SparkContext
6+
7+
# from pyspark.sql import DataFrame
8+
9+
10+
def create_glue_context():
11+
# Initialize the SparkContext and GlueContext
12+
sc = SparkContext()
13+
glueContext = GlueContext(sc)
14+
15+
return glueContext
16+
17+
18+
def load_data_from_s3(
19+
glueContext, s3_path: str, file_type: str = "json", format_options: dict = {}
20+
):
21+
"""
22+
Loads data from S3 into a Glue DynamicFrame.
23+
"""
24+
if file_type == "json":
25+
return glueContext.create_dynamic_frame.from_options(
26+
connection_type="s3",
27+
connection_options={"paths": [s3_path]},
28+
format=file_type,
29+
)
30+
else:
31+
raise ValueError(f"Unsupported file_type: {file_type}")
32+
33+
34+
def transform_data(dynamic_frame: DynamicFrame) -> DynamicFrame:
35+
"""
36+
Example transformation function. Modify this to suit your transformation logic.
37+
"""
38+
# Convert DynamicFrame to DataFrame to leverage Spark SQL operations if needed
39+
df = dynamic_frame.toDF()
40+
41+
# Perform any necessary transformations using Spark DataFrame API
42+
df_transformed = df.filter(df["x"] == "placeholder")
43+
44+
# Convert DataFrame back to DynamicFrame for Glue compatibility
45+
transformed_dynamic_frame = DynamicFrame.fromDF(
46+
df_transformed, dynamic_frame.glue_ctx, "transformed_dynamic_frame"
47+
)
48+
49+
return transformed_dynamic_frame
50+
51+
52+
def write_data_to_s3(
53+
dynamic_frame: DynamicFrame,
54+
s3_path: str,
55+
file_type: str = "csv",
56+
partition_keys: list = None,
57+
):
58+
"""
59+
Writes a DynamicFrame to S3 with partitioning support for scalability.
60+
"""
61+
if file_type == "csv":
62+
dynamic_frame.toDF().write.option("header", "true").mode(
63+
"overwrite"
64+
).partitionBy(*partition_keys).csv(s3_path)
65+
elif file_type == "parquet":
66+
dynamic_frame.toDF().write.mode("overwrite").partitionBy(
67+
*partition_keys
68+
).parquet(s3_path)
69+
elif file_type == "json":
70+
dynamic_frame.toDF().write.mode("overwrite").partitionBy(*partition_keys).json(
71+
s3_path
72+
)
73+
else:
74+
raise ValueError(f"Unsupported file_type: {file_type}")
75+
76+
77+
def handle_error(exception: Exception):
78+
# Custom error handling for logging
79+
raise exception
80+
81+
82+
def main():
83+
try:
84+
# Initialize Glue Context
85+
glueContext = create_glue_context()
86+
87+
# Example paths and configurations
88+
input_path = "s3://source-data-bucket/input-data/" # probs worth using one bucket and different folders? Cuts costs
89+
output_path = "s3://target-data-bucket/output-data/"
90+
91+
# Load data from S3 (adjust format if needed)
92+
dynamic_frame = load_data_from_s3(glueContext, input_path, format="json")
93+
94+
# Transform data
95+
transformed_dynamic_frame = transform_data(dynamic_frame)
96+
97+
# Write the transformed data back to S3, partitioned by 'date'
98+
write_data_to_s3(
99+
transformed_dynamic_frame,
100+
output_path,
101+
format="csv",
102+
partition_keys=["date"],
103+
)
104+
105+
except Exception as e:
106+
handle_error(e)
107+
108+
109+
# Entry point for Glue job
110+
if __name__ == "__main__":
111+
main()

terraform/account-wide-infrastructure/modules/glue/src/transforms.py

Whitespace-only changes.

0 commit comments

Comments
 (0)