Skip to content

Commit c4e4698

Browse files
authored
Merge pull request #798 from NHSDigital/feature/jale13-nrl-1187-powerbi-connections
NRL 1187 powerbi connections
2 parents 1fb83fa + 092794f commit c4e4698

File tree

17 files changed

+285
-119
lines changed

17 files changed

+285
-119
lines changed

terraform/account-wide-infrastructure/modules/athena/athena.tf

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,3 @@
1-
resource "aws_athena_database" "reporting-db" {
2-
name = var.database
3-
4-
bucket = var.target_bucket_name
5-
6-
encryption_configuration {
7-
encryption_option = "SSE_KMS"
8-
kms_key = aws_kms_key.athena.arn
9-
}
10-
11-
force_destroy = true
12-
}
13-
141
resource "aws_athena_workgroup" "athena" {
152
name = "${var.name_prefix}-athena-wg"
163

@@ -19,7 +6,7 @@ resource "aws_athena_workgroup" "athena" {
196
publish_cloudwatch_metrics_enabled = true
207

218
result_configuration {
22-
output_location = "s3://{aws_s3_bucket.athena.bucket}/output/"
9+
output_location = "s3://${aws_s3_bucket.athena.id}/output/"
2310

2411
encryption_configuration {
2512
encryption_option = "SSE_KMS"

terraform/account-wide-infrastructure/modules/athena/outputs.tf

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,3 @@ output "workgroup" {
55
output "bucket" {
66
value = aws_s3_bucket.athena
77
}
8-
9-
output "database" {
10-
value = aws_athena_database.reporting-db
11-
}

terraform/account-wide-infrastructure/modules/glue/glue.tf

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,16 @@
11
# Create Glue Data Catalog Database
2-
resource "aws_glue_catalog_database" "raw_log_database" {
3-
name = "${var.name_prefix}-raw_log"
4-
location_uri = "${aws_s3_bucket.source-data-bucket.id}/"
2+
resource "aws_glue_catalog_database" "log_database" {
3+
name = "${var.name_prefix}-reporting"
4+
location_uri = "${aws_s3_bucket.target-data-bucket.id}/logs/"
55
}
66

77
# Create Glue Crawler
8-
resource "aws_glue_crawler" "raw_log_crawler" {
9-
name = "${var.name_prefix}-raw-log-crawler"
10-
database_name = aws_glue_catalog_database.raw_log_database.name
8+
resource "aws_glue_crawler" "log_crawler" {
9+
name = "${var.name_prefix}-log-crawler"
10+
database_name = aws_glue_catalog_database.log_database.name
1111
role = aws_iam_role.glue_service_role.name
1212
s3_target {
13-
path = "${aws_s3_bucket.source-data-bucket.id}/"
13+
path = "${aws_s3_bucket.target-data-bucket.id}/logs/"
1414
}
1515
schema_change_policy {
1616
delete_behavior = "LOG"
@@ -22,11 +22,11 @@ resource "aws_glue_crawler" "raw_log_crawler" {
2222
}
2323
})
2424
}
25-
resource "aws_glue_trigger" "raw_log_trigger" {
25+
resource "aws_glue_trigger" "log_trigger" {
2626
name = "${var.name_prefix}-org-report-trigger"
2727
type = "ON_DEMAND"
2828
actions {
29-
crawler_name = aws_glue_crawler.raw_log_crawler.name
29+
crawler_name = aws_glue_crawler.log_crawler.name
3030
}
3131
}
3232

@@ -49,9 +49,9 @@ resource "aws_glue_job" "glue_job" {
4949
"--enable-auto-scaling" = "true"
5050
"--enable-continous-cloudwatch-log" = "true"
5151
"--datalake-formats" = "delta"
52-
"--source-path" = "s3://${aws_s3_bucket.source-data-bucket.id}/" # Specify the source S3 path
53-
"--destination-path" = "s3://${aws_s3_bucket.target-data-bucket.id}/" # Specify the destination S3 path
54-
"--job-name" = "poc-glue-job"
52+
"--source_path" = "s3://${aws_s3_bucket.source-data-bucket.id}/" # Specify the source S3 path
53+
"--target_path" = "s3://${aws_s3_bucket.target-data-bucket.id}/logs" # Specify the destination S3 path
54+
"--job_name" = "poc-glue-job"
5555
"--enable-continuous-log-filter" = "true"
5656
"--enable-metrics" = "true"
5757
"--extra-py-files" = "s3://${aws_s3_bucket.code-bucket.id}/src.zip"

terraform/account-wide-infrastructure/modules/glue/iam.tf

Lines changed: 74 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,79 @@ resource "aws_iam_role" "glue_service_role" {
1515
})
1616
}
1717

18+
data "aws_iam_policy_document" "glue_service" {
19+
statement {
20+
actions = [
21+
"s3:AbortMultipartUpload",
22+
"s3:GetBucketLocation",
23+
"s3:GetObject",
24+
"s3:ListBucket",
25+
"s3:ListBucketMultipartUploads",
26+
"s3:PutObject",
27+
"s3:DeleteObject",
28+
]
29+
30+
resources = compact([
31+
aws_s3_bucket.source-data-bucket.arn,
32+
"${aws_s3_bucket.source-data-bucket.arn}/*",
33+
aws_s3_bucket.target-data-bucket.arn,
34+
"${aws_s3_bucket.target-data-bucket.arn}/*",
35+
aws_s3_bucket.code-bucket.arn,
36+
"${aws_s3_bucket.code-bucket.arn}/*",
37+
])
38+
effect = "Allow"
39+
}
40+
41+
statement {
42+
actions = [
43+
"kms:DescribeKey",
44+
"kms:GenerateDataKey*",
45+
"kms:Encrypt",
46+
"kms:ReEncrypt*",
47+
"kms:Decrypt",
48+
]
49+
50+
resources = [
51+
aws_kms_key.glue.arn,
52+
]
53+
54+
effect = "Allow"
55+
}
56+
57+
statement {
58+
actions = [
59+
"logs:CreateLogGroup",
60+
"logs:CreateLogStream",
61+
"logs:PutLogEvents"
62+
]
63+
64+
resources = [
65+
"arn:aws:logs:*:*:*:/aws-glue/*",
66+
# "arn:aws:logs:*:*:*:/customlogs/*"
67+
]
68+
69+
effect = "Allow"
70+
}
71+
72+
statement {
73+
actions = [
74+
"glue:*",
75+
]
76+
77+
resources = [
78+
"*"
79+
]
80+
81+
effect = "Allow"
82+
}
83+
}
84+
85+
resource "aws_iam_policy" "glue_service" {
86+
name = "${var.name_prefix}-glue"
87+
policy = data.aws_iam_policy_document.glue_service.json
88+
}
89+
1890
resource "aws_iam_role_policy_attachment" "glue_service" {
19-
role = aws_iam_role.glue_service_role.id
20-
policy_arn = "arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole"
91+
role = aws_iam_role.glue_service_role.name
92+
policy_arn = aws_iam_policy.glue_service.arn
2193
}

terraform/account-wide-infrastructure/modules/glue/s3.tf

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ data "archive_file" "python" {
174174

175175
resource "aws_s3_object" "zip" {
176176
bucket = aws_s3_bucket.code-bucket.bucket
177-
key = "main.py"
178-
source = "${path.module}/files/src.zip"
177+
key = "src.zip"
178+
source = data.archive_file.python.output_path
179+
etag = filemd5(data.archive_file.python.output_path)
179180
}

terraform/account-wide-infrastructure/modules/glue/src/main.py

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,26 @@
11
import sys
22

33
from awsglue.utils import getResolvedOptions
4+
from pipeline import LogPipeline
45
from pyspark.context import SparkContext
5-
from src.pipeline import LogPipeline
6-
from src.transformations import placeholder
6+
from transformations import dtype_conversion, flatten_df, logSchema
77

88
# Get arguments from AWS Glue job
9-
args = getResolvedOptions(
10-
sys.argv, ["JOB_NAME", "SOURCE_PATH", "TARGET_PATH", "PARTITION_COLS"]
11-
)
9+
args = getResolvedOptions(sys.argv, ["job_name", "source_path", "target_path"])
1210

1311
# Start Glue context
1412
sc = SparkContext()
1513

16-
partition_cols = args["PARTITION_COLS"].split(",") if "PARTITION_COLS" in args else []
14+
partition_cols = args["partition_cols"].split(",") if "partition_cols" in args else []
1715

1816
# Initialize ETL process
1917
etl_job = LogPipeline(
2018
spark_context=sc,
21-
source_path=args["SOURCE_PATH"],
22-
target_path=args["TARGET_PATH"],
19+
source_path=args["source_path"],
20+
target_path=args["target_path"],
21+
schema=logSchema,
2322
partition_cols=partition_cols,
24-
transformations=[placeholder],
23+
transformations=[flatten_df, dtype_conversion],
2524
)
2625

2726
# Run the job

terraform/account-wide-infrastructure/modules/glue/src/pipeline.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from src.instances import GlueContextSingleton, LoggerSingleton
1+
from instances import GlueContextSingleton, LoggerSingleton
22

33

44
class LogPipeline:
@@ -7,7 +7,8 @@ def __init__(
77
spark_context,
88
source_path,
99
target_path,
10-
partition_cols=None,
10+
schema,
11+
partition_cols=[],
1112
transformations=[],
1213
):
1314
"""Initialize Glue context, Spark session, logger, and paths"""
@@ -16,6 +17,7 @@ def __init__(
1617
self.logger = LoggerSingleton().logger
1718
self.source_path = source_path
1819
self.target_path = target_path
20+
self.schema = schema
1921
self.partition_cols = partition_cols
2022
self.transformations = transformations
2123

@@ -36,7 +38,11 @@ def run(self):
3638
def extract(self):
3739
"""Extract JSON data from S3"""
3840
self.logger.info(f"Extracting data from {self.source_path} as JSON")
39-
return self.spark.read.json(self.source_path)
41+
return (
42+
self.spark.read.option("recursiveFileLookup", "true")
43+
.schema(self.schema)
44+
.json(self.source_path)
45+
)
4046

4147
def transform(self, dataframe):
4248
"""Apply a list of transformations on the dataframe"""
@@ -48,6 +54,6 @@ def transform(self, dataframe):
4854
def load(self, dataframe):
4955
"""Load transformed data into Parquet format"""
5056
self.logger.info(f"Loading data into {self.target_path} as Parquet")
51-
dataframe.write.mode("overwrite").partitionBy(*self.partition_cols).parquet(
57+
dataframe.write.mode("append").partitionBy(*self.partition_cols).parquet(
5258
self.target_path
5359
)
Lines changed: 76 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,76 @@
1-
def placeholder(): ...
1+
from pyspark.sql.functions import to_timestamp
2+
from pyspark.sql.types import (
3+
BooleanType,
4+
StringType,
5+
StructField,
6+
StructType,
7+
TimestampType,
8+
)
9+
10+
logSchema = StructType(
11+
[
12+
StructField("time", TimestampType(), True),
13+
StructField("index", StringType(), True),
14+
StructField("host", StringType(), True),
15+
StructField("source", StringType(), True),
16+
StructField(
17+
"event",
18+
StructType(
19+
[
20+
StructField("level", StringType(), True),
21+
StructField("location", StringType(), True),
22+
StructField("message", StringType(), True),
23+
StructField("timestamp", StringType(), True),
24+
StructField("service", StringType(), True),
25+
StructField("cold_start", BooleanType(), True),
26+
StructField("function_name", StringType(), True),
27+
StructField("function_memory_size", StringType(), True),
28+
StructField("function_arn", StringType(), True),
29+
StructField("function_request_id", StringType(), True),
30+
StructField("correlation_id", StringType(), True),
31+
StructField("method", StringType(), True),
32+
StructField("path", StringType(), True),
33+
StructField(
34+
"headers",
35+
StructType(
36+
[
37+
StructField("accept", StringType(), True),
38+
StructField("accept-encoding", StringType(), True),
39+
StructField("Authorization", StringType(), True),
40+
StructField("Host", StringType(), True),
41+
StructField(
42+
"NHSD-Connection-Metadata", StringType(), True
43+
),
44+
StructField("NHSD-Correlation-Id", StringType(), True),
45+
StructField("User-Agent", StringType(), True),
46+
StructField("X-Forwarded-For", StringType(), True),
47+
StructField("X-Request-Id", StringType(), True),
48+
]
49+
),
50+
True,
51+
),
52+
StructField("log_reference", StringType(), True),
53+
StructField("xray_trace_id", StringType(), True),
54+
]
55+
),
56+
True,
57+
),
58+
]
59+
)
60+
61+
62+
def flatten_df(df):
63+
cols = []
64+
for c in df.dtypes:
65+
if "struct" in c[1]:
66+
nested_col = c[0]
67+
else:
68+
cols.append(c[0])
69+
return df.select(*cols, f"{nested_col}.*")
70+
71+
72+
def dtype_conversion(df):
73+
df = df.withColumn(
74+
"timestamp", to_timestamp(df["timestamp"], "yyyy-MM-dd HH:mm:ss,SSSXXX")
75+
)
76+
return df

terraform/infrastructure/data.tf

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,11 @@ data "external" "current-info" {
4343
}
4444

4545
data "aws_s3_bucket" "source-data-bucket" {
46-
count = local.is_dev_env ? 1 : 0
46+
count = local.is_dev_env && !local.is_sandbox_env ? 1 : 0
4747
bucket = "${local.shared_prefix}-source-data-bucket"
4848
}
49+
50+
data "aws_kms_key" "glue" {
51+
count = local.is_dev_env && !local.is_sandbox_env ? 1 : 0
52+
key_id = "alias/${local.shared_prefix}-glue"
53+
}

terraform/infrastructure/firehose.tf

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,5 +9,6 @@ module "firehose__processor" {
99
splunk_index = local.splunk_index
1010
destination = "splunk"
1111
reporting_bucket_arn = local.reporting_bucket_arn
12-
reporting_infra_toggle = local.is_dev_env
12+
reporting_kms_arn = local.reporting_kms_arn
13+
reporting_infra_toggle = local.is_dev_env && !local.is_sandbox_env
1314
}

0 commit comments

Comments
 (0)