Skip to content

Commit 543df9d

Browse files
authored
Merge pull request #976 from NHSDigital/feature/jale13-1346-handle-splunk-ssp-data
NRL-1346 Handle SSP logs
2 parents eb44946 + 1a97292 commit 543df9d

File tree

8 files changed

+113
-22
lines changed

8 files changed

+113
-22
lines changed

terraform/account-wide-infrastructure/dev/glue.tf

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,6 @@ module "dev-glue" {
22
is_enabled = var.enable_reporting
33
source = "../modules/glue"
44
name_prefix = "nhsd-nrlf--dev"
5+
schedule = false
56
python_version = 3
67
}

terraform/account-wide-infrastructure/modules/glue/glue.tf

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,9 @@ resource "aws_glue_crawler" "log_crawler" {
4646
s3_target {
4747
path = "s3://${aws_s3_bucket.target-data-bucket.id}/producer_upsertDocumentReference/"
4848
}
49+
s3_target {
50+
path = "s3://${aws_s3_bucket.target-data-bucket.id}/spine_sspDocumentRetrieval/"
51+
}
4952
schema_change_policy {
5053
delete_behavior = "LOG"
5154
}
@@ -56,6 +59,7 @@ resource "aws_glue_crawler" "log_crawler" {
5659
}
5760
})
5861
}
62+
5963
resource "aws_glue_trigger" "log_trigger" {
6064
count = var.is_enabled ? 1 : 0
6165

@@ -66,6 +70,18 @@ resource "aws_glue_trigger" "log_trigger" {
6670
}
6771
}
6872

73+
resource "aws_glue_trigger" "glue_trigger" {
74+
count = var.schedule && var.is_enabled ? 1 : 0
75+
76+
name = "${var.name_prefix}-glue-trigger"
77+
type = "SCHEDULED"
78+
schedule = "cron(0 1 * * ? *)"
79+
80+
actions {
81+
job_name = aws_glue_job.glue_job[0].name
82+
}
83+
}
84+
6985
resource "aws_glue_job" "glue_job" {
7086
count = var.is_enabled ? 1 : 0
7187

terraform/account-wide-infrastructure/modules/glue/src/main.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from awsglue.utils import getResolvedOptions
66
from pipeline import LogPipeline
77
from pyspark.sql import SparkSession
8-
from transformations import dtype_conversion, rename_cols, resolve_dupes
8+
from transformations import dtype_conversion, format_ssp, rename_cols, resolve_dupes
99

1010
# Spark and Glue Context initialization
1111
spark = SparkSession.builder.config("spark.sql.caseSensitive", "true").getOrCreate()
@@ -37,6 +37,7 @@
3737
"producer--updateDocumentReference",
3838
"producer--deleteDocumentReference",
3939
"producer--createDocumentReference",
40+
"s2c",
4041
]
4142

4243
# Initialize ETL process
@@ -49,7 +50,7 @@
4950
host_prefixes=host_prefixes,
5051
job_name=args["job_name"],
5152
partition_cols=partition_cols,
52-
transformations=[rename_cols, resolve_dupes, dtype_conversion],
53+
transformations=[rename_cols, resolve_dupes, dtype_conversion, format_ssp],
5354
)
5455

5556
# Run the job

terraform/account-wide-infrastructure/modules/glue/src/pipeline.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ def extract_dynamic(self):
8181
},
8282
format="json",
8383
).filter(
84-
f=lambda x, n=name: (x["host"].endswith(n))
84+
f=lambda x, n=name: (x["host"] is not None and n in x["host"])
8585
and (x["time"] > last_runtime)
8686
)
8787

@@ -95,7 +95,7 @@ def extract_dynamic(self):
9595
"groupSize": "134217728",
9696
},
9797
format="json",
98-
).filter(f=lambda x, n=name: x["host"].endswith(n))
98+
).filter(f=lambda x, n=name: (x["host"] is not None and n in x["host"]))
9999

100100
return data
101101

@@ -107,23 +107,29 @@ def transform(self, dataframe, name):
107107
)
108108
for transformation in self.transformations:
109109
self.logger.info(f"Applying transformation: {transformation.__name__}")
110-
dataframe = transformation(dataframe, self.logger)
110+
dataframe = transformation(dataframe, self.logger, name)
111111
return dataframe
112112

113113
def load(self, data):
114114
"""Load transformed data into Parquet format"""
115115
self.logger.info(f"Loading data into {self.target_path} as Parquet")
116116
for name, dataframe in data.items():
117117
name = name.replace("--", "_")
118+
if name == "s2c":
119+
name = "spine_sspDocumentRetrieval"
118120
try:
121+
if dataframe.rdd.isEmpty():
122+
self.logger.info(f"{name} dataframe has no rows. Skipping.")
123+
continue
124+
119125
self.logger.info(
120126
f"Attempting to load dataframe {name} into {self.target_path}{name}"
121127
)
122128
dataframe.write.mode("append").partitionBy(
123129
*self.partition_cols
124130
).parquet(f"{self.target_path}{name}")
125-
except:
126-
self.logger.info(f"{name} dataframe has no rows. Skipping.")
131+
except Exception as e:
132+
self.logger.info(f"{name} failed to write with error: {e}")
127133

128134
def trigger_crawler(self):
129135
self.glue.start_crawler(Name=f"{self.name_prefix}-log-crawler")

terraform/account-wide-infrastructure/modules/glue/src/transformations.py

Lines changed: 74 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,48 @@
1515
from pyspark.sql.types import NullType
1616

1717

