2929from  redis .asyncio .connection  import  Connection , DefaultParser , SSLConnection , parse_url 
3030from  redis .asyncio .lock  import  Lock 
3131from  redis .asyncio .retry  import  Retry 
32+ from  redis .auth .token  import  TokenInterface 
3233from  redis .backoff  import  default_backoff 
3334from  redis .client  import  EMPTY_RESPONSE , NEVER_DECODE , AbstractRedis 
3435from  redis .cluster  import  (
4546from  redis .commands  import  READ_COMMANDS , AsyncRedisClusterCommands 
4647from  redis .crc  import  REDIS_CLUSTER_HASH_SLOTS , key_slot 
4748from  redis .credentials  import  CredentialProvider 
49+ from  redis .event  import  AfterAsyncClusterInstantiationEvent , EventDispatcher 
4850from  redis .exceptions  import  (
4951    AskError ,
5052    BusyLoadingError ,
5759    MaxConnectionsError ,
5860    MovedError ,
5961    RedisClusterException ,
62+     RedisError ,
6063    ResponseError ,
6164    SlotNotCoveredError ,
6265    TimeoutError ,
@@ -279,6 +282,7 @@ def __init__(
279282        ssl_ciphers : Optional [str ] =  None ,
280283        protocol : Optional [int ] =  2 ,
281284        address_remap : Optional [Callable [[Tuple [str , int ]], Tuple [str , int ]]] =  None ,
285+         event_dispatcher : Optional [EventDispatcher ] =  None ,
282286    ) ->  None :
283287        if  db :
284288            raise  RedisClusterException (
@@ -375,12 +379,18 @@ def __init__(
375379        if  host  and  port :
376380            startup_nodes .append (ClusterNode (host , port , ** self .connection_kwargs ))
377381
382+         if  event_dispatcher  is  None :
383+             self ._event_dispatcher  =  EventDispatcher ()
384+         else :
385+             self ._event_dispatcher  =  event_dispatcher 
386+ 
378387        self .nodes_manager  =  NodesManager (
379388            startup_nodes ,
380389            require_full_coverage ,
381390            kwargs ,
382391            dynamic_startup_nodes = dynamic_startup_nodes ,
383392            address_remap = address_remap ,
393+             event_dispatcher = self ._event_dispatcher ,
384394        )
385395        self .encoder  =  Encoder (encoding , encoding_errors , decode_responses )
386396        self .read_from_replicas  =  read_from_replicas 
@@ -939,6 +949,8 @@ class ClusterNode:
939949    __slots__  =  (
940950        "_connections" ,
941951        "_free" ,
952+         "_lock" ,
953+         "_event_dispatcher" ,
942954        "connection_class" ,
943955        "connection_kwargs" ,
944956        "host" ,
@@ -976,6 +988,9 @@ def __init__(
976988
977989        self ._connections : List [Connection ] =  []
978990        self ._free : Deque [Connection ] =  collections .deque (maxlen = self .max_connections )
991+         self ._event_dispatcher  =  self .connection_kwargs .get ("event_dispatcher" , None )
992+         if  self ._event_dispatcher  is  None :
993+             self ._event_dispatcher  =  EventDispatcher ()
979994
980995    def  __repr__ (self ) ->  str :
981996        return  (
@@ -1092,10 +1107,38 @@ async def execute_pipeline(self, commands: List["PipelineCommand"]) -> bool:
10921107
10931108        return  ret 
10941109
1110+     async  def  re_auth_callback (self , token : TokenInterface ):
1111+         tmp_queue  =  collections .deque ()
1112+         while  self ._free :
1113+             conn  =  self ._free .popleft ()
1114+             await  conn .retry .call_with_retry (
1115+                 lambda : conn .send_command (
1116+                     "AUTH" , token .try_get ("oid" ), token .get_value ()
1117+                 ),
1118+                 lambda  error : self ._mock (error ),
1119+             )
1120+             await  conn .retry .call_with_retry (
1121+                 lambda : conn .read_response (), lambda  error : self ._mock (error )
1122+             )
1123+             tmp_queue .append (conn )
1124+ 
1125+         while  tmp_queue :
1126+             conn  =  tmp_queue .popleft ()
1127+             self ._free .append (conn )
1128+ 
1129+     async  def  _mock (self , error : RedisError ):
1130+         """ 
1131+         Dummy functions, needs to be passed as error callback to retry object. 
1132+         :param error: 
1133+         :return: 
1134+         """ 
1135+         pass 
1136+ 
10951137
10961138class  NodesManager :
10971139    __slots__  =  (
10981140        "_moved_exception" ,
1141+         "_event_dispatcher" ,
10991142        "connection_kwargs" ,
11001143        "default_node" ,
11011144        "nodes_cache" ,
@@ -1114,6 +1157,7 @@ def __init__(
11141157        connection_kwargs : Dict [str , Any ],
11151158        dynamic_startup_nodes : bool  =  True ,
11161159        address_remap : Optional [Callable [[Tuple [str , int ]], Tuple [str , int ]]] =  None ,
1160+         event_dispatcher : Optional [EventDispatcher ] =  None ,
11171161    ) ->  None :
11181162        self .startup_nodes  =  {node .name : node  for  node  in  startup_nodes }
11191163        self .require_full_coverage  =  require_full_coverage 
@@ -1126,6 +1170,10 @@ def __init__(
11261170        self .slots_cache : Dict [int , List ["ClusterNode" ]] =  {}
11271171        self .read_load_balancer  =  LoadBalancer ()
11281172        self ._moved_exception : MovedError  =  None 
1173+         if  event_dispatcher  is  None :
1174+             self ._event_dispatcher  =  EventDispatcher ()
1175+         else :
1176+             self ._event_dispatcher  =  event_dispatcher 
11291177
11301178    def  get_node (
11311179        self ,
@@ -1243,6 +1291,12 @@ async def initialize(self) -> None:
12431291            try :
12441292                # Make sure cluster mode is enabled on this node 
12451293                try :
1294+                     self ._event_dispatcher .dispatch (
1295+                         AfterAsyncClusterInstantiationEvent (
1296+                             self .nodes_cache ,
1297+                             self .connection_kwargs .get ("credential_provider" , None ),
1298+                         )
1299+                     )
12461300                    cluster_slots  =  await  startup_node .execute_command ("CLUSTER SLOTS" )
12471301                except  ResponseError :
12481302                    raise  RedisClusterException (
0 commit comments