Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions redis_sync/src/log_decorator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
"""This module contains the logging decorator for sending the appropriate logs to Cloudwatch and Firehose.
The decorator log pattern is shared by filenameprocessor, recordprocessor, ack_backend and redis_sync modules.
and therefore could be moved to a common module in the future.
TODO: refactor to a common module.
TODO: Duplication check has been suppressed in sonar-project.properties. Remove once refactored.
"""
import json
import time
from datetime import datetime
from functools import wraps
from clients import firehose_client, logger, STREAM_NAME


def send_log_to_firehose(log_data: dict) -> None:
"""Sends the log_message to Firehose"""
try:
record = {"Data": json.dumps({"event": log_data}).encode("utf-8")}
response = firehose_client.put_record(DeliveryStreamName=STREAM_NAME, Record=record)
logger.info("Log sent to Firehose: %s", response)
except Exception as error: # pylint:disable = broad-exception-caught
logger.exception("Error sending log to Firehose: %s", error)


def generate_and_send_logs(
start_time, base_log_data: dict, additional_log_data: dict, is_error_log: bool = False
) -> None:
"""Generates log data which includes the base_log_data, additional_log_data, and time taken (calculated using the
current time and given start_time) and sends them to Cloudwatch and Firehose."""
log_data = {**base_log_data, "time_taken": f"{round(time.time() - start_time, 5)}s", **additional_log_data}
log_function = logger.error if is_error_log else logger.info
log_function(json.dumps(log_data))
send_log_to_firehose(log_data)


def logging_decorator(prefix="redis_sync"):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
base_log_data = {
"function_name": f"{prefix}_{func.__name__}",
"date_time": str(datetime.now())
}
start_time = time.time()
try:
result = func(*args, **kwargs)
generate_and_send_logs(start_time, base_log_data, additional_log_data=result)
return result
except Exception as e:
additional_log_data = {"statusCode": 500, "error": str(e)}
generate_and_send_logs(start_time, base_log_data, additional_log_data, is_error_log=True)
raise
return wrapper
return decorator
17 changes: 13 additions & 4 deletions redis_sync/src/record_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,22 @@ def process_record(record: S3EventRecord) -> dict:
bucket_name = record.get_bucket_name()
file_key = record.get_object_key()

base_log_data = {
"file_key": file_key
}

try:
return RedisCacher.upload(bucket_name, file_key)
result = RedisCacher.upload(bucket_name, file_key)
result.update(base_log_data)
return result

except Exception as error: # pylint: disable=broad-except
logger.exception("Error uploading to cache for filename '%s'", file_key)
return {"status": "error", "message": str(error)}
error_data = {"status": "error", "message": str(error)}
error_data.update(base_log_data)
return error_data

except Exception: # pylint: disable=broad-except
logger.exception("Error obtaining file_key")
return {"status": "error", "message": "Error obtaining file_key"}
msg = "Error processing record"
logger.exception(msg)
return {"status": "error", "message": msg}
30 changes: 20 additions & 10 deletions redis_sync/src/redis_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@
from s3_event import S3Event
from record_processor import process_record
from event_read import read_event
from log_decorator import logging_decorator
from clients import redis_client
'''
Event Processor
The Business Logic for the Redis Sync Lambda Function.
This module processes S3 events and iterates through each record to process them individually.'''


@logging_decorator(prefix="redis_sync")
def handler(event, _):

