Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 8 additions & 10 deletions src/event_gate_lambda.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,24 @@
import sys
from typing import Any, Dict

import urllib3

import boto3
import jwt
import requests
import urllib3
from cryptography.exceptions import UnsupportedAlgorithm
from cryptography.hazmat.primitives import serialization
from jsonschema import validate
from jsonschema.exceptions import ValidationError

# Added explicit import for serialization-related exceptions
try: # pragma: no cover - import guard
from cryptography.exceptions import UnsupportedAlgorithm # type: ignore
except Exception: # pragma: no cover - very defensive
UnsupportedAlgorithm = Exception # type: ignore

# Import writer modules with explicit ImportError fallback
try:
from . import writer_eventbridge, writer_kafka, writer_postgres
from . import writer_eventbridge
from . import writer_kafka
from . import writer_postgres
except ImportError: # fallback when executed outside package context
import writer_eventbridge, writer_kafka, writer_postgres # type: ignore[no-redef]
import writer_eventbridge # type: ignore[no-redef]
import writer_kafka # type: ignore[no-redef]
import writer_postgres # type: ignore[no-redef]

# Import configuration directory symbols with explicit ImportError fallback
try:
Expand Down
29 changes: 29 additions & 0 deletions src/logging_levels.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
"""Custom logging levels.

Adds a TRACE level below DEBUG for very verbose payload logging.
"""

from __future__ import annotations
import logging

TRACE_LEVEL = 5

# Register level name only once (idempotent)
if not hasattr(logging, "TRACE"):
logging.addLevelName(TRACE_LEVEL, "TRACE")

def trace(self: logging.Logger, message: str, *args, **kws): # type: ignore[override]
"""Log a message with TRACE level.

Args:
self: Logger instance.
message: Log message format string.
*args: Positional arguments for message formatting.
**kws: Keyword arguments passed to _log.
"""
if self.isEnabledFor(TRACE_LEVEL):
self._log(TRACE_LEVEL, message, args, **kws) # pylint: disable=protected-access

logging.Logger.trace = trace # type: ignore[attr-defined]

__all__ = ["TRACE_LEVEL"]
87 changes: 87 additions & 0 deletions src/safe_serialization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
#
# Copyright 2025 ABSA Group Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""Safe serialization utilities for logging.

Provides PII-safe, size-bounded JSON serialization for TRACE logging.
"""

import json
import os
from typing import Any, List, Set


def _redact_sensitive_keys(obj: Any, redact_keys: Set[str]) -> Any:
"""Recursively redact sensitive keys from nested structures.

Args:
obj: Object to redact (dict, list, or scalar).
redact_keys: Set of key names to redact (case-insensitive).

Returns:
Copy of obj with sensitive values replaced by "***REDACTED***".
"""
if isinstance(obj, dict):
return {
k: "***REDACTED***" if k.lower() in redact_keys else _redact_sensitive_keys(v, redact_keys)
for k, v in obj.items()
}
if isinstance(obj, list):
return [_redact_sensitive_keys(item, redact_keys) for item in obj]
return obj


def safe_serialize_for_log(message: Any, redact_keys: List[str] | None = None, max_bytes: int | None = None) -> str:
"""Safely serialize a message for logging with redaction and size capping.

Args:
message: Object to serialize (typically a dict).
redact_keys: List of key names to redact (case-insensitive). If None, uses env TRACE_REDACT_KEYS.
max_bytes: Maximum serialized output size in bytes. If None, uses env TRACE_MAX_BYTES (default 10000).

