diff --git a/src/event_gate_lambda.py b/src/event_gate_lambda.py index 16c98eb..d4574fe 100644 --- a/src/event_gate_lambda.py +++ b/src/event_gate_lambda.py @@ -29,7 +29,9 @@ from src.handlers.handler_health import HandlerHealth from src.utils.constants import SSL_CA_BUNDLE_KEY from src.utils.utils import build_error_response -from src.writers import writer_eventbridge, writer_kafka, writer_postgres +from src.writers.writer_eventbridge import WriterEventBridge +from src.writers.writer_kafka import WriterKafka +from src.writers.writer_postgres import WriterPostgres from src.utils.conf_path import CONF_DIR, INVALID_CONF_ENV @@ -79,15 +81,17 @@ handler_token = HandlerToken(config).load_public_keys() # Initialize EventGate writers -writer_eventbridge.init(logger, config) -writer_kafka.init(logger, config) -writer_postgres.init(logger) +writers = { + "kafka": WriterKafka(config), + "eventbridge": WriterEventBridge(config), + "postgres": WriterPostgres(config), +} # Initialize topic handler and load topic schemas -handler_topic = HandlerTopic(CONF_DIR, ACCESS, handler_token).load_topic_schemas() +handler_topic = HandlerTopic(CONF_DIR, ACCESS, handler_token, writers).load_topic_schemas() # Initialize health handler -handler_health = HandlerHealth() +handler_health = HandlerHealth(writers) def get_api() -> Dict[str, Any]: diff --git a/src/handlers/handler_health.py b/src/handlers/handler_health.py index 0db951a..6dff734 100644 --- a/src/handlers/handler_health.py +++ b/src/handlers/handler_health.py @@ -23,7 +23,7 @@ from datetime import datetime, timezone from typing import Dict, Any -from src.writers import writer_eventbridge, writer_kafka, writer_postgres +from src.writers.writer import Writer logger = logging.getLogger(__name__) log_level = os.environ.get("LOG_LEVEL", "INFO") @@ -35,8 +35,9 @@ class HandlerHealth: HandlerHealth manages service health checks and dependency status monitoring. """ - def __init__(self): + def __init__(self, writers: Dict[str, Writer]): self.start_time: datetime = datetime.now(timezone.utc) + self.writers = writers def get_health(self) -> Dict[str, Any]: """ @@ -51,28 +52,10 @@ def get_health(self) -> Dict[str, Any]: failures: Dict[str, str] = {} - # Check Kafka writer - if writer_kafka.STATE.get("producer") is None: - failures["kafka"] = "producer not initialized" - - # Check EventBridge writer - eventbus_arn = writer_eventbridge.STATE.get("event_bus_arn") - eventbridge_client = writer_eventbridge.STATE.get("client") - if eventbus_arn: - if eventbridge_client is None: - failures["eventbridge"] = "client not initialized" - - # Check PostgreSQL writer - postgres_config = writer_postgres.POSTGRES - if postgres_config.get("database"): - if not postgres_config.get("host"): - failures["postgres"] = "host not configured" - elif not postgres_config.get("user"): - failures["postgres"] = "user not configured" - elif not postgres_config.get("password"): - failures["postgres"] = "password not configured" - elif not postgres_config.get("port"): - failures["postgres"] = "port not configured" + for name, writer in self.writers.items(): + healthy, msg = writer.check_health() + if not healthy: + failures[name] = msg uptime_seconds = int((datetime.now(timezone.utc) - self.start_time).total_seconds()) diff --git a/src/handlers/handler_topic.py b/src/handlers/handler_topic.py index 1434fa1..ab004b6 100644 --- a/src/handlers/handler_topic.py +++ b/src/handlers/handler_topic.py @@ -28,7 +28,7 @@ from src.handlers.handler_token import HandlerToken from src.utils.utils import build_error_response -from src.writers import writer_eventbridge, writer_kafka, writer_postgres +from src.writers.writer import Writer logger = logging.getLogger(__name__) log_level = os.environ.get("LOG_LEVEL", "INFO") @@ -40,10 +40,17 @@ class HandlerTopic: HandlerTopic manages topic schemas, access control, and message posting. """ - def __init__(self, conf_dir: str, access_config: Dict[str, list[str]], handler_token: HandlerToken): + def __init__( + self, + conf_dir: str, + access_config: Dict[str, list[str]], + handler_token: HandlerToken, + writers: Dict[str, Writer], + ): self.conf_dir = conf_dir self.access_config = access_config self.handler_token = handler_token + self.writers = writers self.topics: Dict[str, Dict[str, Any]] = {} def load_topic_schemas(self) -> "HandlerTopic": @@ -128,17 +135,11 @@ def post_topic_message(self, topic_name: str, topic_message: Dict[str, Any], tok except ValidationError as exc: return build_error_response(400, "validation", exc.message) - kafka_ok, kafka_err = writer_kafka.write(topic_name, topic_message) - eventbridge_ok, eventbridge_err = writer_eventbridge.write(topic_name, topic_message) - postgres_ok, postgres_err = writer_postgres.write(topic_name, topic_message) - errors = [] - if not kafka_ok: - errors.append({"type": "kafka", "message": kafka_err}) - if not eventbridge_ok: - errors.append({"type": "eventbridge", "message": eventbridge_err}) - if not postgres_ok: - errors.append({"type": "postgres", "message": postgres_err}) + for writer_name, writer in self.writers.items(): + ok, err = writer.write(topic_name, topic_message) + if not ok: + errors.append({"type": writer_name, "message": err}) if errors: return { diff --git a/src/writers/writer.py b/src/writers/writer.py new file mode 100644 index 0000000..c98789f --- /dev/null +++ b/src/writers/writer.py @@ -0,0 +1,59 @@ +# +# Copyright 2026 ABSA Group Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +This module provides abstract base class for all EventGate writers. +""" + +from abc import ABC, abstractmethod +from typing import Any, Dict, Optional, Tuple + + +class Writer(ABC): + """ + Abstract base class for EventGate writers. + All writers inherit from this class and implement the write() method. Writers use lazy initialization. + """ + + def __init__(self, config: Dict[str, Any]) -> None: + self.config = config + + @abstractmethod + def write(self, topic_name: str, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: + """ + Publish a message to the target system. + + Args: + topic_name: Target writer topic (destination) name. + message: JSON-serializable payload to publish. + + Returns: + Tuple of (success: bool, error_message: Optional[str]). + - (True, None) on success or when writer is disabled/skipped. + - (False, "error description") on failure. + """ + + @abstractmethod + def check_health(self) -> Tuple[bool, str]: + """ + Check writer health and connectivity. + + Returns: + Tuple of (is_healthy: bool, message: str). + - (True, "ok") - configured and working. + - (True, "not configured") - not configured, skipped. + - (False, "error message") - configured but failing. + """ diff --git a/src/writers/writer_eventbridge.py b/src/writers/writer_eventbridge.py index 37e28a8..1bff60d 100644 --- a/src/writers/writer_eventbridge.py +++ b/src/writers/writer_eventbridge.py @@ -14,86 +14,109 @@ # limitations under the License. # -"""EventBridge writer module. - +""" +EventBridge writer module. Provides initialization and write functionality for publishing events to AWS EventBridge. """ import json import logging +import os from typing import Any, Dict, Optional, Tuple, List import boto3 from botocore.exceptions import BotoCoreError, ClientError from src.utils.trace_logging import log_payload_at_trace +from src.writers.writer import Writer -STATE: Dict[str, Any] = {"logger": logging.getLogger(__name__), "event_bus_arn": "", "client": None} - +logger = logging.getLogger(__name__) +log_level = os.environ.get("LOG_LEVEL", "INFO") +logger.setLevel(log_level) -def init(logger: logging.Logger, config: Dict[str, Any]) -> None: - """Initialize the EventBridge writer. - Args: - logger: Shared application logger. - config: Configuration dictionary (expects optional 'event_bus_arn'). +class WriterEventBridge(Writer): """ - STATE["logger"] = logger - STATE["client"] = boto3.client("events") - STATE["event_bus_arn"] = config.get("event_bus_arn", "") - STATE["logger"].debug("Initialized EVENTBRIDGE writer") - - -def _format_failed_entries(entries: List[Dict[str, Any]]) -> str: - failed = [e for e in entries if "ErrorCode" in e or "ErrorMessage" in e] - # Keep message concise but informative - return json.dumps(failed) if failed else "[]" - - -def write(topic_name: str, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: - """Publish a message to EventBridge. - - Args: - topic_name: Source topic name used as event Source. - message: JSON-serializable payload. - Returns: - Tuple of success flag and optional error message. + EventBridge writer for publishing events to AWS EventBridge. + The boto3 EventBridge client is created on the first write() call. """ - logger = STATE["logger"] - event_bus_arn = STATE["event_bus_arn"] - client = STATE["client"] - if not event_bus_arn: - logger.debug("No EventBus Arn - skipping") - return True, None - if client is None: # defensive - logger.debug("EventBridge client not initialized - skipping") + def __init__(self, config: Dict[str, Any]) -> None: + super().__init__(config) + self._client: Optional["boto3.client"] = None + self._entries: List[Dict[str, Any]] = [] + self.event_bus_arn: str = config.get("event_bus_arn", "") + logger.debug("Initialized EventBridge writer") + + def _format_failed_entries(self) -> str: + """ + Format failed EventBridge entries for error message. + + Returns: + JSON string of failed entries. + """ + failed = [e for e in self._entries if "ErrorCode" in e or "ErrorMessage" in e] + return json.dumps(failed) if failed else "[]" + + def write(self, topic_name: str, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: + """ + Publish a message to EventBridge. + + Args: + topic_name: Target EventBridge writer topic (destination) name. + message: JSON-serializable payload to publish. + Returns: + Tuple of (success: bool, error_message: Optional[str]). + """ + if not self.event_bus_arn: + logger.debug("No EventBus Arn - skipping EventBridge writer") + return True, None + + if self._client is None: + self._client = boto3.client("events") + logger.debug("EventBridge client initialized") + + log_payload_at_trace(logger, "EventBridge", topic_name, message) + + try: + logger.debug("Sending to EventBridge %s", topic_name) + response = self._client.put_events( + Entries=[ + { + "Source": topic_name, + "DetailType": "JSON", + "Detail": json.dumps(message), + "EventBusName": self.event_bus_arn, + } + ] + ) + failed_count = response.get("FailedEntryCount", 0) + if failed_count > 0: + self._entries = response.get("Entries", []) + failed_repr = self._format_failed_entries() + msg = f"{failed_count} EventBridge entries failed: {failed_repr}" + logger.error(msg) + return False, msg + except (BotoCoreError, ClientError) as err: # explicit AWS client-related errors + logger.exception("EventBridge put_events call failed") + return False, str(err) + return True, None - log_payload_at_trace(logger, "EventBridge", topic_name, message) - - try: - logger.debug("Sending to eventBridge %s", topic_name) - response = client.put_events( - Entries=[ - { - "Source": topic_name, - "DetailType": "JSON", - "Detail": json.dumps(message), - "EventBusName": event_bus_arn, - } - ] - ) - failed_count = response.get("FailedEntryCount", 0) - if failed_count > 0: - entries = response.get("Entries", []) - failed_repr = _format_failed_entries(entries) - msg = f"{failed_count} EventBridge entries failed: {failed_repr}" - logger.error(msg) - return False, msg - except (BotoCoreError, ClientError) as err: # explicit AWS client-related errors - logger.exception("EventBridge put_events call failed") - return False, str(err) - - # Let any unexpected exception propagate for upstream handler (avoids broad except BLE001 / TRY400) - return True, None + def check_health(self) -> Tuple[bool, str]: + """ + Check EventBridge writer health. + + Returns: + Tuple of (is_healthy: bool, message: str). + """ + if not self.event_bus_arn: + return True, "not configured" + + try: + if self._client is None: + self._client = boto3.client("events") + logger.debug("EventBridge client initialized during health check") + return True, "ok" + except (BotoCoreError, ClientError) as err: + return False, str(err) diff --git a/src/writers/writer_kafka.py b/src/writers/writer_kafka.py index ad85935..452be7f 100644 --- a/src/writers/writer_kafka.py +++ b/src/writers/writer_kafka.py @@ -14,145 +14,174 @@ # limitations under the License. # -"""Kafka writer module. - -Initializes a Confluent Kafka Producer and publishes messages for a topic. +""" +Kafka writer module. +Provides functionality for publishing messages to Kafka topics. """ import json import logging import os import time -from typing import Any, Dict, Optional, Tuple -from confluent_kafka import Producer +from typing import Any, Dict, List, Optional, Tuple +from confluent_kafka import Producer, KafkaException from src.utils.trace_logging import log_payload_at_trace +from src.writers.writer import Writer -try: # KafkaException may not exist in stubbed test module - from confluent_kafka import KafkaException # type: ignore -except (ImportError, ModuleNotFoundError): # pragma: no cover - fallback for test stub - - class KafkaException(Exception): # type: ignore - """Fallback KafkaException if confluent_kafka is not installed.""" +logger = logging.getLogger(__name__) +log_level = os.environ.get("LOG_LEVEL", "INFO") +logger.setLevel(log_level) - -STATE: Dict[str, Any] = {"logger": logging.getLogger(__name__), "producer": None} # Configurable flush timeouts and retries via env variables to avoid hanging indefinitely _KAFKA_FLUSH_TIMEOUT_SEC = float(os.environ.get("KAFKA_FLUSH_TIMEOUT", "7")) _MAX_RETRIES = int(os.environ.get("KAFKA_FLUSH_RETRIES", "3")) _RETRY_BACKOFF_SEC = float(os.environ.get("KAFKA_RETRY_BACKOFF", "0.5")) -def init(logger: logging.Logger, config: Dict[str, Any]) -> None: - """Initialize Kafka producer. - - Args: - logger: Shared application logger. - config: Configuration dictionary (expects 'kafka_bootstrap_server' plus optional SASL/SSL fields). - Raises: - ValueError: if required 'kafka_bootstrap_server' is missing or empty. +class WriterKafka(Writer): """ - STATE["logger"] = logger - - if "kafka_bootstrap_server" not in config or not config.get("kafka_bootstrap_server"): - raise ValueError("Missing required config: kafka_bootstrap_server") - bootstrap = config["kafka_bootstrap_server"] - - producer_config: Dict[str, Any] = {"bootstrap.servers": bootstrap} - if "kafka_sasl_kerberos_principal" in config and "kafka_ssl_key_path" in config: - producer_config.update( - { - "security.protocol": "SASL_SSL", - "sasl.mechanism": "GSSAPI", - "sasl.kerberos.service.name": "kafka", - "sasl.kerberos.keytab": config["kafka_sasl_kerberos_keytab_path"], - "sasl.kerberos.principal": config["kafka_sasl_kerberos_principal"], - "ssl.ca.location": config["kafka_ssl_ca_path"], - "ssl.certificate.location": config["kafka_ssl_cert_path"], - "ssl.key.location": config["kafka_ssl_key_path"], - "ssl.key.password": config["kafka_ssl_key_password"], - } - ) - STATE["logger"].debug("Kafka producer will use SASL_SSL") - - STATE["producer"] = Producer(producer_config) - STATE["logger"].debug("Initialized KAFKA writer") - - -def write(topic_name: str, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: - """Publish a message to Kafka. - - Args: - topic_name: Kafka topic to publish to. - message: JSON-serializable payload. - Returns: - Tuple[success flag, optional error message]. + Kafka writer for publishing messages to Kafka topics. + The Kafka producer is created on the first write() call. """ - logger = STATE["logger"] - producer: Optional[Producer] = STATE.get("producer") # type: ignore[assignment] - if producer is None: - logger.debug("Kafka producer not initialized - skipping") - return True, None - log_payload_at_trace(logger, "Kafka", topic_name, message) - - errors: list[str] = [] - has_exception = False - - # Produce step - try: - logger.debug("Sending to kafka %s", topic_name) - producer.produce( - topic_name, - key="", - value=json.dumps(message).encode("utf-8"), - callback=lambda err, msg: (errors.append(str(err)) if err is not None else None), - ) - except KafkaException as e: - errors.append(f"Produce exception: {e}") - has_exception = True - - # Flush step (always attempted) - remaining: Optional[int] = None - for attempt in range(1, _MAX_RETRIES + 1): + def __init__(self, config: Dict[str, Any]) -> None: + super().__init__(config) + self._producer: Optional["Producer"] = None + logger.debug("Initialized Kafka writer") + + def _create_producer(self) -> Optional[Producer]: + """ + Create Kafka producer from config. + + Returns: + None if bootstrap server not configured else Producer instance. + """ + if "kafka_bootstrap_server" not in self.config or not self.config.get("kafka_bootstrap_server"): + return None + + bootstrap = self.config["kafka_bootstrap_server"] + producer_config: Dict[str, Any] = {"bootstrap.servers": bootstrap} + + if "kafka_sasl_kerberos_principal" in self.config and "kafka_ssl_key_path" in self.config: + producer_config.update( + { + "security.protocol": "SASL_SSL", + "sasl.mechanism": "GSSAPI", + "sasl.kerberos.service.name": "kafka", + "sasl.kerberos.keytab": self.config["kafka_sasl_kerberos_keytab_path"], + "sasl.kerberos.principal": self.config["kafka_sasl_kerberos_principal"], + "ssl.ca.location": self.config["kafka_ssl_ca_path"], + "ssl.certificate.location": self.config["kafka_ssl_cert_path"], + "ssl.key.location": self.config["kafka_ssl_key_path"], + "ssl.key.password": self.config["kafka_ssl_key_password"], + } + ) + logger.debug("Kafka producer will use SASL_SSL") + + return Producer(producer_config) + + def _flush_with_timeout(self, timeout: float) -> Optional[int]: + """ + Flush the Kafka producer with a timeout. + + Args: + timeout: Timeout in seconds. + Returns: + Number of messages still pending after flush (0 = all delivered). + None if the producer stub doesn't provide a count. + """ + if self._producer is None: + return 0 try: - remaining = flush_with_timeout(producer, _KAFKA_FLUSH_TIMEOUT_SEC) + return self._producer.flush(timeout) + except TypeError: + return self._producer.flush() + + def write(self, topic_name: str, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: + """ + Publish a message to Kafka. + + Args: + topic_name: Kafka topic to publish to. + message: JSON-serializable payload. + Returns: + Tuple of (success: bool, error_message: Optional[str]). + """ + # Lazy initialization of Kafka producer + if self._producer is None: + self._producer = self._create_producer() + + # If no bootstrap server configured, skipping Kafka writer + if self._producer is None: + logger.debug("Kafka producer not initialized - skipping Kafka writer") + return True, None + + log_payload_at_trace(logger, "Kafka", topic_name, message) + + errors: List[str] = [] + has_exception = False + + # Produce step + try: + logger.debug("Sending to Kafka %s", topic_name) + self._producer.produce( + topic_name, + key="", + value=json.dumps(message).encode("utf-8"), + callback=lambda err, msg: (errors.append(str(err)) if err is not None else None), + ) except KafkaException as e: - errors.append(f"Flush exception: {e}") + errors.append(f"Produce exception: {e}") has_exception = True - # Treat None (flush returns None in some stubs) as success equivalent to 0 pending - if (remaining is None or remaining == 0) and not errors: - break - if attempt < _MAX_RETRIES: - logger.warning("Kafka flush pending (%s message(s) remain) attempt %d/%d", remaining, attempt, _MAX_RETRIES) - time.sleep(_RETRY_BACKOFF_SEC) - - # Warn if messages still pending after retries - if isinstance(remaining, int) and remaining > 0: - logger.warning( - "Kafka flush timeout after %ss: %d message(s) still pending", _KAFKA_FLUSH_TIMEOUT_SEC, remaining - ) - - if errors: - failure_text = "Kafka writer failed: " + "; ".join(errors) - (logger.exception if has_exception else logger.error)(failure_text) - return False, failure_text + # Flush step (always attempted) + remaining: Optional[int] = None + for attempt in range(1, _MAX_RETRIES + 1): + try: + remaining = self._flush_with_timeout(_KAFKA_FLUSH_TIMEOUT_SEC) + except KafkaException as e: + errors.append(f"Flush exception: {e}") + has_exception = True + + # Treat None (flush returns None in some stubs) as success equivalent to 0 pending + if (remaining is None or remaining == 0) and not errors: + break + if attempt < _MAX_RETRIES: + logger.warning( + "Kafka flush pending (%s message(s) remain) attempt %d/%d", remaining, attempt, _MAX_RETRIES + ) + time.sleep(_RETRY_BACKOFF_SEC) + + # Warn if messages still pending after retries + if isinstance(remaining, int) and remaining > 0: + logger.warning( + "Kafka flush timeout after %ss: %d message(s) still pending", _KAFKA_FLUSH_TIMEOUT_SEC, remaining + ) + + if errors: + failure_text = "Kafka writer failed: " + "; ".join(errors) + (logger.exception if has_exception else logger.error)(failure_text) + return False, failure_text - return True, None + return True, None + def check_health(self) -> Tuple[bool, str]: + """ + Check Kafka writer health. -def flush_with_timeout(producer, timeout: float) -> Optional[int]: - """Flush the Kafka producer with a timeout, handling TypeError for stubs. + Returns: + Tuple of (is_healthy: bool, message: str). + """ + if not self.config.get("kafka_bootstrap_server"): + return True, "not configured" - Args: - producer: Kafka Producer instance. - timeout: Timeout in seconds. - Returns: - Number of messages still pending after the flush call (0 all messages delivered). - None is returned only if the underlying (stub/mock) producer.flush() does not provide a count. - """ - try: - return producer.flush(timeout) - except TypeError: # Fallback for stub producers without timeout parameter - return producer.flush() + try: + if self._producer is None: + self._producer = self._create_producer() + logger.debug("Kafka producer initialized during health check") + if self._producer is None: + return False, "producer initialization failed" + return True, "ok" + except KafkaException as err: + return False, str(err) diff --git a/src/writers/writer_postgres.py b/src/writers/writer_postgres.py index e7979c1..da0ad5e 100644 --- a/src/writers/writer_postgres.py +++ b/src/writers/writer_postgres.py @@ -14,296 +14,305 @@ # limitations under the License. # -"""Postgres writer module. - -Handles optional initialization via AWS Secrets Manager and topic-based inserts into Postgres. +""" +Postgres writer module. +Provides functionality for writing events to PostgreSQL database. """ import json -import os import logging -from typing import Any, Dict, Tuple, Optional +import os +from typing import Any, Dict, Optional, Tuple import boto3 -try: - import psycopg2 # noqa: F401 -except ImportError: # pragma: no cover - environment without psycopg2 - psycopg2 = None # type: ignore - from src.utils.trace_logging import log_payload_at_trace +from src.writers.writer import Writer -# Define a unified psycopg2 error base for safe exception handling even if psycopg2 missing -if psycopg2 is not None: # type: ignore - try: # pragma: no cover - attribute presence depends on installed psycopg2 variant - PsycopgError = psycopg2.Error # type: ignore[attr-defined] - except AttributeError: # pragma: no cover - - class PsycopgError(Exception): # type: ignore - """Shim psycopg2 error base when psycopg2 provides no Error attribute.""" - -else: # fallback shim when psycopg2 absent +try: + import psycopg2 + from psycopg2 import Error as PsycopgError +except ImportError: + psycopg2 = None # type: ignore class PsycopgError(Exception): # type: ignore """Shim psycopg2 error base when psycopg2 is not installed.""" -# Module level globals for typing -logger: logging.Logger = logging.getLogger(__name__) -POSTGRES: Dict[str, Any] = {"database": ""} +logger = logging.getLogger(__name__) +log_level = os.environ.get("LOG_LEVEL", "INFO") +logger.setLevel(log_level) -def init(logger_instance: logging.Logger) -> None: - """Initialize Postgres credentials either from AWS Secrets Manager or fallback empty config. - - Args: - logger_instance: Shared application logger. +class WriterPostgres(Writer): """ - global logger # pylint: disable=global-statement - global POSTGRES # pylint: disable=global-statement - - logger = logger_instance - - secret_name = os.environ.get("POSTGRES_SECRET_NAME", "") - secret_region = os.environ.get("POSTGRES_SECRET_REGION", "") - - if secret_name and secret_region: - aws_secrets = boto3.Session().client(service_name="secretsmanager", region_name=secret_region) - postgres_secret = aws_secrets.get_secret_value(SecretId=secret_name)["SecretString"] - POSTGRES = json.loads(postgres_secret) - else: - POSTGRES = {"database": ""} - - logger.debug("Initialized POSTGRES writer") - - -def postgres_edla_write(cursor, table: str, message: Dict[str, Any]) -> None: - """Insert a dlchange style event row. - - Args: - cursor: Database cursor. - table: Target table name. - message: Event payload. + Postgres writer for storing events in PostgreSQL database. + Database credentials are loaded from AWS Secrets Manager at initialization. """ - logger.debug("Sending to Postgres - %s", table) - cursor.execute( - f""" - INSERT INTO {table} - ( - event_id, - tenant_id, - source_app, - source_app_version, - environment, - timestamp_event, - country, - catalog_id, - operation, - "location", - "format", - format_options, - additional_info - ) - VALUES - ( - %s, - %s, - %s, - %s, - %s, - %s, - %s, - %s, - %s, - %s, - %s, - %s, - %s - )""", - ( - message["event_id"], - message["tenant_id"], - message["source_app"], - message["source_app_version"], - message["environment"], - message["timestamp_event"], - message.get("country", ""), - message["catalog_id"], - message["operation"], - message.get("location"), - message["format"], - (json.dumps(message.get("format_options")) if "format_options" in message else None), - (json.dumps(message.get("additional_info")) if "additional_info" in message else None), - ), - ) - -def postgres_run_write(cursor, table_runs: str, table_jobs: str, message: Dict[str, Any]) -> None: - """Insert a run event row plus related job rows. - - Args: - cursor: Database cursor. - table_runs: Runs table name. - table_jobs: Jobs table name. - message: Event payload (includes jobs array). - """ - logger.debug("Sending to Postgres - %s and %s", table_runs, table_jobs) - cursor.execute( - f""" - INSERT INTO {table_runs} - ( + def __init__(self, config: Dict[str, Any]) -> None: + super().__init__(config) + secret_name = os.environ.get("POSTGRES_SECRET_NAME", "") + secret_region = os.environ.get("POSTGRES_SECRET_REGION", "") + + if secret_name and secret_region: + aws_secrets = boto3.Session().client(service_name="secretsmanager", region_name=secret_region) + postgres_secret = aws_secrets.get_secret_value(SecretId=secret_name)["SecretString"] + self._db_config: Dict[str, Any] = json.loads(postgres_secret) + else: + self._db_config = {"database": ""} + + logger.debug("Initialized PostgreSQL writer") + + def _postgres_edla_write(self, cursor: Any, table: str, message: Dict[str, Any]) -> None: + """ + Insert a dlchange style event row. + + Args: + cursor: Database cursor. + table: Target table name. + message: Event payload. + """ + logger.debug("Sending to Postgres - %s", table) + cursor.execute( + f""" + INSERT INTO {table} + ( event_id, - job_ref, tenant_id, source_app, source_app_version, environment, - timestamp_start, - timestamp_end - ) - VALUES - ( - %s, - %s, - %s, - %s, - %s, - %s, - %s, - %s - )""", - ( - message["event_id"], - message["job_ref"], - message["tenant_id"], - message["source_app"], - message["source_app_version"], - message["environment"], - message["timestamp_start"], - message["timestamp_end"], - ), - ) - - for job in message["jobs"]: - cursor.execute( - f""" - INSERT INTO {table_jobs} - ( - event_id, + timestamp_event, country, catalog_id, - status, - timestamp_start, - timestamp_end, - message, + operation, + "location", + "format", + format_options, additional_info - ) - VALUES - ( - %s, - %s, - %s, - %s, - %s, - %s, - %s, - %s - )""", + ) + VALUES + ( + %s, + %s, + %s, + %s, + %s, + %s, + %s, + %s, + %s, + %s, + %s, + %s, + %s + )""", ( message["event_id"], - job.get("country", ""), - job["catalog_id"], - job["status"], - job["timestamp_start"], - job["timestamp_end"], - job.get("message"), - (json.dumps(job.get("additional_info")) if "additional_info" in job else None), + message["tenant_id"], + message["source_app"], + message["source_app_version"], + message["environment"], + message["timestamp_event"], + message.get("country", ""), + message["catalog_id"], + message["operation"], + message.get("location"), + message["format"], + (json.dumps(message.get("format_options")) if "format_options" in message else None), + (json.dumps(message.get("additional_info")) if "additional_info" in message else None), ), ) - -def postgres_test_write(cursor, table: str, message: Dict[str, Any]) -> None: - """Insert a test topic row. - - Args: - cursor: Database cursor. - table: Target table name. - message: Event payload. - """ - logger.debug("Sending to Postgres - %s", table) - cursor.execute( - f""" - INSERT INTO {table} - ( - event_id, - tenant_id, - source_app, - environment, - timestamp_event, - additional_info + def _postgres_run_write(self, cursor: Any, table_runs: str, table_jobs: str, message: Dict[str, Any]) -> None: + """ + Insert a run event row plus related job rows. + + Args: + cursor: Database cursor. + table_runs: Runs table name. + table_jobs: Jobs table name. + message: Event payload (includes jobs array). + """ + logger.debug("Sending to Postgres - %s and %s", table_runs, table_jobs) + cursor.execute( + f""" + INSERT INTO {table_runs} + ( + event_id, + job_ref, + tenant_id, + source_app, + source_app_version, + environment, + timestamp_start, + timestamp_end + ) + VALUES + ( + %s, + %s, + %s, + %s, + %s, + %s, + %s, + %s + )""", + ( + message["event_id"], + message["job_ref"], + message["tenant_id"], + message["source_app"], + message["source_app_version"], + message["environment"], + message["timestamp_start"], + message["timestamp_end"], + ), ) - VALUES - ( - %s, - %s, - %s, - %s, - %s, - %s - )""", - ( - message["event_id"], - message["tenant_id"], - message["source_app"], - message["environment"], - message["timestamp"], - (json.dumps(message.get("additional_info")) if "additional_info" in message else None), - ), - ) - - -def write(topic_name: str, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: - """Dispatch insertion for a topic into the correct Postgres table(s). - - Skips if Postgres not configured or psycopg2 unavailable. Returns success flag and optional error. - - Args: - topic_name: Incoming topic identifier. - message: Event payload. - """ - try: - if not POSTGRES.get("database"): - logger.debug("No Postgres - skipping") - return True, None - if psycopg2 is None: # type: ignore - logger.debug("psycopg2 not available - skipping actual Postgres write") - return True, None - - log_payload_at_trace(logger, "Postgres", topic_name, message) - with psycopg2.connect( # type: ignore[attr-defined] - database=POSTGRES["database"], - host=POSTGRES["host"], - user=POSTGRES["user"], - password=POSTGRES["password"], - port=POSTGRES["port"], - ) as connection: # type: ignore[call-arg] - with connection.cursor() as cursor: # type: ignore - if topic_name == "public.cps.za.dlchange": - postgres_edla_write(cursor, "public_cps_za_dlchange", message) - elif topic_name == "public.cps.za.runs": - postgres_run_write(cursor, "public_cps_za_runs", "public_cps_za_runs_jobs", message) - elif topic_name == "public.cps.za.test": - postgres_test_write(cursor, "public_cps_za_test", message) - else: - msg = f"unknown topic for postgres {topic_name}" - logger.error(msg) - return False, msg - - connection.commit() # type: ignore - except (RuntimeError, PsycopgError) as e: # narrowed exception set - err_msg = f"The Postgres writer with failed unknown error: {str(e)}" - logger.exception(err_msg) - return False, err_msg + for job in message["jobs"]: + cursor.execute( + f""" + INSERT INTO {table_jobs} + ( + event_id, + country, + catalog_id, + status, + timestamp_start, + timestamp_end, + message, + additional_info + ) + VALUES + ( + %s, + %s, + %s, + %s, + %s, + %s, + %s, + %s + )""", + ( + message["event_id"], + job.get("country", ""), + job["catalog_id"], + job["status"], + job["timestamp_start"], + job["timestamp_end"], + job.get("message"), + (json.dumps(job.get("additional_info")) if "additional_info" in job else None), + ), + ) + + def _postgres_test_write(self, cursor: Any, table: str, message: Dict[str, Any]) -> None: + """ + Insert a test topic row. + + Args: + cursor: Database cursor. + table: Target table name. + message: Event payload. + """ + logger.debug("Sending to Postgres - %s", table) + cursor.execute( + f""" + INSERT INTO {table} + ( + event_id, + tenant_id, + source_app, + environment, + timestamp_event, + additional_info + ) + VALUES + ( + %s, + %s, + %s, + %s, + %s, + %s + )""", + ( + message["event_id"], + message["tenant_id"], + message["source_app"], + message["environment"], + message["timestamp"], + (json.dumps(message.get("additional_info")) if "additional_info" in message else None), + ), + ) - return True, None + def write(self, topic_name: str, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: + """ + Dispatch insertion for a topic into the correct Postgres table(s). + + Args: + topic_name: Incoming topic identifier. + message: JSON-serializable payload. + Returns: + Tuple of (success: bool, error_message: Optional[str]). + """ + try: + if not self._db_config.get("database"): + logger.debug("No Postgres - skipping Postgres writer") + return True, None + if psycopg2 is None: + logger.debug("psycopg2 not available - skipping actual Postgres write") + return True, None + + log_payload_at_trace(logger, "Postgres", topic_name, message) + + with psycopg2.connect( # type: ignore[attr-defined] + database=self._db_config["database"], + host=self._db_config["host"], + user=self._db_config["user"], + password=self._db_config["password"], + port=self._db_config["port"], + ) as connection: + with connection.cursor() as cursor: + if topic_name == "public.cps.za.dlchange": + self._postgres_edla_write(cursor, "public_cps_za_dlchange", message) + elif topic_name == "public.cps.za.runs": + self._postgres_run_write(cursor, "public_cps_za_runs", "public_cps_za_runs_jobs", message) + elif topic_name == "public.cps.za.test": + self._postgres_test_write(cursor, "public_cps_za_test", message) + else: + msg = f"unknown topic for postgres {topic_name}" + logger.error(msg) + return False, msg + + connection.commit() + except (RuntimeError, PsycopgError) as e: + err_msg = f"The Postgres writer with failed unknown error: {str(e)}" + logger.exception(err_msg) + return False, err_msg + + return True, None + + def check_health(self) -> Tuple[bool, str]: + """ + Check PostgreSQL writer health. + + Returns: + Tuple of (is_healthy: bool, message: str). + """ + if not self._db_config.get("database"): + return True, "not configured" + + if not self._db_config.get("host"): + return False, "host not configured" + if not self._db_config.get("user"): + return False, "user not configured" + if not self._db_config.get("password"): + return False, "password not configured" + if not self._db_config.get("port"): + return False, "port not configured" + + return True, "ok" diff --git a/tests/handlers/test_handler_health.py b/tests/handlers/test_handler_health.py index ca3bfbb..5ffad21 100644 --- a/tests/handlers/test_handler_health.py +++ b/tests/handlers/test_handler_health.py @@ -15,24 +15,32 @@ # import json -from unittest.mock import MagicMock, patch +from unittest.mock import MagicMock from src.handlers.handler_health import HandlerHealth + +def _create_mock_writer(check_health_return): + """Create a mock writer with check_health returning the specified value.""" + mock = MagicMock() + mock.check_health.return_value = check_health_return + return mock + + ### get_health() ## Minimal healthy state (just kafka) def test_get_health_minimal_kafka_healthy(): """Health check returns 200 when Kafka is initialized and optional writers are disabled.""" - handler = HandlerHealth() + writers = { + "kafka": _create_mock_writer((True, "ok")), + "eventbridge": _create_mock_writer((True, "not configured")), + "postgres": _create_mock_writer((True, "not configured")), + } + handler = HandlerHealth(writers) - with ( - patch("src.handlers.handler_health.writer_kafka.STATE", {"producer": MagicMock()}), - patch("src.handlers.handler_health.writer_eventbridge.STATE", {"client": None, "event_bus_arn": ""}), - patch("src.handlers.handler_health.writer_postgres.POSTGRES", {"database": ""}), - ): - response = handler.get_health() + response = handler.get_health() assert response["statusCode"] == 200 body = json.loads(response["body"]) @@ -43,15 +51,14 @@ def test_get_health_minimal_kafka_healthy(): ## Healthy state with all writers enabled def test_get_health_all_writers_enabled_and_healthy(): """Health check returns 200 when all writers are enabled and properly configured.""" - handler = HandlerHealth() - postgres_config = {"database": "db", "host": "localhost", "user": "user", "password": "pass", "port": "5432"} + writers = { + "kafka": _create_mock_writer((True, "ok")), + "eventbridge": _create_mock_writer((True, "ok")), + "postgres": _create_mock_writer((True, "ok")), + } + handler = HandlerHealth(writers) - with ( - patch("src.handlers.handler_health.writer_kafka.STATE", {"producer": MagicMock()}), - patch("src.handlers.handler_health.writer_eventbridge.STATE", {"client": MagicMock(), "event_bus_arn": "arn"}), - patch("src.handlers.handler_health.writer_postgres.POSTGRES", postgres_config), - ): - response = handler.get_health() + response = handler.get_health() assert response["statusCode"] == 200 body = json.loads(response["body"]) @@ -59,21 +66,17 @@ def test_get_health_all_writers_enabled_and_healthy(): assert "uptime_seconds" in body -## Degraded state with all writers enabled +## Degraded state with all writers failing def test_get_health_kafka_not_initialized(): """Health check returns 503 when Kafka writer is not initialized.""" - handler = HandlerHealth() - postgres_config = {"database": "db", "host": "", "user": "", "password": "", "port": ""} - - with ( - patch("src.handlers.handler_health.writer_kafka.STATE", {"producer": None}), - patch( - "src.handlers.handler_health.writer_eventbridge.STATE", - {"client": None, "event_bus_arn": "arn:aws:events:us-east-1:123:event-bus/bus"}, - ), - patch("src.handlers.handler_health.writer_postgres.POSTGRES", postgres_config), - ): - response = handler.get_health() + writers = { + "kafka": _create_mock_writer((False, "producer initialization failed")), + "eventbridge": _create_mock_writer((False, "client initialization failed")), + "postgres": _create_mock_writer((False, "host not configured")), + } + handler = HandlerHealth(writers) + + response = handler.get_health() assert response["statusCode"] == 503 body = json.loads(response["body"]) @@ -86,15 +89,14 @@ def test_get_health_kafka_not_initialized(): ## Healthy when eventbridge is disabled def test_get_health_eventbridge_disabled(): """Health check returns 200 when EventBridge is disabled (empty event_bus_arn).""" - handler = HandlerHealth() - postgres_config = {"database": "db", "host": "localhost", "user": "user", "password": "pass", "port": "5432"} + writers = { + "kafka": _create_mock_writer((True, "ok")), + "eventbridge": _create_mock_writer((True, "not configured")), + "postgres": _create_mock_writer((True, "ok")), + } + handler = HandlerHealth(writers) - with ( - patch("src.handlers.handler_health.writer_kafka.STATE", {"producer": MagicMock()}), - patch("src.handlers.handler_health.writer_eventbridge.STATE", {"client": None, "event_bus_arn": ""}), - patch("src.handlers.handler_health.writer_postgres.POSTGRES", postgres_config), - ): - response = handler.get_health() + response = handler.get_health() assert response["statusCode"] == 200 @@ -102,14 +104,14 @@ def test_get_health_eventbridge_disabled(): ## Healthy when postgres is disabled def test_get_health_postgres_disabled(): """Health check returns 200 when PostgreSQL is disabled (empty database).""" - handler = HandlerHealth() + writers = { + "kafka": _create_mock_writer((True, "ok")), + "eventbridge": _create_mock_writer((True, "ok")), + "postgres": _create_mock_writer((True, "not configured")), + } + handler = HandlerHealth(writers) - with ( - patch("src.handlers.handler_health.writer_kafka.STATE", {"producer": MagicMock()}), - patch("src.handlers.handler_health.writer_eventbridge.STATE", {"client": MagicMock(), "event_bus_arn": "arn"}), - patch("src.handlers.handler_health.writer_postgres.POSTGRES", {"database": ""}), - ): - response = handler.get_health() + response = handler.get_health() assert response["statusCode"] == 200 @@ -117,15 +119,14 @@ def test_get_health_postgres_disabled(): ## Degraded state - postgres host not configured def test_get_health_postgres_host_not_configured(): """Health check returns 503 when PostgreSQL host is not configured.""" - handler = HandlerHealth() - postgres_config = {"database": "db", "host": "", "user": "user", "password": "pass", "port": "5432"} + writers = { + "kafka": _create_mock_writer((True, "ok")), + "eventbridge": _create_mock_writer((True, "ok")), + "postgres": _create_mock_writer((False, "host not configured")), + } + handler = HandlerHealth(writers) - with ( - patch("src.handlers.handler_health.writer_kafka.STATE", {"producer": MagicMock()}), - patch("src.handlers.handler_health.writer_eventbridge.STATE", {"client": MagicMock(), "event_bus_arn": "arn"}), - patch("src.handlers.handler_health.writer_postgres.POSTGRES", postgres_config), - ): - response = handler.get_health() + response = handler.get_health() assert response["statusCode"] == 503 body = json.loads(response["body"]) @@ -135,15 +136,14 @@ def test_get_health_postgres_host_not_configured(): ## Uptime calculation def test_get_health_uptime_is_positive(): """Verify uptime_seconds is calculated and is a positive integer.""" - handler = HandlerHealth() - postgres_config = {"database": "db", "host": "localhost", "user": "user", "password": "pass", "port": "5432"} - - with ( - patch("src.handlers.handler_health.writer_kafka.STATE", {"producer": MagicMock()}), - patch("src.handlers.handler_health.writer_eventbridge.STATE", {"client": MagicMock(), "event_bus_arn": "arn"}), - patch("src.handlers.handler_health.writer_postgres.POSTGRES", postgres_config), - ): - response = handler.get_health() + writers = { + "kafka": _create_mock_writer((True, "ok")), + "eventbridge": _create_mock_writer((True, "ok")), + "postgres": _create_mock_writer((True, "ok")), + } + handler = HandlerHealth(writers) + + response = handler.get_health() body = json.loads(response["body"]) assert "uptime_seconds" in body diff --git a/tests/handlers/test_handler_topic.py b/tests/handlers/test_handler_topic.py index 3916dd6..9d8a3a3 100644 --- a/tests/handlers/test_handler_topic.py +++ b/tests/handlers/test_handler_topic.py @@ -25,8 +25,13 @@ ## load_topic_schemas() def test_load_topic_schemas_success(): mock_handler_token = MagicMock() + mock_writers = { + "kafka": MagicMock(), + "eventbridge": MagicMock(), + "postgres": MagicMock(), + } access_config = {"public.cps.za.test": ["TestUser"]} - handler = HandlerTopic("conf", access_config, mock_handler_token) + handler = HandlerTopic("conf", access_config, mock_handler_token, mock_writers) mock_schemas = { "topic_runs.json": {"type": "object", "properties": {"run_id": {"type": "string"}}}, @@ -135,12 +140,10 @@ def test_post_invalid_token_decode(event_gate_module, make_event, valid_payload) # --- POST success & failure aggregation --- def test_post_success_all_writers(event_gate_module, make_event, valid_payload): - with ( - patch.object(event_gate_module.handler_token, "decode_jwt", return_value={"sub": "TestUser"}), - patch("src.handlers.handler_topic.writer_kafka.write", return_value=(True, None)), - patch("src.handlers.handler_topic.writer_eventbridge.write", return_value=(True, None)), - patch("src.handlers.handler_topic.writer_postgres.write", return_value=(True, None)), - ): + with patch.object(event_gate_module.handler_token, "decode_jwt", return_value={"sub": "TestUser"}): + for writer in event_gate_module.handler_topic.writers.values(): + writer.write = MagicMock(return_value=(True, None)) + event = make_event( "/topics/{topic_name}", method="POST", @@ -156,12 +159,11 @@ def test_post_success_all_writers(event_gate_module, make_event, valid_payload): def test_post_single_writer_failure(event_gate_module, make_event, valid_payload): - with ( - patch.object(event_gate_module.handler_token, "decode_jwt", return_value={"sub": "TestUser"}), - patch("src.handlers.handler_topic.writer_kafka.write", return_value=(False, "Kafka boom")), - patch("src.handlers.handler_topic.writer_eventbridge.write", return_value=(True, None)), - patch("src.handlers.handler_topic.writer_postgres.write", return_value=(True, None)), - ): + with patch.object(event_gate_module.handler_token, "decode_jwt", return_value={"sub": "TestUser"}): + event_gate_module.handler_topic.writers["kafka"].write = MagicMock(return_value=(False, "Kafka boom")) + event_gate_module.handler_topic.writers["eventbridge"].write = MagicMock(return_value=(True, None)) + event_gate_module.handler_topic.writers["postgres"].write = MagicMock(return_value=(True, None)) + event = make_event( "/topics/{topic_name}", method="POST", @@ -178,12 +180,11 @@ def test_post_single_writer_failure(event_gate_module, make_event, valid_payload def test_post_multiple_writer_failures(event_gate_module, make_event, valid_payload): - with ( - patch.object(event_gate_module.handler_token, "decode_jwt", return_value={"sub": "TestUser"}), - patch("src.handlers.handler_topic.writer_kafka.write", return_value=(False, "Kafka A")), - patch("src.handlers.handler_topic.writer_eventbridge.write", return_value=(False, "EB B")), - patch("src.handlers.handler_topic.writer_postgres.write", return_value=(True, None)), - ): + with patch.object(event_gate_module.handler_token, "decode_jwt", return_value={"sub": "TestUser"}): + event_gate_module.handler_topic.writers["kafka"].write = MagicMock(return_value=(False, "Kafka A")) + event_gate_module.handler_topic.writers["eventbridge"].write = MagicMock(return_value=(False, "EB B")) + event_gate_module.handler_topic.writers["postgres"].write = MagicMock(return_value=(True, None)) + event = make_event( "/topics/{topic_name}", method="POST", @@ -198,12 +199,10 @@ def test_post_multiple_writer_failures(event_gate_module, make_event, valid_payl def test_token_extraction_lowercase_bearer_header(event_gate_module, make_event, valid_payload): - with ( - patch.object(event_gate_module.handler_token, "decode_jwt", return_value={"sub": "TestUser"}), - patch("src.handlers.handler_topic.writer_kafka.write", return_value=(True, None)), - patch("src.handlers.handler_topic.writer_eventbridge.write", return_value=(True, None)), - patch("src.handlers.handler_topic.writer_postgres.write", return_value=(True, None)), - ): + with patch.object(event_gate_module.handler_token, "decode_jwt", return_value={"sub": "TestUser"}): + for writer in event_gate_module.handler_topic.writers.values(): + writer.write = MagicMock(return_value=(True, None)) + event = make_event( "/topics/{topic_name}", method="POST", diff --git a/tests/utils/test_trace_logging.py b/tests/utils/test_trace_logging.py index a5cce96..df1cd29 100644 --- a/tests/utils/test_trace_logging.py +++ b/tests/utils/test_trace_logging.py @@ -1,22 +1,39 @@ -import logging +# +# Copyright 2025 ABSA Group Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + from unittest.mock import MagicMock from src.utils.logging_levels import TRACE_LEVEL -import src.writers.writer_eventbridge as we -import src.writers.writer_kafka as wk -import src.writers.writer_postgres as wp +import src.writers.writer_eventbridge as writer_eventbridge +import src.writers.writer_kafka as writer_kafka +import src.writers.writer_postgres as writer_postgres def test_trace_eventbridge(caplog): - logger = logging.getLogger("trace.eventbridge") - logger.setLevel(TRACE_LEVEL) - we.STATE["logger"] = logger - we.STATE["event_bus_arn"] = "arn:aws:events:region:acct:event-bus/test" + # Set trace level on the module's logger + writer_eventbridge.logger.setLevel(TRACE_LEVEL) + caplog.set_level(TRACE_LEVEL) + + writer = writer_eventbridge.WriterEventBridge({"event_bus_arn": "arn:aws:events:region:acct:event-bus/test"}) mock_client = MagicMock() mock_client.put_events.return_value = {"FailedEntryCount": 0, "Entries": []} - we.STATE["client"] = mock_client - caplog.set_level(TRACE_LEVEL) - ok, err = we.write("topic.eb", {"k": 1}) + writer._client = mock_client + + ok, err = writer.write("topic.eb", {"k": 1}) + assert ok and err is None assert any("EventBridge payload" in rec.message for rec in caplog.records) @@ -28,21 +45,23 @@ def produce(self, *a, **kw): if cb: cb(None, object()) - def flush(self, *a, **kw): + def flush(self, timeout=None): return 0 - logger = logging.getLogger("trace.kafka") - logger.setLevel(TRACE_LEVEL) - wk.STATE["logger"] = logger - wk.STATE["producer"] = FakeProducer() + # Set trace level on the module's logger + writer_kafka.logger.setLevel(TRACE_LEVEL) caplog.set_level(TRACE_LEVEL) - ok, err = wk.write("topic.kf", {"k": 2}) + + writer = writer_kafka.WriterKafka({"kafka_bootstrap_server": "localhost:9092"}) + writer._producer = FakeProducer() + + ok, err = writer.write("topic.kf", {"k": 2}) + assert ok and err is None assert any("Kafka payload" in rec.message for rec in caplog.records) def test_trace_postgres(caplog, monkeypatch): - # Prepare dummy psycopg2 connection machinery store = [] class DummyCursor: @@ -72,15 +91,17 @@ class DummyPsycopg2: def connect(self, **kwargs): return DummyConnection() - monkeypatch.setattr(wp, "psycopg2", DummyPsycopg2()) - - logger = logging.getLogger("trace.postgres") - logger.setLevel(TRACE_LEVEL) - wp.logger = logger - wp.POSTGRES = {"database": "db", "host": "h", "user": "u", "password": "p", "port": 5432} + monkeypatch.setattr(writer_postgres, "psycopg2", DummyPsycopg2()) + # Set trace level on the module's logger + writer_postgres.logger.setLevel(TRACE_LEVEL) caplog.set_level(TRACE_LEVEL) + + writer = writer_postgres.WriterPostgres({}) + writer._db_config = {"database": "db", "host": "h", "user": "u", "password": "p", "port": 5432} + message = {"event_id": "e", "tenant_id": "t", "source_app": "a", "environment": "dev", "timestamp": 1} - ok, err = wp.write("public.cps.za.test", message) + ok, err = writer.write("public.cps.za.test", message) + assert ok and err is None assert any("Postgres payload" in rec.message for rec in caplog.records) diff --git a/tests/writers/test_writer_eventbridge.py b/tests/writers/test_writer_eventbridge.py index 737b4af..e35776d 100644 --- a/tests/writers/test_writer_eventbridge.py +++ b/tests/writers/test_writer_eventbridge.py @@ -1,30 +1,46 @@ -import logging -from unittest.mock import MagicMock +# +# Copyright 2025 ABSA Group Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# -import src.writers.writer_eventbridge as we +from unittest.mock import MagicMock, patch +from botocore.exceptions import BotoCoreError + +from src.writers.writer_eventbridge import WriterEventBridge + + +# --- write() --- def test_write_skips_when_no_event_bus(): - we.STATE["logger"] = logging.getLogger("test") - we.STATE["event_bus_arn"] = "" # no bus configured - ok, err = we.write("topic", {"k": 1}) + writer = WriterEventBridge({"event_bus_arn": ""}) + ok, err = writer.write("topic", {"k": 1}) assert ok and err is None def test_write_success(): - we.STATE["logger"] = logging.getLogger("test") - we.STATE["event_bus_arn"] = "arn:aws:events:region:acct:event-bus/bus" # fake + writer = WriterEventBridge({"event_bus_arn": "arn:aws:events:region:acct:event-bus/bus"}) mock_client = MagicMock() mock_client.put_events.return_value = {"FailedEntryCount": 0, "Entries": []} - we.STATE["client"] = mock_client - ok, err = we.write("topic", {"k": 2}) + writer._client = mock_client + ok, err = writer.write("topic", {"k": 2}) assert ok and err is None mock_client.put_events.assert_called_once() def test_write_failed_entries(): - we.STATE["logger"] = logging.getLogger("test") - we.STATE["event_bus_arn"] = "arn:aws:events:region:acct:event-bus/bus" + writer = WriterEventBridge({"event_bus_arn": "arn:aws:events:region:acct:event-bus/bus"}) mock_client = MagicMock() mock_client.put_events.return_value = { "FailedEntryCount": 1, @@ -33,8 +49,8 @@ def test_write_failed_entries(): {"EventId": "2"}, ], } - we.STATE["client"] = mock_client - ok, err = we.write("topic", {"k": 3}) + writer._client = mock_client + ok, err = writer.write("topic", {"k": 3}) assert not ok and "EventBridge" in err @@ -44,10 +60,37 @@ def test_write_client_error(): class DummyError(BotoCoreError): pass - we.STATE["logger"] = logging.getLogger("test") - we.STATE["event_bus_arn"] = "arn:aws:events:region:acct:event-bus/bus" + writer = WriterEventBridge({"event_bus_arn": "arn:aws:events:region:acct:event-bus/bus"}) mock_client = MagicMock() mock_client.put_events.side_effect = DummyError() - we.STATE["client"] = mock_client - ok, err = we.write("topic", {"k": 4}) + writer._client = mock_client + ok, err = writer.write("topic", {"k": 4}) assert not ok and err is not None + + +# --- check_health() --- + + +def test_check_health_not_configured(): + writer = WriterEventBridge({"event_bus_arn": ""}) + healthy, msg = writer.check_health() + assert healthy and msg == "not configured" + + +def test_check_health_success(): + writer = WriterEventBridge({"event_bus_arn": "arn:aws:events:region:acct:event-bus/bus"}) + with patch("boto3.client") as mock_client: + mock_client.return_value = MagicMock() + healthy, msg = writer.check_health() + assert healthy and msg == "ok" + assert writer._client is not None + + +def test_check_health_client_error(): + class DummyError(BotoCoreError): + fmt = "Dummy error" + + writer = WriterEventBridge({"event_bus_arn": "arn:aws:events:region:acct:event-bus/bus"}) + with patch("boto3.client", side_effect=DummyError()): + healthy, msg = writer.check_health() + assert not healthy and msg is not None diff --git a/tests/writers/test_writer_kafka.py b/tests/writers/test_writer_kafka.py index 7cad07b..2166d13 100644 --- a/tests/writers/test_writer_kafka.py +++ b/tests/writers/test_writer_kafka.py @@ -1,5 +1,23 @@ +# +# Copyright 2025 ABSA Group Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + import logging from types import SimpleNamespace + +from src.writers.writer_kafka import WriterKafka import src.writers.writer_kafka as wk @@ -12,8 +30,8 @@ def produce(self, topic, key, value, callback): # noqa: D401 # simulate success callback(None, SimpleNamespace()) - def flush(self): - return None + def flush(self, timeout=None): + return 0 class FakeProducerError(FakeProducerSuccess): @@ -28,7 +46,7 @@ def __init__(self, sequence): # sequence of remaining counts per flush call self.sequence = sequence self.flush_calls = 0 - def flush(self, *a, **kw): + def flush(self, timeout=None): # Simulate decreasing remaining messages if self.flush_calls < len(self.sequence): val = self.sequence[self.flush_calls] @@ -44,7 +62,7 @@ def __init__(self, remaining_value): self.remaining_value = remaining_value self.flush_calls = 0 - def flush(self, *a, **kw): # always returns same remaining >0 to force timeout warning + def flush(self, timeout=None): # always returns same remaining >0 to force timeout warning self.flush_calls += 1 return self.remaining_value @@ -60,24 +78,26 @@ def flush(self): # noqa: D401 return 0 -def test_write_skips_when_producer_none(monkeypatch): - wk.STATE["logger"] = logging.getLogger("test") - wk.STATE["producer"] = None - ok, err = wk.write("topic", {"a": 1}) +# --- write() --- + + +def test_write_skips_when_producer_none(): + writer = WriterKafka({}) # No kafka_bootstrap_server + ok, err = writer.write("topic", {"a": 1}) assert ok and err is None -def test_write_success(monkeypatch): - wk.STATE["logger"] = logging.getLogger("test") - wk.STATE["producer"] = FakeProducerSuccess() - ok, err = wk.write("topic", {"b": 2}) +def test_write_success(): + writer = WriterKafka({"kafka_bootstrap_server": "localhost:9092"}) + writer._producer = FakeProducerSuccess() + ok, err = writer.write("topic", {"b": 2}) assert ok and err is None -def test_write_async_error(monkeypatch): - wk.STATE["logger"] = logging.getLogger("test") - wk.STATE["producer"] = FakeProducerError() - ok, err = wk.write("topic", {"c": 3}) +def test_write_async_error(): + writer = WriterKafka({"kafka_bootstrap_server": "localhost:9092"}) + writer._producer = FakeProducerError() + ok, err = writer.write("topic", {"c": 3}) assert not ok and "ERR" in err @@ -86,27 +106,26 @@ class DummyKafkaException(Exception): def test_write_kafka_exception(monkeypatch): - wk.STATE["logger"] = logging.getLogger("test") - class RaisingProducer(FakeProducerSuccess): def produce(self, *a, **kw): # noqa: D401 raise DummyKafkaException("boom") # Monkeypatch KafkaException symbol used in except monkeypatch.setattr(wk, "KafkaException", DummyKafkaException, raising=False) - wk.STATE["producer"] = RaisingProducer() - ok, err = wk.write("topic", {"d": 4}) + writer = WriterKafka({"kafka_bootstrap_server": "localhost:9092"}) + writer._producer = RaisingProducer() + ok, err = writer.write("topic", {"d": 4}) assert not ok and "boom" in err def test_write_flush_retries_until_success(monkeypatch, caplog): - wk.STATE["logger"] = logging.getLogger("test") caplog.set_level(logging.WARNING) # Force smaller max retries for deterministic sequence length monkeypatch.setattr(wk, "_MAX_RETRIES", 5, raising=False) producer = FakeProducerFlushSequence([5, 4, 3, 1, 0]) - wk.STATE["producer"] = producer - ok, err = wk.write("topic", {"e": 5}) + writer = WriterKafka({"kafka_bootstrap_server": "localhost:9092"}) + writer._producer = producer + ok, err = writer.write("topic", {"e": 5}) assert ok and err is None # It should break as soon as remaining == 0 (after flush call returning 0) assert producer.flush_calls == 5 # sequence consumed until 0 @@ -116,12 +135,12 @@ def test_write_flush_retries_until_success(monkeypatch, caplog): def test_write_timeout_warning_when_remaining_after_retries(monkeypatch, caplog): - wk.STATE["logger"] = logging.getLogger("test") caplog.set_level(logging.WARNING) monkeypatch.setattr(wk, "_MAX_RETRIES", 3, raising=False) producer = FakeProducerTimeout(2) - wk.STATE["producer"] = producer - ok, err = wk.write("topic", {"f": 6}) + writer = WriterKafka({"kafka_bootstrap_server": "localhost:9092"}) + writer._producer = producer + ok, err = writer.write("topic", {"f": 6}) timeout_warnings = [ r.message for r in caplog.records if "timeout" in r.message ] # final warning should mention timeout @@ -131,11 +150,35 @@ def test_write_timeout_warning_when_remaining_after_retries(monkeypatch, caplog) def test_flush_with_timeout_typeerror_fallback(monkeypatch): - wk.STATE["logger"] = logging.getLogger("test") monkeypatch.setattr(wk, "_MAX_RETRIES", 4, raising=False) producer = FakeProducerTypeError() - wk.STATE["producer"] = producer - ok, err = wk.write("topic", {"g": 7}) + writer = WriterKafka({"kafka_bootstrap_server": "localhost:9092"}) + writer._producer = producer + ok, err = writer.write("topic", {"g": 7}) assert ok and err is None # Since flush returns 0 immediately, only one flush call should be needed assert producer.flush_calls == 1 + + +# --- check_health() --- + + +def test_check_health_not_configured(): + writer = WriterKafka({}) + healthy, msg = writer.check_health() + assert healthy and msg == "not configured" + + +def test_check_health_success(): + writer = WriterKafka({"kafka_bootstrap_server": "localhost:9092"}) + writer._producer = FakeProducerSuccess() + healthy, msg = writer.check_health() + assert healthy and msg == "ok" + + +def test_check_health_producer_init_failed(monkeypatch): + writer = WriterKafka({"kafka_bootstrap_server": "localhost:9092"}) + # Force _create_producer to return None + monkeypatch.setattr(writer, "_create_producer", lambda: None) + healthy, msg = writer.check_health() + assert not healthy and "initialization failed" in msg diff --git a/tests/writers/test_writer_postgres.py b/tests/writers/test_writer_postgres.py index bdb8d6d..30e846d 100644 --- a/tests/writers/test_writer_postgres.py +++ b/tests/writers/test_writer_postgres.py @@ -13,18 +13,15 @@ # See the License for the specific language governing permissions and # limitations under the License. # + import json -import logging import os import types -import pytest - -from src.writers import writer_postgres +import pytest -@pytest.fixture(scope="module", autouse=True) -def init_module_logger(): - writer_postgres.init(logging.getLogger("test")) +from src.writers.writer_postgres import WriterPostgres +import src.writers.writer_postgres as wp class MockCursor: @@ -39,6 +36,7 @@ def execute(self, sql, params): def test_postgres_edla_write_with_optional_fields(): + writer = WriterPostgres({}) cur = MockCursor() message = { "event_id": "e1", @@ -55,7 +53,7 @@ def test_postgres_edla_write_with_optional_fields(): "format_options": {"compression": "snappy"}, "additional_info": {"foo": "bar"}, } - writer_postgres.postgres_edla_write(cur, "table_a", message) + writer._postgres_edla_write(cur, "table_a", message) assert len(cur.executions) == 1 _sql, params = cur.executions[0] assert len(params) == 13 @@ -68,6 +66,7 @@ def test_postgres_edla_write_with_optional_fields(): def test_postgres_edla_write_missing_optional(): + writer = WriterPostgres({}) cur = MockCursor() message = { "event_id": "e2", @@ -80,7 +79,7 @@ def test_postgres_edla_write_missing_optional(): "operation": "overwrite", "format": "delta", } - writer_postgres.postgres_edla_write(cur, "table_a", message) + writer._postgres_edla_write(cur, "table_a", message) _sql, params = cur.executions[0] assert params[6] == "" assert params[9] is None @@ -90,6 +89,7 @@ def test_postgres_edla_write_missing_optional(): def test_postgres_run_write(): + writer = WriterPostgres({}) cur = MockCursor() message = { "event_id": "r1", @@ -113,7 +113,7 @@ def test_postgres_run_write(): }, ], } - writer_postgres.postgres_run_write(cur, "runs_table", "jobs_table", message) + writer._postgres_run_write(cur, "runs_table", "jobs_table", message) assert len(cur.executions) == 3 # Check run insert @@ -135,6 +135,7 @@ def test_postgres_run_write(): def test_postgres_test_write(): + writer = WriterPostgres({}) cur = MockCursor() message = { "event_id": "t1", @@ -144,7 +145,7 @@ def test_postgres_test_write(): "timestamp": 999, "additional_info": {"a": 1}, } - writer_postgres.postgres_test_write(cur, "table_test", message) + writer._postgres_test_write(cur, "table_test", message) assert len(cur.executions) == 1 _sql, params = cur.executions[0] assert params[0] == "t1" @@ -156,13 +157,8 @@ def test_postgres_test_write(): @pytest.fixture -def reset_state(monkeypatch): - # Preserve psycopg2 ref - original_psycopg2 = writer_postgres.psycopg2 - writer_postgres.POSTGRES = {"database": ""} +def reset_env(): yield - writer_postgres.psycopg2 = original_psycopg2 - writer_postgres.POSTGRES = {"database": ""} os.environ.pop("POSTGRES_SECRET_NAME", None) os.environ.pop("POSTGRES_SECRET_REGION", None) @@ -207,48 +203,56 @@ def connect(self, **kwargs): return DummyConnection(self.store) -def test_write_skips_when_no_database(reset_state): - writer_postgres.POSTGRES = {"database": ""} - ok, err = writer_postgres.write("public.cps.za.test", {}) +# --- write() --- + + +def test_write_skips_when_no_database(reset_env): + writer = WriterPostgres({}) + writer._db_config = {"database": ""} + ok, err = writer.write("public.cps.za.test", {}) assert ok and err is None -def test_write_skips_when_psycopg2_missing(reset_state, monkeypatch): - writer_postgres.POSTGRES = {"database": "db", "host": "h", "user": "u", "password": "p", "port": 5432} - monkeypatch.setattr(writer_postgres, "psycopg2", None) - ok, err = writer_postgres.write("public.cps.za.test", {}) +def test_write_skips_when_psycopg2_missing(reset_env, monkeypatch): + writer = WriterPostgres({}) + writer._db_config = {"database": "db", "host": "h", "user": "u", "password": "p", "port": 5432} + monkeypatch.setattr(wp, "psycopg2", None) + ok, err = writer.write("public.cps.za.test", {}) assert ok and err is None -def test_write_unknown_topic_returns_false(reset_state, monkeypatch): +def test_write_unknown_topic_returns_false(reset_env, monkeypatch): store = [] - monkeypatch.setattr(writer_postgres, "psycopg2", DummyPsycopg(store)) - writer_postgres.POSTGRES = {"database": "db", "host": "h", "user": "u", "password": "p", "port": 5432} - ok, err = writer_postgres.write("public.cps.za.unknown", {}) + monkeypatch.setattr(wp, "psycopg2", DummyPsycopg(store)) + writer = WriterPostgres({}) + writer._db_config = {"database": "db", "host": "h", "user": "u", "password": "p", "port": 5432} + ok, err = writer.write("public.cps.za.unknown", {}) assert not ok and "unknown topic" in err -def test_write_success_known_topic(reset_state, monkeypatch): +def test_write_success_known_topic(reset_env, monkeypatch): store = [] - monkeypatch.setattr(writer_postgres, "psycopg2", DummyPsycopg(store)) - writer_postgres.POSTGRES = {"database": "db", "host": "h", "user": "u", "password": "p", "port": 5432} + monkeypatch.setattr(wp, "psycopg2", DummyPsycopg(store)) + writer = WriterPostgres({}) + writer._db_config = {"database": "db", "host": "h", "user": "u", "password": "p", "port": 5432} message = {"event_id": "id", "tenant_id": "ten", "source_app": "app", "environment": "dev", "timestamp": 123} - ok, err = writer_postgres.write("public.cps.za.test", message) + ok, err = writer.write("public.cps.za.test", message) assert ok and err is None and len(store) == 1 -def test_write_exception_returns_false(reset_state, monkeypatch): +def test_write_exception_returns_false(reset_env, monkeypatch): class FailingPsycopg: def connect(self, **kwargs): raise RuntimeError("boom") - monkeypatch.setattr(writer_postgres, "psycopg2", FailingPsycopg()) - writer_postgres.POSTGRES = {"database": "db", "host": "h", "user": "u", "password": "p", "port": 5432} - ok, err = writer_postgres.write("public.cps.za.test", {}) + monkeypatch.setattr(wp, "psycopg2", FailingPsycopg()) + writer = WriterPostgres({}) + writer._db_config = {"database": "db", "host": "h", "user": "u", "password": "p", "port": 5432} + ok, err = writer.write("public.cps.za.test", {}) assert not ok and "failed unknown error" in err -def test_init_with_secret(monkeypatch, reset_state): +def test_init_with_secret(monkeypatch, reset_env): secret_dict = {"database": "db", "host": "h", "user": "u", "password": "p", "port": 5432} os.environ["POSTGRES_SECRET_NAME"] = "mysecret" os.environ["POSTGRES_SECRET_REGION"] = "eu-west-1" @@ -258,15 +262,16 @@ class MockSession: def client(self, service_name, region_name): return mock_client - monkeypatch.setattr(writer_postgres.boto3, "Session", lambda: MockSession()) - writer_postgres.init(logging.getLogger("test")) - assert writer_postgres.POSTGRES == secret_dict + monkeypatch.setattr(wp.boto3, "Session", lambda: MockSession()) + writer = WriterPostgres({}) + assert writer._db_config == secret_dict -def test_write_dlchange_success(reset_state, monkeypatch): +def test_write_dlchange_success(reset_env, monkeypatch): store = [] - monkeypatch.setattr(writer_postgres, "psycopg2", DummyPsycopg(store)) - writer_postgres.POSTGRES = {"database": "db", "host": "h", "user": "u", "password": "p", "port": 5432} + monkeypatch.setattr(wp, "psycopg2", DummyPsycopg(store)) + writer = WriterPostgres({}) + writer._db_config = {"database": "db", "host": "h", "user": "u", "password": "p", "port": 5432} message = { "event_id": "e1", "tenant_id": "t", @@ -278,14 +283,15 @@ def test_write_dlchange_success(reset_state, monkeypatch): "operation": "append", "format": "parquet", } - ok, err = writer_postgres.write("public.cps.za.dlchange", message) + ok, err = writer.write("public.cps.za.dlchange", message) assert ok and err is None and len(store) == 1 -def test_write_runs_success(reset_state, monkeypatch): +def test_write_runs_success(reset_env, monkeypatch): store = [] - monkeypatch.setattr(writer_postgres, "psycopg2", DummyPsycopg(store)) - writer_postgres.POSTGRES = {"database": "db", "host": "h", "user": "u", "password": "p", "port": 5432} + monkeypatch.setattr(wp, "psycopg2", DummyPsycopg(store)) + writer = WriterPostgres({}) + writer._db_config = {"database": "db", "host": "h", "user": "u", "password": "p", "port": 5432} message = { "event_id": "r1", "job_ref": "job", @@ -297,5 +303,29 @@ def test_write_runs_success(reset_state, monkeypatch): "timestamp_end": 2, "jobs": [{"catalog_id": "c", "status": "ok", "timestamp_start": 1, "timestamp_end": 2}], } - ok, err = writer_postgres.write("public.cps.za.runs", message) + ok, err = writer.write("public.cps.za.runs", message) assert ok and err is None and len(store) == 2 # run + job insert + + +# --- check_health() --- + + +def test_check_health_not_configured(): + writer = WriterPostgres({}) + writer._db_config = {"database": ""} + healthy, msg = writer.check_health() + assert healthy and msg == "not configured" + + +def test_check_health_success(): + writer = WriterPostgres({}) + writer._db_config = {"database": "db", "host": "h", "user": "u", "password": "p", "port": 5432} + healthy, msg = writer.check_health() + assert healthy and msg == "ok" + + +def test_check_health_missing_host(): + writer = WriterPostgres({}) + writer._db_config = {"database": "db", "host": "", "user": "u", "password": "p", "port": 5432} + healthy, msg = writer.check_health() + assert not healthy and "host not configured" in msg