Skip to content

Commit ad4d5d7

Browse files
Add TRACE logging level for verbose payload logging across event writers
1 parent 1095a0b commit ad4d5d7

File tree

6 files changed

+150
-0
lines changed

6 files changed

+150
-0
lines changed

src/event_gate_lambda.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,12 @@
4343
except ImportError: # fallback when executed outside package context
4444
import writer_eventbridge, writer_kafka, writer_postgres # type: ignore[no-redef]
4545

46+
# Register custom TRACE level before using LOG_LEVEL env var
47+
try:
48+
from .logging_levels import TRACE_LEVEL # noqa: F401
49+
except Exception: # pragma: no cover - defensive
50+
TRACE_LEVEL = 5 # type: ignore
51+
4652
# Import configuration directory symbols with explicit ImportError fallback
4753
try:
4854
from .conf_path import CONF_DIR, INVALID_CONF_ENV # type: ignore[no-redef]

src/logging_levels.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
"""Custom logging levels.
2+
3+
Adds a TRACE level below DEBUG for very verbose payload logging.
4+
"""
5+
6+
from __future__ import annotations
7+
import logging
8+
9+
TRACE_LEVEL = 5
10+
11+
# Register level name only once (idempotent)
12+
if not hasattr(logging, "TRACE"):
13+
logging.addLevelName(TRACE_LEVEL, "TRACE")
14+
15+
def trace(self: logging.Logger, message: str, *args, **kws): # type: ignore[override]
16+
if self.isEnabledFor(TRACE_LEVEL):
17+
self._log(TRACE_LEVEL, message, args, **kws) # pylint: disable=protected-access
18+
19+
logging.Logger.trace = trace # type: ignore[attr-defined]
20+
21+
__all__ = ["TRACE_LEVEL"]

src/writer_eventbridge.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@
2626
import boto3
2727
from botocore.exceptions import BotoCoreError, ClientError
2828

29+
# Ensure TRACE level is registered
30+
from . import logging_levels # noqa: F401
31+
from .logging_levels import TRACE_LEVEL
32+
2933
STATE: Dict[str, Any] = {"logger": logging.getLogger(__name__), "event_bus_arn": "", "client": None}
3034

3135

