Skip to content

Commit f96614f

Browse files
committed
[NRL-1231] Add script that can cleanup invalid pointers
1 parent fa1baef commit f96614f

File tree

1 file changed

+120
-0
lines changed

1 file changed

+120
-0
lines changed
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
from datetime import datetime, timedelta, timezone
2+
from typing import Any
3+
4+
import boto3
5+
import fire
6+
7+
from nrlf.consumer.fhir.r4.model import DocumentReference
8+
from nrlf.core.logger import logger
9+
from nrlf.core.validators import DocumentReferenceValidator
10+
11+
dynamodb = boto3.client("dynamodb")
12+
paginator = dynamodb.get_paginator("scan")
13+
14+
logger.setLevel("ERROR")
15+
16+
17+
def _validate_document(document: str):
18+
docref = DocumentReference.model_validate_json(document)
19+
20+
validator = DocumentReferenceValidator()
21+
result = validator.validate(data=docref)
22+
23+
if not result.is_valid:
24+
raise RuntimeError("Failed to validate document: " + str(result.issues))
25+
26+
27+
def _find_invalid_pointers(table_name: str) -> dict[str, float | int]:
28+
"""
29+
Find and delete pointers in the given table that are invalid based on the FHIR model and NRLF validators.
30+
Parameters:
31+
- table_name: The name of the pointers table to find and delete pointer from.
32+
"""
33+
34+
print(f"Finding invalid pointers to delete in table {table_name}....")
35+
36+
params: dict[str, Any] = {
37+
"TableName": table_name,
38+
"PaginationConfig": {"PageSize": 50},
39+
}
40+
41+
invalid_pointers = []
42+
total_scanned_count = 0
43+
44+
start_time = datetime.now(tz=timezone.utc)
45+
46+
for page in paginator.paginate(**params):
47+
for item in page["Items"]:
48+
pointer_id = item.get("id", {}).get("S")
49+
document = item.get("document", {}).get("S", "")
50+
try:
51+
_validate_document(document)
52+
except Exception as exc:
53+
invalid_pointers.append((pointer_id, exc))
54+
55+
total_scanned_count += page["ScannedCount"]
56+
57+
if total_scanned_count % 1000 == 0:
58+
print(".", end="", flush=True)
59+
60+
if total_scanned_count % 100000 == 0:
61+
print(f"scanned={total_scanned_count} invalid={len(invalid_pointers)}")
62+
63+
end_time = datetime.now(tz=timezone.utc)
64+
65+
print(f" Done. Found {len(invalid_pointers)} invalid pointers")
66+
67+
if len(invalid_pointers) == 0:
68+
return {
69+
"invalid_pointers": 0,
70+
"scanned_count": total_scanned_count,
71+
"took-secs": timedelta.total_seconds(end_time - start_time),
72+
}
73+
74+
print("Writing invalid pointers IDs to file ./invalid_pointers.txt ...")
75+
with open("invalid_pointers.txt", "w") as f:
76+
for _id, err in invalid_pointers:
77+
f.write(f"{_id}: {err}\n")
78+
79+
confirmation_input = input(
80+
"Would you like to delete all the invalid pointers? (yes/no): "
81+
)
82+
if confirmation_input != "yes":
83+
print("Invalid pointers NOT deleted.")
84+
return {
85+
"invalid_pointers": len(invalid_pointers),
86+
"scanned_count": total_scanned_count,
87+
"took-secs": timedelta.total_seconds(end_time - start_time),
88+
}
89+
90+
print("Deleting invalid pointers...")
91+
pointers_deleted = 0
92+
for _id, _ in invalid_pointers:
93+
try:
94+
item_key = {"S": f"D#{_id}"}
95+
dynamodb.delete_item(
96+
TableName=table_name,
97+
Key={"pk": item_key, "sk": item_key},
98+
ReturnValues="NONE",
99+
)
100+
101+
pointers_deleted += 1
102+
103+
if pointers_deleted % 1000 == 0:
104+
print(".", end="", flush=True)
105+
except Exception as exc:
106+
print(f"Failed to delete pointer {_id}: {exc}")
107+
108+
end_time = datetime.now(tz=timezone.utc)
109+
110+
print(" Done")
111+
return {
112+
"invalid_pointers_total": len(invalid_pointers),
113+
"invalid_pointers_deleted": pointers_deleted,
114+
"scanned_count": total_scanned_count,
115+
"took-secs": timedelta.total_seconds(end_time - start_time),
116+
}
117+
118+
119+
if __name__ == "__main__":
120+
fire.Fire(_find_invalid_pointers)

0 commit comments

Comments
 (0)