Returns:
JSON string (redacted and truncated if needed), or empty string on serialization error.
"""
# Apply configuration defaults
if redact_keys is None:
redact_keys_str = os.environ.get("TRACE_REDACT_KEYS", "password,secret,token,key,apikey,api_key")
redact_keys = [k.strip() for k in redact_keys_str.split(",") if k.strip()]
if max_bytes is None:
max_bytes = int(os.environ.get("TRACE_MAX_BYTES", "10000"))

# Normalize to case-insensitive set
redact_set = {k.lower() for k in redact_keys}

try:
# Redact sensitive keys
redacted = _redact_sensitive_keys(message, redact_set)
# Serialize with minimal whitespace
serialized = json.dumps(redacted, separators=(",", ":"))
# Truncate if needed
if len(serialized.encode("utf-8")) > max_bytes:
# Binary truncate to max_bytes and append marker
truncated_bytes = serialized.encode("utf-8")[:max_bytes]
# Ensure we don't break mid-multibyte character
try:
return truncated_bytes.decode("utf-8", errors="ignore") + "..."
except UnicodeDecodeError: # pragma: no cover - defensive
return ""
return serialized
except (TypeError, ValueError, OverflowError): # pragma: no cover - catch serialization errors
return ""


__all__ = ["safe_serialize_for_log"]
51 changes: 51 additions & 0 deletions src/trace_logging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#
# Copyright 2025 ABSA Group Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""Trace-level logging utilities.

Provides reusable TRACE-level payload logging for writer modules.
"""

import logging
from typing import Any, Dict

from .logging_levels import TRACE_LEVEL
from .safe_serialization import safe_serialize_for_log


def log_payload_at_trace(logger: logging.Logger, writer_name: str, topic_name: str, message: Dict[str, Any]) -> None:
"""Log message payload at TRACE level with safe serialization.

Args:
logger: Logger instance to use for logging.
writer_name: Name of the writer (e.g., "EventBridge", "Kafka", "Postgres").
topic_name: Topic name being written to.
message: Message payload to log.
"""
if not logger.isEnabledFor(TRACE_LEVEL):
return

try:
safe_payload = safe_serialize_for_log(message)
if safe_payload:
logger.trace( # type: ignore[attr-defined]
"%s payload topic=%s payload=%s", writer_name, topic_name, safe_payload
)
except (TypeError, ValueError): # pragma: no cover - defensive serialization guard
logger.trace("%s payload topic=%s <unserializable>", writer_name, topic_name) # type: ignore[attr-defined]


__all__ = ["log_payload_at_trace"]
4 changes: 4 additions & 0 deletions src/writer_eventbridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import boto3
from botocore.exceptions import BotoCoreError, ClientError

from .trace_logging import log_payload_at_trace

STATE: Dict[str, Any] = {"logger": logging.getLogger(__name__), "event_bus_arn": "", "client": None}