@@ -68,6 +72,15 @@ def write(topic_name: str, message: Dict[str, Any]) -> Tuple[bool, Optional[str]
6872
logger.debug("EventBridge client not initialized - skipping")
6973
return True, None
7074

75+
# TRACE-level payload logging
76+
if logger.isEnabledFor(TRACE_LEVEL):
77+
try:
78+
logger.trace( # type: ignore[attr-defined]
79+
"EventBridge payload topic=%s payload=%s", topic_name, json.dumps(message, separators=(",", ":"))
80+
)
81+
except Exception: # pragma: no cover - defensive serialization guard
82+
logger.trace("EventBridge payload topic=%s <unserializable>", topic_name) # type: ignore[attr-defined]
83+
7184
try:
7285
logger.debug("Sending to eventBridge %s", topic_name)
7386
response = client.put_events(

src/writer_kafka.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@
2626

2727
from confluent_kafka import Producer
2828

29+
# Add TRACE level import
30+
from .logging_levels import TRACE_LEVEL # type: ignore
31+
2932
try: # KafkaException may not exist in stubbed test module
3033
from confluent_kafka import KafkaException # type: ignore
3134
except (ImportError, ModuleNotFoundError): # pragma: no cover - fallback for test stub
@@ -91,6 +94,15 @@ def write(topic_name: str, message: Dict[str, Any]) -> Tuple[bool, Optional[str]
9194
logger.debug("Kafka producer not initialized - skipping")
9295
return True, None
9396

97+
# TRACE-level payload logging prior to produce
98+
if logger.isEnabledFor(TRACE_LEVEL):
99+
try:
100+
logger.trace( # type: ignore[attr-defined]
101+
"Kafka payload topic=%s payload=%s", topic_name, json.dumps(message, separators=(",", ":"))
102+
)
103+
except Exception: # pragma: no cover - defensive
104+
logger.trace("Kafka payload topic=%s <unserializable>", topic_name) # type: ignore[attr-defined]
105+
94106
errors: list[Any] = []
95107
try:
96108
logger.debug("Sending to kafka %s", topic_name)

src/writer_postgres.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@
3131
except ImportError: # pragma: no cover - environment without psycopg2
3232
psycopg2 = None # type: ignore
3333

34+
# Ensure TRACE level is registered
35+
from .logging_levels import TRACE_LEVEL # type: ignore
36+
3437
# Define a unified psycopg2 error base for safe exception handling even if psycopg2 missing
3538
if psycopg2 is not None: # type: ignore
3639
try: # pragma: no cover - attribute presence depends on installed psycopg2 variant
@@ -271,6 +274,15 @@ def write(topic_name: str, message: Dict[str, Any]) -> Tuple[bool, Optional[str]
271274
_logger.debug("psycopg2 not available - skipping actual Postgres write")
272275
return True, None
273276

277+
# TRACE-level payload logging (only when we intend to write)
278+
if _logger.isEnabledFor(TRACE_LEVEL):
279+
try:
280+
_logger.trace( # type: ignore[attr-defined]
281+
"Postgres payload topic=%s payload=%s", topic_name, json.dumps(message, separators=(",", ":"))
282+
)
283+
except Exception: # pragma: no cover - defensive
284+
_logger.trace("Postgres payload topic=%s <unserializable>", topic_name) # type: ignore[attr-defined]
285+
274286
with psycopg2.connect( # type: ignore[attr-defined]
275287
database=POSTGRES["database"],
276288
host=POSTGRES["host"],

tests/test_trace_logging.py

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
import logging
2+
from unittest.mock import MagicMock
3+
4+
from src.logging_levels import TRACE_LEVEL
5+
import src.writer_eventbridge as we
6+
import src.writer_kafka as wk
7+
import src.writer_postgres as wp
8+
9+
10+
def test_trace_eventbridge(caplog):
11+
logger = logging.getLogger("trace.eventbridge")
12+
logger.setLevel(TRACE_LEVEL)
13+
we.STATE["logger"] = logger
14+
we.STATE["event_bus_arn"] = "arn:aws:events:region:acct:event-bus/test"
15+
mock_client = MagicMock()
16+
mock_client.put_events.return_value = {"FailedEntryCount": 0, "Entries": []}
17+
we.STATE["client"] = mock_client
18+
caplog.set_level(TRACE_LEVEL)
19+
ok, err = we.write("topic.eb", {"k": 1})
20+
assert ok and err is None
21+
assert any("EventBridge payload" in rec.message for rec in caplog.records)
22+
23+
24+
def test_trace_kafka(caplog):
25+
class FakeProducer:
26+
def produce(self, *a, **kw):
27+
cb = kw.get("callback")
28+
if cb:
29+
cb(None, object())
30+
31+
def flush(self, *a, **kw): # noqa: D401
32+
return 0
33+
34+
logger = logging.getLogger("trace.kafka")
35+
logger.setLevel(TRACE_LEVEL)
36+
wk.STATE["logger"] = logger
37+
wk.STATE["producer"] = FakeProducer()
38+
caplog.set_level(TRACE_LEVEL)
39+
ok, err = wk.write("topic.kf", {"k": 2})
40+
assert ok and err is None
41+
assert any("Kafka payload" in rec.message for rec in caplog.records)
42+
43+
44+
def test_trace_postgres(caplog, monkeypatch):
45+
# Prepare dummy psycopg2 connection machinery
46+
store = []
47+
48+
class DummyCursor:
49+
def execute(self, sql, params):
50+
store.append((sql, params))
51+
52+
def __enter__(self):
53+
return self
54+
55+
def __exit__(self, exc_type, exc, tb):
56+
return False
57+
58+
class DummyConnection:
59+
def cursor(self):
60+
return DummyCursor()
61+
62+
def commit(self):
63+
pass
64+
65+
def __enter__(self):
66+
return self
67+
68+
def __exit__(self, exc_type, exc, tb):
69+
return False
70+
71+
class DummyPsycopg2:
72+
def connect(self, **kwargs): # noqa: D401
73+
return DummyConnection()
74+
75+
monkeypatch.setattr(wp, "psycopg2", DummyPsycopg2())
76+
77+
logger = logging.getLogger("trace.postgres")
78+
logger.setLevel(TRACE_LEVEL)
79+
wp._logger = logger # type: ignore
80+
wp.POSTGRES = {"database": "db", "host": "h", "user": "u", "password": "p", "port": 5432}
81+
82+
caplog.set_level(TRACE_LEVEL)
83+
message = {"event_id": "e", "tenant_id": "t", "source_app": "a", "environment": "dev", "timestamp": 1}
84+
ok, err = wp.write("public.cps.za.test", message)
85+
assert ok and err is None
86+
assert any("Postgres payload" in rec.message for rec in caplog.records)

0 commit comments

Comments
 (0)