Skip to content

Commit a404ad3

Browse files
adeoraAbhimanyu Deora
andauthored
Add optional exception handler to PubSubWorkerThread (#1395)
Add optional exception handler to PubSubWorkerThread Co-authored-by: Abhimanyu Deora <[email protected]>
1 parent 15dafb1 commit a404ad3

File tree

3 files changed

+56
-5
lines changed

3 files changed

+56
-5
lines changed

README.rst

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -732,6 +732,20 @@ subscribed to patterns or channels that don't have message handlers attached.
732732
# when it's time to shut it down...
733733
>>> thread.stop()
734734
735+
`run_in_thread` also supports an optional exception handler, which lets you
736+
catch exceptions that occur within the worker thread and handle them
737+
appropriately. The exception handler will take as arguments the exception
738+
itself, the pubsub object, and the worker thread returned by `run_in_thread`.
739+
740+
.. code-block:: pycon
741+
>>> p.subscribe(**{'my-channel': my_handler})
742+
>>> def exception_handler(ex, pubsub, thread):
743+
>>> print(ex)
744+
>>> thread.stop()
745+
>>> thread.join(timeout=1.0)
746+
>>> pubsub.close()
747+
>>> thread = p.run_in_thread(exception_handler=exception_handler)
748+
735749
A PubSub object adheres to the same encoding semantics as the client instance
736750
it was created from. Any channel or pattern that's unicode will be encoded
737751
using the `charset` specified on the client before being sent to Redis. If the

redis/client.py

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3803,7 +3803,8 @@ def handle_message(self, response, ignore_subscribe_messages=False):
38033803

38043804
return message
38053805

3806-
def run_in_thread(self, sleep_time=0, daemon=False):
3806+
def run_in_thread(self, sleep_time=0, daemon=False,
3807+
exception_handler=None):
38073808
for channel, handler in self.channels.items():
38083809
if handler is None:
38093810
raise PubSubError("Channel: '%s' has no handler registered" %
@@ -3813,17 +3814,24 @@ def run_in_thread(self, sleep_time=0, daemon=False):
38133814
raise PubSubError("Pattern: '%s' has no handler registered" %
38143815
pattern)
38153816

3816-
thread = PubSubWorkerThread(self, sleep_time, daemon=daemon)
3817+
thread = PubSubWorkerThread(
3818+
self,
3819+
sleep_time,
3820+
daemon=daemon,
3821+
exception_handler=exception_handler
3822+
)
38173823
thread.start()
38183824
return thread
38193825

38203826

38213827
class PubSubWorkerThread(threading.Thread):
3822-
def __init__(self, pubsub, sleep_time, daemon=False):
3828+
def __init__(self, pubsub, sleep_time, daemon=False,
3829+
exception_handler=None):
38233830
super().__init__()
38243831
self.daemon = daemon
38253832
self.pubsub = pubsub
38263833
self.sleep_time = sleep_time
3834+
self.exception_handler = exception_handler
38273835
self._running = threading.Event()
38283836

38293837
def run(self):
@@ -3833,8 +3841,13 @@ def run(self):
38333841
pubsub = self.pubsub
38343842
sleep_time = self.sleep_time
38353843
while self._running.is_set():
3836-
pubsub.get_message(ignore_subscribe_messages=True,
3837-
timeout=sleep_time)
3844+
try:
3845+
pubsub.get_message(ignore_subscribe_messages=True,
3846+
timeout=sleep_time)
3847+
except BaseException as e:
3848+
if self.exception_handler is None:
3849+
raise
3850+
self.exception_handler(e, pubsub, self)
38383851
pubsub.close()
38393852

38403853
def stop(self):

tests/test_pubsub.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
import pytest
2+
import threading
23
import time
34

5+
from unittest import mock
6+
47
import redis
58
from redis.exceptions import ConnectionError
69

@@ -543,3 +546,24 @@ def test_get_message_with_timeout_returns_none(self, r):
543546
p.subscribe('foo')
544547
assert wait_for_message(p) == make_message('subscribe', 'foo', 1)
545548
assert p.get_message(timeout=0.01) is None
549+
550+
551+
class TestPubSubWorkerThread:
552+
def test_pubsub_worker_thread_exception_handler(self, r):
553+
event = threading.Event()
554+
555+
def exception_handler(ex, pubsub, thread):
556+
thread.stop()
557+
event.set()
558+
559+
p = r.pubsub()
560+
p.subscribe(**{'foo': lambda m: m})
561+
with mock.patch.object(p, 'get_message',
562+
side_effect=Exception('error')):
563+
pubsub_thread = p.run_in_thread(
564+
exception_handler=exception_handler
565+
)
566+
567+
assert event.wait(timeout=1.0)
568+
pubsub_thread.join(timeout=1.0)
569+
assert not pubsub_thread.is_alive()

0 commit comments

Comments
 (0)