Skip to content

Commit e3a7300

Browse files
authored
Merge pull request #811 from NHSDigital/feature/jale13-nrl-1268-explode-header-col
NRL 1268/1269 explode header col and format date cols
2 parents c4e4698 + 47205c1 commit e3a7300

File tree

4 files changed

+56
-15
lines changed

4 files changed

+56
-15
lines changed

terraform/account-wide-infrastructure/modules/glue/glue.tf

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,8 @@ resource "aws_glue_job" "glue_job" {
5151
"--datalake-formats" = "delta"
5252
"--source_path" = "s3://${aws_s3_bucket.source-data-bucket.id}/" # Specify the source S3 path
5353
"--target_path" = "s3://${aws_s3_bucket.target-data-bucket.id}/logs" # Specify the destination S3 path
54-
"--job_name" = "poc-glue-job"
54+
"--job_name" = "${var.name_prefix}-glue-job"
55+
"--partition_cols" = "date"
5556
"--enable-continuous-log-filter" = "true"
5657
"--enable-metrics" = "true"
5758
"--extra-py-files" = "s3://${aws_s3_bucket.code-bucket.id}/src.zip"

terraform/account-wide-infrastructure/modules/glue/src/main.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@
66
from transformations import dtype_conversion, flatten_df, logSchema
77

88
# Get arguments from AWS Glue job
9-
args = getResolvedOptions(sys.argv, ["job_name", "source_path", "target_path"])
9+
args = getResolvedOptions(
10+
sys.argv, ["job_name", "source_path", "target_path", "partition_cols"]
11+
)
1012

1113
# Start Glue context
1214
sc = SparkContext()
@@ -19,6 +21,7 @@
1921
source_path=args["source_path"],
2022
target_path=args["target_path"],
2123
schema=logSchema,
24+
job_name=args["job_name"],
2225
partition_cols=partition_cols,
2326
transformations=[flatten_df, dtype_conversion],
2427
)

terraform/account-wide-infrastructure/modules/glue/src/pipeline.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import boto3
12
from instances import GlueContextSingleton, LoggerSingleton
23

34

@@ -8,6 +9,7 @@ def __init__(
89
source_path,
910
target_path,
1011
schema,
12+
job_name,
1113
partition_cols=[],
1214
transformations=[],
1315
):
@@ -20,6 +22,12 @@ def __init__(
2022
self.schema = schema
2123
self.partition_cols = partition_cols
2224
self.transformations = transformations
25+
self.glue = boto3.client(
26+
service_name="glue",
27+
region_name="eu-west-2",
28+
endpoint_url="https://glue.eu-west-2.amazonaws.com",
29+
)
30+
self.name_prefix = "-".join(job_name.split("-")[:4])
2331

2432
def run(self):
2533
"""Runs ETL"""
@@ -31,6 +39,8 @@ def run(self):
3139
self.logger.info("Data transformed successfully.")
3240
self.load(df)
3341
self.logger.info(f"Data loaded into {self.target_path}.")
42+
self.logger.info("Trigger glue crawler")
43+
self.trigger_crawler()
3444
except Exception as e:
3545
self.logger.error(f"ETL process failed: {e}")
3646
raise e
@@ -57,3 +67,6 @@ def load(self, dataframe):
5767
dataframe.write.mode("append").partitionBy(*self.partition_cols).parquet(
5868
self.target_path
5969
)
70+
71+
def trigger_crawler(self):
72+
self.glue.start_crawler(Name=f"{self.name_prefix}-log-crawler")

terraform/account-wide-infrastructure/modules/glue/src/transformations.py

Lines changed: 37 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,21 @@
1-
from pyspark.sql.functions import to_timestamp
1+
from pyspark.sql.functions import (
2+
col,
3+
from_unixtime,
4+
regexp_replace,
5+
to_date,
6+
to_timestamp,
7+
)
28
from pyspark.sql.types import (
39
BooleanType,
10+
DoubleType,
411
StringType,
512
StructField,
613
StructType,
7-
TimestampType,
814
)
915

1016
logSchema = StructType(
1117
[
12-
StructField("time", TimestampType(), True),
18+
StructField("time", DoubleType(), True),
1319
StructField("index", StringType(), True),
1420
StructField("host", StringType(), True),
1521
StructField("source", StringType(), True),
@@ -60,17 +66,35 @@
6066

6167

6268
def flatten_df(df):
63-
cols = []
64-
for c in df.dtypes:
65-
if "struct" in c[1]:
66-
nested_col = c[0]
67-
else:
68-
cols.append(c[0])
69-
return df.select(*cols, f"{nested_col}.*")
69+
def flatten(schema, prefix=""):
70+
"""
71+
Recursively traverse the schema to extract all nested fields.
72+
"""
73+
fields = []
74+
for field in schema.fields:
75+
name = f"{prefix}.{field.name}" if prefix else field.name
76+
if isinstance(field.dataType, StructType):
77+
fields += flatten(field.dataType, name)
78+
else:
79+
alias_name = name.replace(".", "_")
80+
fields.append((name, alias_name))
81+
return fields
82+
83+
flat_columns = flatten(df.schema)
84+
85+
return df.select([col(c).alias(n) for c, n in flat_columns])
7086

7187

7288
def dtype_conversion(df):
73-
df = df.withColumn(
74-
"timestamp", to_timestamp(df["timestamp"], "yyyy-MM-dd HH:mm:ss,SSSXXX")
89+
df = (
90+
df.withColumn(
91+
"event_timestamp_cleaned", regexp_replace(col("event_timestamp"), ",", ".")
92+
)
93+
.withColumn(
94+
"event_timestamp",
95+
to_timestamp(col("event_timestamp_cleaned"), "yyyy-MM-dd HH:mm:ss.SSSZ"),
96+
)
97+
.withColumn("time", from_unixtime(col("time")).cast("timestamp"))
98+
.withColumn("date", to_date(col("time")))
7599
)
76-
return df
100+
return df.drop("event_timestamp_cleaned")

0 commit comments

Comments
 (0)