try:
Expand All @@ -19,17 +21,25 @@ def handler(event, _):
logger.info("Processing S3 event with %d records", len(event.get('Records', [])))
s3_event = S3Event(event)
record_count = len(s3_event.get_s3_records())
error_count = 0
for record in s3_event.get_s3_records():
record_result = process_record(record)
if record_result["status"] == "error":
error_count += 1
if error_count > 0:
logger.error("Processed %d records with %d errors", record_count, error_count)
return {"status": "error", "message": f"Processed {record_count} records with {error_count} errors"}
if record_count == 0:
logger.info("No records found in event")
return {"status": "success", "message": "No records found in event"}
else:
logger.info("Successfully processed all %d records", record_count)
return {"status": "success", "message": f"Successfully processed {record_count} records"}
error_count = 0
file_keys = []
for record in s3_event.get_s3_records():
record_result = process_record(record)
file_keys.append(record_result["file_key"])
if record_result["status"] == "error":
error_count += 1
if error_count > 0:
logger.error("Processed %d records with %d errors", record_count, error_count)
return {"status": "error", "message": f"Processed {record_count} records with {error_count} errors",
"file_keys": file_keys}
else:
logger.info("Successfully processed all %d records", record_count)
return {"status": "success", "message": f"Successfully processed {record_count} records",
"file_keys": file_keys}
else:
logger.info("No records found in event")
return {"status": "success", "message": "No records found in event"}
Expand Down
6 changes: 1 addition & 5 deletions redis_sync/tests/test_clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,7 @@ def setUp(self):
self.mock_boto3_client.return_value.send_message = {}

def tearDown(self):
self.getenv_patch.stop()
self.boto3_client_patch.stop()
self.logging_patch.stop()
self.redis_patch.stop()
self.logger_info_patcher.stop()
patch.stopall()

def test_os_environ(self):
# Test if environment variables are set correctly
Expand Down
126 changes: 68 additions & 58 deletions redis_sync/tests/test_handler.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
''' unit tests for redis_sync.py '''
import unittest
import importlib
from unittest.mock import patch
from redis_sync import handler
from s3_event import S3EventRecord
from constants import RedisCacheKey
import redis_sync


class TestHandler(unittest.TestCase):
Expand Down Expand Up @@ -40,74 +40,84 @@ def tearDown(self):
self.logger_exception_patcher.stop()

def test_handler_success(self):
mock_event = {'Records': [self.s3_vaccine]}
self.mock_get_s3_records.return_value = [self.s3_vaccine]

result = handler(mock_event, None)

self.assertTrue(result)
self.mock_logger_info.assert_called_with("Successfully processed all %d records", 1)
with patch("log_decorator.logging_decorator", lambda prefix=None: (lambda f: f)):
importlib.reload(redis_sync)
mock_event = {'Records': [self.s3_vaccine]}
self.mock_get_s3_records.return_value = [self.s3_vaccine]
with patch("redis_sync.process_record") as mock_record_processor:
mock_record_processor.return_value = {'status': 'success', 'message': 'Processed successfully',
'file_key': 'test-key'}
result = redis_sync.handler(mock_event, None)
self.assertEqual(result["status"], "success")
self.assertEqual(result["message"], "Successfully processed 1 records")
self.assertEqual(result["file_keys"], ['test-key'])

def test_handler_failure(self):
mock_event = {'Records': [self.s3_vaccine]}
with patch("log_decorator.logging_decorator", lambda prefix=None: (lambda f: f)):
importlib.reload(redis_sync)

self.mock_get_s3_records.return_value = [self.s3_vaccine]
self.mock_record_processor.side_effect = Exception("Processing error")
mock_event = {'Records': [self.s3_vaccine]}
with patch("redis_sync.process_record") as mock_record_processor:
self.mock_get_s3_records.return_value = [self.s3_vaccine]
mock_record_processor.side_effect = Exception("Processing error 1")

result = handler(mock_event, None)
result = redis_sync.handler(mock_event, None)

self.assertEqual(result, {'status': 'error', 'message': 'Error processing S3 event'})
self.mock_logger_info.assert_called_with("Processing S3 event with %d records", 1)
self.assertEqual(result, {'status': 'error', 'message': 'Error processing S3 event'})

def test_handler_no_records(self):
mock_event = {'Records': []}

self.mock_get_s3_records.return_value = []

result = handler(mock_event, None)

self.assertTrue(result)
self.mock_logger_info.assert_called_with("Successfully processed all %d records", 0)
with patch("log_decorator.logging_decorator", lambda prefix=None: (lambda f: f)):
importlib.reload(redis_sync)
mock_event = {'Records': []}
self.mock_get_s3_records.return_value = []
result = redis_sync.handler(mock_event, None)
self.assertEqual(result, {'status': 'success', 'message': 'No records found in event'})

