diff --git a/src/event_gate_lambda.py b/src/event_gate_lambda.py index 36f1664..6f19ef5 100644 --- a/src/event_gate_lambda.py +++ b/src/event_gate_lambda.py @@ -22,26 +22,24 @@ import sys from typing import Any, Dict -import urllib3 - import boto3 import jwt import requests +import urllib3 +from cryptography.exceptions import UnsupportedAlgorithm from cryptography.hazmat.primitives import serialization from jsonschema import validate from jsonschema.exceptions import ValidationError -# Added explicit import for serialization-related exceptions -try: # pragma: no cover - import guard - from cryptography.exceptions import UnsupportedAlgorithm # type: ignore -except Exception: # pragma: no cover - very defensive - UnsupportedAlgorithm = Exception # type: ignore - # Import writer modules with explicit ImportError fallback try: - from . import writer_eventbridge, writer_kafka, writer_postgres + from . import writer_eventbridge + from . import writer_kafka + from . import writer_postgres except ImportError: # fallback when executed outside package context - import writer_eventbridge, writer_kafka, writer_postgres # type: ignore[no-redef] + import writer_eventbridge # type: ignore[no-redef] + import writer_kafka # type: ignore[no-redef] + import writer_postgres # type: ignore[no-redef] # Import configuration directory symbols with explicit ImportError fallback try: diff --git a/src/logging_levels.py b/src/logging_levels.py new file mode 100644 index 0000000..7e31c13 --- /dev/null +++ b/src/logging_levels.py @@ -0,0 +1,29 @@ +"""Custom logging levels. + +Adds a TRACE level below DEBUG for very verbose payload logging. +""" + +from __future__ import annotations +import logging + +TRACE_LEVEL = 5 + +# Register level name only once (idempotent) +if not hasattr(logging, "TRACE"): + logging.addLevelName(TRACE_LEVEL, "TRACE") + + def trace(self: logging.Logger, message: str, *args, **kws): # type: ignore[override] + """Log a message with TRACE level. + + Args: + self: Logger instance. + message: Log message format string. + *args: Positional arguments for message formatting. + **kws: Keyword arguments passed to _log. + """ + if self.isEnabledFor(TRACE_LEVEL): + self._log(TRACE_LEVEL, message, args, **kws) # pylint: disable=protected-access + + logging.Logger.trace = trace # type: ignore[attr-defined] + +__all__ = ["TRACE_LEVEL"] diff --git a/src/safe_serialization.py b/src/safe_serialization.py new file mode 100644 index 0000000..28cc472 --- /dev/null +++ b/src/safe_serialization.py @@ -0,0 +1,87 @@ +# +# Copyright 2025 ABSA Group Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Safe serialization utilities for logging. + +Provides PII-safe, size-bounded JSON serialization for TRACE logging. +""" + +import json +import os +from typing import Any, List, Set + + +def _redact_sensitive_keys(obj: Any, redact_keys: Set[str]) -> Any: + """Recursively redact sensitive keys from nested structures. + + Args: + obj: Object to redact (dict, list, or scalar). + redact_keys: Set of key names to redact (case-insensitive). + + Returns: + Copy of obj with sensitive values replaced by "***REDACTED***". + """ + if isinstance(obj, dict): + return { + k: "***REDACTED***" if k.lower() in redact_keys else _redact_sensitive_keys(v, redact_keys) + for k, v in obj.items() + } + if isinstance(obj, list): + return [_redact_sensitive_keys(item, redact_keys) for item in obj] + return obj + + +def safe_serialize_for_log(message: Any, redact_keys: List[str] | None = None, max_bytes: int | None = None) -> str: + """Safely serialize a message for logging with redaction and size capping. + + Args: + message: Object to serialize (typically a dict). + redact_keys: List of key names to redact (case-insensitive). If None, uses env TRACE_REDACT_KEYS. + max_bytes: Maximum serialized output size in bytes. If None, uses env TRACE_MAX_BYTES (default 10000). + + Returns: + JSON string (redacted and truncated if needed), or empty string on serialization error. + """ + # Apply configuration defaults + if redact_keys is None: + redact_keys_str = os.environ.get("TRACE_REDACT_KEYS", "password,secret,token,key,apikey,api_key") + redact_keys = [k.strip() for k in redact_keys_str.split(",") if k.strip()] + if max_bytes is None: + max_bytes = int(os.environ.get("TRACE_MAX_BYTES", "10000")) + + # Normalize to case-insensitive set + redact_set = {k.lower() for k in redact_keys} + + try: + # Redact sensitive keys + redacted = _redact_sensitive_keys(message, redact_set) + # Serialize with minimal whitespace + serialized = json.dumps(redacted, separators=(",", ":")) + # Truncate if needed + if len(serialized.encode("utf-8")) > max_bytes: + # Binary truncate to max_bytes and append marker + truncated_bytes = serialized.encode("utf-8")[:max_bytes] + # Ensure we don't break mid-multibyte character + try: + return truncated_bytes.decode("utf-8", errors="ignore") + "..." + except UnicodeDecodeError: # pragma: no cover - defensive + return "" + return serialized + except (TypeError, ValueError, OverflowError): # pragma: no cover - catch serialization errors + return "" + + +__all__ = ["safe_serialize_for_log"] diff --git a/src/trace_logging.py b/src/trace_logging.py new file mode 100644 index 0000000..4fd2a1e --- /dev/null +++ b/src/trace_logging.py @@ -0,0 +1,51 @@ +# +# Copyright 2025 ABSA Group Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Trace-level logging utilities. + +Provides reusable TRACE-level payload logging for writer modules. +""" + +import logging +from typing import Any, Dict + +from .logging_levels import TRACE_LEVEL +from .safe_serialization import safe_serialize_for_log + + +def log_payload_at_trace(logger: logging.Logger, writer_name: str, topic_name: str, message: Dict[str, Any]) -> None: + """Log message payload at TRACE level with safe serialization. + + Args: + logger: Logger instance to use for logging. + writer_name: Name of the writer (e.g., "EventBridge", "Kafka", "Postgres"). + topic_name: Topic name being written to. + message: Message payload to log. + """ + if not logger.isEnabledFor(TRACE_LEVEL): + return + + try: + safe_payload = safe_serialize_for_log(message) + if safe_payload: + logger.trace( # type: ignore[attr-defined] + "%s payload topic=%s payload=%s", writer_name, topic_name, safe_payload + ) + except (TypeError, ValueError): # pragma: no cover - defensive serialization guard + logger.trace("%s payload topic=%s ", writer_name, topic_name) # type: ignore[attr-defined] + + +__all__ = ["log_payload_at_trace"] diff --git a/src/writer_eventbridge.py b/src/writer_eventbridge.py index eae6332..66da65c 100644 --- a/src/writer_eventbridge.py +++ b/src/writer_eventbridge.py @@ -26,6 +26,8 @@ import boto3 from botocore.exceptions import BotoCoreError, ClientError +from .trace_logging import log_payload_at_trace + STATE: Dict[str, Any] = {"logger": logging.getLogger(__name__), "event_bus_arn": "", "client": None} @@ -68,6 +70,8 @@ def write(topic_name: str, message: Dict[str, Any]) -> Tuple[bool, Optional[str] logger.debug("EventBridge client not initialized - skipping") return True, None + log_payload_at_trace(logger, "EventBridge", topic_name, message) + try: logger.debug("Sending to eventBridge %s", topic_name) response = client.put_events( diff --git a/src/writer_kafka.py b/src/writer_kafka.py index 6151776..1b4b526 100644 --- a/src/writer_kafka.py +++ b/src/writer_kafka.py @@ -26,6 +26,8 @@ from typing import Any, Dict, Optional, Tuple from confluent_kafka import Producer +from .trace_logging import log_payload_at_trace + try: # KafkaException may not exist in stubbed test module from confluent_kafka import KafkaException # type: ignore except (ImportError, ModuleNotFoundError): # pragma: no cover - fallback for test stub @@ -92,6 +94,8 @@ def write(topic_name: str, message: Dict[str, Any]) -> Tuple[bool, Optional[str] logger.debug("Kafka producer not initialized - skipping") return True, None + log_payload_at_trace(logger, "Kafka", topic_name, message) + errors: list[str] = [] has_exception = False diff --git a/src/writer_postgres.py b/src/writer_postgres.py index 28289e8..6f8d603 100644 --- a/src/writer_postgres.py +++ b/src/writer_postgres.py @@ -31,6 +31,8 @@ except ImportError: # pragma: no cover - environment without psycopg2 psycopg2 = None # type: ignore +from .trace_logging import log_payload_at_trace + # Define a unified psycopg2 error base for safe exception handling even if psycopg2 missing if psycopg2 is not None: # type: ignore try: # pragma: no cover - attribute presence depends on installed psycopg2 variant @@ -47,20 +49,20 @@ class PsycopgError(Exception): # type: ignore # Module level globals for typing -_logger: logging.Logger = logging.getLogger(__name__) +logger: logging.Logger = logging.getLogger(__name__) POSTGRES: Dict[str, Any] = {"database": ""} -def init(logger: logging.Logger) -> None: +def init(logger_instance: logging.Logger) -> None: """Initialize Postgres credentials either from AWS Secrets Manager or fallback empty config. Args: - logger: Shared application logger. + logger_instance: Shared application logger. """ - global _logger # pylint: disable=global-statement + global logger # pylint: disable=global-statement global POSTGRES # pylint: disable=global-statement - _logger = logger + logger = logger_instance secret_name = os.environ.get("POSTGRES_SECRET_NAME", "") secret_region = os.environ.get("POSTGRES_SECRET_REGION", "") @@ -72,7 +74,7 @@ def init(logger: logging.Logger) -> None: else: POSTGRES = {"database": ""} - _logger.debug("Initialized POSTGRES writer") + logger.debug("Initialized POSTGRES writer") def postgres_edla_write(cursor, table: str, message: Dict[str, Any]) -> None: @@ -83,7 +85,7 @@ def postgres_edla_write(cursor, table: str, message: Dict[str, Any]) -> None: table: Target table name. message: Event payload. """ - _logger.debug("Sending to Postgres - %s", table) + logger.debug("Sending to Postgres - %s", table) cursor.execute( f""" INSERT INTO {table} @@ -142,7 +144,7 @@ def postgres_run_write(cursor, table_runs: str, table_jobs: str, message: Dict[s table_jobs: Jobs table name. message: Event payload (includes jobs array). """ - _logger.debug("Sending to Postgres - %s and %s", table_runs, table_jobs) + logger.debug("Sending to Postgres - %s and %s", table_runs, table_jobs) cursor.execute( f""" INSERT INTO {table_runs} @@ -222,7 +224,7 @@ def postgres_test_write(cursor, table: str, message: Dict[str, Any]) -> None: table: Target table name. message: Event payload. """ - _logger.debug("Sending to Postgres - %s", table) + logger.debug("Sending to Postgres - %s", table) cursor.execute( f""" INSERT INTO {table} @@ -265,12 +267,14 @@ def write(topic_name: str, message: Dict[str, Any]) -> Tuple[bool, Optional[str] """ try: if not POSTGRES.get("database"): - _logger.debug("No Postgres - skipping") + logger.debug("No Postgres - skipping") return True, None if psycopg2 is None: # type: ignore - _logger.debug("psycopg2 not available - skipping actual Postgres write") + logger.debug("psycopg2 not available - skipping actual Postgres write") return True, None + log_payload_at_trace(logger, "Postgres", topic_name, message) + with psycopg2.connect( # type: ignore[attr-defined] database=POSTGRES["database"], host=POSTGRES["host"], @@ -287,13 +291,13 @@ def write(topic_name: str, message: Dict[str, Any]) -> Tuple[bool, Optional[str] postgres_test_write(cursor, "public_cps_za_test", message) else: msg = f"unknown topic for postgres {topic_name}" - _logger.error(msg) + logger.error(msg) return False, msg connection.commit() # type: ignore except (RuntimeError, PsycopgError) as e: # narrowed exception set err_msg = f"The Postgres writer with failed unknown error: {str(e)}" - _logger.error(err_msg) + logger.exception(err_msg) return False, err_msg return True, None diff --git a/tests/test_safe_serialization.py b/tests/test_safe_serialization.py new file mode 100644 index 0000000..638bc1b --- /dev/null +++ b/tests/test_safe_serialization.py @@ -0,0 +1,243 @@ +# +# Copyright 2025 ABSA Group Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Unit tests for safe_serialization module.""" + +import json +import os +from unittest.mock import patch + +import pytest + +from src.safe_serialization import safe_serialize_for_log + + +class TestSafeSerializeForLog: + """Test suite for safe_serialize_for_log function.""" + + def test_simple_serialization(self): + """Test basic serialization without redaction or truncation.""" + message = {"event_id": "123", "tenant_id": "abc"} + result = safe_serialize_for_log(message, redact_keys=[], max_bytes=10000) + assert result == '{"event_id":"123","tenant_id":"abc"}' + + def test_redact_single_key(self): + """Test redaction of a single sensitive key.""" + message = {"event_id": "123", "password": "secret123"} + result = safe_serialize_for_log(message, redact_keys=["password"], max_bytes=10000) + parsed = json.loads(result) + assert parsed["event_id"] == "123" + assert parsed["password"] == "***REDACTED***" + + def test_redact_multiple_keys(self): + """Test redaction of multiple sensitive keys.""" + message = {"username": "user", "password": "pass123", "api_key": "key456", "data": "visible"} + result = safe_serialize_for_log(message, redact_keys=["password", "api_key"], max_bytes=10000) + parsed = json.loads(result) + assert parsed["username"] == "user" + assert parsed["password"] == "***REDACTED***" + assert parsed["api_key"] == "***REDACTED***" + assert parsed["data"] == "visible" + + def test_redact_case_insensitive(self): + """Test that redaction is case-insensitive.""" + message = {"Password": "secret", "API_KEY": "key", "Token": "tok"} + result = safe_serialize_for_log(message, redact_keys=["password", "api_key", "token"], max_bytes=10000) + parsed = json.loads(result) + assert parsed["Password"] == "***REDACTED***" + assert parsed["API_KEY"] == "***REDACTED***" + assert parsed["Token"] == "***REDACTED***" + + def test_redact_nested_dict(self): + """Test redaction in nested dictionaries.""" + message = { + "event_id": "123", + "credentials": {"username": "user", "password": "secret"}, + "config": {"api_key": "key123", "timeout": 30}, + } + result = safe_serialize_for_log(message, redact_keys=["password", "api_key"], max_bytes=10000) + parsed = json.loads(result) + assert parsed["event_id"] == "123" + assert parsed["credentials"]["username"] == "user" + assert parsed["credentials"]["password"] == "***REDACTED***" + assert parsed["config"]["api_key"] == "***REDACTED***" + assert parsed["config"]["timeout"] == 30 + + def test_redact_in_list(self): + """Test redaction in lists of objects.""" + message = { + "users": [ + {"name": "alice", "password": "pass1"}, + {"name": "bob", "secret": "sec2"}, + ] + } + result = safe_serialize_for_log(message, redact_keys=["password", "secret"], max_bytes=10000) + parsed = json.loads(result) + assert parsed["users"][0]["name"] == "alice" + assert parsed["users"][0]["password"] == "***REDACTED***" + assert parsed["users"][1]["name"] == "bob" + assert parsed["users"][1]["secret"] == "***REDACTED***" + + def test_truncation_at_max_bytes(self): + """Test that output is truncated when exceeding max_bytes.""" + message = {"data": "x" * 10000} + result = safe_serialize_for_log(message, redact_keys=[], max_bytes=100) + # Result should be truncated and end with "..." + assert result.endswith("...") + assert len(result.encode("utf-8")) <= 103 # 100 bytes + "..." + + def test_truncation_preserves_validity(self): + """Test that truncation doesn't break in middle of multibyte characters.""" + # Create a message with multibyte unicode characters + message = {"emoji": "🔥" * 1000} + result = safe_serialize_for_log(message, redact_keys=[], max_bytes=50) + assert result.endswith("...") + # Should not raise decoding errors + assert len(result) > 0 + + def test_env_default_redact_keys(self): + """Test that default redact keys are loaded from environment.""" + with patch.dict(os.environ, {"TRACE_REDACT_KEYS": "password,secret,token"}): + message = {"password": "pass123", "secret": "sec456", "data": "visible"} + result = safe_serialize_for_log(message) + parsed = json.loads(result) + assert parsed["password"] == "***REDACTED***" + assert parsed["secret"] == "***REDACTED***" + assert parsed["data"] == "visible" + + def test_env_default_max_bytes(self): + """Test that default max_bytes is loaded from environment.""" + with patch.dict(os.environ, {"TRACE_MAX_BYTES": "50"}): + message = {"data": "x" * 1000} + result = safe_serialize_for_log(message) + assert result.endswith("...") + assert len(result.encode("utf-8")) <= 53 # 50 + "..." + + def test_env_defaults_when_not_set(self): + """Test default behavior when env vars are not set.""" + # Clear env vars if present + with patch.dict(os.environ, {}, clear=False): + if "TRACE_REDACT_KEYS" in os.environ: + del os.environ["TRACE_REDACT_KEYS"] + if "TRACE_MAX_BYTES" in os.environ: + del os.environ["TRACE_MAX_BYTES"] + message = {"password": "secret", "data": "x" * 20000} + result = safe_serialize_for_log(message) + # Default should redact 'password' (in default list) + if result.endswith("..."): + # Truncated at default 10000 bytes + assert len(result.encode("utf-8")) <= 10003 + else: + parsed = json.loads(result) + assert parsed["password"] == "***REDACTED***" + + def test_unserializable_object_returns_empty(self): + """Test that unserializable objects return empty string.""" + + class Unserializable: + def __repr__(self): + raise RuntimeError("Cannot serialize") + + # Mock json.dumps to raise an exception + message = {"obj": "normal"} + with patch("src.safe_serialization.json.dumps", side_effect=TypeError("Cannot serialize")): + result = safe_serialize_for_log(message, redact_keys=[], max_bytes=10000) + assert result == "" + + def test_empty_message(self): + """Test serialization of empty message.""" + result = safe_serialize_for_log({}, redact_keys=[], max_bytes=10000) + assert result == "{}" + + def test_none_redact_keys(self): + """Test that None redact_keys uses environment defaults.""" + with patch.dict(os.environ, {"TRACE_REDACT_KEYS": "password"}): + message = {"password": "secret"} + result = safe_serialize_for_log(message, redact_keys=None, max_bytes=10000) + parsed = json.loads(result) + assert parsed["password"] == "***REDACTED***" + + def test_none_max_bytes(self): + """Test that None max_bytes uses environment defaults.""" + with patch.dict(os.environ, {"TRACE_MAX_BYTES": "100"}): + message = {"data": "x" * 1000} + result = safe_serialize_for_log(message, redact_keys=[], max_bytes=None) + assert result.endswith("...") + + def test_empty_redact_keys_list(self): + """Test that empty redact_keys list performs no redaction.""" + message = {"password": "secret", "api_key": "key123"} + result = safe_serialize_for_log(message, redact_keys=[], max_bytes=10000) + parsed = json.loads(result) + assert parsed["password"] == "secret" + assert parsed["api_key"] == "key123" + + def test_redact_with_whitespace_in_env(self): + """Test that env var with whitespace is handled correctly.""" + with patch.dict(os.environ, {"TRACE_REDACT_KEYS": " password , api_key , token "}): + message = {"password": "secret", "api_key": "key", "token": "tok"} + result = safe_serialize_for_log(message, redact_keys=None, max_bytes=10000) + parsed = json.loads(result) + assert parsed["password"] == "***REDACTED***" + assert parsed["api_key"] == "***REDACTED***" + assert parsed["token"] == "***REDACTED***" + + def test_complex_nested_structure(self): + """Test redaction in complex nested structures.""" + message = { + "level1": { + "level2": { + "level3": { + "password": "deep_secret", + "data": "visible", + } + }, + "items": [ + {"id": 1, "secret": "s1"}, + {"id": 2, "secret": "s2"}, + ], + } + } + result = safe_serialize_for_log(message, redact_keys=["password", "secret"], max_bytes=10000) + parsed = json.loads(result) + assert parsed["level1"]["level2"]["level3"]["password"] == "***REDACTED***" + assert parsed["level1"]["level2"]["level3"]["data"] == "visible" + assert parsed["level1"]["items"][0]["secret"] == "***REDACTED***" + assert parsed["level1"]["items"][1]["secret"] == "***REDACTED***" + assert parsed["level1"]["items"][0]["id"] == 1 + + def test_non_dict_message(self): + """Test serialization of non-dict messages (e.g., list, string).""" + # List + result = safe_serialize_for_log([1, 2, 3], redact_keys=[], max_bytes=10000) + assert result == "[1,2,3]" + + # String + result = safe_serialize_for_log("hello", redact_keys=[], max_bytes=10000) + assert result == '"hello"' + + # Number + result = safe_serialize_for_log(42, redact_keys=[], max_bytes=10000) + assert result == "42" + + def test_redaction_with_non_string_values(self): + """Test that redaction works when sensitive keys have non-string values.""" + message = {"password": 12345, "api_key": None, "secret": True} + result = safe_serialize_for_log(message, redact_keys=["password", "api_key", "secret"], max_bytes=10000) + parsed = json.loads(result) + assert parsed["password"] == "***REDACTED***" + assert parsed["api_key"] == "***REDACTED***" + assert parsed["secret"] == "***REDACTED***" diff --git a/tests/test_trace_logging.py b/tests/test_trace_logging.py new file mode 100644 index 0000000..751c35d --- /dev/null +++ b/tests/test_trace_logging.py @@ -0,0 +1,86 @@ +import logging +from unittest.mock import MagicMock + +from src.logging_levels import TRACE_LEVEL +import src.writer_eventbridge as we +import src.writer_kafka as wk +import src.writer_postgres as wp + + +def test_trace_eventbridge(caplog): + logger = logging.getLogger("trace.eventbridge") + logger.setLevel(TRACE_LEVEL) + we.STATE["logger"] = logger + we.STATE["event_bus_arn"] = "arn:aws:events:region:acct:event-bus/test" + mock_client = MagicMock() + mock_client.put_events.return_value = {"FailedEntryCount": 0, "Entries": []} + we.STATE["client"] = mock_client + caplog.set_level(TRACE_LEVEL) + ok, err = we.write("topic.eb", {"k": 1}) + assert ok and err is None + assert any("EventBridge payload" in rec.message for rec in caplog.records) + + +def test_trace_kafka(caplog): + class FakeProducer: + def produce(self, *a, **kw): + cb = kw.get("callback") + if cb: + cb(None, object()) + + def flush(self, *a, **kw): + return 0 + + logger = logging.getLogger("trace.kafka") + logger.setLevel(TRACE_LEVEL) + wk.STATE["logger"] = logger + wk.STATE["producer"] = FakeProducer() + caplog.set_level(TRACE_LEVEL) + ok, err = wk.write("topic.kf", {"k": 2}) + assert ok and err is None + assert any("Kafka payload" in rec.message for rec in caplog.records) + + +def test_trace_postgres(caplog, monkeypatch): + # Prepare dummy psycopg2 connection machinery + store = [] + + class DummyCursor: + def execute(self, sql, params): + store.append((sql, params)) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + class DummyConnection: + def cursor(self): + return DummyCursor() + + def commit(self): + pass + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + class DummyPsycopg2: + def connect(self, **kwargs): + return DummyConnection() + + monkeypatch.setattr(wp, "psycopg2", DummyPsycopg2()) + + logger = logging.getLogger("trace.postgres") + logger.setLevel(TRACE_LEVEL) + wp.logger = logger + wp.POSTGRES = {"database": "db", "host": "h", "user": "u", "password": "p", "port": 5432} + + caplog.set_level(TRACE_LEVEL) + message = {"event_id": "e", "tenant_id": "t", "source_app": "a", "environment": "dev", "timestamp": 1} + ok, err = wp.write("public.cps.za.test", message) + assert ok and err is None + assert any("Postgres payload" in rec.message for rec in caplog.records)