diff --git a/DEVELOPER.md b/DEVELOPER.md index aa55859..22350a5 100644 --- a/DEVELOPER.md +++ b/DEVELOPER.md @@ -28,7 +28,7 @@ cd EventGate ```shell python3 -m venv .venv source .venv/bin/activate -pip install -r requirements.txt +pip3 install -r requirements.txt ``` ## Run Pylint Tool Locally diff --git a/requirements.txt b/requirements.txt index ac326f0..01e824e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,6 +11,6 @@ jsonschema==4.25.1 PyJWT==2.10.1 requests==2.32.5 boto3==1.40.25 -confluent-kafka==2.11.1 +confluent-kafka==2.12.1 # psycopg2-binary==2.9.10 # Ideal for local development, but not for long-term production use psycopg2==2.9.10 diff --git a/src/writer_kafka.py b/src/writer_kafka.py index 692aeaa..6151776 100644 --- a/src/writer_kafka.py +++ b/src/writer_kafka.py @@ -22,8 +22,8 @@ import json import logging import os +import time from typing import Any, Dict, Optional, Tuple - from confluent_kafka import Producer try: # KafkaException may not exist in stubbed test module @@ -35,8 +35,10 @@ class KafkaException(Exception): # type: ignore STATE: Dict[str, Any] = {"logger": logging.getLogger(__name__), "producer": None} -# Configurable flush timeout (seconds) to avoid hanging indefinitely -_KAFKA_FLUSH_TIMEOUT_SEC = float(os.environ.get("KAFKA_FLUSH_TIMEOUT", "5")) +# Configurable flush timeouts and retries via env variables to avoid hanging indefinitely +_KAFKA_FLUSH_TIMEOUT_SEC = float(os.environ.get("KAFKA_FLUSH_TIMEOUT", "7")) +_MAX_RETRIES = int(os.environ.get("KAFKA_FLUSH_RETRIES", "3")) +_RETRY_BACKOFF_SEC = float(os.environ.get("KAFKA_RETRY_BACKOFF", "0.5")) def init(logger: logging.Logger, config: Dict[str, Any]) -> None: @@ -86,12 +88,14 @@ def write(topic_name: str, message: Dict[str, Any]) -> Tuple[bool, Optional[str] """ logger = STATE["logger"] producer: Optional[Producer] = STATE.get("producer") # type: ignore[assignment] - if producer is None: logger.debug("Kafka producer not initialized - skipping") return True, None - errors: list[Any] = [] + errors: list[str] = [] + has_exception = False + + # Produce step try: logger.debug("Sending to kafka %s", topic_name) producer.produce( @@ -100,23 +104,51 @@ def write(topic_name: str, message: Dict[str, Any]) -> Tuple[bool, Optional[str] value=json.dumps(message).encode("utf-8"), callback=lambda err, msg: (errors.append(str(err)) if err is not None else None), ) + except KafkaException as e: + errors.append(f"Produce exception: {e}") + has_exception = True + + # Flush step (always attempted) + remaining: Optional[int] = None + for attempt in range(1, _MAX_RETRIES + 1): try: - remaining = producer.flush(_KAFKA_FLUSH_TIMEOUT_SEC) # type: ignore[arg-type] - except TypeError: # Fallback for stub producers without timeout parameter - remaining = producer.flush() # type: ignore[call-arg] - # remaining can be number of undelivered messages (confluent_kafka returns int) - if not errors and isinstance(remaining, int) and remaining > 0: - timeout_msg = f"Kafka flush timeout after {_KAFKA_FLUSH_TIMEOUT_SEC}s: {remaining} message(s) still pending" - logger.error(timeout_msg) - return False, timeout_msg - except KafkaException as e: # narrow exception capture - err_msg = f"The Kafka writer failed with unknown error: {str(e)}" - logger.exception(err_msg) - return False, err_msg + remaining = flush_with_timeout(producer, _KAFKA_FLUSH_TIMEOUT_SEC) + except KafkaException as e: + errors.append(f"Flush exception: {e}") + has_exception = True + + # Treat None (flush returns None in some stubs) as success equivalent to 0 pending + if (remaining is None or remaining == 0) and not errors: + break + if attempt < _MAX_RETRIES: + logger.warning("Kafka flush pending (%s message(s) remain) attempt %d/%d", remaining, attempt, _MAX_RETRIES) + time.sleep(_RETRY_BACKOFF_SEC) + + # Warn if messages still pending after retries + if isinstance(remaining, int) and remaining > 0: + logger.warning( + "Kafka flush timeout after %ss: %d message(s) still pending", _KAFKA_FLUSH_TIMEOUT_SEC, remaining + ) if errors: - msg = "; ".join(errors) - logger.error(msg) - return False, msg + failure_text = "Kafka writer failed: " + "; ".join(errors) + (logger.exception if has_exception else logger.error)(failure_text) + return False, failure_text return True, None + + +def flush_with_timeout(producer, timeout: float) -> Optional[int]: + """Flush the Kafka producer with a timeout, handling TypeError for stubs. + + Args: + producer: Kafka Producer instance. + timeout: Timeout in seconds. + Returns: + Number of messages still pending after the flush call (0 all messages delivered). + None is returned only if the underlying (stub/mock) producer.flush() does not provide a count. + """ + try: + return producer.flush(timeout) + except TypeError: # Fallback for stub producers without timeout parameter + return producer.flush() diff --git a/tests/test_writer_kafka.py b/tests/test_writer_kafka.py index 6017b3e..bb880bb 100644 --- a/tests/test_writer_kafka.py +++ b/tests/test_writer_kafka.py @@ -1,4 +1,3 @@ -import json import logging from types import SimpleNamespace import src.writer_kafka as wk @@ -23,6 +22,44 @@ def produce(self, topic, key, value, callback): # noqa: D401 callback("ERR", None) +class FakeProducerFlushSequence(FakeProducerSuccess): + def __init__(self, sequence): # sequence of remaining counts per flush call + super().__init__() + self.sequence = sequence + self.flush_calls = 0 + + def flush(self, *a, **kw): + # Simulate decreasing remaining messages + if self.flush_calls < len(self.sequence): + val = self.sequence[self.flush_calls] + else: + val = self.sequence[-1] + self.flush_calls += 1 + return val + + +class FakeProducerTimeout(FakeProducerSuccess): + def __init__(self, remaining_value): + super().__init__() + self.remaining_value = remaining_value + self.flush_calls = 0 + + def flush(self, *a, **kw): # always returns same remaining >0 to force timeout warning + self.flush_calls += 1 + return self.remaining_value + + +class FakeProducerTypeError(FakeProducerSuccess): + def __init__(self): + super().__init__() + self.flush_calls = 0 + + # Intentionally omit timeout parameter causing TypeError on first attempt inside flush_with_timeout + def flush(self): # noqa: D401 + self.flush_calls += 1 + return 0 + + def test_write_skips_when_producer_none(monkeypatch): wk.STATE["logger"] = logging.getLogger("test") wk.STATE["producer"] = None @@ -60,3 +97,45 @@ def produce(self, *a, **kw): # noqa: D401 wk.STATE["producer"] = RaisingProducer() ok, err = wk.write("topic", {"d": 4}) assert not ok and "boom" in err + + +def test_write_flush_retries_until_success(monkeypatch, caplog): + wk.STATE["logger"] = logging.getLogger("test") + caplog.set_level(logging.WARNING) + # Force smaller max retries for deterministic sequence length + monkeypatch.setattr(wk, "_MAX_RETRIES", 5, raising=False) + producer = FakeProducerFlushSequence([5, 4, 3, 1, 0]) + wk.STATE["producer"] = producer + ok, err = wk.write("topic", {"e": 5}) + assert ok and err is None + # It should break as soon as remaining == 0 (after flush call returning 0) + assert producer.flush_calls == 5 # sequence consumed until 0 + # Warnings logged for attempts before success (flush_calls -1) because last attempt didn't warn + warn_messages = [r.message for r in caplog.records if r.levelno == logging.WARNING] + assert any("attempt 1" in m or "attempt 2" in m for m in warn_messages) + + +def test_write_timeout_warning_when_remaining_after_retries(monkeypatch, caplog): + wk.STATE["logger"] = logging.getLogger("test") + caplog.set_level(logging.WARNING) + monkeypatch.setattr(wk, "_MAX_RETRIES", 3, raising=False) + producer = FakeProducerTimeout(2) + wk.STATE["producer"] = producer + ok, err = wk.write("topic", {"f": 6}) + timeout_warnings = [ + r.message for r in caplog.records if "timeout" in r.message + ] # final warning should mention timeout + assert ok and err is None # function returns success even if timeout warning + assert timeout_warnings, "Expected timeout warning logged" + assert producer.flush_calls == 3 # retried 3 times + + +def test_flush_with_timeout_typeerror_fallback(monkeypatch): + wk.STATE["logger"] = logging.getLogger("test") + monkeypatch.setattr(wk, "_MAX_RETRIES", 4, raising=False) + producer = FakeProducerTypeError() + wk.STATE["producer"] = producer + ok, err = wk.write("topic", {"g": 7}) + assert ok and err is None + # Since flush returns 0 immediately, only one flush call should be needed + assert producer.flush_calls == 1