Skip to content

Commit e71145c

Browse files
Fix string formatting error when logging slow processing (#156)
Fixes crash in #153 Co-authored-by: Vikram Patki <54442035+patkivikram@users.noreply.github.com>
1 parent df0856f commit e71145c

File tree

2 files changed

+34
-5
lines changed

2 files changed

+34
-5
lines changed

faust/transport/drivers/aiokafka.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -852,11 +852,14 @@ def _log_slow_processing_commit(self, msg: str, *args: Any) -> None:
852852
current_value=app.conf.broker_commit_livelock_soft_timeout,
853853
)
854854

855-
def _make_slow_processing_error(self, msg: str, causes: Iterable[str]) -> str:
855+
def _make_slow_processing_error(
856+
self, msg: str, causes: Iterable[str], setting: str, current_value: float
857+
) -> str:
856858
return " ".join(
857859
[
858860
msg,
859-
SLOW_PROCESSING_EXPLAINED,
861+
SLOW_PROCESSING_EXPLAINED
862+
% {"setting": setting, "current_value": current_value},
860863
text.enumeration(causes, start=2, sep="\n\n"),
861864
]
862865
)
@@ -870,10 +873,8 @@ def _log_slow_processing(
870873
current_value: float,
871874
) -> None:
872875
return self.log.error(
873-
self._make_slow_processing_error(msg, causes),
876+
self._make_slow_processing_error(msg, causes, setting, current_value),
874877
*args,
875-
setting=setting,
876-
current_value=current_value,
877878
)
878879

879880
async def position(self, tp: TP) -> Optional[int]:

tests/unit/transport/drivers/test_aiokafka.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import pytest
99
from aiokafka.errors import CommitFailedError, IllegalStateError, KafkaError
1010
from aiokafka.structs import OffsetAndMetadata, TopicPartition
11+
from mode.utils import text
1112
from mode.utils.futures import done_future
1213
from mode.utils.mocks import ANY, AsyncMock, MagicMock, Mock, call, patch
1314
from opentracing.ext import tags
@@ -18,6 +19,10 @@
1819
from faust.sensors.monitor import Monitor
1920
from faust.transport.drivers import aiokafka as mod
2021
from faust.transport.drivers.aiokafka import (
22+
SLOW_PROCESSING_CAUSE_AGENT,
23+
SLOW_PROCESSING_CAUSE_STREAM,
24+
SLOW_PROCESSING_EXPLAINED,
25+
SLOW_PROCESSING_STREAM_IDLE_SINCE_START,
2126
TOPIC_LENGTH_MAX,
2227
AIOKafkaConsumerThread,
2328
Consumer,
@@ -386,6 +391,29 @@ def test_state(self, *, cthread, now):
386391
assert cthread.time_started == now
387392

388393

394+
class Test_Log_Slow_Processing(Test_verify_event_path_base):
395+
def test_log_slow_processing_stream(
396+
self, cthread: AIOKafkaConsumerThread, tp: TP, logger
397+
):
398+
cthread._log_slow_processing_stream(
399+
SLOW_PROCESSING_STREAM_IDLE_SINCE_START, tp, "3 seconds ago"
400+
)
401+
logger.error.assert_called_with(
402+
SLOW_PROCESSING_STREAM_IDLE_SINCE_START
403+
+ " "
404+
+ SLOW_PROCESSING_EXPLAINED
405+
% {"setting": "stream_processing_timeout", "current_value": 300.0}
406+
+ " "
407+
+ text.enumeration(
408+
[SLOW_PROCESSING_CAUSE_STREAM, SLOW_PROCESSING_CAUSE_AGENT],
409+
start=2,
410+
sep="\n\n",
411+
),
412+
tp,
413+
"3 seconds ago",
414+
)
415+
416+
389417
@pytest.mark.skip("Needs fixing")
390418
class Test_VEP_no_fetch_since_start(Test_verify_event_path_base):
391419
def test_just_started(self, *, cthread, now, tp, logger):

0 commit comments

Comments
 (0)