diff --git a/lambdas/redis_sync/src/record_processor.py b/lambdas/redis_sync/src/record_processor.py index 40232714b..82554bd26 100644 --- a/lambdas/redis_sync/src/record_processor.py +++ b/lambdas/redis_sync/src/record_processor.py @@ -1,6 +1,7 @@ from redis_cacher import RedisCacher from common.clients import logger from common.s3_event import S3EventRecord +from common.service_return import ServiceReturn ''' Record Processor This module processes individual S3 records from an event. @@ -8,7 +9,7 @@ ''' -def process_record(record: S3EventRecord) -> dict: +def process_record(record: S3EventRecord) -> ServiceReturn: try: logger.info("Processing S3 r bucket: %s, key: %s", record.get_bucket_name(), record.get_object_key()) @@ -20,17 +21,21 @@ def process_record(record: S3EventRecord) -> dict: } try: - result = RedisCacher.upload(bucket_name, file_key) - result.update(base_log_data) - return result + service_result = RedisCacher.upload(bucket_name, file_key) + if service_result.is_success: + result = service_result.value + result.update(base_log_data) + return ServiceReturn(value=result) + else: + return ServiceReturn( + status=500, + message=f"Failed to upload to cache for filename '{file_key}': {service_result.message}") except Exception as error: # pylint: disable=broad-except logger.exception("Error uploading to cache for filename '%s'", file_key) error_data = {"status": "error", "message": str(error)} error_data.update(base_log_data) - return error_data + return ServiceReturn(value=error_data) except Exception: # pylint: disable=broad-except - msg = "Error processing record" - logger.exception(msg) - return {"status": "error", "message": msg} + return ServiceReturn(value={"status": "error", "message": "Error processing record"}) diff --git a/lambdas/redis_sync/src/redis_cacher.py b/lambdas/redis_sync/src/redis_cacher.py index 28f9c8622..0320deb03 100644 --- a/lambdas/redis_sync/src/redis_cacher.py +++ b/lambdas/redis_sync/src/redis_cacher.py @@ -5,22 +5,28 @@ from common.clients import logger from common.redis_client import get_redis_client from common.s3_reader import S3Reader +from common.service_return import ServiceReturn class RedisCacher: """Class to handle interactions with ElastiCache (Redis) for configuration files.""" @staticmethod - def upload(bucket_name: str, file_key: str) -> dict: + def upload(bucket_name: str, file_key: str) -> ServiceReturn: try: logger.info("Upload from s3 to Redis cache. file '%s'. bucket '%s'", file_key, bucket_name) # get from s3 - config_file_content = S3Reader.read(bucket_name, file_key) - if isinstance(config_file_content, str): - config_file_content = json.loads(config_file_content) + result = S3Reader.read(bucket_name, file_key) + if result.is_success: + config_file_content = result.value + if isinstance(config_file_content, str): + config_file_content = json.loads(config_file_content) - logger.info("Config file content for '%s': %s", file_key, config_file_content) + logger.info("Config file content for '%s': %s", file_key, config_file_content) + else: + return ServiceReturn(status=500, + message=f"Failed to read S3 file '{file_key}': {result.message}") # Transform redis_mappings = transform_map(config_file_content, file_key) @@ -40,8 +46,7 @@ def upload(bucket_name: str, file_key: str) -> dict: redis_client.hdel(key, *fields_to_delete) logger.info("Deleted mapping fields for %s: %s", key, fields_to_delete) - return {"status": "success", "message": f"File {file_key} uploaded to Redis cache."} + # return success - not certain of the "what", as ServiceReturn already manages status + return ServiceReturn(value={"status": "success", "message": f"File {file_key} uploaded to Redis cache."}) except Exception: - msg = f"Error uploading file '{file_key}' to Redis cache" - logger.exception(msg) - return {"status": "error", "message": msg} + return ServiceReturn(status=500, message=f"Error uploading file '{file_key}' to Redis cache") diff --git a/lambdas/redis_sync/src/redis_sync.py b/lambdas/redis_sync/src/redis_sync.py index 8b3362668..77db318fa 100644 --- a/lambdas/redis_sync/src/redis_sync.py +++ b/lambdas/redis_sync/src/redis_sync.py @@ -4,6 +4,7 @@ from common.log_decorator import logging_decorator from common.redis_client import get_redis_client from common.s3_event import S3Event +from common.service_return import ServiceReturn ''' Event Processor The Business Logic for the Redis Sync Lambda Function. @@ -15,18 +16,21 @@ def _process_all_records(s3_records: list) -> dict: error_count = 0 file_keys = [] for record in s3_records: - record_result = process_record(record) - file_keys.append(record_result["file_key"]) - if record_result["status"] == "error": + service_result = process_record(record) + file_keys.append(service_result.value.get("file_key")) + if service_result.status == 500: error_count += 1 if error_count > 0: logger.error("Processed %d records with %d errors", record_count, error_count) - return {"status": "error", "message": f"Processed {record_count} records with {error_count} errors", - "file_keys": file_keys} + return ServiceReturn(value={"status": "error", + "message": f"Processed {record_count} records with {error_count} errors", + "file_keys": file_keys}) else: logger.info("Successfully processed all %d records", record_count) - return {"status": "success", "message": f"Successfully processed {record_count} records", - "file_keys": file_keys} + return ServiceReturn( + value={"status": "success", + "message": f"Successfully processed {record_count} records", + "file_keys": file_keys}) @logging_decorator(prefix="redis_sync", stream_name=STREAM_NAME) @@ -44,7 +48,12 @@ def handler(event, _): logger.info(no_records) return {"status": "success", "message": no_records} else: - return _process_all_records(s3_records) + service_result = _process_all_records(s3_records) + if service_result.is_success: + return service_result.value + else: + return {"status": "error", "message": service_result.value.get("message"), + "file_keys": service_result.value.get("file_keys", [])} else: logger.info(no_records) return {"status": "success", "message": no_records} diff --git a/lambdas/shared/src/common/s3_reader.py b/lambdas/shared/src/common/s3_reader.py index 2f740956a..b1b3c41bb 100644 --- a/lambdas/shared/src/common/s3_reader.py +++ b/lambdas/shared/src/common/s3_reader.py @@ -1,4 +1,5 @@ from common.clients import s3_client, logger +from service_return import ServiceReturn class S3Reader: @@ -11,11 +12,11 @@ class S3Reader: """ @staticmethod - def read(bucket_name, file_key): + def read(bucket_name, file_key) -> ServiceReturn: try: s3_file = s3_client.get_object(Bucket=bucket_name, Key=file_key) - return s3_file["Body"].read().decode("utf-8") + return ServiceReturn(value=s3_file["Body"].read().decode("utf-8")) except Exception as error: # pylint: disable=broad-except logger.exception("Error reading S3 file '%s' from bucket '%s'", file_key, bucket_name) - raise error + return ServiceReturn(status=500, message="Error reading S3 file", exception=error) diff --git a/lambdas/shared/src/common/service_return.py b/lambdas/shared/src/common/service_return.py new file mode 100644 index 000000000..fec6fa258 --- /dev/null +++ b/lambdas/shared/src/common/service_return.py @@ -0,0 +1,78 @@ +import inspect +from responses import logger + + +class ServiceReturn: + def __init__( + self, + status: int = 200, + message: str = "OK", + value: any = None, + exception: Exception = None + ): + self.status = status + self.message = message + self.value = value + self.exception = exception + self.call_location = self._get_call_location() + + if self.status != 200 and (self.exception or self.message): + msg = { + "status": self.status, + "Message": self.message, + "Exception": self.exception, + "Location": self.call_location + } + logger.warning(f"ServiceReturn : {msg}") + + def _get_call_location(self): + # Get the name of the function 2 frames up + frame = inspect.currentframe() + outer_frames = inspect.getouterframes(frame) + if len(outer_frames) >= 3: + caller_frame = outer_frames[2] + return f"{caller_frame.function} ({caller_frame.filename}:{caller_frame.lineno})" + return "Unknown" + + def to_string(self): + # bit of a mess but hopefully, you get the point + exception_msg = f"{type(self.exception).__name__}: {self.exception}" if self.exception else "No exception" + error_msg = f"{self.message}" if self.message else "" + return f"{self.call_location}." \ + f"{error_msg} " \ + f"{exception_msg})" + + def __bool__(self): + return self.exception is None + + @property + def is_success(self) -> bool: + return self.exception is None + + +# an example function that uses ServiceReturn +def my_divide(a: int, b: int) -> ServiceReturn: + try: + result = a / b + return ServiceReturn(value=result) + except Exception as e: + return ServiceReturn( + status=500, + message="Division failed", + exception=e + ) + + +# example of usage +# Test +result = my_divide(10, 0) + + +if result: # or result.is_success as below + print(f"Result: {result.value}") +else: + print(f"Error: {result.to_string()}") + +# Optional explicit check +if result.is_success: + print("Division succeeded.")