Skip to content

Commit 78dbbce

Browse files
authored
VED-1006 Minor improvement to delta error handling (#1099)
1 parent 1243403 commit 78dbbce

File tree

4 files changed

+52
-113
lines changed

4 files changed

+52
-113
lines changed

infrastructure/instance/delta.tf

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -93,9 +93,6 @@ data "aws_iam_policy_document" "delta_policy_document" {
9393
templatefile("${local.policy_path}/dynamo_key_access.json", {
9494
"dynamo_encryption_key" : data.aws_kms_key.existing_dynamo_encryption_key.arn
9595
}),
96-
templatefile("${local.policy_path}/aws_sns_topic.json", {
97-
"aws_sns_topic_name" : aws_sns_topic.delta_sns.name
98-
}),
9996
templatefile("${local.policy_path}/log_kinesis.json", {
10097
"kinesis_stream_name" : module.splunk.firehose_stream_name
10198
}),
@@ -159,7 +156,7 @@ resource "aws_lambda_event_source_mapping" "delta_trigger" {
159156
starting_position = "TRIM_HORIZON"
160157
destination_config {
161158
on_failure {
162-
destination_arn = aws_sns_topic.delta_sns.arn
159+
destination_arn = aws_sqs_queue.dlq.arn
163160
}
164161
}
165162
maximum_retry_attempts = 0
@@ -170,10 +167,6 @@ resource "aws_sqs_queue" "dlq" {
170167
name = "${local.short_prefix}-${local.dlq_name}"
171168
}
172169

173-
resource "aws_sns_topic" "delta_sns" {
174-
name = "${local.short_prefix}-${local.sns_name}"
175-
}
176-
177170
resource "aws_cloudwatch_log_group" "delta_lambda" {
178171
name = "/aws/lambda/${local.short_prefix}-${local.function_name}"
179172
retention_in_days = 30

infrastructure/instance/outputs.tf

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,7 @@ output "aws_sqs_queue_name" {
1414
value = aws_sqs_queue.dlq.name
1515
}
1616

17-
output "aws_sns_topic_name" {
18-
value = aws_sns_topic.delta_sns.name
19-
}
20-
2117
output "id_sync_queue_arn" {
22-
description = "The ARN of the created SQS queue"
18+
description = "The ARN of the ID Sync (MNS NHS Number change) SQS queue"
2319
value = aws_sqs_queue.id_sync_queue.arn
2420
}

lambdas/delta_backend/src/delta.py

Lines changed: 17 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,9 @@ def get_delta_table():
3737
return delta_table
3838

3939

40-
def send_message(record, queue_url=failure_queue_url):
41-
# Create a message
42-
message_body = record
40+
def send_record_to_dlq(record: dict) -> None:
4341
try:
44-
# Send the record to the queue
45-
get_sqs_client().send_message(QueueUrl=queue_url, MessageBody=json.dumps(message_body))
42+
get_sqs_client().send_message(QueueUrl=failure_queue_url, MessageBody=json.dumps(record))
4643
logger.info("Record saved successfully to the DLQ")
4744
except Exception:
4845
logger.exception("Error sending record to DLQ")
@@ -207,35 +204,23 @@ def process_record(record):
207204
return False, {"statusCode": "500", "statusDesc": "Exception", "diagnostics": e}
208205

209206

210-
def handler(event, _context):
211-
overall_success = True
207+
def handler(event, _context) -> bool:
212208
logger.info("Starting Delta Handler")
213-
try:
214-
for record in event["Records"]:
215-
datetime_str = datetime.now().isoformat()
216-
start = time.time()
217-
success, operation_outcome = process_record(record)
218-
overall_success = overall_success and success
219-
end = time.time()
220-
log_data = {
221-
"function_name": "delta_sync",
222-
"operation_outcome": operation_outcome,
223-
"date_time": datetime_str,
224-
"time_taken": f"{round(end - start, 5)}s",
225-
}
226-
send_log_to_firehose(STREAM_NAME, log_data)
227-
except Exception:
228-
overall_success = False
229-
operation_outcome = {
230-
"statusCode": "500",
231-
"statusDesc": "Exception",
232-
"diagnostics": "Delta Lambda failure: Incorrect invocation of Lambda",
209+
210+
for record in event["Records"]:
211+
record_ingestion_datetime = datetime.now().isoformat()
212+
record_processing_start = time.time()
213+
success, operation_outcome = process_record(record)
214+
record_processing_end = time.time()
215+
log_data = {
216+
"function_name": "delta_sync",
217+
"operation_outcome": operation_outcome,
218+
"date_time": record_ingestion_datetime,
219+
"time_taken": f"{round(record_processing_end - record_processing_start, 5)}s",
233220
}
234-
logger.exception(operation_outcome["diagnostics"])
235-
log_data = {"function_name": "delta_sync", "operation_outcome": operation_outcome}
236221
send_log_to_firehose(STREAM_NAME, log_data)
237222

238-
if not overall_success:
239-
send_message(event)
223+
if not success:
224+
send_record_to_dlq(record)
240225

241-
return overall_success
226+
return True

lambdas/delta_backend/tests/test_delta.py

Lines changed: 33 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,13 @@
1010
from delta import (
1111
handler,
1212
process_record,
13-
send_message,
13+
send_record_to_dlq,
1414
)
1515
from mappings import ActionFlag, EventName, Operation
1616
from utils_for_converter_tests import RecordConfig, ValuesForTests
1717

18-
TEST_QUEUE_URL = "https://sqs.eu-west-2.amazonaws.com/123456789012/test-queue"
19-
os.environ["AWS_SQS_QUEUE_URL"] = TEST_QUEUE_URL
18+
TEST_DEAD_LETTER_QUEUE_URL = "https://sqs.eu-west-2.amazonaws.com/123456789012/test-queue"
19+
os.environ["AWS_SQS_QUEUE_URL"] = TEST_DEAD_LETTER_QUEUE_URL
2020
os.environ["DELTA_TABLE_NAME"] = "my_delta_table"
2121
os.environ["DELTA_TTL_DAYS"] = "14"
2222
os.environ["SOURCE"] = "my_source"
@@ -52,27 +52,23 @@ def setUp(self):
5252
self.mock_delta_table = self.delta_table_patcher.start()
5353

5454
def tearDown(self):
55-
self.logger_exception_patcher.stop()
56-
self.logger_warning_patcher.stop()
57-
self.logger_error_patcher.stop()
58-
self.logger_info_patcher.stop()
59-
self.mock_send_log_to_firehose.stop()
60-
self.sqs_client_patcher.stop()
61-
self.delta_table_patcher.stop()
55+
patch.stopall()
6256

63-
def test_send_message_success(self):
57+
def test_send_record_to_dlq_success(self):
6458
# Arrange
6559
self.mock_sqs_client.send_message.return_value = {"MessageId": "123"}
6660
record = {"key": "value"}
67-
sqs_queue_url = "test-queue-url"
6861

6962
# Act
70-
send_message(record, sqs_queue_url)
63+
send_record_to_dlq(record)
7164

7265
# Assert
73-
self.mock_sqs_client.send_message.assert_called_once_with(QueueUrl=sqs_queue_url, MessageBody=json.dumps(record))
66+
self.mock_sqs_client.send_message.assert_called_once_with(
67+
QueueUrl=TEST_DEAD_LETTER_QUEUE_URL, MessageBody=json.dumps(record)
68+
)
69+
self.mock_logger_info.assert_called_with("Record saved successfully to the DLQ")
7470

75-
def test_send_message_client_error(self):
71+
def test_send_record_to_dlq_client_error(self):
7672
# Arrange
7773
record = {"key": "value"}
7874

@@ -81,7 +77,7 @@ def test_send_message_client_error(self):
8177
self.mock_sqs_client.send_message.side_effect = ClientError(error_response, "SendMessage")
8278

8379
# Act
84-
send_message(record, "test-queue-url")
80+
send_record_to_dlq(record)
8581

8682
# Assert
8783
self.mock_logger_exception.assert_called_once_with("Error sending record to DLQ")
@@ -116,33 +112,35 @@ def test_handler_success_insert(self):
116112

117113
def test_handler_exception(self):
118114
"""Ensure that sqs_client exceptions do not cause the lambda handler itself to raise an exception"""
119-
120115
# Arrange
121116
self.mock_sqs_client.send_message.side_effect = Exception("SQS error")
122-
event = {"invalid_format": True}
117+
self.mock_delta_table.put_item.return_value = FAIL_RESPONSE
118+
event = ValuesForTests.get_event()
123119

124120
# Act
125121
result = handler(event, None)
126122

127123
# Assert
128-
self.assertFalse(result)
124+
self.assertTrue(result)
129125
self.mock_logger_exception.assert_has_calls(
130126
[
131-
call("Delta Lambda failure: Incorrect invocation of Lambda"),
132127
call("Error sending record to DLQ"),
133128
]
134129
)
135130

136-
def test_handler_overall_failure(self):
131+
def test_handler_raises_exception_if_called_with_unexpected_event(self):
132+
"""Tests that when the Lambda is invoked with an unexpected event format i.e. no "Records" key, then an
133+
exception will be raised. The DDB Stream configuration will then ensure that the event is forwarded to the DLQ.
134+
Note: this would only ever happen if we misconfigured the Lambda or tested manually with a bad event."""
137135
# Arrange
138136
event = {"invalid_format": True}
139137

140138
# Act
141-
result = handler(event, None)
139+
with self.assertRaises(KeyError):
140+
handler(event, None)
142141

143142
# Assert
144-
self.assertFalse(result)
145-
self.mock_sqs_client.send_message.assert_called_with(QueueUrl=TEST_QUEUE_URL, MessageBody=json.dumps(event))
143+
self.mock_sqs_client.send_message.assert_not_called()
146144

147145
def test_handler_processing_failure(self):
148146
# Arrange
@@ -153,8 +151,10 @@ def test_handler_processing_failure(self):
153151
result = handler(event, None)
154152

155153
# Assert
156-
self.assertFalse(result)
157-
self.mock_sqs_client.send_message.assert_called_with(QueueUrl=TEST_QUEUE_URL, MessageBody=json.dumps(event))
154+
self.assertTrue(result)
155+
self.mock_sqs_client.send_message.assert_called_with(
156+
QueueUrl=TEST_DEAD_LETTER_QUEUE_URL, MessageBody=json.dumps(event["Records"][0])
157+
)
158158

159159
def test_handler_success_update(self):
160160
# Arrange
@@ -429,10 +429,11 @@ def test_single_error_in_multi(self):
429429
result = handler(event, None)
430430

431431
# Assert
432-
self.assertFalse(result)
432+
self.assertTrue(result)
433433
self.assertEqual(self.mock_delta_table.put_item.call_count, 3)
434434
self.assertEqual(self.mock_send_log_to_firehose.call_count, 3)
435435
self.assertEqual(self.mock_logger_error.call_count, 1)
436+
self.assertEqual(self.mock_sqs_client.send_message.call_count, 1)
436437

437438
def test_single_exception_in_multi(self):
438439
# Arrange
@@ -454,7 +455,8 @@ def test_single_exception_in_multi(self):
454455
result = handler(event, None)
455456

456457
# Assert
457-
self.assertFalse(result)
458+
self.assertTrue(result)
459+
self.assertEqual(self.mock_sqs_client.send_message.call_count, 1)
458460
self.assertEqual(self.mock_delta_table.put_item.call_count, len(records_config))
459461
self.assertEqual(self.mock_send_log_to_firehose.call_count, len(records_config))
460462

@@ -497,12 +499,9 @@ def test_handler_calls_process_record_for_each_event(self, mock_send_log_to_fire
497499
self.assertTrue(result)
498500
self.assertEqual(mock_process_record.call_count, len(event["Records"]))
499501

500-
# TODO depedency injection needed here
501502
@patch("delta.process_record")
502503
@patch("delta.send_log_to_firehose")
503504
def test_handler_sends_all_to_firehose(self, mock_send_log_to_firehose, mock_process_record):
504-
# Arrange
505-
506505
# event with 3 records
507506
event = {"Records": [{"a": "record1"}, {"a": "record2"}, {"a": "record3"}]}
508507
return_ok = (True, {})
@@ -514,10 +513,12 @@ def test_handler_sends_all_to_firehose(self, mock_send_log_to_firehose, mock_pro
514513
result = handler(event, {})
515514

516515
# Assert
517-
self.assertFalse(result)
516+
self.assertTrue(result)
518517
self.assertEqual(mock_process_record.call_count, len(event["Records"]))
519518
# check that all records were sent to firehose
520519
self.assertEqual(mock_send_log_to_firehose.call_count, len(event["Records"]))
520+
# Only send the failed record to SQS DLQ
521+
self.assertEqual(self.mock_sqs_client.send_message.call_count, 1)
521522

522523

523524
class DeltaRecordProcessorTestCase(unittest.TestCase):
@@ -677,39 +678,3 @@ def test_returns_none_on_exception(self, mock_get_dynamodb_table):
677678
table = delta.get_delta_table()
678679
self.assertIsNone(table)
679680
self.mock_logger_error.assert_called()
680-
681-
682-
class TestSendMessage(unittest.TestCase):
683-
def setUp(self):
684-
self.get_sqs_client_patcher = patch("delta.get_sqs_client")
685-
self.mock_get_sqs_client = self.get_sqs_client_patcher.start()
686-
self.mock_sqs_client = MagicMock()
687-
self.mock_get_sqs_client.return_value = self.mock_sqs_client
688-
689-
self.logger_info_patcher = patch("logging.Logger.info")
690-
self.mock_logger_info = self.logger_info_patcher.start()
691-
self.logger_error_patcher = patch("logging.Logger.error")
692-
self.mock_logger_error = self.logger_error_patcher.start()
693-
694-
def tearDown(self):
695-
self.get_sqs_client_patcher.stop()
696-
self.logger_info_patcher.stop()
697-
self.logger_error_patcher.stop()
698-
699-
def test_send_message_success(self):
700-
record = {"a": "bbb"}
701-
self.mock_sqs_client.send_message.return_value = {"MessageId": "123"}
702-
703-
delta.send_message(record)
704-
705-
self.mock_sqs_client.send_message.assert_called_once()
706-
self.mock_logger_info.assert_any_call("Record saved successfully to the DLQ")
707-
self.mock_logger_error.assert_not_called()
708-
709-
def test_send_message_client_error(self):
710-
record = {"a": "bbb"}
711-
self.mock_sqs_client.send_message.side_effect = Exception("SQS error")
712-
713-
delta.send_message(record, "test-queue-url")
714-
715-
self.mock_logger_error.assert_called()

0 commit comments

Comments
 (0)