Skip to content

Commit 4878785

Browse files
committed
log_firehose
1 parent cb27850 commit 4878785

File tree

5 files changed

+44
-164
lines changed

5 files changed

+44
-164
lines changed

lambdas/delta_backend/src/delta.py

Lines changed: 11 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import decimal
22
import json
3-
import logging
43
import os
54
import time
65
from datetime import UTC, datetime, timedelta
@@ -9,19 +8,16 @@
98
from boto3.dynamodb.conditions import Attr
109
from botocore.exceptions import ClientError
1110

11+
from common.clients import STREAM_NAME, logger
12+
from common.log_firehose import send_log_to_firehose
1213
from common.mappings import ActionFlag, EventName, Operation
1314
from converter import Converter
14-
from log_firehose import FirehoseLogger
1515

1616
failure_queue_url = os.environ["AWS_SQS_QUEUE_URL"]
1717
delta_table_name = os.environ["DELTA_TABLE_NAME"]
1818
delta_source = os.environ["SOURCE"]
1919
delta_ttl_days = os.environ["DELTA_TTL_DAYS"]
2020
region_name = "eu-west-2"
21-
logging.basicConfig()
22-
logger = logging.getLogger()
23-
logger.setLevel("INFO")
24-
firehose_logger = FirehoseLogger()
2521

2622
delta_table = None
2723

@@ -87,14 +83,6 @@ def get_creation_and_expiry_times(creation_timestamp: float) -> (str, int):
8783
return creation_datetime.isoformat(), expiry_timestamp
8884

8985

90-
def send_firehose(log_data):
91-
try:
92-
firehose_log = {"event": log_data}
93-
firehose_logger.send_log(firehose_log)
94-
except Exception:
95-
logger.exception("Error sending log to Firehose")
96-
97-
9886
def handle_dynamodb_response(response, error_records):
9987
match response:
10088
case {"ResponseMetadata": {"HTTPStatusCode": 200}} if error_records:
@@ -248,14 +236,13 @@ def handler(event, _context):
248236
success, operation_outcome = process_record(record)
249237
overall_success = overall_success and success
250238
end = time.time()
251-
send_firehose(
252-
{
253-
"function_name": "delta_sync",
254-
"operation_outcome": operation_outcome,
255-
"date_time": datetime_str,
256-
"time_taken": f"{round(end - start, 5)}s",
257-
}
258-
)
239+
log_data = {
240+
"function_name": "delta_sync",
241+
"operation_outcome": operation_outcome,
242+
"date_time": datetime_str,
243+
"time_taken": f"{round(end - start, 5)}s",
244+
}
245+
send_log_to_firehose(STREAM_NAME, log_data)
259246
except Exception:
260247
overall_success = False
261248
operation_outcome = {
@@ -265,7 +252,8 @@ def handler(event, _context):
265252
}
266253
logger.exception(operation_outcome["diagnostics"])
267254
send_message(event) # Send failed records to DLQ
268-
send_firehose({"function_name": "delta_sync", "operation_outcome": operation_outcome})
255+
log_data = {"function_name": "delta_sync", "operation_outcome": operation_outcome}
256+
send_log_to_firehose(STREAM_NAME, log_data)
269257

270258
if not overall_success:
271259
send_message(event)

lambdas/delta_backend/src/log_firehose.py

Lines changed: 0 additions & 36 deletions
This file was deleted.

lambdas/delta_backend/tests/test_convert.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,13 +75,13 @@ def setUp(self):
7575
self.logger_exception_patcher = patch("logging.Logger.exception")
7676
self.mock_logger_exception = self.logger_exception_patcher.start()
7777

78-
self.firehose_logger_patcher = patch("delta.firehose_logger")
79-
self.mock_firehose_logger = self.firehose_logger_patcher.start()
78+
self.send_log_to_firehose_patcher = patch("delta.send_log_to_firehose")
79+
self.mock_send_log_to_firehose = self.send_log_to_firehose_patcher.start()
8080

