Skip to content

Commit 79458e8

Browse files
authored
[PRMP-570] Step Function Error Handling (#863)
1 parent 5cbcce6 commit 79458e8

11 files changed

+347
-69
lines changed

lambdas/handlers/migration_dynamodb_handler.py

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ def extract_table_info(event):
4545

4646

4747
def validate_event_input(event):
48-
required_fields = ["segment", "totalSegments", "migrationScript"]
48+
required_fields = ["segment", "totalSegments", "migrationScript", "executionId"]
4949
for field in required_fields:
5050
if field not in event:
5151
raise ValueError(f"Missing required field: '{field}' in event")
@@ -67,18 +67,23 @@ def validate_event_input(event):
6767
if not migration_script:
6868
raise ValueError("'migrationScript' cannot be empty")
6969

70-
run_migration = bool(event.get("run_migration", False))
70+
run_migration = bool(event.get("runMigration", False))
7171

7272
table_name, environment, region = extract_table_info(event)
7373

74+
execution_id = event.get("executionId")
75+
if not execution_id:
76+
raise ValueError("'executionId' cannot be empty")
77+
7478
return (
7579
segment,
7680
total_segments,
7781
table_name,
7882
environment,
7983
region,
80-
run_migration,
84+
run_migration,
8185
migration_script,
86+
execution_id
8287
)
8388

8489

@@ -87,11 +92,12 @@ def lambda_handler(event, context):
8792
(
8893
segment,
8994
total_segments,
90-
table_name,
95+
table_name,
9196
environment,
9297
region,
9398
run_migration,
94-
migration_script,
99+
migration_script,
100+
execution_id
95101
) = validate_event_input(event)
96102

97103
logger.info(
@@ -105,13 +111,14 @@ def lambda_handler(event, context):
105111
environment=environment,
106112
run_migration=run_migration,
107113
migration_script=migration_script,
114+
execution_id=execution_id
108115
)
109116

110117
result = service.execute_migration()
111118
logger.info(
112119
f"Migration completed: status={result.get('status')}, "
113-
f"scanned={result.get('scannedCount')}, updated={result.get('updatedCount')}, "
114-
f"errors={result.get('errorCount')}"
120+
f"scanned={result.get('scannedCount')}, processed={result.get('processedCount')}, "
121+
f"errors={result.get('errorCount')}, skipped={result.get('skippedCount')}"
115122
)
116123
return result
117124

lambdas/scripts/MigrationBase.py

Lines changed: 69 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
11
from abc import ABC, abstractmethod
22
from typing import Iterable, Callable
3+
import boto3
4+
import json
5+
import os
36

47
from services.base.dynamo_service import DynamoDBService
58
from utils.audit_logging_setup import LoggingService
9+
from utils.exceptions import MigrationUnrecoverableException
610

711

812
class MigrationBase(ABC):
@@ -15,6 +19,10 @@ def __init__(self, environment: str, table_name: str, run_migration: bool = Fals
1519
self.table_name = table_name
1620
self.run_migration = run_migration
1721
self.target_table = f"{self.table_name}"
22+
self.s3_client = boto3.client("s3")
23+
self.failed_items_bucket = os.environ.get("MIGRATION_FAILED_ITEMS_STORE_BUCKET_NAME")
24+
if not self.failed_items_bucket:
25+
raise ValueError("MIGRATION_FAILED_ITEMS_STORE_BUCKET_NAME environment variable is required")
1826

1927
self.logger = LoggingService(self.__class__.__name__)
2028
self.dynamo_service = DynamoDBService()
@@ -28,36 +36,79 @@ def process_entries(
2836
label: str,
2937
entries: Iterable[dict],
3038
update_fn: Callable[[dict], dict | None],
39+
segment: int,
40+
execution_id: str
3141
):
3242
"""
3343
Processes a batch of DynamoDB entries for a given migration step.
34-
35-
:param label: Descriptive name of the step (for logging)
36-
:param entries: List of DynamoDB items to process
37-
:param update_fn: Function that returns a dict of updated fields or None
3844
"""
3945
self.logger.info(f"Running '{label}' migration step on {len(entries := list(entries))} items")
46+
successful_item_runs = 0
47+
skipped_items = 0
48+
failed_items = []
4049

4150
for index, entry in enumerate(entries, start=1):
42-
item_id = entry.get("ID")
43-
self.logger.info(f"[{label}] Processing item {index} (ID: {item_id})")
51+
result = self._process_single_entry(label, entry, update_fn, segment)
52+
if result == "skipped":
53+
skipped_items += 1
54+
elif result == "success":
55+
successful_item_runs += 1
56+
elif isinstance(result, dict): # failed item
57+
failed_items.append(result)
58+
59+
if failed_items:
60+
self._handle_failed_items(label, segment, execution_id, failed_items)
61+
62+
self.logger.info(f"'{label}' migration step completed.\n")
63+
return {
64+
"successful_item_runs": successful_item_runs,
65+
"failed_items_count": len(failed_items),
66+
"skipped_items_count": skipped_items
67+
}
4468

69+
def _process_single_entry(self, label, entry, update_fn, segment):
70+
item_id = entry.get("ID")
71+
self.logger.info(f"[{label}] Processing item (ID: {item_id})")
72+
try:
4573
updated_fields = update_fn(entry)
4674
if not updated_fields:
47-
self.logger.debug(f"[{label}] Item {item_id} does not require update, skipping.")
48-
continue
75+
self.logger.info(f"[{label}] Item {item_id} does not require update, skipping.")
76+
return "skipped"
77+
78+
self.logger.info(f"[{label}] Item {item_id} update fields: {updated_fields}")
4979

5080
if self.run_migration:
51-
try:
52-
self.dynamo_service.update_item(
53-
table_name=self.target_table,
54-
key_pair={"ID": item_id},
55-
updated_fields=updated_fields,
56-
)
57-
self.logger.info(f"[{label}] Updated item {item_id}: {updated_fields}")
58-
except Exception as e:
59-
self.logger.error(f"[{label}] Failed to update item {item_id}: {str(e)}")
81+
self.dynamo_service.update_item(
82+
table_name=self.target_table,
83+
key_pair={"ID": item_id},
84+
updated_fields=updated_fields,
85+
)
86+
self.logger.info(f"[{label}] Updated item {item_id}: {updated_fields}")
87+
return "success"
6088
else:
6189
self.logger.info(f"[Dry Run] Would update item {item_id} with {updated_fields}")
90+
return "success"
6291

63-
self.logger.info(f"'{label}' migration step completed.\n")
92+
except MigrationUnrecoverableException as e:
93+
self.logger.error(f"[{label}] Unrecoverable error for item {item_id} - segment {segment}: {e.message}")
94+
return {"item_id": item_id, "error": e.message}
95+
96+
except Exception as e:
97+
self.logger.error(f"[{label}] Error processing item {item_id}: {str(e)}")
98+
raise
99+
100+
def _handle_failed_items(self, label, segment, execution_id, failed_items):
101+
self.logger.error(f"'{label}' migration segment: {segment} completed with {len(failed_items)} errors.")
102+
error_report_key = f"{execution_id}/{segment}/{label}_errors.json"
103+
try:
104+
self.s3_client.put_object(
105+
Bucket=self.failed_items_bucket,
106+
Key=error_report_key,
107+
Body=json.dumps(failed_items).encode("utf-8"),
108+
ContentType='application/json'
109+
)
110+
self.logger.error(f"Error report saved to s3://{self.failed_items_bucket}/{error_report_key}")
111+
except Exception as s3_error:
112+
self.logger.error(f"Failed to save error report to S3: {str(s3_error)}")
113+
self.logger.error({"Unlogged failed items": failed_items})
114+
raise

lambdas/scripts/dynamodb_migration_prmp_179.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from typing import Callable, Iterable
22

3+
from utils.exceptions import MigrationUnrecoverableException, MigrationRetryableException
34
from scripts.MigrationBase import MigrationBase
45
from services.base.dynamo_service import DynamoDBService
56
from services.base.s3_service import S3Service
@@ -41,8 +42,10 @@ def main(
4142
self.logger.info(f"Dry run mode: {not self.run_migration}")
4243

4344
if entries is None:
44-
self.logger.error("No entries provided after scanning entire table.")
45-
raise ValueError("Entries must be provided to main().")
45+
self.logger.error("No entry provided.")
46+
raise MigrationRetryableException(
47+
message="Entries missing for segment worker", segment_id=None
48+
)
4649

4750
return [("S3Metadata", self.get_updated_items)]
4851

@@ -51,6 +54,7 @@ def get_updated_items(self, entry: dict) -> dict | None:
5154
Aggregates updates from all S3 update methods for a single entry.
5255
Returns a dict of fields to update, or None if no update is needed.
5356
"""
57+
5458
update_items = {}
5559

5660
if s3_metadata_items := self.get_update_s3_metadata_items(entry):
@@ -66,18 +70,18 @@ def get_update_s3_metadata_items(self, entry: dict) -> dict | None:
6670
file_location = entry.get("FileLocation")
6771
if not file_location:
6872
self.logger.warning(f"Missing FileLocation for entry: {entry.get('ID')}")
69-
return None
73+
raise MigrationUnrecoverableException(message="Missing FileLocation", item_id=entry.get("ID"))
7074

7175
parsed = self.parse_s3_path(file_location)
7276
if not parsed:
7377
self.logger.warning(f"Invalid S3 path format: {file_location}")
74-
return None
78+
raise MigrationUnrecoverableException(message="Invalid S3 path format", item_id=entry.get("ID"))
7579

7680
s3_bucket, s3_key = parsed
7781
metadata = self.get_s3_metadata(s3_bucket, s3_key)
7882
if not metadata:
7983
self.logger.warning(f"Could not retrieve S3 metadata for key: {s3_key}")
80-
return None
84+
raise MigrationUnrecoverableException(message="Could not retrieve S3 metadata", item_id=entry.get("ID"))
8185

8286
content_length, version_id = metadata
8387
updated_fields = {}
@@ -113,4 +117,5 @@ def get_s3_metadata(self, bucket: str, key: str) -> tuple[int, str] | None:
113117
return s3_head.get("ContentLength"), s3_head.get("VersionId")
114118
except Exception as e:
115119
self.logger.error(f"Failed to retrieve S3 metadata for {key}: {e}")
120+
raise MigrationUnrecoverableException(message="Failed to retrieve S3 metadata", item_id=key)
116121
return None

lambdas/scripts/dynamodb_migration_prmp_198.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from scripts.MigrationBase import MigrationBase
55
from services.base.dynamo_service import DynamoDBService
66
from utils.audit_logging_setup import LoggingService
7+
from utils.exceptions import MigrationUnrecoverableException, MigrationRetryableException
78

89

910
class AuthorMigration(MigrationBase):
@@ -41,7 +42,9 @@ def main(
4142

4243
if entries is None:
4344
self.logger.error("No entries provided — expected a list of table items.")
44-
raise ValueError("Entries must be provided to main().")
45+
raise MigrationRetryableException(
46+
message="Entries missing for segment worker", segment_id=None
47+
)
4548

4649
return [("Author", self.get_update_author_items)]
4750

@@ -71,14 +74,18 @@ def get_update_author_items(self, entry: dict) -> dict | None:
7174
self.logger.warning(
7275
f"No completed bulk upload found for NHS number: {nhs_number}"
7376
)
74-
return None
77+
raise MigrationUnrecoverableException(
78+
message=f"No completed bulk upload found for NHS number: {nhs_number}", item_id=entry.get("ID")
79+
)
7580

7681
new_author = bulk_upload_row.get("UploaderOdsCode")
7782
if not new_author:
7883
self.logger.warning(
7984
f"No uploader ODS code found for NHS number: {nhs_number}"
8085
)
81-
return None
86+
raise MigrationUnrecoverableException(
87+
message=f"No uploader ODS code found for NHS number: {nhs_number}", item_id=entry.get("ID")
88+
)
8289

8390
return {"Author": new_author}
8491

lambdas/scripts/dynamodb_migration_prmp_202.py

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from scripts.MigrationBase import MigrationBase
66
from services.base.dynamo_service import DynamoDBService
77
from utils.audit_logging_setup import LoggingService
8+
from utils.exceptions import MigrationUnrecoverableException, MigrationRetryableException
89

910

1011
class VersionMigration(MigrationBase):
@@ -42,7 +43,9 @@ def main(
4243

4344
if entries is None:
4445
self.logger.error("No entries provided after scanning entire table.")
45-
raise ValueError("Entries must be provided to main().")
46+
raise MigrationRetryableException(
47+
message="Entries missing for segment worker", segment_id=None
48+
)
4649

4750
return [("LGTableValues", self.get_updated_items)]
4851

@@ -84,7 +87,9 @@ def get_update_custodian_items(self, entry: dict) -> dict | None:
8487
self.logger.warning(
8588
f"[Custodian] CurrentGpOds is missing for item {entry.get('ID')}"
8689
)
87-
return None
90+
raise MigrationUnrecoverableException(
91+
message="CurrentGpOds is missing", item_id=entry.get("ID")
92+
)
8893
if current_gp_ods is None or current_gp_ods != custodian:
8994
return {"Custodian": current_gp_ods}
9095

@@ -138,20 +143,23 @@ def get_update_doc_status_items(self, entry: dict) -> dict | None:
138143

139144
inferred_status = document.infer_doc_status()
140145

141-
if entry.get("uploaded") and entry.get("uploading"):
146+
if entry.get("Uploaded") and entry.get("Uploading"):
142147
self.logger.warning(
143148
f"{entry.get('ID')}: Document has a status of uploading and uploaded."
144149
)
150+
raise MigrationUnrecoverableException(
151+
message="Document has a status of uploading and uploaded", item_id=entry.get("ID")
152+
)
145153

146154
if entry.get("DocStatus", "") == inferred_status:
147155
return None
148156

149-
self.logger.warning(f"{entry.get('ID')}: {inferred_status}")
150-
151157
if inferred_status:
152158
return {"DocStatus": inferred_status}
153159

154160
self.logger.warning(
155161
f"[DocStatus] Cannot determine status for item {entry.get('ID')}"
156162
)
157-
return None
163+
raise MigrationUnrecoverableException(
164+
message="Cannot determine document status", item_id=entry.get("ID")
165+
)

0 commit comments

Comments
 (0)