Skip to content

Commit 176e02e

Browse files
Improve worker resilience
1 parent 0126451 commit 176e02e

File tree

4 files changed

+63
-4
lines changed

4 files changed

+63
-4
lines changed

docs/source/changelog.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ Changelog
88

99
- Python versions 3.7 and 3.8 are no longer supported.
1010
- tdlib 1.8.31.
11+
- Fix: Handle errors during updates processing
12+
- Fix: Handle queye full errors
1113

1214
[0.18.0] - 2023-03-13
1315

telegram/client.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -559,7 +559,14 @@ def _run_handlers(self, update: Dict[Any, Any]) -> None:
559559
update_type: str = update.get("@type", "unknown")
560560

561561
for handler in self._update_handlers[update_type]:
562-
self._workers_queue.put((handler, update), timeout=self._queue_put_timeout)
562+
try:
563+
self._workers_queue.put((handler, update), timeout=self._queue_put_timeout)
564+
except queue.Full:
565+
logger.error(
566+
"Queue is full, update %s dropped for handler %s",
567+
update_type,
568+
handler.__name__ if hasattr(handler, "__name__") else handler,
569+
)
563570

564571
def remove_update_handler(self, handler_type: str, func: Callable) -> None:
565572
"""

telegram/worker.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,12 @@ def _run_thread(self) -> None:
4141
except Empty:
4242
continue
4343

44-
handler(update)
45-
self._queue.task_done()
44+
try:
45+
handler(update)
46+
except Exception:
47+
logger.exception("Handler raised an exception")
48+
finally:
49+
self._queue.task_done()
4650

4751
def stop(self) -> None:
4852
self._is_enabled = False

tests/test_telegram_methods.py

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
import pytest
2+
import queue
3+
import time
24

35
from unittest.mock import patch
46

57
from telegram import VERSION
68
from telegram.utils import AsyncResult
79
from telegram.client import Telegram, MESSAGE_HANDLER_TYPE, AuthorizationState
810
from telegram.text import Spoiler
11+
from telegram.worker import SimpleWorker
912

1013
API_ID = 1
1114
API_HASH = "hash"
@@ -90,7 +93,10 @@ def test_parse_text_entities(self, telegram):
9093

9194
def test_send_phone_number_or_bot_token(self, telegram):
9295
# check that the dunction calls _send_phone_number or _send_bot_token
93-
with patch.object(telegram, "_send_phone_number"), patch.object(telegram, "_send_bot_token"):
96+
with (
97+
patch.object(telegram, "_send_phone_number"),
98+
patch.object(telegram, "_send_bot_token"),
99+
):
94100
telegram.phone = "123"
95101
telegram.bot_token = None
96102

@@ -478,3 +484,43 @@ def _get_async_result(data, request_id=None):
478484
assert state == telegram.authorization_state == AuthorizationState.READY
479485

480486
assert telegram._tdjson.send.call_count == 0
487+
488+
489+
class TestWorker:
490+
def test_worker_continues_after_handler_exception(self):
491+
"""Handler exceptions should not kill the worker thread and task_done must be called"""
492+
q = queue.Queue()
493+
worker = SimpleWorker(q)
494+
worker.run()
495+
496+
results = []
497+
498+
def failing_handler(update):
499+
raise ValueError("Handler failed")
500+
501+
def working_handler(update):
502+
results.append(update)
503+
504+
# Put two items: one with a failing handler, one with a working handler
505+
q.put((failing_handler, {"id": 1}))
506+
q.put((working_handler, {"id": 2}))
507+
508+
# Give the worker time to process both items
509+
time.sleep(1)
510+
511+
worker.stop()
512+
513+
assert results == [{"id": 2}]
514+
515+
def test_run_handlers_continues_on_queue_full(self, telegram):
516+
"""queue.Full should not crash the listener"""
517+
518+
def my_handler():
519+
pass
520+
521+
telegram.add_message_handler(my_handler)
522+
523+
# Mock the queue to always raise queue.Full
524+
with patch.object(telegram._workers_queue, "put", side_effect=queue.Full):
525+
# This should not raise an exception
526+
telegram._run_handlers({"@type": MESSAGE_HANDLER_TYPE})

0 commit comments

Comments
 (0)