8181
def tearDown(self):
8282
self.logger_exception_patcher.stop()
8383
self.logger_info_patcher.stop()
84-
self.mock_firehose_logger.stop()
84+
self.mock_send_log_to_firehose.stop()
8585

8686
self.mock.stop()
8787

lambdas/delta_backend/tests/test_delta.py

Lines changed: 30 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,8 @@ def setUp(self):
4242
self.logger_error_patcher = patch("logging.Logger.error")
4343
self.mock_logger_error = self.logger_error_patcher.start()
4444

45-
self.firehose_logger_patcher = patch("delta.firehose_logger")
46-
self.mock_firehose_logger = self.firehose_logger_patcher.start()
45+
self.send_log_to_firehose_patcher = patch("delta.send_log_to_firehose")
46+
self.mock_send_log_to_firehose = self.send_log_to_firehose_patcher.start()
4747

4848
self.sqs_client_patcher = patch("delta.sqs_client")
4949
self.mock_sqs_client = self.sqs_client_patcher.start()
@@ -56,7 +56,7 @@ def tearDown(self):
5656
self.logger_warning_patcher.stop()
5757
self.logger_error_patcher.stop()
5858
self.logger_info_patcher.stop()
59-
self.mock_firehose_logger.stop()
59+
self.mock_send_log_to_firehose.stop()
6060
self.sqs_client_patcher.stop()
6161
self.delta_table_patcher.stop()
6262

@@ -105,7 +105,7 @@ def test_handler_success_insert(self):
105105
# Assert
106106
self.assertTrue(result)
107107
self.mock_delta_table.put_item.assert_called()
108-
self.mock_firehose_logger.send_log.assert_called() # check logged
108+
self.mock_send_log_to_firehose.assert_called() # check logged
109109
put_item_call_args = self.mock_delta_table.put_item.call_args # check data written to DynamoDB
110110
put_item_data = put_item_call_args.kwargs["Item"]
111111
self.assertIn("Imms", put_item_data)
@@ -149,7 +149,7 @@ def test_handler_success_update(self):
149149
# Assert
150150
self.assertTrue(result)
151151
self.mock_delta_table.put_item.assert_called()
152-
self.mock_firehose_logger.send_log.assert_called() # check logged
152+
self.mock_send_log_to_firehose.assert_called() # check logged
153153
put_item_call_args = self.mock_delta_table.put_item.call_args # check data written to DynamoDB
154154
put_item_data = put_item_call_args.kwargs["Item"]
155155
self.assertIn("Imms", put_item_data)
@@ -174,7 +174,7 @@ def test_handler_success_delete_physical(self):
174174
# Assert
175175
self.assertTrue(result)
176176
self.mock_delta_table.put_item.assert_called()
177-
self.mock_firehose_logger.send_log.assert_called() # check logged
177+
self.mock_send_log_to_firehose.assert_called() # check logged
178178
put_item_call_args = self.mock_delta_table.put_item.call_args # check data written to DynamoDB
179179
put_item_data = put_item_call_args.kwargs["Item"]
180180
self.assertIn("Imms", put_item_data)
@@ -198,7 +198,7 @@ def test_handler_success_delete_logical(self):
198198
# Assert
199199
self.assertTrue(result)
200200
self.mock_delta_table.put_item.assert_called()
201-
self.mock_firehose_logger.send_log.assert_called() # check logged
201+
self.mock_send_log_to_firehose.assert_called() # check logged
202202
put_item_call_args = self.mock_delta_table.put_item.call_args # check data written to DynamoDB
203203
put_item_data = put_item_call_args.kwargs["Item"]
204204
self.assertIn("Imms", put_item_data)
@@ -217,7 +217,7 @@ def test_dps_record_skipped(self, mock_logger_info):
217217

218218
# Check logging and Firehose were called
219219
mock_logger_info.assert_called_with("Record from DPS skipped")
220-
self.mock_firehose_logger.send_log.assert_called()
220+
self.mock_send_log_to_firehose.assert_called()
221221
self.mock_sqs_client.send_message.assert_not_called()
222222

