Skip to content

Commit 46e2f94

Browse files
committed
Added support for Pub/Sub in cluster
1 parent 0de0f4d commit 46e2f94

File tree

2 files changed

+14
-3
lines changed

2 files changed

+14
-3
lines changed

redis/client.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -721,7 +721,7 @@ def __init__(
721721
# to lookup channel and pattern names for callback handlers.
722722
self.encoder = encoder
723723
self.push_handler_func = push_handler_func
724-
self._event_dispatcher = event_dispatcher
724+
self.event_dispatcher = event_dispatcher
725725
self._lock = threading.Lock()
726726
if self.encoder is None:
727727
self.encoder = self.connection_pool.get_encoder()
@@ -813,7 +813,7 @@ def execute_command(self, *args):
813813
self.connection.register_connect_callback(self.on_connect)
814814
if self.push_handler_func is not None and not HIREDIS_AVAILABLE:
815815
self.connection._parser.set_pubsub_push_handler(self.push_handler_func)
816-
self._event_dispatcher.dispatch(
816+
self.event_dispatcher.dispatch(
817817
AfterPubSubConnectionInstantiationEvent(
818818
self.connection,
819819
self.connection_pool,

redis/cluster.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@
1515
from redis.commands.helpers import list_or_args
1616
from redis.connection import ConnectionPool, DefaultParser, parse_url
1717
from redis.crc import REDIS_CLUSTER_HASH_SLOTS, key_slot
18-
from redis.event import EventDispatcher, EventDispatcherInterface, AfterPooledConnectionsInstantiationEvent, ClientType
18+
from redis.event import EventDispatcher, EventDispatcherInterface, AfterPooledConnectionsInstantiationEvent, ClientType, \
19+
AfterPubSubConnectionInstantiationEvent
1920
from redis.exceptions import (
2021
AskError,
2122
AuthenticationError,
@@ -1714,6 +1715,7 @@ def __init__(
17141715
host=None,
17151716
port=None,
17161717
push_handler_func=None,
1718+
event_dispatcher: Optional["EventDispatcher"] = EventDispatcher(),
17171719
**kwargs,
17181720
):
17191721
"""
@@ -1743,6 +1745,7 @@ def __init__(
17431745
connection_pool=connection_pool,
17441746
encoder=redis_cluster.encoder,
17451747
push_handler_func=push_handler_func,
1748+
event_dispatcher=event_dispatcher,
17461749
**kwargs,
17471750
)
17481751

@@ -1829,6 +1832,14 @@ def execute_command(self, *args):
18291832
self.connection.register_connect_callback(self.on_connect)
18301833
if self.push_handler_func is not None and not HIREDIS_AVAILABLE:
18311834
self.connection._parser.set_pubsub_push_handler(self.push_handler_func)
1835+
self.event_dispatcher.dispatch(
1836+
AfterPubSubConnectionInstantiationEvent(
1837+
self.connection,
1838+
self.connection_pool,
1839+
ClientType.SYNC,
1840+
self._lock
1841+
)
1842+
)
18321843
connection = self.connection
18331844
self._execute(connection, connection.send_command, *args)
18341845

0 commit comments

Comments
 (0)