Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/source/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ Changelog

- Python versions 3.7 and 3.8 are no longer supported.
- tdlib 1.8.31.
- Fix: Handle errors during updates processing
- Fix: Handle queue full errors

[0.18.0] - 2023-03-13

Expand Down
9 changes: 8 additions & 1 deletion telegram/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,14 @@ def _run_handlers(self, update: Dict[Any, Any]) -> None:
update_type: str = update.get("@type", "unknown")

for handler in self._update_handlers[update_type]:
self._workers_queue.put((handler, update), timeout=self._queue_put_timeout)
try:
self._workers_queue.put((handler, update), timeout=self._queue_put_timeout)
except queue.Full:
logger.error(
"Queue is full, update %s dropped for handler %s",
update_type,
handler.__name__ if hasattr(handler, "__name__") else handler,
)

def remove_update_handler(self, handler_type: str, func: Callable) -> None:
"""
Expand Down
8 changes: 6 additions & 2 deletions telegram/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,12 @@ def _run_thread(self) -> None:
except Empty:
continue

handler(update)
self._queue.task_done()
try:
handler(update)
except Exception:
logger.exception("Handler raised an exception")
finally:
self._queue.task_done()

def stop(self) -> None:
self._is_enabled = False
Expand Down
51 changes: 49 additions & 2 deletions tests/test_telegram_methods.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import pytest
import queue
import time

from unittest.mock import patch

from telegram import VERSION
from telegram.utils import AsyncResult
from telegram.client import Telegram, MESSAGE_HANDLER_TYPE, AuthorizationState
from telegram.text import Spoiler
from telegram.worker import SimpleWorker

API_ID = 1
API_HASH = "hash"
Expand Down Expand Up @@ -89,8 +92,11 @@ def test_parse_text_entities(self, telegram):
telegram._tdjson.send.assert_called_once_with(exp_data)

def test_send_phone_number_or_bot_token(self, telegram):
# check that the dunction calls _send_phone_number or _send_bot_token
with patch.object(telegram, "_send_phone_number"), patch.object(telegram, "_send_bot_token"):
# check that the function calls _send_phone_number or _send_bot_token
with (
patch.object(telegram, "_send_phone_number"),
patch.object(telegram, "_send_bot_token"),
):
telegram.phone = "123"
telegram.bot_token = None

Expand Down Expand Up @@ -478,3 +484,44 @@ def _get_async_result(data, request_id=None):
assert state == telegram.authorization_state == AuthorizationState.READY

assert telegram._tdjson.send.call_count == 0


class TestWorker:
def test_worker_continues_after_handler_exception(self):
"""Handler exceptions should not kill the worker thread and task_done must be called"""
q = queue.Queue()
worker = SimpleWorker(q)
worker.run()

results = []

def failing_handler(update):
raise ValueError("Handler failed")

def working_handler(update):
results.append(update)

# Put two items: one with a failing handler, one with a working handler
q.put((failing_handler, {"id": 1}))
q.put((working_handler, {"id": 2}))

# Give the worker time to process both items.
# Can't use join when the test fails.
time.sleep(1)

worker.stop()

assert results == [{"id": 2}]
Comment on lines +490 to +514
Copy link

Copilot AI Dec 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test claims to verify that task_done is called, but it doesn't actually assert this behavior. While the test checks that the worker continues processing after an exception (by verifying the second handler runs), it doesn't explicitly verify that task_done was called for the failing handler. Consider using a mock or spy on the queue to verify task_done is called twice (once for each item).

Copilot uses AI. Check for mistakes.

def test_run_handlers_continues_on_queue_full(self, telegram):
"""queue.Full should not crash the listener"""

def my_handler():
pass

telegram.add_message_handler(my_handler)

# Mock the queue to always raise queue.Full
with patch.object(telegram._workers_queue, "put", side_effect=queue.Full):
# This should not raise an exception
telegram._run_handlers({"@type": MESSAGE_HANDLER_TYPE})
Loading