Skip to content

Commit f6f2c5d

Browse files
[PRMP-198] Created author migration (#789)
Co-authored-by: SWhyteAnswer <[email protected]> Co-authored-by: Sam Whyte <[email protected]>
1 parent deeed4a commit f6f2c5d

File tree

2 files changed

+164
-6
lines changed

2 files changed

+164
-6
lines changed
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
from typing import Callable, Iterable
2+
3+
from services.base.dynamo_service import DynamoDBService
4+
from utils.audit_logging_setup import LoggingService
5+
6+
7+
class VersionMigration:
8+
def __init__(self, environment: str, table_name: str, dry_run: bool = False):
9+
self.bulk_upload_lookup: dict[str, dict] | None = None
10+
self.environment = environment
11+
self.table_name = table_name
12+
self.dynamo_service = DynamoDBService()
13+
self.dry_run = dry_run
14+
self.logger = LoggingService("AuthorMigration")
15+
16+
self.target_table = f"{self.environment}_{self.table_name}"
17+
self.bulk_upload_report_table = f"{self.environment}_BulkUploadReport"
18+
19+
def main(
20+
self, entries: Iterable[dict]
21+
) -> list[tuple[str, Callable[[dict], dict | None]]]:
22+
"""
23+
Main entry point for the migration.
24+
Returns a list of (label, update function) tuples.
25+
"""
26+
self.logger.info("Starting Author field migration")
27+
self.logger.info(f"Target table: {self.target_table}")
28+
self.logger.info(f"Dry run mode: {self.dry_run}")
29+
30+
if entries is None:
31+
self.logger.error("No entries provided — expected a list of table items.")
32+
raise ValueError("Entries must be provided to main().")
33+
34+
return [("Author", self.get_update_author_items)]
35+
36+
def get_update_author_items(self, entry: dict) -> dict | None:
37+
"""
38+
Determines whether the 'Author' field should be updated.
39+
Returns a dict with the update or None if no update is needed.
40+
"""
41+
current_author = entry.get("Author")
42+
deleted_value = entry.get("Deleted")
43+
nhs_number = entry.get("NhsNumber")
44+
45+
if current_author:
46+
return None
47+
48+
if deleted_value not in (None, ""):
49+
self.logger.debug(
50+
f"[Author] Skipping {nhs_number}: Deleted field not empty ({deleted_value})."
51+
)
52+
return None
53+
54+
if self.bulk_upload_lookup is None:
55+
self.bulk_upload_lookup = self.build_bulk_upload_lookup()
56+
57+
bulk_upload_row = self.bulk_upload_lookup.get(nhs_number)
58+
if not bulk_upload_row:
59+
self.logger.warning(f"No completed bulk upload found for NHS number: {nhs_number}")
60+
return None
61+
62+
new_author = bulk_upload_row.get("UploaderOdsCode")
63+
if not new_author:
64+
self.logger.warning(f"No uploader ODS code found for NHS number: {nhs_number}")
65+
return None
66+
67+
return {"Author": new_author}
68+
69+
def build_bulk_upload_lookup(self) -> dict[str, dict]:
70+
"""
71+
Creates a lookup of the most recent completed bulk upload reports by NHS number.
72+
"""
73+
self.logger.info("Building bulk upload lookup from BulkUploadReport table...")
74+
bulk_reports = self.dynamo_service.scan_whole_table(self.bulk_upload_report_table)
75+
lookup: dict[str, dict] = {}
76+
77+
for row in bulk_reports:
78+
nhs = row.get("NhsNumber")
79+
status = row.get("UploadStatus")
80+
timestamp = row.get("Timestamp")
81+
82+
if not nhs or status != "complete" or not timestamp:
83+
continue
84+
85+
stored = lookup.get(nhs)
86+
if not stored or int(timestamp) > int(stored.get("Timestamp", 0)):
87+
lookup[nhs] = row
88+
89+
self.logger.info(f"Loaded {len(lookup)} completed bulk upload entries.")
90+
return lookup
91+
92+
def process_entries(
93+
self,
94+
label: str,
95+
entries: Iterable[dict],
96+
update_fn: Callable[[dict], dict | None],
97+
):
98+
"""Process a list of entries and apply an update function to each one."""
99+
self.logger.info(f"Running {label} migration")
100+
101+
for index, entry in enumerate(entries, start=1):
102+
item_id = entry.get("ID")
103+
self.logger.info(f"[{label}] Processing item {index} (ID: {item_id})")
104+
105+
updated_fields = update_fn(entry)
106+
if not updated_fields:
107+
self.logger.debug(
108+
f"[{label}] Item {item_id} does not require update, skipping."
109+
)
110+
continue
111+
112+
if self.dry_run:
113+
self.logger.info(
114+
f"[Dry Run] Would update item {item_id} with {updated_fields}"
115+
)
116+
else:
117+
self.logger.info(f"Updating item {item_id} with {updated_fields}")
118+
try:
119+
self.dynamo_service.update_item(
120+
table_name=self.target_table,
121+
key_pair={"ID": item_id},
122+
updated_fields=updated_fields,
123+
)
124+
except Exception as e:
125+
self.logger.error(f"Failed to update {item_id}: {e}")
126+
127+
self.logger.info(f"{label} migration completed.")
128+
129+
130+
if __name__ == "__main__":
131+
import argparse
132+
133+
parser = argparse.ArgumentParser(
134+
prog="dynamodb_migration_20250731.py",
135+
description="Migrate DynamoDB table columns",
136+
)
137+
parser.add_argument("environment", help="Environment prefix for DynamoDB table")
138+
parser.add_argument("table_name", help="DynamoDB table name to migrate")
139+
parser.add_argument(
140+
"--dry-run",
141+
action="store_true",
142+
help="Run migration in dry-run mode (no writes)",
143+
)
144+
args = parser.parse_args()
145+
146+
migration = VersionMigration(
147+
environment=args.environment,
148+
table_name=args.table_name,
149+
dry_run=args.dry_run,
150+
)
151+
152+
entries_to_process = list(
153+
migration.dynamo_service.stream_whole_table(migration.target_table)
154+
)
155+
156+
update_functions = migration.main(entries=entries_to_process)
157+
158+
for label, fn in update_functions:
159+
migration.process_entries(label=label, entries=entries_to_process, update_fn=fn)

lambdas/services/base/dynamo_service.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import time
2-
from typing import Iterator
3-
from typing import Optional
2+
from typing import Iterator, Optional
43

54
import boto3
65
from boto3.dynamodb.conditions import Attr, ConditionBase, Key
@@ -287,10 +286,10 @@ def get_item(self, table_name: str, key: dict):
287286
raise e
288287

289288
def stream_whole_table(
290-
self,
291-
table_name: str,
292-
filter_expression: Optional[str] = None,
293-
projection_expression: Optional[str] = None,
289+
self,
290+
table_name: str,
291+
filter_expression: Optional[str] = None,
292+
projection_expression: Optional[str] = None,
294293
) -> Iterator[dict]:
295294
"""
296295
Streams all items from a DynamoDB table using pagination.

0 commit comments

Comments
 (0)