18-
def resolve_dupes(df, logger):
18+
def format_ssp(df, logger, name):
19+
if name != "s2c":
20+
logger.info(f"Not SSP logs, returning df: {name}")
21+
return df
22+
23+
if df.rdd.isEmpty():
24+
logger.info(f"{name} dataframe has no rows. Skipping format_ssp.")
25+
return df
26+
27+
logger.info("Processing SSP logs")
28+
no_ods_code = df.filter(col("logReference") != "SSP0001").select(
29+
"time",
30+
"host",
31+
"internalID",
32+
"logReference",
33+
"interaction",
34+
"responseCode",
35+
"responseErrorMessage",
36+
"totalDuration",
37+
)
38+
ods_code = df.filter(col("logReference") == "SSP0001").select(
39+
"sspFrom",
40+
"fromOrgName",
41+
"fromOdsCode",
42+
"fromPostCode",
43+
"sspTo",
44+
"toOrgName",
45+
"toOdsCode",
46+
"toPostCode",
47+
"internalID",
48+
)
49+
50+
df = no_ods_code.join(ods_code, on="internalID", how="left")
51+
52+
return df
53+
54+
55+
def resolve_dupes(df, logger, name):
56+
if df.rdd.isEmpty():
57+
logger.info(f"{name} dataframe has no rows. Skipping resolve_dupes.")
58+
return df
59+
1960
column_groups = defaultdict(list)
2061
for column_name in df.columns:
2162
normalised_name = column_name.lower().rstrip("_")
@@ -27,7 +68,9 @@ def resolve_dupes(df, logger):
2768
if len(original_names) == 1:
2869
final_select_exprs.append(col(original_names[0]).alias(lower_name))
2970
else:
30-
logger.info(f"Resolving duplicate group '{lower_name}': {original_names}")
71+
logger.info(
72+
f"Resolving duplicate group '{lower_name}': {original_names} for df: {name}"
73+
)
3174

3275
merge_logic = lambda col1, col2: when(
3376
col1.isNull() | col2.isNull(), coalesce(col1, col2)
@@ -40,34 +83,50 @@ def resolve_dupes(df, logger):
4083
return df.select(*final_select_exprs)
4184

4285

43-
def rename_cols(df, logger):
44-
logger.info("Replacing '.' with '_'")
86+
def rename_cols(df, logger, name):
87+
if df.rdd.isEmpty():
88+
logger.info(f"{name} dataframe has no rows. Skipping rename_cols.")
89+
return df
90+
91+
logger.info(f"Replacing '.' with '_' for df: {name}")
4592
for col_name in df.columns:
4693
df = df.withColumnRenamed(col_name, col_name.replace(".", "_"))
4794
return df
4895

4996

50-
def dtype_conversion(df, logger):
97+
def dtype_conversion(df, logger, name):
98+
if df.rdd.isEmpty():
99+
logger.info(f"{name} dataframe has no rows. Skipping dtype_conversion.")
100+
return df
51101
try:
52-
logger.info("Formatting event_timestamp")
53-
df = (
54-
df.withColumn(
102+
logger.info(f"Formatting event_timestamp, time and date columns for df: {name}")
103+
if "event_timestamp" in df.columns:
104+
df = df.withColumn(
55105
"event_timestamp_cleaned",
56106
regexp_replace(col("event_timestamp"), ",", "."),
57-
)
58-
.withColumn(
107+
).withColumn(
59108
"event_timestamp",
60109
to_timestamp(
61110
col("event_timestamp_cleaned"), "yyyy-MM-dd HH:mm:ss.SSSZ"
62111
),
63112
)
64-
.withColumn("time", from_unixtime(col("time")).cast("timestamp"))
65-
.withColumn("date", to_date(col("time")))
66-
)
67113

68-
df = df.drop("event_timestamp_cleaned")
114+
df = df.drop("event_timestamp_cleaned")
115+
116+
if "time" in df.columns:
117+
df = df.withColumn(
118+
"time", from_unixtime(col("time")).cast("timestamp")
119+
).withColumn("date", to_date(col("time")))
120+
121+
if "_time" in df.columns:
122+
df = df.withColumn(
123+
"time", to_timestamp(col("_time"), "yyyy-MM-dd HH:mm:ss.SSSZ")
124+
).withColumn("date", to_date(col("time")))
125+
126+
df = df.drop("_time")
127+
69128
except Exception as e:
70-
logger.info(f"Failed formatting of timestamp column with error: {e}")
129+
logger.info(f"Failed formatting of timestamp columns with error: {e}")
71130

72131
logger.info("Handling Null Type columns")
73132
select_exprs = []

terraform/account-wide-infrastructure/modules/glue/vars.tf

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,3 +28,9 @@ variable "is_enabled" {
2828
description = "Flag to enable or disable the Glue module"
2929
default = true
3030
}
31+
32+
variable "schedule" {
33+
type = bool
34+
description = "Flag to enable or disable the Glue schedule"
35+
default = false
36+
}

terraform/account-wide-infrastructure/prod/glue.tf

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,6 @@ module "prod-glue" {
22
is_enabled = var.enable_reporting
33
source = "../modules/glue"
44
name_prefix = "nhsd-nrlf--prod"
5+
schedule = true
56
python_version = 3
67
}

terraform/account-wide-infrastructure/test/glue.tf

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,6 @@ module "test-glue" {
22
is_enabled = var.enable_reporting
33
source = "../modules/glue"
44
name_prefix = "nhsd-nrlf--test"
5+
schedule = false
56
python_version = 3
67
}

0 commit comments

Comments
 (0)