223223
@patch("delta.Converter")
@@ -237,15 +237,15 @@ def test_partial_success_with_errors(self, mock_converter):
237237
self.assertTrue(response)
238238
# Check logging and Firehose were called
239239
self.mock_logger_info.assert_called()
240-
self.assertEqual(self.mock_firehose_logger.send_log.call_count, 1)
241-
self.mock_firehose_logger.send_log.assert_called_once()
240+
self.assertEqual(self.mock_send_log_to_firehose.call_count, 1)
241+
self.mock_send_log_to_firehose.assert_called_once()
242242

243-
# Get the actual argument passed to send_log
244-
args, kwargs = self.mock_firehose_logger.send_log.call_args
245-
sent_payload = args[0] # First positional arg
243+
# Get the actual argument passed to send_log_to_firehose
244+
args, kwargs = self.mock_send_log_to_firehose.call_args
245+
sent_payload = args[1] # Second positional arg
246246

247247
# Navigate to the specific message
248-
status_desc = sent_payload["event"]["operation_outcome"]["statusDesc"]
248+
status_desc = sent_payload["operation_outcome"]["statusDesc"]
249249

250250
# Assert the expected message is present
251251
self.assertIn(
@@ -275,7 +275,7 @@ def test_send_message_multi_records_diverse(self):
275275
# Assert
276276
self.assertTrue(result)
277277
self.assertEqual(self.mock_delta_table.put_item.call_count, len(records_config))
278-
self.assertEqual(self.mock_firehose_logger.send_log.call_count, len(records_config))
278+
self.assertEqual(self.mock_send_log_to_firehose.call_count, len(records_config))
279279

280280
def test_send_message_skipped_records_diverse(self):
281281
"""Check skipped records sent to firehose but not to DynamoDB"""
@@ -301,7 +301,7 @@ def test_send_message_skipped_records_diverse(self):
301301
# Assert
302302
self.assertTrue(result)
303303
self.assertEqual(self.mock_delta_table.put_item.call_count, 3)
304-
self.assertEqual(self.mock_firehose_logger.send_log.call_count, len(records_config))
304+
self.assertEqual(self.mock_send_log_to_firehose.call_count, len(records_config))
305305

306306
def test_send_message_multi_create(self):
307307
# Arrange
@@ -319,7 +319,7 @@ def test_send_message_multi_create(self):
319319
# Assert
320320
self.assertTrue(result)
321321
self.assertEqual(self.mock_delta_table.put_item.call_count, 3)
322-
self.assertEqual(self.mock_firehose_logger.send_log.call_count, 3)
322+
self.assertEqual(self.mock_send_log_to_firehose.call_count, 3)
323323

324324
def test_send_message_multi_update(self):
325325
# Arrange
@@ -337,7 +337,7 @@ def test_send_message_multi_update(self):
337337
# Assert
338338
self.assertTrue(result)
339339
self.assertEqual(self.mock_delta_table.put_item.call_count, 3)
340-
self.assertEqual(self.mock_firehose_logger.send_log.call_count, 3)
340+
self.assertEqual(self.mock_send_log_to_firehose.call_count, 3)
341341

342342
def test_send_message_multi_logical_delete(self):
343343
# Arrange
@@ -371,7 +371,7 @@ def test_send_message_multi_logical_delete(self):
371371
# Assert
372372
self.assertTrue(result)
373373
self.assertEqual(self.mock_delta_table.put_item.call_count, 3)
374-
self.assertEqual(self.mock_firehose_logger.send_log.call_count, 3)
374+
self.assertEqual(self.mock_send_log_to_firehose.call_count, 3)
375375

376376
def test_send_message_multi_physical_delete(self):
377377
# Arrange
@@ -389,7 +389,7 @@ def test_send_message_multi_physical_delete(self):
389389
# Assert
390390
self.assertTrue(result)
391391
self.assertEqual(self.mock_delta_table.put_item.call_count, 3)
392-
self.assertEqual(self.mock_firehose_logger.send_log.call_count, 3)
392+
self.assertEqual(self.mock_send_log_to_firehose.call_count, 3)
393393

394394
def test_single_error_in_multi(self):
395395
# Arrange
@@ -412,7 +412,7 @@ def test_single_error_in_multi(self):
412412
# Assert
413413
self.assertFalse(result)
414414
self.assertEqual(self.mock_delta_table.put_item.call_count, 3)
415-
self.assertEqual(self.mock_firehose_logger.send_log.call_count, 3)
415+
self.assertEqual(self.mock_send_log_to_firehose.call_count, 3)
416416
self.assertEqual(self.mock_logger_error.call_count, 1)
417417

418418
def test_single_exception_in_multi(self):
@@ -437,7 +437,7 @@ def test_single_exception_in_multi(self):
437437
# Assert
438438
self.assertFalse(result)
439439
self.assertEqual(self.mock_delta_table.put_item.call_count, len(records_config))
440-
self.assertEqual(self.mock_firehose_logger.send_log.call_count, len(records_config))
440+
self.assertEqual(self.mock_send_log_to_firehose.call_count, len(records_config))
441441

442442
def test_single_duplicate_in_multi(self):
443443
# Arrange
@@ -460,16 +460,16 @@ def test_single_duplicate_in_multi(self):
460460
# Assert
461461
self.assertTrue(result)
462462
self.assertEqual(self.mock_delta_table.put_item.call_count, len(records_config))
463-
self.assertEqual(self.mock_firehose_logger.send_log.call_count, len(records_config))
463+
self.assertEqual(self.mock_send_log_to_firehose.call_count, len(records_config))
464464

465465
@patch("delta.process_record")
466-
@patch("delta.send_firehose")
467-
def test_handler_calls_process_record_for_each_event(self, mock_send_firehose, mock_process_record):
466+
@patch("delta.send_log_to_firehose")
467+
def test_handler_calls_process_record_for_each_event(self, mock_send_log_to_firehose, mock_process_record):
468468
# Arrange
469469
event = {"Records": [{"a": "record1"}, {"a": "record2"}, {"a": "record3"}]}
470470
# Mock process_record to always return True
471471
mock_process_record.return_value = True, {}
472-
mock_send_firehose.return_value = None
472+
mock_send_log_to_firehose.return_value = None
473473

474474
# Act
475475
result = handler(event, {})
@@ -480,15 +480,15 @@ def test_handler_calls_process_record_for_each_event(self, mock_send_firehose, m
480480

481481
# TODO depedency injection needed here
482482
@patch("delta.process_record")
483-
@patch("delta.send_firehose")
484-
def test_handler_sends_all_to_firehose(self, mock_send_firehose, mock_process_record):
483+
@patch("delta.send_log_to_firehose")
484+
def test_handler_sends_all_to_firehose(self, mock_send_log_to_firehose, mock_process_record):
485485
# Arrange
486486

487487
# event with 3 records
488488
event = {"Records": [{"a": "record1"}, {"a": "record2"}, {"a": "record3"}]}
489489
return_ok = (True, {})
490490
return_fail = (False, {})
491-
mock_send_firehose.return_value = None
491+
mock_send_log_to_firehose.return_value = None
492492
mock_process_record.side_effect = [return_ok, return_fail, return_ok]
493493

494494
# Act
@@ -498,7 +498,7 @@ def test_handler_sends_all_to_firehose(self, mock_send_firehose, mock_process_re
498498
self.assertFalse(result)
499499
self.assertEqual(mock_process_record.call_count, len(event["Records"]))
500500
# check that all records were sent to firehose
501-
self.assertEqual(mock_send_firehose.call_count, len(event["Records"]))
501+
self.assertEqual(mock_send_log_to_firehose.call_count, len(event["Records"]))
502502

503503

504504
class DeltaRecordProcessorTestCase(unittest.TestCase):

0 commit comments

Comments
 (0)