1
1
import asyncio
2
2
import socket
3
3
import sys
4
+ from logging import getLogger
4
5
from typing import Callable , List , Optional , TypedDict , Union
5
6
6
7
if sys .version_info .major >= 3 and sys .version_info .minor >= 11 :
11
12
from ..exceptions import ConnectionError , InvalidResponse , RedisError
12
13
from ..typing import EncodableT
13
14
from ..utils import HIREDIS_AVAILABLE
14
- from .base import AsyncBaseParser , BaseParser
15
+ from .base import (
16
+ AsyncBaseParser ,
17
+ AsyncPushNotificationsParser ,
18
+ BaseParser ,
19
+ PushNotificationsParser ,
20
+ )
15
21
from .socket import (
16
22
NONBLOCKING_EXCEPTION_ERROR_NUMBERS ,
17
23
NONBLOCKING_EXCEPTIONS ,
@@ -32,21 +38,29 @@ class _HiredisReaderArgs(TypedDict, total=False):
32
38
errors : Optional [str ]
33
39
34
40
35
- class _HiredisParser (BaseParser ):
41
+ class _HiredisParser (BaseParser , PushNotificationsParser ):
36
42
"Parser class for connections using Hiredis"
37
43
38
44
def __init__ (self , socket_read_size ):
39
45
if not HIREDIS_AVAILABLE :
40
46
raise RedisError ("Hiredis is not installed" )
41
47
self .socket_read_size = socket_read_size
42
48
self ._buffer = bytearray (socket_read_size )
49
+ self .pubsub_push_handler_func = self .handle_pubsub_push_response
50
+ self .invalidation_push_handler_func = None
51
+ self ._hiredis_PushNotificationType = None
43
52
44
53
def __del__ (self ):
45
54
try :
46
55
self .on_disconnect ()
47
56
except Exception :
48
57
pass
49
58
59
+ def handle_pubsub_push_response (self , response ):
60
+ logger = getLogger ("push_response" )
61
+ logger .debug ("Push response: " + str (response ))
62
+ return response
63
+
50
64
def on_connect (self , connection , ** kwargs ):
51
65
import hiredis
52
66
@@ -64,6 +78,12 @@ def on_connect(self, connection, **kwargs):
64
78
self ._reader = hiredis .Reader (** kwargs )
65
79
self ._next_response = NOT_ENOUGH_DATA
66
80
81
+ try :
82
+ self ._hiredis_PushNotificationType = hiredis .PushNotification
83
+ except AttributeError :
84
+ # hiredis < 3.2
85
+ self ._hiredis_PushNotificationType = None
86
+
67
87
def on_disconnect (self ):
68
88
self ._sock = None
69
89
self ._reader = None
@@ -109,14 +129,24 @@ def read_from_socket(self, timeout=SENTINEL, raise_on_timeout=True):
109
129
if custom_timeout :
110
130
sock .settimeout (self ._socket_timeout )
111
131
112
- def read_response (self , disable_decoding = False ):
132
+ def read_response (self , disable_decoding = False , push_request = False ):
113
133
if not self ._reader :
114
134
raise ConnectionError (SERVER_CLOSED_CONNECTION_ERROR )
115
135
116
136
# _next_response might be cached from a can_read() call
117
137
if self ._next_response is not NOT_ENOUGH_DATA :
118
138
response = self ._next_response
119
139
self ._next_response = NOT_ENOUGH_DATA
140
+ if self ._hiredis_PushNotificationType is not None and isinstance (
141
+ response , self ._hiredis_PushNotificationType
142
+ ):
143
+ response = self .handle_push_response (response )
144
+ if not push_request :
145
+ return self .read_response (
146
+ disable_decoding = disable_decoding , push_request = push_request
147
+ )
148
+ else :
149
+ return response
120
150
return response
121
151
122
152
if disable_decoding :
@@ -135,6 +165,16 @@ def read_response(self, disable_decoding=False):
135
165
# happened
136
166
if isinstance (response , ConnectionError ):
137
167
raise response
168
+ elif self ._hiredis_PushNotificationType is not None and isinstance (
169
+ response , self ._hiredis_PushNotificationType
170
+ ):
171
+ response = self .handle_push_response (response )
172
+ if not push_request :
173
+ return self .read_response (
174
+ disable_decoding = disable_decoding , push_request = push_request
175
+ )
176
+ else :
177
+ return response
138
178
elif (
139
179
isinstance (response , list )
140
180
and response
@@ -144,7 +184,7 @@ def read_response(self, disable_decoding=False):
144
184
return response
145
185
146
186
147
- class _AsyncHiredisParser (AsyncBaseParser ):
187
+ class _AsyncHiredisParser (AsyncBaseParser , AsyncPushNotificationsParser ):
148
188
"""Async implementation of parser class for connections using Hiredis"""
149
189
150
190
__slots__ = ("_reader" ,)
@@ -154,6 +194,14 @@ def __init__(self, socket_read_size: int):
154
194
raise RedisError ("Hiredis is not available." )
155
195
super ().__init__ (socket_read_size = socket_read_size )
156
196
self ._reader = None
197
+ self .pubsub_push_handler_func = self .handle_pubsub_push_response
198
+ self .invalidation_push_handler_func = None
199
+ self ._hiredis_PushNotificationType = None
200
+
201
+ async def handle_pubsub_push_response (self , response ):
202
+ logger = getLogger ("push_response" )
203
+ logger .debug ("Push response: " + str (response ))
204
+ return response
157
205
158
206
def on_connect (self , connection ):
159
207
import hiredis
@@ -171,6 +219,14 @@ def on_connect(self, connection):
171
219
self ._reader = hiredis .Reader (** kwargs )
172
220
self ._connected = True
173
221
222
+ try :
223
+ self ._hiredis_PushNotificationType = getattr (
224
+ hiredis , "PushNotification" , None
225
+ )
226
+ except AttributeError :
227
+ # hiredis < 3.2
228
+ self ._hiredis_PushNotificationType = None
229
+
174
230
def on_disconnect (self ):
175
231
self ._connected = False
176
232
@@ -195,7 +251,7 @@ async def read_from_socket(self):
195
251
return True
196
252
197
253
async def read_response (
198
- self , disable_decoding : bool = False
254
+ self , disable_decoding : bool = False , push_request : bool = False
199
255
) -> Union [EncodableT , List [EncodableT ]]:
200
256
# If `on_disconnect()` has been called, prohibit any more reads
201
257
# even if they could happen because data might be present.
@@ -207,6 +263,7 @@ async def read_response(
207
263
response = self ._reader .gets (False )
208
264
else :
209
265
response = self ._reader .gets ()
266
+
210
267
while response is NOT_ENOUGH_DATA :
211
268
await self .read_from_socket ()
212
269
if disable_decoding :
@@ -219,6 +276,16 @@ async def read_response(
219
276
# happened
220
277
if isinstance (response , ConnectionError ):
221
278
raise response
279
+ elif self ._hiredis_PushNotificationType is not None and isinstance (
280
+ response , self ._hiredis_PushNotificationType
281
+ ):
282
+ response = await self .handle_push_response (response )
283
+ if not push_request :
284
+ return await self .read_response (
285
+ disable_decoding = disable_decoding , push_request = push_request
286
+ )
287
+ else :
288
+ return response
222
289
elif (
223
290
isinstance (response , list )
224
291
and response
0 commit comments