Skip to content

Commit 2ec9887

Browse files
NRL-1798 Add delete pointers script
1 parent 3ef1892 commit 2ec9887

File tree

2 files changed

+691
-0
lines changed

2 files changed

+691
-0
lines changed

scripts/delete_pointers_by_id.py

Lines changed: 317 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,317 @@
1+
#!/usr/bin/env python
2+
import json
3+
from datetime import datetime, timedelta, timezone
4+
from typing import Any, Dict, List
5+
6+
import boto3
7+
import fire
8+
9+
dynamodb = boto3.client("dynamodb")
10+
11+
12+
def _load_pointers_from_file(pointers_file: str) -> list[str]:
13+
"""
14+
Read pointers from a file. Supports:
15+
- JSON array of objects with an "id" field
16+
- line-delimited plain text (one id per line)
17+
18+
Returns a list of pointer id strings. Prints a warning for skipped malformed JSON entries.
19+
"""
20+
with open(pointers_file, "r") as fh:
21+
content = fh.read().strip()
22+
23+
if not content:
24+
return []
25+
26+
# JSON path
27+
if content.startswith("[") or content.startswith("{"):
28+
try:
29+
data = json.loads(content)
30+
except json.JSONDecodeError as e:
31+
raise ValueError(f"Failed to parse JSON file {pointers_file}: {e}") from e
32+
33+
if not isinstance(data, list):
34+
raise ValueError("JSON file must contain an array of objects")
35+
36+
parsed_ids: list[str] = []
37+
skipped_count = 0
38+
for item in data:
39+
if (
40+
isinstance(item, dict)
41+
and "id" in item
42+
and isinstance(item["id"], str)
43+
and item["id"].strip()
44+
):
45+
parsed_ids.append(item["id"].strip())
46+
else:
47+
skipped_count += 1
48+
49+
if skipped_count:
50+
print(
51+
f"Warning: skipped {skipped_count} malformed entries in JSON file {pointers_file}"
52+
)
53+
54+
return parsed_ids
55+
56+
# Plain text fallback
57+
return [line.strip() for line in content.splitlines() if line.strip()]
58+
59+
60+
def _check_pointers_match_ods_code(
61+
ods_code: str, pointer_ids: List[str]
62+
) -> tuple[List[str], List[str]]:
63+
"""
64+
Validate that pointer IDs are in line with the provided ODS code.
65+
Returns (matched_ids, mismatched_ids)
66+
"""
67+
matched = []
68+
mismatched = []
69+
70+
for pointer_id in pointer_ids:
71+
if pointer_id.startswith(f"{ods_code}-"):
72+
matched.append(pointer_id)
73+
else:
74+
mismatched.append(pointer_id)
75+
76+
return matched, mismatched
77+
78+
79+
def _batch_get_existing_pointers(
80+
table_name: str, pointer_ids: List[str]
81+
) -> tuple[List[str], List[str]]:
82+
"""
83+
Check which pointers exist using BatchGetItem (max 100 items per request).
84+
Returns (existing_ids, not_found_ids)
85+
"""
86+
existing = []
87+
not_found = []
88+
89+
for batch_idx in range(0, len(pointer_ids), 100):
90+
batch_ids = pointer_ids[batch_idx : batch_idx + 100]
91+
92+
keys = [
93+
{
94+
"pk": {"S": f"D#{pointer_id}"},
95+
"sk": {"S": f"D#{pointer_id}"},
96+
}
97+
for pointer_id in batch_ids
98+
]
99+
100+
response = dynamodb.batch_get_item(RequestItems={table_name: {"Keys": keys}})
101+
102+
found_ids = {
103+
item["pk"]["S"][2:]
104+
for item in response.get("Responses", {}).get(table_name, [])
105+
}
106+
107+
for pointer_id in batch_ids:
108+
if pointer_id in found_ids:
109+
existing.append(pointer_id)
110+
else:
111+
not_found.append(pointer_id)
112+
113+
return existing, not_found
114+
115+
116+
def _batch_delete_pointers(
117+
table_name: str, pointer_ids: List[str]
118+
) -> tuple[List[str], List[str]]:
119+
"""
120+
Delete pointers using BatchWriteItem (max 25 items per request).
121+
Returns (deleted_ids, failed_ids)
122+
"""
123+
pointers_deleted = []
124+
failed_deletes_set: set[str] = set()
125+
126+
for _batch_id in range(0, len(pointer_ids), 25):
127+
batch_ptrs = pointer_ids[_batch_id : _batch_id + 25]
128+
129+
batch = [
130+
{
131+
"DeleteRequest": {
132+
"Key": {
133+
"pk": {"S": f"D#{pointer_id}"},
134+
"sk": {"S": f"D#{pointer_id}"},
135+
}
136+
}
137+
}
138+
for pointer_id in batch_ptrs
139+
]
140+
141+
result = dynamodb.batch_write_item(RequestItems={table_name: batch})
142+
unprocessed = result.get("UnprocessedItems", {}).get(table_name, [])
143+
144+
# Collect unprocessed IDs
145+
for item in unprocessed:
146+
pk_val = item["DeleteRequest"]["Key"]["pk"]["S"]
147+
failed_deletes_set.add(pk_val[2:]) # Remove "D#"
148+
149+
# Only count successfully deleted items (batch size minus unprocessed)
150+
successfully_deleted = [p for p in batch_ptrs if p not in failed_deletes_set]
151+
pointers_deleted.extend(successfully_deleted)
152+
153+
if len(pointers_deleted) % 1000 == 0 and len(pointers_deleted) > 0:
154+
print(".", end="", flush=True)
155+
156+
return pointers_deleted, sorted(failed_deletes_set)
157+
158+
159+
def _delete_pointers_by_id(
160+
table_name: str,
161+
ods_code: str,
162+
pointers_to_delete: List[str] | None = None,
163+
pointers_file: str | None = None,
164+
) -> Dict[str, Any]:
165+
"""
166+
Delete pointers from DynamoDB table.
167+
168+
Can accept pointers as:
169+
- list of strings: pointers_to_delete=["id1", "id2"]
170+
- JSON file: pointers_file=/path/to/pointers.json (array of objects with "id" field)
171+
- text file: pointers_file=/path/to/ids.txt (one id per line)
172+
173+
Parameters:
174+
- table_name: DynamoDB table name
175+
- ods_code: ODS code of the organisation that the pointers belong to
176+
- pointers_to_delete: list of pointer ids to delete
177+
- pointers_file: path to JSON file (array of objects with "id" field) or text file (one id per line)
178+
"""
179+
if pointers_to_delete is None and pointers_file is None:
180+
raise ValueError("Provide either pointers_to_delete or pointers_file")
181+
182+
if pointers_to_delete is not None and pointers_file is not None:
183+
raise ValueError("Provide either pointers_to_delete or pointers_file, not both")
184+
185+
# Load pointers from file if provided
186+
if pointers_file:
187+
pointers_to_delete = _load_pointers_from_file(pointers_file)
188+
189+
if len(pointers_to_delete) == 0:
190+
return {
191+
"pointers_to_delete": 0,
192+
"ods_code": ods_code,
193+
"ods_code_matched": [],
194+
"ods_code_mismatched": [],
195+
"pointer_not_found": [],
196+
"deleted_pointers": [],
197+
"failed_deletes": [],
198+
"deletes-took-secs": 0,
199+
}
200+
201+
start_time = datetime.now(tz=timezone.utc)
202+
203+
print(
204+
f"Validating {len(pointers_to_delete)} pointers against ODS code {ods_code}..."
205+
)
206+
matched_pointers, mismatched_pointers = _check_pointers_match_ods_code(
207+
ods_code, pointers_to_delete
208+
)
209+
210+
print(
211+
f"Validate pointer's ODS code: {len(matched_pointers)} matched, {len(mismatched_pointers)} mismatched"
212+
)
213+
214+
if len(matched_pointers) == 0:
215+
print(f"None of the pointer IDs are a match for ODS code {ods_code}. Exiting.")
216+
end_time = datetime.now(tz=timezone.utc)
217+
return {
218+
"pointers_to_delete": len(pointers_to_delete),
219+
"ods_code": ods_code,
220+
"ods_code_matched": {
221+
"count": len(matched_pointers),
222+
"ids": matched_pointers,
223+
},
224+
"ods_code_mismatched": {
225+
"count": len(mismatched_pointers),
226+
"ids": mismatched_pointers,
227+
},
228+
"pointer_not_found": {
229+
"count": 0,
230+
"ids": [],
231+
},
232+
"deleted_pointers": {
233+
"count": 0,
234+
"ids": [],
235+
},
236+
"failed_deletes": {
237+
"count": 0,
238+
"ids": [],
239+
},
240+
"deletes-took-secs": timedelta.total_seconds(end_time - start_time),
241+
}
242+
243+
print(f"Checking existence of {len(matched_pointers)} pointers in {table_name}...")
244+
existing_pointers, not_found_pointers = _batch_get_existing_pointers(
245+
table_name, matched_pointers
246+
)
247+
248+
print(
249+
f"Found {len(existing_pointers)} existing pointers to delete, {len(not_found_pointers)} not found."
250+
)
251+
252+
if len(existing_pointers) == 0:
253+
print("No pointers found to delete. Exiting.")
254+
end_time = datetime.now(tz=timezone.utc)
255+
return {
256+
"pointers_to_delete": len(pointers_to_delete),
257+
"ods_code": ods_code,
258+
"ods_code_matched": {
259+
"count": len(matched_pointers),
260+
"ids": matched_pointers,
261+
},
262+
"ods_code_mismatched": {
263+
"count": len(mismatched_pointers),
264+
"ids": mismatched_pointers,
265+
},
266+
"pointer_not_found": {
267+
"count": len(not_found_pointers),
268+
"ids": not_found_pointers,
269+
},
270+
"deleted_pointers": {
271+
"count": 0,
272+
"ids": [],
273+
},
274+
"failed_deletes": {
275+
"count": 0,
276+
"ids": [],
277+
},
278+
"deletes-took-secs": timedelta.total_seconds(end_time - start_time),
279+
}
280+
281+
# Proceed with deletion using BatchWriteItem
282+
pointers_deleted, failed_deletes = _batch_delete_pointers(
283+
table_name, existing_pointers
284+
)
285+
286+
end_time = datetime.now(tz=timezone.utc)
287+
288+
print(" Done")
289+
return {
290+
"pointers_to_delete": len(pointers_to_delete),
291+
"ods_code": ods_code,
292+
"ods_code_matched": {
293+
"count": len(matched_pointers),
294+
"ids": matched_pointers,
295+
},
296+
"ods_code_mismatched": {
297+
"count": len(mismatched_pointers),
298+
"ids": mismatched_pointers,
299+
},
300+
"pointer_not_found": {
301+
"count": len(not_found_pointers),
302+
"ids": not_found_pointers,
303+
},
304+
"deleted_pointers": {
305+
"count": len(pointers_deleted),
306+
"ids": pointers_deleted,
307+
},
308+
"failed_deletes": {
309+
"count": len(failed_deletes),
310+
"ids": failed_deletes,
311+
},
312+
"deletes-took-secs": timedelta.total_seconds(end_time - start_time),
313+
}
314+
315+
316+
if __name__ == "__main__":
317+
fire.Fire(_delete_pointers_by_id)

0 commit comments

Comments
 (0)