Skip to content

Commit e29fec4

Browse files
committed
NRL-1188 Merge remote-tracking branch origin into feature/imaging
2 parents 96c3cc4 + d3cd4a7 commit e29fec4

File tree

29 files changed

+774
-16
lines changed

29 files changed

+774
-16
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@ override.tf.json
4242
*_override.tf
4343
*_override.tf.json
4444

45+
# Ignore output of data object
46+
terraform/account-wide-infrastructure/modules/glue/files/src.zip
47+
4548
# Include override files you do wish to add to version control using negated pattern
4649
#
4750
# !example_override.tf
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
from datetime import datetime, timedelta, timezone
2+
from typing import Any
3+
4+
import boto3
5+
import fire
6+
7+
from nrlf.consumer.fhir.r4.model import DocumentReference
8+
from nrlf.core.logger import logger
9+
from nrlf.core.validators import DocumentReferenceValidator
10+
11+
dynamodb = boto3.client("dynamodb")
12+
paginator = dynamodb.get_paginator("scan")
13+
resource = boto3.resource("dynamodb")
14+
15+
logger.setLevel("ERROR")
16+
17+
18+
def _validate_document(document: str):
19+
docref = DocumentReference.model_validate_json(document)
20+
21+
validator = DocumentReferenceValidator()
22+
result = validator.validate(data=docref)
23+
24+
if not result.is_valid:
25+
raise RuntimeError("Failed to validate document: " + str(result.issues))
26+
27+
28+
def _find_invalid_pointers(table_name: str) -> dict[str, Any]:
29+
print(f"Finding invalid pointers to delete in table {table_name}....")
30+
31+
params: dict[str, Any] = {
32+
"TableName": table_name,
33+
"PaginationConfig": {"PageSize": 50},
34+
}
35+
36+
invalid_pointers = []
37+
total_scanned_count = 0
38+
39+
start_time = datetime.now(tz=timezone.utc)
40+
41+
for page in paginator.paginate(**params):
42+
for item in page["Items"]:
43+
pointer_id = item.get("id", {}).get("S")
44+
document = item.get("document", {}).get("S", "")
45+
try:
46+
_validate_document(document)
47+
except Exception as exc:
48+
invalid_pointers.append((pointer_id, exc))
49+
50+
total_scanned_count += page["ScannedCount"]
51+
52+
if total_scanned_count % 1000 == 0:
53+
print(".", end="", flush=True)
54+
55+
if total_scanned_count % 100000 == 0:
56+
print(f"scanned={total_scanned_count} invalid={len(invalid_pointers)}")
57+
58+
end_time = datetime.now(tz=timezone.utc)
59+
60+
print(f" Done. Found {len(invalid_pointers)} invalid pointers")
61+
62+
if len(invalid_pointers) > 0:
63+
print("Writing invalid pointers IDs to file ./invalid_pointers.txt ...")
64+
with open("invalid_pointers.txt", "w") as f:
65+
for _id, err in invalid_pointers:
66+
f.write(f"{_id}: {err}\n")
67+
68+
return {
69+
"invalid_pointers": invalid_pointers,
70+
"scanned_count": total_scanned_count,
71+
"find-took-secs": timedelta.total_seconds(end_time - start_time),
72+
}
73+
74+
75+
def _delete_pointers(table_name: str, pointers_to_delete: list[str]) -> dict[str, Any]:
76+
"""
77+
Delete the provided pointers from the given table.
78+
"""
79+
start_time = datetime.now(tz=timezone.utc)
80+
81+
print("Deleting invalid pointers...")
82+
pointers_deleted = 0
83+
failed_to_delete = 0
84+
85+
for _batch_id in range(0, len(pointers_to_delete), 25):
86+
batch = [
87+
{
88+
"DeleteRequest": {
89+
"Key": {
90+
"pk": {"S": f"D#{pointer_id}"},
91+
"sk": {"S": f"D#{pointer_id}"},
92+
}
93+
}
94+
}
95+
for pointer_id in pointers_to_delete[_batch_id : _batch_id + 25]
96+
]
97+
98+
result = dynamodb.batch_write_item(RequestItems={table_name: batch})
99+
100+
unprocessed_items = len(result.get("UnprocessedItems", []))
101+
pointers_deleted += len(batch) - unprocessed_items
102+
failed_to_delete += unprocessed_items
103+
if pointers_deleted % 1000 == 0:
104+
print(".", end="", flush=True)
105+
106+
end_time = datetime.now(tz=timezone.utc)
107+
108+
print(" Done")
109+
return {
110+
"pointers_to_delete": len(pointers_to_delete),
111+
"deleted_pointers": pointers_deleted,
112+
"failed_deletes": failed_to_delete,
113+
"deletes-took-secs": timedelta.total_seconds(end_time - start_time),
114+
}
115+
116+
117+
def _find_and_delete_invalid_pointers(table_name: str) -> dict[str, float | int]:
118+
"""
119+
Find and delete any pointers in the given table that are invalid based on the FHIR model and NRLF validators.
120+
Parameters:
121+
- table_name: The name of the pointers table to find and delete pointer from.
122+
"""
123+
find_result = _find_invalid_pointers(table_name)
124+
125+
if len(find_result["invalid_pointers"]) == 0:
126+
return {
127+
"invalid_pointers": 0,
128+
"scanned_count": find_result["scanned_count"],
129+
"find-took-secs": find_result["find-took-secs"],
130+
}
131+
132+
confirmation_input = input(
133+
"Would you like to delete all the invalid pointers? (yes/no): "
134+
)
135+
if confirmation_input != "yes":
136+
print("Invalid pointers NOT deleted.")
137+
find_result.pop("invalid_pointers")
138+
return find_result
139+
140+
pointers_to_delete = [_id for _id, _ in find_result["invalid_pointers"]]
141+
142+
delete_result = _delete_pointers(table_name, pointers_to_delete)
143+
144+
find_result.pop("invalid_pointers")
145+
146+
return {**find_result, **delete_result}
147+
148+
149+
if __name__ == "__main__":
150+
fire.Fire(_find_and_delete_invalid_pointers)
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
module "dev-athena" {
2+
source = "../modules/athena"
3+
name_prefix = "nhsd-nrlf--dev"
4+
target_bucket_name = module.dev-glue.target_bucket_name
5+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
module "dev-glue" {
2+
source = "../modules/glue"
3+
name_prefix = "nhsd-nrlf--dev"
4+
python_version = 3
5+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
resource "aws_athena_database" "reporting-db" {
2+
name = var.database
3+
4+
bucket = var.target_bucket_name
5+
6+
encryption_configuration {
7+
encryption_option = "SSE_KMS"
8+
kms_key = aws_kms_key.athena.arn
9+
}
10+
11+
force_destroy = true
12+
}
13+
14+
resource "aws_athena_workgroup" "athena" {
15+
name = "${var.name_prefix}-athena-wg"
16+
17+
configuration {
18+
enforce_workgroup_configuration = true
19+
publish_cloudwatch_metrics_enabled = true
20+
21+
result_configuration {
22+
output_location = "s3://{aws_s3_bucket.athena.bucket}/output/"
23+
24+
encryption_configuration {
25+
encryption_option = "SSE_KMS"
26+
kms_key_arn = aws_kms_key.athena.arn
27+
}
28+
}
29+
}
30+
31+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
resource "aws_kms_key" "athena" {
2+
}
3+
4+
resource "aws_kms_alias" "athena" {
5+
name = "alias/${var.name_prefix}-athena"
6+
target_key_id = aws_kms_key.athena.key_id
7+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
output "workgroup" {
2+
value = aws_athena_workgroup.athena
3+
}
4+
5+
output "bucket" {
6+
value = aws_s3_bucket.athena
7+
}
8+
9+
output "database" {
10+
value = aws_athena_database.reporting-db
11+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
resource "aws_s3_bucket" "athena" {
2+
bucket = "${var.name_prefix}-athena"
3+
}
4+
5+
resource "aws_s3_bucket_policy" "athena" {
6+
bucket = "${var.name_prefix}-athena"
7+
8+
policy = jsonencode({
9+
Version = "2012-10-17"
10+
Id = "athena-policy"
11+
Statement = [
12+
{
13+
Sid = "HTTPSOnly"
14+
Effect = "Deny"
15+
Principal = {
16+
"AWS" : "*"
17+
}
18+
Action = "s3:*"
19+
Resource = [
20+
aws_s3_bucket.athena.arn,
21+
"${aws_s3_bucket.athena.arn}/*",
22+
]
23+
Condition = {
24+
Bool = {
25+
"aws:SecureTransport" = "false"
26+
}
27+
}
28+
},
29+
]
30+
})
31+
}
32+
33+
resource "aws_s3_bucket_public_access_block" "athena-public-access-block" {
34+
bucket = aws_s3_bucket.athena.id
35+
36+
block_public_acls = true
37+
block_public_policy = true
38+
ignore_public_acls = true
39+
restrict_public_buckets = true
40+
}
41+
42+
43+
resource "aws_s3_bucket_server_side_encryption_configuration" "athena" {
44+
bucket = aws_s3_bucket.athena.bucket
45+
rule {
46+
apply_server_side_encryption_by_default {
47+
sse_algorithm = "aws:kms"
48+
kms_master_key_id = aws_kms_key.athena.arn
49+
}
50+
}
51+
52+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
variable "database" {
2+
description = "What the db will be called"
3+
default = "nrl_reporting"
4+
}
5+
6+
variable "name_prefix" {
7+
type = string
8+
description = "The prefix to apply to all resources in the module."
9+
}
10+
11+
variable "target_bucket_name" {
12+
type = string
13+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
# Create Glue Data Catalog Database
2+
resource "aws_glue_catalog_database" "raw_log_database" {
3+
name = "${var.name_prefix}-raw_log"
4+
location_uri = "${aws_s3_bucket.source-data-bucket.id}/"
5+
}
6+
7+
# Create Glue Crawler
8+
resource "aws_glue_crawler" "raw_log_crawler" {
9+
name = "${var.name_prefix}-raw-log-crawler"
10+
database_name = aws_glue_catalog_database.raw_log_database.name
11+
role = aws_iam_role.glue_service_role.name
12+
s3_target {
13+
path = "${aws_s3_bucket.source-data-bucket.id}/"
14+
}
15+
schema_change_policy {
16+
delete_behavior = "LOG"
17+
}
18+
configuration = jsonencode({
19+
"Version" : 1.0,
20+
"Grouping" : {
21+
"TableGroupingPolicy" : "CombineCompatibleSchemas"
22+
}
23+
})
24+
}
25+
resource "aws_glue_trigger" "raw_log_trigger" {
26+
name = "${var.name_prefix}-org-report-trigger"
27+
type = "ON_DEMAND"
28+
actions {
29+
crawler_name = aws_glue_crawler.raw_log_crawler.name
30+
}
31+
}
32+
33+
resource "aws_glue_job" "glue_job" {
34+
name = "${var.name_prefix}-glue-job"
35+
role_arn = aws_iam_role.glue_service_role.arn
36+
description = "Transfer logs from source to bucket"
37+
glue_version = "4.0"
38+
worker_type = "G.1X"
39+
timeout = 2880
40+
max_retries = 1
41+
number_of_workers = 2
42+
command {
43+
name = "glueetl"
44+
python_version = var.python_version
45+
script_location = "s3://${aws_s3_bucket.code-bucket.id}/main.py"
46+
}
47+
48+
default_arguments = {
49+
"--enable-auto-scaling" = "true"
50+
"--enable-continous-cloudwatch-log" = "true"
51+
"--datalake-formats" = "delta"
52+
"--source-path" = "s3://${aws_s3_bucket.source-data-bucket.id}/" # Specify the source S3 path
53+
"--destination-path" = "s3://${aws_s3_bucket.target-data-bucket.id}/" # Specify the destination S3 path
54+
"--job-name" = "poc-glue-job"
55+
"--enable-continuous-log-filter" = "true"
56+
"--enable-metrics" = "true"
57+
"--extra-py-files" = "s3://${aws_s3_bucket.code-bucket.id}/src.zip"
58+
}
59+
}

0 commit comments

Comments
 (0)