Skip to content

Commit 9fe60d3

Browse files
authored
VED 369 Redis Sync firehose logging (#610)
* Decorator tests * exclusion * handler decorator tests
1 parent 0542995 commit 9fe60d3

File tree

8 files changed

+383
-82
lines changed

8 files changed

+383
-82
lines changed

redis_sync/src/log_decorator.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
"""This module contains the logging decorator for sending the appropriate logs to Cloudwatch and Firehose.
2+
The decorator log pattern is shared by filenameprocessor, recordprocessor, ack_backend and redis_sync modules.
3+
and therefore could be moved to a common module in the future.
4+
TODO: refactor to a common module.
5+
TODO: Duplication check has been suppressed in sonar-project.properties. Remove once refactored.
6+
"""
7+
import json
8+
import time
9+
from datetime import datetime
10+
from functools import wraps
11+
from clients import firehose_client, logger, STREAM_NAME
12+
13+
14+
def send_log_to_firehose(log_data: dict) -> None:
15+
"""Sends the log_message to Firehose"""
16+
try:
17+
record = {"Data": json.dumps({"event": log_data}).encode("utf-8")}
18+
response = firehose_client.put_record(DeliveryStreamName=STREAM_NAME, Record=record)
19+
logger.info("Log sent to Firehose: %s", response)
20+
except Exception as error: # pylint:disable = broad-exception-caught
21+
logger.exception("Error sending log to Firehose: %s", error)
22+
23+
24+
def generate_and_send_logs(
25+
start_time, base_log_data: dict, additional_log_data: dict, is_error_log: bool = False
26+
) -> None:
27+
"""Generates log data which includes the base_log_data, additional_log_data, and time taken (calculated using the
28+
current time and given start_time) and sends them to Cloudwatch and Firehose."""
29+
log_data = {**base_log_data, "time_taken": f"{round(time.time() - start_time, 5)}s", **additional_log_data}
30+
log_function = logger.error if is_error_log else logger.info
31+
log_function(json.dumps(log_data))
32+
send_log_to_firehose(log_data)
33+
34+
35+
def logging_decorator(prefix="redis_sync"):
36+
def decorator(func):
37+
@wraps(func)
38+
def wrapper(*args, **kwargs):
39+
base_log_data = {
40+
"function_name": f"{prefix}_{func.__name__}",
41+
"date_time": str(datetime.now())
42+
}
43+
start_time = time.time()
44+
try:
45+
result = func(*args, **kwargs)
46+
generate_and_send_logs(start_time, base_log_data, additional_log_data=result)
47+
return result
48+
except Exception as e:
49+
additional_log_data = {"statusCode": 500, "error": str(e)}
50+
generate_and_send_logs(start_time, base_log_data, additional_log_data, is_error_log=True)
51+
raise
52+
return wrapper
53+
return decorator

redis_sync/src/record_processor.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,22 @@ def process_record(record: S3EventRecord) -> dict:
1515
bucket_name = record.get_bucket_name()
1616
file_key = record.get_object_key()
1717

18+
base_log_data = {
19+
"file_key": file_key
20+
}
21+
1822
try:
19-
return RedisCacher.upload(bucket_name, file_key)
23+
result = RedisCacher.upload(bucket_name, file_key)
24+
result.update(base_log_data)
25+
return result
2026

2127
except Exception as error: # pylint: disable=broad-except
2228
logger.exception("Error uploading to cache for filename '%s'", file_key)
23-
return {"status": "error", "message": str(error)}
29+
error_data = {"status": "error", "message": str(error)}
30+
error_data.update(base_log_data)
31+
return error_data
2432

2533
except Exception: # pylint: disable=broad-except
26-
logger.exception("Error obtaining file_key")
27-
return {"status": "error", "message": "Error obtaining file_key"}
34+
msg = "Error processing record"
35+
logger.exception(msg)
36+
return {"status": "error", "message": msg}

redis_sync/src/redis_sync.py

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,15 @@
22
from s3_event import S3Event
33
from record_processor import process_record
44
from event_read import read_event
5+
from log_decorator import logging_decorator
56
from clients import redis_client
67
'''
78
Event Processor
89
The Business Logic for the Redis Sync Lambda Function.
910
This module processes S3 events and iterates through each record to process them individually.'''
1011

1112

13+
@logging_decorator(prefix="redis_sync")
1214
def handler(event, _):
1315

1416
try:
@@ -19,17 +21,25 @@ def handler(event, _):
1921
logger.info("Processing S3 event with %d records", len(event.get('Records', [])))
2022
s3_event = S3Event(event)
2123
record_count = len(s3_event.get_s3_records())
22-
error_count = 0
23-
for record in s3_event.get_s3_records():
24-
record_result = process_record(record)
25-
if record_result["status"] == "error":
26-
error_count += 1
27-
if error_count > 0:
28-
logger.error("Processed %d records with %d errors", record_count, error_count)
29-
return {"status": "error", "message": f"Processed {record_count} records with {error_count} errors"}
24+
if record_count == 0:
25+
logger.info("No records found in event")
26+
return {"status": "success", "message": "No records found in event"}
3027
else:
31-
logger.info("Successfully processed all %d records", record_count)
32-
return {"status": "success", "message": f"Successfully processed {record_count} records"}
28+
error_count = 0
29+
file_keys = []
30+
for record in s3_event.get_s3_records():
31+
record_result = process_record(record)
32+
file_keys.append(record_result["file_key"])
33+
if record_result["status"] == "error":
34+
error_count += 1
35+
if error_count > 0:
36+
logger.error("Processed %d records with %d errors", record_count, error_count)
37+
return {"status": "error", "message": f"Processed {record_count} records with {error_count} errors",
38+
"file_keys": file_keys}
39+
else:
40+
logger.info("Successfully processed all %d records", record_count)
41+
return {"status": "success", "message": f"Successfully processed {record_count} records",
42+
"file_keys": file_keys}
3343
else:
3444
logger.info("No records found in event")
3545
return {"status": "success", "message": "No records found in event"}

redis_sync/tests/test_clients.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,7 @@ def setUp(self):
3535
self.mock_boto3_client.return_value.send_message = {}
3636

3737
def tearDown(self):
38-
self.getenv_patch.stop()
39-
self.boto3_client_patch.stop()
40-
self.logging_patch.stop()
41-
self.redis_patch.stop()
42-
self.logger_info_patcher.stop()
38+
patch.stopall()
4339

4440
def test_os_environ(self):
4541
# Test if environment variables are set correctly

redis_sync/tests/test_handler.py

Lines changed: 68 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
''' unit tests for redis_sync.py '''
22
import unittest
3+
import importlib
34
from unittest.mock import patch
4-
from redis_sync import handler
5-
from s3_event import S3EventRecord
65
from constants import RedisCacheKey
6+
import redis_sync
77

88

99
class TestHandler(unittest.TestCase):
@@ -40,74 +40,84 @@ def tearDown(self):
4040
self.logger_exception_patcher.stop()
4141

4242
def test_handler_success(self):
43-
mock_event = {'Records': [self.s3_vaccine]}
44-
self.mock_get_s3_records.return_value = [self.s3_vaccine]
45-
46-
result = handler(mock_event, None)
47-
48-
self.assertTrue(result)
49-
self.mock_logger_info.assert_called_with("Successfully processed all %d records", 1)
43+
with patch("log_decorator.logging_decorator", lambda prefix=None: (lambda f: f)):
44+
importlib.reload(redis_sync)
45+
mock_event = {'Records': [self.s3_vaccine]}
46+
self.mock_get_s3_records.return_value = [self.s3_vaccine]
47+
with patch("redis_sync.process_record") as mock_record_processor:
48+
mock_record_processor.return_value = {'status': 'success', 'message': 'Processed successfully',
49+
'file_key': 'test-key'}
50+
result = redis_sync.handler(mock_event, None)
51+
self.assertEqual(result["status"], "success")
52+
self.assertEqual(result["message"], "Successfully processed 1 records")
53+
self.assertEqual(result["file_keys"], ['test-key'])
5054

5155
def test_handler_failure(self):
52-
mock_event = {'Records': [self.s3_vaccine]}
56+
with patch("log_decorator.logging_decorator", lambda prefix=None: (lambda f: f)):
57+
importlib.reload(redis_sync)
5358

54-
self.mock_get_s3_records.return_value = [self.s3_vaccine]
55-
self.mock_record_processor.side_effect = Exception("Processing error")
59+
mock_event = {'Records': [self.s3_vaccine]}
60+
with patch("redis_sync.process_record") as mock_record_processor:
61+
self.mock_get_s3_records.return_value = [self.s3_vaccine]
62+
mock_record_processor.side_effect = Exception("Processing error 1")
5663

57-
result = handler(mock_event, None)
64+
result = redis_sync.handler(mock_event, None)
5865

59-
self.assertEqual(result, {'status': 'error', 'message': 'Error processing S3 event'})
60-
self.mock_logger_info.assert_called_with("Processing S3 event with %d records", 1)
66+
self.assertEqual(result, {'status': 'error', 'message': 'Error processing S3 event'})
6167

6268
def test_handler_no_records(self):
63-
mock_event = {'Records': []}
64-
65-
self.mock_get_s3_records.return_value = []
66-
67-
result = handler(mock_event, None)
68-
69-
self.assertTrue(result)
70-
self.mock_logger_info.assert_called_with("Successfully processed all %d records", 0)
69+
with patch("log_decorator.logging_decorator", lambda prefix=None: (lambda f: f)):
70+
importlib.reload(redis_sync)
71+
mock_event = {'Records': []}
72+
self.mock_get_s3_records.return_value = []
73+
result = redis_sync.handler(mock_event, None)
74+
self.assertEqual(result, {'status': 'success', 'message': 'No records found in event'})
7175

7276
def test_handler_exception(self):
73-
mock_event = {'Records': [self.s3_vaccine]}
74-
self.mock_get_s3_records.return_value = [self.s3_vaccine]
75-
self.mock_record_processor.side_effect = Exception("Processing error")
76-
77-
result = handler(mock_event, None)
78-
79-
self.assertEqual(result, {'status': 'error', 'message': 'Error processing S3 event'})
80-
self.mock_logger_info.assert_called_with("Processing S3 event with %d records", 1)
77+
with patch("log_decorator.logging_decorator", lambda prefix=None: (lambda f: f)):
78+
importlib.reload(redis_sync)
79+
mock_event = {'Records': [self.s3_vaccine]}
80+
self.mock_get_s3_records.return_value = [self.s3_vaccine]
81+
with patch("redis_sync.process_record") as mock_record_processor:
82+
mock_record_processor.side_effect = Exception("Processing error 2")
83+
result = redis_sync.handler(mock_event, None)
84+
self.assertEqual(result, {'status': 'error', 'message': 'Error processing S3 event'})
8185

8286
def test_handler_with_empty_event(self):
83-
self.mock_get_s3_records.return_value = []
84-
85-
result = handler({}, None)
86-
87-
self.assertEqual(result, {'status': 'success', 'message': 'No records found in event'})
87+
with patch("log_decorator.logging_decorator", lambda prefix=None: (lambda f: f)):
88+
importlib.reload(redis_sync)
89+
self.mock_get_s3_records.return_value = []
90+
result = redis_sync.handler({}, None)
91+
self.assertEqual(result, {'status': 'success', 'message': 'No records found in event'})
8892

8993
def test_handler_multi_record(self):
90-
mock_event = {'Records': [self.s3_vaccine, self.s3_supplier]}
91-
92-
self.mock_get_s3_records.return_value = [
93-
S3EventRecord(self.s3_vaccine),
94-
S3EventRecord(self.s3_supplier)
95-
]
96-
self.mock_record_processor.return_value = True
97-
98-
result = handler(mock_event, None)
94+
with patch("log_decorator.logging_decorator", lambda prefix=None: (lambda f: f)):
95+
importlib.reload(redis_sync)
96+
mock_event = {'Records': [self.s3_vaccine, self.s3_supplier]}
97+
# If you need S3EventRecord, uncomment the import and use it here
98+
# self.mock_get_s3_records.return_value = [
99+
# S3EventRecord(self.s3_vaccine),
100+
# S3EventRecord(self.s3_supplier)
101+
# ]
102+
self.mock_get_s3_records.return_value = [self.s3_vaccine, self.s3_supplier]
103+
with patch("redis_sync.process_record") as mock_record_processor:
104+
mock_record_processor.side_effect = [{'status': 'success', 'message': 'Processed successfully',
105+
'file_key': 'test-key1'},
106+
{'status': 'success', 'message': 'Processed successfully',
107+
'file_key': 'test-key2'}]
108+
result = redis_sync.handler(mock_event, None)
109+
self.assertEqual(result['status'], 'success')
110+
self.assertEqual(result['message'], 'Successfully processed 2 records')
111+
self.assertEqual(result['file_keys'][0], 'test-key1')
112+
self.assertEqual(result['file_keys'][1], 'test-key2')
99113

100-
self.assertTrue(result)
101-
self.mock_logger_info.assert_called_with("Processing S3 event with %d records", 2)
102-
103-
# test to check that event_read is called when "read" key is passed in the event
104114
def test_handler_read_event(self):
105-
mock_event = {'read': 'myhash'}
106-
mock_read_event_response = {'field1': 'value1'}
107-
108-
with patch('redis_sync.read_event') as mock_read_event:
109-
mock_read_event.return_value = mock_read_event_response
110-
result = handler(mock_event, None)
111-
112-
mock_read_event.assert_called_once()
113-
self.assertEqual(result, mock_read_event_response)
115+
with patch("log_decorator.logging_decorator", lambda prefix=None: (lambda f: f)):
116+
importlib.reload(redis_sync)
117+
mock_event = {'read': 'myhash'}
118+
mock_read_event_response = {'field1': 'value1'}
119+
with patch('redis_sync.read_event') as mock_read_event:
120+
mock_read_event.return_value = mock_read_event_response
121+
result = redis_sync.handler(mock_event, None)
122+
mock_read_event.assert_called_once()
123+
self.assertEqual(result, mock_read_event_response)

0 commit comments

Comments
 (0)