Skip to content

Commit 71816b2

Browse files
committed
NRL-1187 update glue script to format data, update infrastructure for athena
1 parent 47e7516 commit 71816b2

File tree

8 files changed

+153
-40
lines changed

8 files changed

+153
-40
lines changed

terraform/account-wide-infrastructure/modules/athena/athena.tf

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
1-
resource "aws_athena_database" "reporting-db" {
2-
name = var.database
1+
# resource "aws_athena_database" "reporting-db" {
2+
# name = var.database
33

4-
bucket = var.target_bucket_name
4+
# bucket = var.target_bucket_name
55

6-
encryption_configuration {
7-
encryption_option = "SSE_KMS"
8-
kms_key = aws_kms_key.athena.arn
9-
}
6+
# encryption_configuration {
7+
# encryption_option = "SSE_KMS"
8+
# kms_key = aws_kms_key.athena.arn
9+
# }
1010

11-
force_destroy = true
12-
}
11+
# force_destroy = true
12+
# }
1313

1414
resource "aws_athena_workgroup" "athena" {
1515
name = "${var.name_prefix}-athena-wg"

terraform/account-wide-infrastructure/modules/athena/outputs.tf

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,6 @@ output "bucket" {
66
value = aws_s3_bucket.athena
77
}
88

9-
output "database" {
10-
value = aws_athena_database.reporting-db
11-
}
9+
# output "database" {
10+
# value = aws_athena_database.reporting-db
11+
# }

terraform/account-wide-infrastructure/modules/glue/glue.tf

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,16 @@
11
# Create Glue Data Catalog Database
2-
resource "aws_glue_catalog_database" "raw_log_database" {
3-
name = "${var.name_prefix}-raw_log"
4-
location_uri = "${aws_s3_bucket.source-data-bucket.id}/"
2+
resource "aws_glue_catalog_database" "log_database" {
3+
name = "${var.name_prefix}-reporting"
4+
location_uri = "${aws_s3_bucket.target-data-bucket.id}/logs/"
55
}
66

77
# Create Glue Crawler
8-
resource "aws_glue_crawler" "raw_log_crawler" {
9-
name = "${var.name_prefix}-raw-log-crawler"
10-
database_name = aws_glue_catalog_database.raw_log_database.name
8+
resource "aws_glue_crawler" "log_crawler" {
9+
name = "${var.name_prefix}-log-crawler"
10+
database_name = aws_glue_catalog_database.log_database.name
1111
role = aws_iam_role.glue_service_role.name
1212
s3_target {
13-
path = "${aws_s3_bucket.source-data-bucket.id}/"
13+
path = "${aws_s3_bucket.target-data-bucket.id}/logs/"
1414
}
1515
schema_change_policy {
1616
delete_behavior = "LOG"
@@ -22,11 +22,11 @@ resource "aws_glue_crawler" "raw_log_crawler" {
2222
}
2323
})
2424
}
25-
resource "aws_glue_trigger" "raw_log_trigger" {
25+
resource "aws_glue_trigger" "log_trigger" {
2626
name = "${var.name_prefix}-org-report-trigger"
2727
type = "ON_DEMAND"
2828
actions {
29-
crawler_name = aws_glue_crawler.raw_log_crawler.name
29+
crawler_name = aws_glue_crawler.log_crawler.name
3030
}
3131
}
3232

@@ -49,9 +49,9 @@ resource "aws_glue_job" "glue_job" {
4949
"--enable-auto-scaling" = "true"
5050
"--enable-continous-cloudwatch-log" = "true"
5151
"--datalake-formats" = "delta"
52-
"--source-path" = "s3://${aws_s3_bucket.source-data-bucket.id}/" # Specify the source S3 path
53-
"--destination-path" = "s3://${aws_s3_bucket.target-data-bucket.id}/" # Specify the destination S3 path
54-
"--job-name" = "poc-glue-job"
52+
"--source_path" = "s3://${aws_s3_bucket.target-data-bucket.id}/" # Specify the source S3 path
53+
"--target_path" = "s3://${aws_s3_bucket.target-data-bucket.id}/logs" # Specify the destination S3 path
54+
"--job_name" = "poc-glue-job"
5555
"--enable-continuous-log-filter" = "true"
5656
"--enable-metrics" = "true"
5757
"--extra-py-files" = "s3://${aws_s3_bucket.code-bucket.id}/src.zip"

terraform/account-wide-infrastructure/modules/glue/iam.tf

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,57 @@ resource "aws_iam_role" "glue_service_role" {
1515
})
1616
}
1717

18+
data "aws_iam_policy_document" "glue_service" {
19+
statement {
20+
actions = [
21+
"s3:AbortMultipartUpload",
22+
"s3:GetBucketLocation",
23+
"s3:GetObject",
24+
"s3:ListBucket",
25+
"s3:ListBucketMultipartUploads",
26+
"s3:PutObject",
27+
"s3:DeleteObject",
28+
]
29+
30+
resources = compact([
31+
aws_s3_bucket.source-data-bucket.arn,
32+
"${aws_s3_bucket.source-data-bucket.arn}/*",
33+
aws_s3_bucket.target-data-bucket.arn,
34+
"${aws_s3_bucket.target-data-bucket.arn}/*",
35+
aws_s3_bucket.code-bucket.arn,
36+
"${aws_s3_bucket.code-bucket.arn}/*",
37+
])
38+
effect = "Allow"
39+
}
40+
41+
statement {
42+
actions = [
43+
"kms:DescribeKey",
44+
"kms:GenerateDataKey*",
45+
"kms:Encrypt",
46+
"kms:ReEncrypt*",
47+
"kms:Decrypt",
48+
]
49+
50+
resources = [
51+
aws_kms_key.glue.arn,
52+
]
53+
54+
effect = "Allow"
55+
}
56+
}
57+
58+
resource "aws_iam_policy" "glue_service" {
59+
name = "${var.name_prefix}-glue"
60+
policy = data.aws_iam_policy_document.glue_service.json
61+
}
62+
1863
resource "aws_iam_role_policy_attachment" "glue_service" {
19-
role = aws_iam_role.glue_service_role.id
20-
policy_arn = "arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole"
64+
role = aws_iam_role.glue_service_role.name
65+
policy_arn = aws_iam_policy.glue_service.arn
2166
}
67+
68+
# resource "aws_iam_role_policy_attachment" "glue_service" {
69+
# role = aws_iam_role.glue_service_role.id
70+
# policy_arn = "arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole"
71+
# }

terraform/account-wide-infrastructure/modules/glue/s3.tf

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ data "archive_file" "python" {
174174

175175
resource "aws_s3_object" "zip" {
176176
bucket = aws_s3_bucket.code-bucket.bucket
177-
key = "main.py"
178-
source = "${path.module}/files/src.zip"
177+
key = "src.zip"
178+
source = data.archive_file.python.output_path
179+
etag = filemd5(data.archive_file.python.output_path)
179180
}

terraform/account-wide-infrastructure/modules/glue/src/main.py

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,24 @@
11
import sys
22

33
from awsglue.utils import getResolvedOptions
4+
from pipeline import LogPipeline
45
from pyspark.context import SparkContext
5-
from src.pipeline import LogPipeline
6-
from src.transformations import placeholder
6+
from transformations import logSchema, placeholder
77

88
# Get arguments from AWS Glue job
9-
args = getResolvedOptions(
10-
sys.argv, ["JOB_NAME", "SOURCE_PATH", "TARGET_PATH", "PARTITION_COLS"]
11-
)
9+
args = getResolvedOptions(sys.argv, ["job_name", "source_path", "target_path"])
1210

1311
# Start Glue context
1412
sc = SparkContext()
1513

16-
partition_cols = args["PARTITION_COLS"].split(",") if "PARTITION_COLS" in args else []
14+
partition_cols = args["partition_cols"].split(",") if "partition_cols" in args else []
1715

1816
# Initialize ETL process
1917
etl_job = LogPipeline(
2018
spark_context=sc,
21-
source_path=args["SOURCE_PATH"],
22-
target_path=args["TARGET_PATH"],
19+
source_path=args["source_path"],
20+
target_path=args["target_path"],
21+
schema=logSchema,
2322
partition_cols=partition_cols,
2423
transformations=[placeholder],
2524
)

terraform/account-wide-infrastructure/modules/glue/src/pipeline.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from src.instances import GlueContextSingleton, LoggerSingleton
1+
from instances import GlueContextSingleton, LoggerSingleton
22

33

44
class LogPipeline:
@@ -7,7 +7,8 @@ def __init__(
77
spark_context,
88
source_path,
99
target_path,
10-
partition_cols=None,
10+
schema,
11+
partition_cols=[],
1112
transformations=[],
1213
):
1314
"""Initialize Glue context, Spark session, logger, and paths"""
@@ -16,6 +17,7 @@ def __init__(
1617
self.logger = LoggerSingleton().logger
1718
self.source_path = source_path
1819
self.target_path = target_path
20+
self.schema = schema
1921
self.partition_cols = partition_cols
2022
self.transformations = transformations
2123

@@ -36,7 +38,7 @@ def run(self):
3638
def extract(self):
3739
"""Extract JSON data from S3"""
3840
self.logger.info(f"Extracting data from {self.source_path} as JSON")
39-
return self.spark.read.json(self.source_path)
41+
return self.spark.read.schema(self.schema).json(self.source_path)
4042

4143
def transform(self, dataframe):
4244
"""Apply a list of transformations on the dataframe"""
Lines changed: 62 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,62 @@
1-
def placeholder(): ...
1+
from pyspark.sql.types import (
2+
BooleanType,
3+
StringType,
4+
StructField,
5+
StructType,
6+
TimestampType,
7+
)
8+
9+
logSchema = StructType(
10+
[
11+
StructField("time", TimestampType(), True),
12+
StructField("index", StringType(), True),
13+
StructField("host", StringType(), True),
14+
StructField("source", StringType(), True),
15+
StructField(
16+
"event",
17+
StructType(
18+
[
19+
StructField("level", StringType(), True),
20+
StructField("location", StringType(), True),
21+
StructField("message", StringType(), True),
22+
StructField("timestamp", TimestampType(), True),
23+
StructField("service", StringType(), True),
24+
StructField("cold_start", BooleanType(), True),
25+
StructField("function_name", StringType(), True),
26+
StructField("function_memory_size", StringType(), True),
27+
StructField("function_arn", StringType(), True),
28+
StructField("function_request_id", StringType(), True),
29+
StructField("correlation_id", StringType(), True),
30+
StructField("method", StringType(), True),
31+
StructField("path", StringType(), True),
32+
StructField(
33+
"headers",
34+
StructType(
35+
[
36+
StructField("accept", StringType(), True),
37+
StructField("accept-encoding", StringType(), True),
38+
StructField("Authorization", StringType(), True),
39+
StructField("Host", StringType(), True),
40+
StructField(
41+
"NHSD-Connection-Metadata", StringType(), True
42+
),
43+
StructField("NHSD-Correlation-Id", StringType(), True),
44+
StructField("User-Agent", StringType(), True),
45+
StructField("X-Forwarded-For", StringType(), True),
46+
StructField("X-Request-Id", StringType(), True),
47+
]
48+
),
49+
True,
50+
),
51+
StructField("log_reference", StringType(), True),
52+
StructField("xray_trace_id", StringType(), True),
53+
]
54+
),
55+
True,
56+
),
57+
]
58+
)
59+
60+
61+
def placeholder(df):
62+
return df

0 commit comments

Comments
 (0)