11import os
2+ import boto3
23from typing import Callable , Iterable
4+ from boto3 .dynamodb .conditions import Key , Attr
35
46from scripts .MigrationBase import MigrationBase
57from services .base .dynamo_service import DynamoDBService
@@ -46,6 +48,9 @@ def main(
4648 message = "Entries missing for segment worker" , segment_id = None
4749 )
4850
51+ # Build lookup once using the entries we're migrating
52+ self .bulk_upload_lookup = self .build_bulk_upload_lookup (entries )
53+
4954 return [("Author" , self .get_update_author_items )]
5055
5156 def get_update_author_items (self , entry : dict ) -> dict | None :
@@ -66,9 +71,6 @@ def get_update_author_items(self, entry: dict) -> dict | None:
6671 )
6772 return None
6873
69- if self .bulk_upload_lookup is None :
70- self .bulk_upload_lookup = self .build_bulk_upload_lookup ()
71-
7274 bulk_upload_row = self .bulk_upload_lookup .get (nhs_number )
7375 if not bulk_upload_row :
7476 self .logger .warning (
@@ -89,27 +91,56 @@ def get_update_author_items(self, entry: dict) -> dict | None:
8991
9092 return {"Author" : new_author }
9193
92- def build_bulk_upload_lookup (self ) -> dict [str , dict ]:
94+ def build_bulk_upload_lookup (self , entries : Iterable [ dict ] ) -> dict [str , dict ]:
9395 """
9496 Creates a lookup of the most recent completed bulk upload reports by NHS number.
97+ Uses GSI queries instead of scanning the entire table.
98+ Extracts unique NHS numbers from the provided entries.
9599 """
96100 self .logger .info ("Building bulk upload lookup from BulkUploadReport table..." )
97- bulk_reports = self .dynamo_service .scan_whole_table (
98- self .bulk_upload_report_table
99- )
100- lookup : dict [str , dict ] = {}
101101
102- for row in bulk_reports :
103- nhs = row .get ("NhsNumber" )
104- status = row .get ("UploadStatus" )
105- timestamp = row .get ("Timestamp" )
102+ # Extract unique NHS numbers from the entries being migrated
103+ unique_nhs_numbers = set ()
104+ for entry in entries :
105+ nhs = entry .get ("NhsNumber" )
106+ if nhs :
107+ unique_nhs_numbers .add (nhs )
108+
109+ self .logger .info (f"Found { len (unique_nhs_numbers )} unique NHS numbers to query" )
106110
107- if not nhs or status != "complete" or not timestamp :
108- continue
111+ if not unique_nhs_numbers :
112+ self .logger .warning ("No NHS numbers found in entries" )
113+ return {}
109114
110- stored = lookup .get (nhs )
111- if not stored or int (timestamp ) > int (stored .get ("Timestamp" , 0 )):
112- lookup [nhs ] = row
115+ lookup : dict [str , dict ] = {}
116+
117+ # Query BulkUploadReport table for each NHS number using GSI
118+ for nhs_number in unique_nhs_numbers :
119+ try :
120+ items = self .dynamo_service .query_table (
121+ table_name = self .bulk_upload_report_table ,
122+ search_key = "NhsNumber" ,
123+ search_condition = nhs_number ,
124+ index_name = "NhsNumberIndex" ,
125+ )
126+
127+ # Find the most recent completed upload for this NHS number
128+ for row in items :
129+ status = row .get ("UploadStatus" )
130+ timestamp = row .get ("Timestamp" )
131+
132+ if status != "complete" or not timestamp :
133+ continue
134+
135+ stored = lookup .get (nhs_number )
136+ if not stored or int (timestamp ) > int (stored .get ("Timestamp" , 0 )):
137+ lookup [nhs_number ] = row
138+
139+ except Exception as e :
140+ self .logger .warning (
141+ f"Failed to query bulk uploads for NHS number { nhs_number } : { str (e )} "
142+ )
143+ continue
113144
114145 self .logger .info (f"Loaded { len (lookup )} completed bulk upload entries." )
115146 return lookup
0 commit comments