Skip to content

Commit 8f8f085

Browse files
Merge pull request #779 from NHSDigital/feature/made14-NRL-1231-delete-invalid-pointers
[NRL-1231] Add script that can cleanup invalid pointers
2 parents b329554 + 9d61e7a commit 8f8f085

File tree

1 file changed

+150
-0
lines changed

1 file changed

+150
-0
lines changed
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
from datetime import datetime, timedelta, timezone
2+
from typing import Any
3+
4+
import boto3
5+
import fire
6+
7+
from nrlf.consumer.fhir.r4.model import DocumentReference
8+
from nrlf.core.logger import logger
9+
from nrlf.core.validators import DocumentReferenceValidator
10+
11+
dynamodb = boto3.client("dynamodb")
12+
paginator = dynamodb.get_paginator("scan")
13+
resource = boto3.resource("dynamodb")
14+
15+
logger.setLevel("ERROR")
16+
17+
18+
def _validate_document(document: str):
19+
docref = DocumentReference.model_validate_json(document)
20+
21+
validator = DocumentReferenceValidator()
22+
result = validator.validate(data=docref)
23+
24+
if not result.is_valid:
25+
raise RuntimeError("Failed to validate document: " + str(result.issues))
26+
27+
28+
def _find_invalid_pointers(table_name: str) -> dict[str, Any]:
29+
print(f"Finding invalid pointers to delete in table {table_name}....")
30+
31+
params: dict[str, Any] = {
32+
"TableName": table_name,
33+
"PaginationConfig": {"PageSize": 50},
34+
}
35+
36+
invalid_pointers = []
37+
total_scanned_count = 0
38+
39+
start_time = datetime.now(tz=timezone.utc)
40+
41+
for page in paginator.paginate(**params):
42+
for item in page["Items"]:
43+
pointer_id = item.get("id", {}).get("S")
44+
document = item.get("document", {}).get("S", "")
45+
try:
46+
_validate_document(document)
47+
except Exception as exc:
48+
invalid_pointers.append((pointer_id, exc))
49+
50+
total_scanned_count += page["ScannedCount"]
51+
52+
if total_scanned_count % 1000 == 0:
53+
print(".", end="", flush=True)
54+
55+
if total_scanned_count % 100000 == 0:
56+
print(f"scanned={total_scanned_count} invalid={len(invalid_pointers)}")
57+
58+
end_time = datetime.now(tz=timezone.utc)
59+
60+
print(f" Done. Found {len(invalid_pointers)} invalid pointers")
61+
62+
if len(invalid_pointers) > 0:
63+
print("Writing invalid pointers IDs to file ./invalid_pointers.txt ...")
64+
with open("invalid_pointers.txt", "w") as f:
65+
for _id, err in invalid_pointers:
66+
f.write(f"{_id}: {err}\n")
67+
68+
return {
69+
"invalid_pointers": invalid_pointers,
70+
"scanned_count": total_scanned_count,
71+
"find-took-secs": timedelta.total_seconds(end_time - start_time),
72+
}
73+
74+
75+
def _delete_pointers(table_name: str, pointers_to_delete: list[str]) -> dict[str, Any]:
76+
"""
77+
Delete the provided pointers from the given table.
78+
"""
79+
start_time = datetime.now(tz=timezone.utc)
80+
81+
print("Deleting invalid pointers...")
82+
pointers_deleted = 0
83+
failed_to_delete = 0
84+
85+
for _batch_id in range(0, len(pointers_to_delete), 25):
86+
batch = [
87+
{
88+
"DeleteRequest": {
89+
"Key": {
90+
"pk": {"S": f"D#{pointer_id}"},
91+
"sk": {"S": f"D#{pointer_id}"},
92+
}
93+
}
94+
}
95+
for pointer_id in pointers_to_delete[_batch_id : _batch_id + 25]
96+
]
97+
98+
result = dynamodb.batch_write_item(RequestItems={table_name: batch})
99+
100+
unprocessed_items = len(result.get("UnprocessedItems", []))
101+
pointers_deleted += 25 - unprocessed_items
102+
failed_to_delete += unprocessed_items
103+
if pointers_deleted % 1000 == 0:
104+
print(".", end="", flush=True)
105+
106+
end_time = datetime.now(tz=timezone.utc)
107+
108+
print(" Done")
109+
return {
110+
"pointers_to_delete": len(pointers_to_delete),
111+
"deleted_pointers": pointers_deleted,
112+
"failed_deletes": failed_to_delete,
113+
"deletes-took-secs": timedelta.total_seconds(end_time - start_time),
114+
}
115+
116+
117+
def _find_and_delete_invalid_pointers(table_name: str) -> dict[str, float | int]:
118+
"""
119+
Find and delete any pointers in the given table that are invalid based on the FHIR model and NRLF validators.
120+
Parameters:
121+
- table_name: The name of the pointers table to find and delete pointer from.
122+
"""
123+
find_result = _find_invalid_pointers(table_name)
124+
125+
if len(find_result["invalid_pointers"]) == 0:
126+
return {
127+
"invalid_pointers": 0,
128+
"scanned_count": find_result["scanned_count"],
129+
"find-took-secs": find_result["find-took-secs"],
130+
}
131+
132+
confirmation_input = input(
133+
"Would you like to delete all the invalid pointers? (yes/no): "
134+
)
135+
if confirmation_input != "yes":
136+
print("Invalid pointers NOT deleted.")
137+
find_result.pop("invalid_pointers")
138+
return find_result
139+
140+
pointers_to_delete = [_id for _id, _ in find_result["invalid_pointers"]]
141+
142+
delete_result = _delete_pointers(table_name, pointers_to_delete)
143+
144+
find_result.pop("invalid_pointers")
145+
146+
return {**find_result, **delete_result}
147+
148+
149+
if __name__ == "__main__":
150+
fire.Fire(_find_and_delete_invalid_pointers)

0 commit comments

Comments
 (0)