Skip to content

Commit 0ee2fd4

Browse files
authored
Merge pull request #846 from NHSDigital/feature/jale13-nrl-1320-update-reporting-infra
NRL 1320/1321 Update reporting infrastructure to take delta and resolve non standard cases
2 parents 86d8ad1 + ab32be9 commit 0ee2fd4

File tree

12 files changed

+2154
-639
lines changed

12 files changed

+2154
-639
lines changed

terraform/account-wide-infrastructure/modules/glue/LogSchemaGeneration/LogSchemaGeneration.ipynb

Lines changed: 357 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
# Log Schema Generation
2+
3+
The Glue script uses pyspark to process log data. Due to the structure of each json document inside of a log group differing, we need to account for this variance.
4+
5+
The notebook provides a way to automatically generate a pyspark schema for a log group without manual intervention. Point it at the desired group, and hit run all, then copy and paste the output into either producer_schema.py or consumer_schema.py.

terraform/account-wide-infrastructure/modules/glue/glue.tf

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ resource "aws_glue_crawler" "log_crawler" {
4040
path = "${aws_s3_bucket.target-data-bucket.id}/producer_updateDocumentReference/"
4141
}
4242
s3_target {
43-
path = "${aws_s3_bucket.target-data-bucket.id}/producer_upsertDocumentReference//"
43+
path = "${aws_s3_bucket.target-data-bucket.id}/producer_upsertDocumentReference/"
4444
}
4545
schema_change_policy {
4646
delete_behavior = "LOG"
@@ -64,10 +64,10 @@ resource "aws_glue_job" "glue_job" {
6464
name = "${var.name_prefix}-glue-job"
6565
role_arn = aws_iam_role.glue_service_role.arn
6666
description = "Transfer logs from source to bucket"
67-
glue_version = "4.0"
67+
glue_version = "5.0"
6868
worker_type = "G.1X"
6969
timeout = 2880
70-
max_retries = 1
70+
max_retries = 0
7171
number_of_workers = 2
7272
command {
7373
name = "glueetl"

terraform/account-wide-infrastructure/modules/glue/iam.tf

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,16 @@ data "aws_iam_policy_document" "glue_service" {
8080

8181
effect = "Allow"
8282
}
83+
84+
statement {
85+
actions = [
86+
"iam:PassRole",
87+
]
88+
effect = "Allow"
89+
resources = [
90+
"*"
91+
]
92+
}
8393
}
8494

8595
resource "aws_iam_policy" "glue_service" {
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
locals {
2+
s3 = {
3+
transition_storage = {
4+
infrequent_access = {
5+
storage_class = "STANDARD_IA"
6+
days = 150
7+
}
8+
glacier = {
9+
storage_class = "GLACIER"
10+
days = 200
11+
}
12+
}
13+
14+
expiration = {
15+
days = 1095
16+
}
17+
}
18+
}

terraform/account-wide-infrastructure/modules/glue/s3.tf

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,35 @@ resource "aws_s3_bucket_public_access_block" "source-data-bucket-public-access-b
5151
restrict_public_buckets = true
5252
}
5353

54+
resource "aws_s3_bucket_lifecycle_configuration" "source-data-bucket-lifecycle" {
55+
bucket = aws_s3_bucket.source-data-bucket.id
56+
57+
58+
rule {
59+
id = "bucket-versioning-rule"
60+
status = "Enabled"
61+
62+
transition {
63+
days = local.s3.transition_storage.infrequent_access.days
64+
storage_class = local.s3.transition_storage.infrequent_access.storage_class
65+
}
66+
transition {
67+
days = local.s3.transition_storage.glacier.days
68+
storage_class = local.s3.transition_storage.glacier.storage_class
69+
}
70+
expiration {
71+
days = local.s3.expiration.days
72+
}
73+
}
74+
}
75+
76+
resource "aws_s3_bucket_versioning" "source-data-bucket-versioning" {
77+
bucket = aws_s3_bucket.source-data-bucket.id
78+
versioning_configuration {
79+
status = "Enabled"
80+
}
81+
}
82+
5483

5584
# S3 Bucket for Processed Data
5685
resource "aws_s3_bucket" "target-data-bucket" {

terraform/account-wide-infrastructure/modules/glue/src/consumer_schemas.py

Lines changed: 334 additions & 179 deletions
Large diffs are not rendered by default.

terraform/account-wide-infrastructure/modules/glue/src/instances.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@ class GlueContextSingleton:
1212
def __new__(cls, spark_context):
1313
if not cls._instance:
1414
cls._instance = super().__new__(cls)
15-
cls._instance.spark = SparkSession.builder.getOrCreate()
15+
cls._instance.spark = SparkSession.builder.config(
16+
"spark.sql.caseSensitive", "true"
17+
).getOrCreate()
1618
cls._instance.context = GlueContext(spark_context)
1719
return cls._instance
1820

terraform/account-wide-infrastructure/modules/glue/src/main.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from pipeline import LogPipeline
66
from producer_schemas import producerSchemaList
77
from pyspark.context import SparkContext
8-
from transformations import dtype_conversion, flatten_df
8+
from transformations import dtype_conversion, flatten_df, resolve_dupes
99

1010
# Get arguments from AWS Glue job
1111
args = getResolvedOptions(
@@ -27,7 +27,7 @@
2727
schemas=consumerSchemaList,
2828
job_name=args["job_name"],
2929
partition_cols=partition_cols,
30-
transformations=[flatten_df, dtype_conversion],
30+
transformations=[flatten_df, resolve_dupes, dtype_conversion],
3131
)
3232

3333
# Run the job

terraform/account-wide-infrastructure/modules/glue/src/pipeline.py

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import time
2+
13
import boto3
24
from instances import GlueContextSingleton, LoggerSingleton
35
from pyspark.sql.functions import col
@@ -28,6 +30,7 @@ def __init__(
2830
region_name="eu-west-2",
2931
endpoint_url="https://glue.eu-west-2.amazonaws.com",
3032
)
33+
self.job_name = job_name
3134
self.name_prefix = "-".join(job_name.split("-")[:4])
3235

3336
def run(self):
@@ -47,16 +50,33 @@ def run(self):
4750
self.logger.error(f"ETL process failed: {e}")
4851
raise e
4952

53+
def get_last_run(self):
54+
all_runs = self.glue.get_job_runs(JobName=self.job_name)
55+
if not all_runs["JobRuns"]:
56+
return None
57+
58+
for run in all_runs["JobRuns"]:
59+
if run["JobRunState"] == "SUCCEEDED":
60+
return time.mktime(run["StartedOn"].timetuple())
61+
5062
def extract(self):
5163
"""Extract JSON data from S3"""
5264
self.logger.info(f"Extracting data from {self.source_path} as JSON")
65+
last_runtime = self.get_last_run()
5366
data = {}
5467
for name, schema in self.schemas.items():
55-
data[name] = (
56-
self.spark.read.option("recursiveFileLookup", "true")
57-
.schema(schema)
58-
.json(self.source_path)
59-
).where(col("host").contains(name))
68+
if last_runtime:
69+
data[name] = (
70+
self.spark.read.option("recursiveFileLookup", "true")
71+
.schema(schema)
72+
.json(self.source_path)
73+
).where((col("host").contains(name)) & (col("time") > last_runtime))
74+
else:
75+
data[name] = (
76+
self.spark.read.option("recursiveFileLookup", "true")
77+
.schema(schema)
78+
.json(self.source_path)
79+
).where(col("host").contains(name))
6080
return data
6181

6282
def transform(self, dataframe):

0 commit comments

Comments
 (0)