1414version_regex = re .compile (rb'Zookeeper version: (\d+)\.(\d+)\.(\d+)-.*' )
1515
1616# all requests and responses are prefixed with a 32-bit int denoting size
17- size_struct = struct .Struct ("!i" )
17+ size_struct = struct .Struct ('!i' )
1818# replies are prefixed with an xid, zxid and error code
19- reply_header_struct = struct .Struct (" !iqi" )
19+ reply_header_struct = struct .Struct (' !iqi' )
2020
2121log = logging .getLogger (__name__ )
22- payload_log = logging .getLogger (__name__ + " .payload" )
22+ payload_log = logging .getLogger (__name__ + ' .payload' )
2323if payload_log .level == logging .NOTSET :
2424 payload_log .setLevel (logging .INFO )
2525
2626
2727class Connection :
28-
2928 def __init__ (self , host , port , watch_handler , read_timeout ):
3029 self .host = host
3130 self .port = int (port )
@@ -54,27 +53,31 @@ async def _make_handshake(self):
5453 self .host_ip = self .writer .transport .get_extra_info ('peername' )[0 ]
5554
5655 log .debug ("Sending 'srvr' command to %s:%d" , self .host , self .port )
57- self .writer .write (b" srvr" )
56+ self .writer .write (b' srvr' )
5857
5958 answer = await self .reader .read ()
6059
61- version_line = answer .split (b" \n " )[0 ]
60+ version_line = answer .split (b' \n ' )[0 ]
6261 match = version_regex .match (version_line )
6362 if match is None :
6463 raise ConnectionError
6564 self .version_info = tuple (map (int , match .groups ()))
66- self .start_read_only = bool (b" READ_ONLY" in answer )
65+ self .start_read_only = bool (b' READ_ONLY' in answer )
6766
68- log .debug (" Version info: %s" , self .version_info )
69- log .debug (" Read-only mode: %s" , self .start_read_only )
67+ log .debug (' Version info: %s' , self .version_info )
68+ log .debug (' Read-only mode: %s' , self .start_read_only )
7069
71- log .debug (" Actual connection to server %s:%d" , self .host , self .port )
70+ log .debug (' Actual connection to server %s:%d' , self .host , self .port )
7271
7372 async def connect (self ):
74- log .debug (" Initial connection to server %s:%d" , self .host , self .port )
73+ log .debug (' Initial connection to server %s:%d' , self .host , self .port )
7574
76- async with asyncio .timeout (self .read_timeout ):
77- self .reader , self .writer = await asyncio .open_connection (self .host , self .port )
75+ try :
76+ self .reader , self .writer = await asyncio .wait_for (
77+ asyncio .open_connection (self .host , self .port ), timeout = self .read_timeout
78+ )
79+ except asyncio .TimeoutError :
80+ raise TimeoutError (f'Connection to { self .host } :{ self .port } timed out after { self .read_timeout } seconds' )
7881
7982 try :
8083 await self ._make_handshake ()
@@ -85,7 +88,7 @@ async def connect(self):
8588
8689 async def send_connect (self , request ):
8790 # meant to be used before the read_loop starts
88- payload_log .debug (" [SEND] (initial) %s" , request )
91+ payload_log .debug (' [SEND] (initial) %s' , request )
8992
9093 payload = request .serialize ()
9194 payload = size_struct .pack (len (payload )) + payload
@@ -95,10 +98,10 @@ async def send_connect(self, request):
9598 try :
9699 _ , zxid , response = await self .read_response (initial_connect = True )
97100 except Exception :
98- log .exception (" Error reading connect response." )
101+ log .exception (' Error reading connect response.' )
99102 return
100103
101- payload_log .debug (" [RECV] (initial) %s" , response )
104+ payload_log .debug (' [RECV] (initial) %s' , response )
102105 return zxid , response
103106
104107 def start_read_loop (self ):
@@ -115,7 +118,7 @@ def send(self, request, xid=None):
115118 if request .special_xid :
116119 xid = request .special_xid
117120
118- payload_log .debug (" [SEND] (xid: %s) %s" , xid , request )
121+ payload_log .debug (' [SEND] (xid: %s) %s' , xid , request )
119122
120123 payload = request .serialize (xid )
121124 payload = size_struct .pack (len (payload )) + payload
@@ -136,8 +139,7 @@ def send(self, request, xid=None):
136139 return f
137140
138141 def pending_count (self ):
139- return (sum (len (futs ) for futs in self .pending_specials .values ()) +
140- len (self .pending ))
142+ return sum (len (futs ) for futs in self .pending_specials .values ()) + len (self .pending )
141143
142144 async def read_loop (self ):
143145 """
@@ -155,11 +157,11 @@ async def read_loop(self):
155157 except (ConnectionAbortedError , asyncio .CancelledError ):
156158 return
157159 except Exception as e :
158- log .exception (" Error reading response." )
160+ log .exception (' Error reading response.' )
159161 self .abort ()
160162 return
161163
162- payload_log .debug (" [RECV] (xid: %s) %s" , xid , response )
164+ payload_log .debug (' [RECV] (xid: %s) %s' , xid , response )
163165
164166 if xid == protocol .WATCH_XID :
165167 self .watch_handler (response )
@@ -182,8 +184,7 @@ async def _read(self, size=-1):
182184 while remaining_size and (time () < end_time ):
183185 remaining_time = end_time - time ()
184186 try :
185- chunk = await asyncio .wait_for (self .reader .read (remaining_size ),
186- timeout = remaining_time )
187+ chunk = await asyncio .wait_for (self .reader .read (remaining_size ), timeout = remaining_time )
187188 except asyncio .TimeoutError :
188189 continue
189190 payload .append (chunk )
@@ -233,7 +234,7 @@ def abort(self, exception=exc.ConnectError):
233234 the given ``exception`` parameter is used (defaults to
234235 ``ConnectError``).
235236 """
236- log .warning (" Aborting connection to %s:%s" , self .host , self .port )
237+ log .warning (' Aborting connection to %s:%s' , self .host , self .port )
237238
238239 def abort_pending (f ):
239240 exc_info = sys .exc_info ()
@@ -272,8 +273,7 @@ async def close(self, timeout):
272273
273274 try :
274275 if self .pending_count () > 0 :
275- log .warning ('Pendings: {}; specials: {}' .format (
276- self .pending , self .pending_specials ))
276+ log .warning ('Pendings: {}; specials: {}' .format (self .pending , self .pending_specials ))
277277 self .abort (exception = exc .TimeoutError )
278278 except asyncio .TimeoutError :
279279 log .warning ('ABORT Timeout' )
0 commit comments