66
77from .client_exceptions import ClientError , ServerTimeoutError
88from .client_reqrep import ClientResponse
9- from .helpers import call_later , set_result
9+ from .helpers import calculate_timeout_when , set_result
1010from .http import (
1111 WS_CLOSED_MESSAGE ,
1212 WS_CLOSING_MESSAGE ,
@@ -62,6 +62,7 @@ def __init__(
6262 self ._autoping = autoping
6363 self ._heartbeat = heartbeat
6464 self ._heartbeat_cb : Optional [asyncio .TimerHandle ] = None
65+ self ._heartbeat_when : float = 0.0
6566 if heartbeat is not None :
6667 self ._pong_heartbeat = heartbeat / 2.0
6768 self ._pong_response_cb : Optional [asyncio .TimerHandle ] = None
@@ -75,52 +76,64 @@ def __init__(
7576 self ._reset_heartbeat ()
7677
7778 def _cancel_heartbeat (self ) -> None :
78- if self ._pong_response_cb is not None :
79- self ._pong_response_cb .cancel ()
80- self ._pong_response_cb = None
81-
79+ self ._cancel_pong_response_cb ()
8280 if self ._heartbeat_cb is not None :
8381 self ._heartbeat_cb .cancel ()
8482 self ._heartbeat_cb = None
8583
86- def _reset_heartbeat (self ) -> None :
87- self ._cancel_heartbeat ()
84+ def _cancel_pong_response_cb (self ) -> None :
85+ if self ._pong_response_cb is not None :
86+ self ._pong_response_cb .cancel ()
87+ self ._pong_response_cb = None
8888
89- if self ._heartbeat is not None :
90- self ._heartbeat_cb = call_later (
91- self ._send_heartbeat ,
92- self ._heartbeat ,
93- self ._loop ,
94- timeout_ceil_threshold = (
95- self ._conn ._connector ._timeout_ceil_threshold
96- if self ._conn is not None
97- else 5
98- ),
99- )
89+ def _reset_heartbeat (self ) -> None :
90+ if self ._heartbeat is None :
91+ return
92+ self ._cancel_pong_response_cb ()
93+ loop = self ._loop
94+ assert loop is not None
95+ conn = self ._conn
96+ timeout_ceil_threshold = (
97+ conn ._connector ._timeout_ceil_threshold if conn is not None else 5
98+ )
99+ now = loop .time ()
100+ when = calculate_timeout_when (now , self ._heartbeat , timeout_ceil_threshold )
101+ self ._heartbeat_when = when
102+ if self ._heartbeat_cb is None :
103+ # We do not cancel the previous heartbeat_cb here because
104+ # it generates a significant amount of TimerHandle churn
105+ # which causes asyncio to rebuild the heap frequently.
106+ # Instead _send_heartbeat() will reschedule the next
107+ # heartbeat if it fires too early.
108+ self ._heartbeat_cb = loop .call_at (when , self ._send_heartbeat )
100109
101110 def _send_heartbeat (self ) -> None :
102- if self ._heartbeat is not None and not self ._closed :
103- # fire-and-forget a task is not perfect but maybe ok for
104- # sending ping. Otherwise we need a long-living heartbeat
105- # task in the class.
106- self ._loop .create_task (self ._writer .ping ())
107-
108- if self ._pong_response_cb is not None :
109- self ._pong_response_cb .cancel ()
110- self ._pong_response_cb = call_later (
111- self ._pong_not_received ,
112- self ._pong_heartbeat ,
113- self ._loop ,
114- timeout_ceil_threshold = (
115- self ._conn ._connector ._timeout_ceil_threshold
116- if self ._conn is not None
117- else 5
118- ),
111+ self ._heartbeat_cb = None
112+ loop = self ._loop
113+ now = loop .time ()
114+ if now < self ._heartbeat_when :
115+ # Heartbeat fired too early, reschedule
116+ self ._heartbeat_cb = loop .call_at (
117+ self ._heartbeat_when , self ._send_heartbeat
119118 )
119+ return
120+
121+ # fire-and-forget a task is not perfect but maybe ok for
122+ # sending ping. Otherwise we need a long-living heartbeat
123+ # task in the class.
124+ loop .create_task (self ._writer .ping ()) # type: ignore[unused-awaitable]
125+
126+ conn = self ._conn
127+ timeout_ceil_threshold = (
128+ conn ._connector ._timeout_ceil_threshold if conn is not None else 5
129+ )
130+ when = calculate_timeout_when (now , self ._pong_heartbeat , timeout_ceil_threshold )
131+ self ._cancel_pong_response_cb ()
132+ self ._pong_response_cb = loop .call_at (when , self ._pong_not_received )
120133
121134 def _pong_not_received (self ) -> None :
122135 if not self ._closed :
123- self ._closed = True
136+ self ._set_closed ()
124137 self ._close_code = WSCloseCode .ABNORMAL_CLOSURE
125138 self ._exception = ServerTimeoutError ()
126139 self ._response .close ()
@@ -129,6 +142,22 @@ def _pong_not_received(self) -> None:
129142 WSMessage (WSMsgType .ERROR , self ._exception , None )
130143 )
131144
145+ def _set_closed (self ) -> None :
146+ """Set the connection to closed.
147+
148+ Cancel any heartbeat timers and set the closed flag.
149+ """
150+ self ._closed = True
151+ self ._cancel_heartbeat ()
152+
153+ def _set_closing (self ) -> None :
154+ """Set the connection to closing.
155+
156+ Cancel any heartbeat timers and set the closing flag.
157+ """
158+ self ._closing = True
159+ self ._cancel_heartbeat ()
160+
132161 @property
133162 def closed (self ) -> bool :
134163 return self ._closed
@@ -193,13 +222,12 @@ async def close(self, *, code: int = WSCloseCode.OK, message: bytes = b"") -> bo
193222 if self ._waiting and not self ._closing :
194223 assert self ._loop is not None
195224 self ._close_wait = self ._loop .create_future ()
196- self ._closing = True
225+ self ._set_closing ()
197226 self ._reader .feed_data (WS_CLOSING_MESSAGE , 0 )
198227 await self ._close_wait
199228
200229 if not self ._closed :
201- self ._cancel_heartbeat ()
202- self ._closed = True
230+ self ._set_closed ()
203231 try :
204232 await self ._writer .close (code , message )
205233 except asyncio .CancelledError :
@@ -266,7 +294,8 @@ async def receive(self, timeout: Optional[float] = None) -> WSMessage:
266294 await self .close ()
267295 return WSMessage (WSMsgType .CLOSED , None , None )
268296 except ClientError :
269- self ._closed = True
297+ # Likely ServerDisconnectedError when connection is lost
298+ self ._set_closed ()
270299 self ._close_code = WSCloseCode .ABNORMAL_CLOSURE
271300 return WS_CLOSED_MESSAGE
272301 except WebSocketError as exc :
@@ -275,18 +304,18 @@ async def receive(self, timeout: Optional[float] = None) -> WSMessage:
275304 return WSMessage (WSMsgType .ERROR , exc , None )
276305 except Exception as exc :
277306 self ._exception = exc
278- self ._closing = True
307+ self ._set_closing ()
279308 self ._close_code = WSCloseCode .ABNORMAL_CLOSURE
280309 await self .close ()
281310 return WSMessage (WSMsgType .ERROR , exc , None )
282311
283312 if msg .type is WSMsgType .CLOSE :
284- self ._closing = True
313+ self ._set_closing ()
285314 self ._close_code = msg .data
286315 if not self ._closed and self ._autoclose :
287316 await self .close ()
288317 elif msg .type is WSMsgType .CLOSING :
289- self ._closing = True
318+ self ._set_closing ()
290319 elif msg .type is WSMsgType .PING and self ._autoping :
291320 await self .pong (msg .data )
292321 continue
0 commit comments