Expand Down Expand Up @@ -68,6 +70,8 @@ def write(topic_name: str, message: Dict[str, Any]) -> Tuple[bool, Optional[str]
logger.debug("EventBridge client not initialized - skipping")
return True, None

log_payload_at_trace(logger, "EventBridge", topic_name, message)

try:
logger.debug("Sending to eventBridge %s", topic_name)
response = client.put_events(
Expand Down
4 changes: 4 additions & 0 deletions src/writer_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
from typing import Any, Dict, Optional, Tuple
from confluent_kafka import Producer

from .trace_logging import log_payload_at_trace

try: # KafkaException may not exist in stubbed test module
from confluent_kafka import KafkaException # type: ignore
except (ImportError, ModuleNotFoundError): # pragma: no cover - fallback for test stub
Expand Down Expand Up @@ -92,6 +94,8 @@ def write(topic_name: str, message: Dict[str, Any]) -> Tuple[bool, Optional[str]
logger.debug("Kafka producer not initialized - skipping")
return True, None

log_payload_at_trace(logger, "Kafka", topic_name, message)

errors: list[str] = []
has_exception = False

Expand Down
30 changes: 17 additions & 13 deletions src/writer_postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
except ImportError: # pragma: no cover - environment without psycopg2
psycopg2 = None # type: ignore

from .trace_logging import log_payload_at_trace

# Define a unified psycopg2 error base for safe exception handling even if psycopg2 missing
if psycopg2 is not None: # type: ignore
try: # pragma: no cover - attribute presence depends on installed psycopg2 variant
Expand All @@ -47,20 +49,20 @@ class PsycopgError(Exception): # type: ignore


# Module level globals for typing
_logger: logging.Logger = logging.getLogger(__name__)
logger: logging.Logger = logging.getLogger(__name__)
POSTGRES: Dict[str, Any] = {"database": ""}


def init(logger: logging.Logger) -> None:
def init(logger_instance: logging.Logger) -> None:
"""Initialize Postgres credentials either from AWS Secrets Manager or fallback empty config.

Args:
logger: Shared application logger.
logger_instance: Shared application logger.
"""
global _logger # pylint: disable=global-statement
global logger # pylint: disable=global-statement
global POSTGRES # pylint: disable=global-statement

_logger = logger
logger = logger_instance

secret_name = os.environ.get("POSTGRES_SECRET_NAME", "")
secret_region = os.environ.get("POSTGRES_SECRET_REGION", "")
Expand All @@ -72,7 +74,7 @@ def init(logger: logging.Logger) -> None:
else:
POSTGRES = {"database": ""}

_logger.debug("Initialized POSTGRES writer")
logger.debug("Initialized POSTGRES writer")


def postgres_edla_write(cursor, table: str, message: Dict[str, Any]) -> None:
Expand All @@ -83,7 +85,7 @@ def postgres_edla_write(cursor, table: str, message: Dict[str, Any]) -> None:
table: Target table name.
message: Event payload.
"""
_logger.debug("Sending to Postgres - %s", table)
logger.debug("Sending to Postgres - %s", table)
cursor.execute(
f"""
INSERT INTO {table}
Expand Down Expand Up @@ -142,7 +144,7 @@ def postgres_run_write(cursor, table_runs: str, table_jobs: str, message: Dict[s
table_jobs: Jobs table name.
message: Event payload (includes jobs array).
"""
_logger.debug("Sending to Postgres - %s and %s", table_runs, table_jobs)
logger.debug("Sending to Postgres - %s and %s", table_runs, table_jobs)
cursor.execute(
f"""
INSERT INTO {table_runs}
Expand Down Expand Up @@ -222,7 +224,7 @@ def postgres_test_write(cursor, table: str, message: Dict[str, Any]) -> None:
table: Target table name.
message: Event payload.
"""
_logger.debug("Sending to Postgres - %s", table)
logger.debug("Sending to Postgres - %s", table)
cursor.execute(
f"""
INSERT INTO {table}
Expand Down Expand Up @@ -265,12 +267,14 @@ def write(topic_name: str, message: Dict[str, Any]) -> Tuple[bool, Optional[str]
"""
try:
if not POSTGRES.get("database"):
_logger.debug("No Postgres - skipping")
logger.debug("No Postgres - skipping")
return True, None
if psycopg2 is None: # type: ignore
_logger.debug("psycopg2 not available - skipping actual Postgres write")
logger.debug("psycopg2 not available - skipping actual Postgres write")
return True, None

log_payload_at_trace(logger, "Postgres", topic_name, message)

with psycopg2.connect( # type: ignore[attr-defined]
database=POSTGRES["database"],
host=POSTGRES["host"],
Expand All @@ -287,13 +291,13 @@ def write(topic_name: str, message: Dict[str, Any]) -> Tuple[bool, Optional[str]
postgres_test_write(cursor, "public_cps_za_test", message)
else:
msg = f"unknown topic for postgres {topic_name}"
_logger.error(msg)
logger.error(msg)
return False, msg

connection.commit() # type: ignore
except (RuntimeError, PsycopgError) as e: # narrowed exception set
err_msg = f"The Postgres writer with failed unknown error: {str(e)}"
_logger.error(err_msg)
logger.exception(err_msg)
return False, err_msg

return True, None
Loading