def test_handler_exception(self):
mock_event = {'Records': [self.s3_vaccine]}
self.mock_get_s3_records.return_value = [self.s3_vaccine]
self.mock_record_processor.side_effect = Exception("Processing error")

result = handler(mock_event, None)

self.assertEqual(result, {'status': 'error', 'message': 'Error processing S3 event'})
self.mock_logger_info.assert_called_with("Processing S3 event with %d records", 1)
with patch("log_decorator.logging_decorator", lambda prefix=None: (lambda f: f)):
importlib.reload(redis_sync)
mock_event = {'Records': [self.s3_vaccine]}
self.mock_get_s3_records.return_value = [self.s3_vaccine]
with patch("redis_sync.process_record") as mock_record_processor:
mock_record_processor.side_effect = Exception("Processing error 2")
result = redis_sync.handler(mock_event, None)
self.assertEqual(result, {'status': 'error', 'message': 'Error processing S3 event'})

def test_handler_with_empty_event(self):
self.mock_get_s3_records.return_value = []

result = handler({}, None)

self.assertEqual(result, {'status': 'success', 'message': 'No records found in event'})
with patch("log_decorator.logging_decorator", lambda prefix=None: (lambda f: f)):
importlib.reload(redis_sync)
self.mock_get_s3_records.return_value = []
result = redis_sync.handler({}, None)
self.assertEqual(result, {'status': 'success', 'message': 'No records found in event'})

def test_handler_multi_record(self):
mock_event = {'Records': [self.s3_vaccine, self.s3_supplier]}

self.mock_get_s3_records.return_value = [
S3EventRecord(self.s3_vaccine),
S3EventRecord(self.s3_supplier)
]
self.mock_record_processor.return_value = True

result = handler(mock_event, None)
with patch("log_decorator.logging_decorator", lambda prefix=None: (lambda f: f)):
importlib.reload(redis_sync)
mock_event = {'Records': [self.s3_vaccine, self.s3_supplier]}
# If you need S3EventRecord, uncomment the import and use it here
# self.mock_get_s3_records.return_value = [
# S3EventRecord(self.s3_vaccine),
# S3EventRecord(self.s3_supplier)
# ]
self.mock_get_s3_records.return_value = [self.s3_vaccine, self.s3_supplier]
with patch("redis_sync.process_record") as mock_record_processor:
mock_record_processor.side_effect = [{'status': 'success', 'message': 'Processed successfully',
'file_key': 'test-key1'},
{'status': 'success', 'message': 'Processed successfully',
'file_key': 'test-key2'}]
result = redis_sync.handler(mock_event, None)
self.assertEqual(result['status'], 'success')
self.assertEqual(result['message'], 'Successfully processed 2 records')
self.assertEqual(result['file_keys'][0], 'test-key1')
self.assertEqual(result['file_keys'][1], 'test-key2')

self.assertTrue(result)
self.mock_logger_info.assert_called_with("Processing S3 event with %d records", 2)

# test to check that event_read is called when "read" key is passed in the event
def test_handler_read_event(self):
mock_event = {'read': 'myhash'}
mock_read_event_response = {'field1': 'value1'}

with patch('redis_sync.read_event') as mock_read_event:
mock_read_event.return_value = mock_read_event_response
result = handler(mock_event, None)

mock_read_event.assert_called_once()
self.assertEqual(result, mock_read_event_response)
with patch("log_decorator.logging_decorator", lambda prefix=None: (lambda f: f)):
importlib.reload(redis_sync)
mock_event = {'read': 'myhash'}
mock_read_event_response = {'field1': 'value1'}
with patch('redis_sync.read_event') as mock_read_event:
mock_read_event.return_value = mock_read_event_response
result = redis_sync.handler(mock_event, None)
mock_read_event.assert_called_once()
self.assertEqual(result, mock_read_event_response)
Loading
Loading