Skip to content

Commit 3a8ec3c

Browse files
authored
Merge pull request #896 from NHSDigital/bug/jale13-NRL-1411-fix-schema-issue
NRL 1411 fix schema issue
2 parents 7167f45 + 8075257 commit 3a8ec3c

File tree

5 files changed

+99
-76
lines changed

5 files changed

+99
-76
lines changed

terraform/account-wide-infrastructure/modules/glue/glue.tf

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ resource "aws_glue_job" "glue_job" {
6868
worker_type = "G.1X"
6969
timeout = 2880
7070
max_retries = 0
71-
number_of_workers = 2
71+
number_of_workers = 4
7272
command {
7373
name = "glueetl"
7474
python_version = var.python_version

terraform/account-wide-infrastructure/modules/glue/iam.tf

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@ data "aws_iam_policy_document" "glue_service" {
6363

6464
resources = [
6565
"arn:aws:logs:*:*:*:/aws-glue/*",
66-
# "arn:aws:logs:*:*:*:/customlogs/*"
6766
]
6867

6968
effect = "Allow"
@@ -81,13 +80,25 @@ data "aws_iam_policy_document" "glue_service" {
8180
effect = "Allow"
8281
}
8382

83+
statement {
84+
actions = [
85+
"cloudwatch:Get*",
86+
"cloudwatch:List*",
87+
"cloudwatch:Put*",
88+
]
89+
resources = [
90+
"*"
91+
]
92+
effect = "Allow"
93+
}
94+
8495
statement {
8596
actions = [
8697
"iam:PassRole",
8798
]
8899
effect = "Allow"
89100
resources = [
90-
"*"
101+
"arn:aws:iam::*:role/AWSGlueServiceRole*"
91102
]
92103
}
93104
}

terraform/account-wide-infrastructure/modules/glue/src/main.py

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,9 @@
11
import sys
22

33
from awsglue.utils import getResolvedOptions
4-
from consumer_schemas import consumerSchemaList
54
from pipeline import LogPipeline
6-
from producer_schemas import producerSchemaList
75
from pyspark.context import SparkContext
8-
from transformations import dtype_conversion, flatten_df, resolve_dupes
6+
from transformations import dtype_conversion, rename_cols, resolve_dupes
97

108
# Get arguments from AWS Glue job
119
args = getResolvedOptions(
@@ -17,17 +15,29 @@
1715

1816
partition_cols = args["partition_cols"].split(",") if "partition_cols" in args else []
1917

20-
consumerSchemaList.update(producerSchemaList)
18+
host_prefixes = [
19+
"consumer--countDocumentReference",
20+
"consumer--searchPostDocumentReference",
21+
"consumer--searchDocumentReference",
22+
"consumer--readDocumentReference",
23+
"producer--searchPostDocumentReference",
24+
"producer--searchDocumentReference",
25+
"producer--readDocumentReference",
26+
"producer--upsertDocumentReference",
27+
"producer--updateDocumentReference",
28+
"producer--deleteDocumentReference",
29+
"producer--createDocumentReference",
30+
]
2131

2232
# Initialize ETL process
2333
etl_job = LogPipeline(
2434
spark_context=sc,
2535
source_path=args["source_path"],
2636
target_path=args["target_path"],
27-
schemas=consumerSchemaList,
37+
host_prefixes=host_prefixes,
2838
job_name=args["job_name"],
2939
partition_cols=partition_cols,
30-
transformations=[flatten_df, resolve_dupes, dtype_conversion],
40+
transformations=[rename_cols, resolve_dupes, dtype_conversion],
3141
)
3242

3343
# Run the job

terraform/account-wide-infrastructure/modules/glue/src/pipeline.py

Lines changed: 37 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import boto3
44
from instances import GlueContextSingleton, LoggerSingleton
5-
from pyspark.sql.functions import col
65

76

87
class LogPipeline:
@@ -11,7 +10,7 @@ def __init__(
1110
spark_context,
1211
source_path,
1312
target_path,
14-
schemas,
13+
host_prefixes,
1514
job_name,
1615
partition_cols=[],
1716
transformations=[],
@@ -22,7 +21,7 @@ def __init__(
2221
self.logger = LoggerSingleton().logger
2322
self.source_path = source_path
2423
self.target_path = target_path
25-
self.schemas = schemas
24+
self.host_prefixes = host_prefixes
2625
self.partition_cols = partition_cols
2726
self.transformations = transformations
2827
self.glue = boto3.client(
@@ -37,10 +36,10 @@ def run(self):
3736
"""Runs ETL"""
3837
try:
3938
self.logger.info("ETL Process started.")
40-
data = self.extract()
39+
data = self.extract_dynamic()
4140
self.logger.info(f"Data extracted from {self.source_path}.")
4241
for name, df in data.items():
43-
data[name] = self.transform(df)
42+
data[name] = self.transform(df, name)
4443
self.logger.info("Data transformed successfully.")
4544
self.load(data)
4645
self.logger.info(f"Data loaded into {self.target_path}.")
@@ -51,6 +50,7 @@ def run(self):
5150
raise e
5251

5352
def get_last_run(self):
53+
self.logger.info("Retrieving last successful runtime.")
5454
all_runs = self.glue.get_job_runs(JobName=self.job_name)
5555
if not all_runs["JobRuns"]:
5656
return None
@@ -59,28 +59,41 @@ def get_last_run(self):
5959
if run["JobRunState"] == "SUCCEEDED":
6060
return time.mktime(run["StartedOn"].timetuple())
6161

62-
def extract(self):
62+
return None
63+
64+
def extract_dynamic(self):
6365
"""Extract JSON data from S3"""
64-
self.logger.info(f"Extracting data from {self.source_path} as JSON")
6566
last_runtime = self.get_last_run()
6667
data = {}
67-
for name, schema in self.schemas.items():
68+
data_source = self.glue_context.getSource("s3", paths=[self.source_path])
69+
data_source.setFormat("json")
70+
self.logger.info(f"Extracting data from {self.source_path} as JSON")
71+
for name in self.host_prefixes:
6872
if last_runtime:
69-
data[name] = (
70-
self.spark.read.option("recursiveFileLookup", "true")
71-
.schema(schema)
72-
.json(self.source_path)
73-
).where((col("host").contains(name)) & (col("time") > last_runtime))
73+
data[name] = self.glue_context.create_dynamic_frame.from_options(
74+
connection_type="s3",
75+
connection_options={"paths": [self.source_path], "recurse": True},
76+
format="json",
77+
).filter(
78+
f=lambda x, n=name: (x["host"].endswith(n))
79+
and (x["time"] > last_runtime)
80+
)
81+
7482
else:
75-
data[name] = (
76-
self.spark.read.option("recursiveFileLookup", "true")
77-
.schema(schema)
78-
.json(self.source_path)
79-
).where(col("host").contains(name))
83+
data[name] = self.glue_context.create_dynamic_frame.from_options(
84+
connection_type="s3",
85+
connection_options={"paths": [self.source_path], "recurse": True},
86+
format="json",
87+
).filter(f=lambda x, n=name: x["host"].endswith(n))
88+
8089
return data
8190

82-
def transform(self, dataframe):
91+
def transform(self, dataframe, name):
8392
"""Apply a list of transformations on the dataframe"""
93+
self.spark.conf.set("spark.sql.caseSensitive", True)
94+
dataframe = (
95+
dataframe.relationalize("root", f"./tmp/{name}").select("root").toDF()
96+
)
8497
for transformation in self.transformations:
8598
self.logger.info(f"Applying transformation: {transformation.__name__}")
8699
dataframe = transformation(dataframe)
@@ -90,13 +103,13 @@ def load(self, data):
90103
"""Load transformed data into Parquet format"""
91104
self.logger.info(f"Loading data into {self.target_path} as Parquet")
92105
for name, dataframe in data.items():
93-
if dataframe.na.drop().count() > 0:
94-
name = name.replace("--", "_")
95-
dataframe.write.mode("append").partitionBy(
106+
name = name.replace("--", "_")
107+
try:
108+
dataframe.coalesce(1).write.mode("append").partitionBy(
96109
*self.partition_cols
97110
).parquet(f"{self.target_path}{name}")
98-
else:
99-
self.logger.info(f"Dataframe {name} is null, skipping")
111+
except:
112+
self.logger.info(f"{name} dataframe has no rows. Skipping.")
100113

101114
def trigger_crawler(self):
102115
self.glue.start_crawler(Name=f"{self.name_prefix}-log-crawler")

terraform/account-wide-infrastructure/modules/glue/src/transformations.py

Lines changed: 32 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,14 @@
22
coalesce,
33
col,
44
concat,
5-
explode_outer,
65
from_unixtime,
76
lit,
87
regexp_replace,
98
to_date,
109
to_timestamp,
1110
when,
1211
)
13-
from pyspark.sql.types import ArrayType, StructType
12+
from pyspark.sql.types import NullType
1413

1514

1615
def resolve_dupes(df):
@@ -33,49 +32,39 @@ def resolve_dupes(df):
3332
return df
3433

3534

36-
def flatten_df(df):
37-
complex_fields = dict(
38-
[
39-
(field.name, field.dataType)
40-
for field in df.schema.fields
41-
if isinstance(field.dataType, ArrayType)
42-
or isinstance(field.dataType, StructType)
43-
]
44-
)
45-
while len(complex_fields) != 0:
46-
col_name = list(complex_fields.keys())[0]
47-
48-
if isinstance(complex_fields[col_name], StructType):
49-
expanded = [
50-
col(col_name + "." + k).alias(col_name + "_" + k)
51-
for k in [n.name for n in complex_fields[col_name]]
52-
]
53-
df = df.select("*", *expanded).drop(col_name)
54-
55-
elif isinstance(complex_fields[col_name], ArrayType):
56-
df = df.withColumn(col_name, explode_outer(col_name))
57-
58-
complex_fields = dict(
59-
[
60-
(field.name, field.dataType)
61-
for field in df.schema.fields
62-
if isinstance(field.dataType, ArrayType)
63-
or isinstance(field.dataType, ArrayType)
64-
]
65-
)
35+
def rename_cols(df):
36+
for col_name in df.columns:
37+
df = df.withColumnRenamed(col_name, col_name.replace(".", "_"))
6638
return df
6739

6840

6941
def dtype_conversion(df):
70-
df = (
71-
df.withColumn(
72-
"event_timestamp_cleaned", regexp_replace(col("event_timestamp"), ",", ".")
42+
try:
43+
df = (
44+
df.withColumn(
45+
"event_timestamp_cleaned",
46+
regexp_replace(col("event_timestamp"), ",", "."),
47+
)
48+
.withColumn(
49+
"event_timestamp",
50+
to_timestamp(
51+
col("event_timestamp_cleaned"), "yyyy-MM-dd HH:mm:ss.SSSZ"
52+
),
53+
)
54+
.withColumn("time", from_unixtime(col("time")).cast("timestamp"))
55+
.withColumn("date", to_date(col("time")))
7356
)
74-
.withColumn(
75-
"event_timestamp",
76-
to_timestamp(col("event_timestamp_cleaned"), "yyyy-MM-dd HH:mm:ss.SSSZ"),
77-
)
78-
.withColumn("time", from_unixtime(col("time")).cast("timestamp"))
79-
.withColumn("date", to_date(col("time")))
80-
)
81-
return df.drop("event_timestamp_cleaned")
57+
58+
df = df.drop("event_timestamp_cleaned")
59+
except:
60+
...
61+
62+
select_exprs = []
63+
for column_name in df.columns:
64+
column_type = df.schema[column_name].dataType
65+
if isinstance(column_type, NullType):
66+
select_exprs.append(col(column_name).cast("string").alias(column_name))
67+
else:
68+
select_exprs.append(col(column_name))
69+
70+
return df.select(*select_exprs)

0 commit comments

Comments
 (0)