Skip to content

Commit ec57518

Browse files
websockets: fix ping_timeout
* Closes #3258 * Closes #2905 * Closes #2655 * Fixes an issue with the calculation of ping timeout interval that could cause connections to be erroneously timed out and closed from the server end.
1 parent d92390e commit ec57518

File tree

2 files changed

+153
-48
lines changed

2 files changed

+153
-48
lines changed

tornado/test/websocket_test.py

Lines changed: 70 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -810,7 +810,11 @@ class PingHandler(TestWebSocketHandler):
810810
def on_pong(self, data):
811811
self.write_message("got pong")
812812

813-
return Application([("/", PingHandler)], websocket_ping_interval=0.01)
813+
return Application(
814+
[("/", PingHandler)],
815+
websocket_ping_interval=0.01,
816+
websocket_ping_timeout=0,
817+
)
814818

815819
@gen_test
816820
def test_server_ping(self):
@@ -831,14 +835,77 @@ def on_ping(self, data):
831835

832836
@gen_test
833837
def test_client_ping(self):
834-
ws = yield self.ws_connect("/", ping_interval=0.01)
838+
ws = yield self.ws_connect("/", ping_interval=0.01, ping_timeout=0)
835839
for i in range(3):
836840
response = yield ws.read_message()
837841
self.assertEqual(response, "got ping")
838-
# TODO: test that the connection gets closed if ping responses stop.
839842
ws.close()
840843

841844

845+
class ServerPingTimeoutTest(WebSocketBaseTestCase):
846+
def get_app(self):
847+
handlers = []
848+
849+
class PingHandler(TestWebSocketHandler):
850+
def initialize(self, close_future=None, compression_options=None):
851+
# capture the handler instance so we can interrogate it later
852+
handlers.append(self)
853+
return super().initialize(
854+
close_future=close_future,
855+
compression_options=compression_options,
856+
)
857+
858+
app = Application([("/", PingHandler)])
859+
app._handlers = handlers
860+
return app
861+
862+
@staticmethod
863+
def suppress_pong(ws):
864+
"""Suppress the client's "pong" response."""
865+
def wrapper(fcn):
866+
def _inner(oppcode: int, data: bytes):
867+
if oppcode == 0xA: # NOTE: 0x9=ping, 0xA=pong
868+
# prevent pong responses
869+
return
870+
# leave all other responses unchanged
871+
return fcn(oppcode, data)
872+
return _inner
873+
874+
ws.protocol._handle_message = wrapper(ws.protocol._handle_message)
875+
876+
@gen_test
877+
def test_client_ping_timeout(self):
878+
# websocket client
879+
ws = yield self.ws_connect("/", ping_interval=0.2, ping_timeout=0.05)
880+
881+
# websocket handler (server side)
882+
handler = self._app._handlers[0]
883+
884+
for _ in range(5):
885+
# wait for the ping period
886+
yield gen.sleep(0.2)
887+
888+
# connection should still be open from the server end
889+
assert handler.close_code is None
890+
assert handler.close_reason is None
891+
892+
# connection should still be open from the client end
893+
assert ws.protocol.close_code is None
894+
895+
# delay the pong message by 0.10 seconds (timeout=0.05)
896+
self.suppress_pong(ws)
897+
898+
# give the server time to register this
899+
yield gen.sleep(0.2)
900+
901+
# connection should be closed from the server side
902+
assert handler.close_code == 1000
903+
assert handler.close_reason == 'ping timed out'
904+
905+
# client should have received a close operation
906+
assert ws.protocol.close_code == 1000
907+
908+
842909
class ManualPingTest(WebSocketBaseTestCase):
843910
def get_app(self):
844911
class PingHandler(TestWebSocketHandler):

tornado/websocket.py

Lines changed: 83 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@
1414
import abc
1515
import asyncio
1616
import base64
17+
import functools
1718
import hashlib
19+
import logging
1820
import os
1921
import sys
2022
import struct
@@ -97,6 +99,9 @@ def log_exception(
9799

98100
_default_max_message_size = 10 * 1024 * 1024
99101

102+
# log to "gen_log" but suppress duplicate log messages
103+
de_dupe_gen_log = functools.lru_cache(gen_log.log)
104+
100105

101106
class WebSocketError(Exception):
102107
pass
@@ -274,17 +279,41 @@ async def get(self, *args: Any, **kwargs: Any) -> None:
274279

275280
@property
276281
def ping_interval(self) -> Optional[float]:
277-
"""The interval for websocket keep-alive pings.
282+
"""The interval for sending websocket pings.
283+
284+
If this is non-zero, the websocket will send a ping every
285+
ping_interval seconds.
286+
The client will respond with a "pong". The connection can be configured
287+
to timeout on late pong delivery using ``websocket_ping_timeout``.
278288
279-
Set websocket_ping_interval = 0 to disable pings.
289+
Set ``websocket_ping_interval = 0`` to disable pings.
290+
291+
Default: ``0``
280292
"""
281293
return self.settings.get("websocket_ping_interval", None)
282294

283295
@property
284296
def ping_timeout(self) -> Optional[float]:
285-
"""If no ping is received in this many seconds,
286-
close the websocket connection (VPNs, etc. can fail to cleanly close ws connections).
287-
Default is max of 3 pings or 30 seconds.
297+
"""Timeout if no pong is received in this many seconds.
298+
299+
To be used in combination with ``websocket_ping_interval > 0``.
300+
If a ping response (a "pong") is not received within
301+
``websocket_ping_timeout`` seconds, then the websocket connection
302+
will be closed.
303+
304+
This can help to clean up clients which have disconnected without
305+
cleanly closing the websocket connection.
306+
307+
Note, the ping timeout cannot be longer than the ping interval.
308+
309+
Set ``websocket_ping_timeout = 0`` to disable the ping timeout.
310+
311+
Default: ``min(ping_interval, 30)``
312+
313+
.. versionchanged:: 6.5.0
314+
Default changed from the max of 3 pings or 30 seconds.
315+
The ping timeout can no longer be configured longer than the
316+
ping interval.
288317
"""
289318
return self.settings.get("websocket_ping_timeout", None)
290319

@@ -832,10 +861,10 @@ def __init__(
832861
self._wire_bytes_in = 0
833862
self._wire_bytes_out = 0
834863
self.ping_callback = None # type: Optional[PeriodicCallback]
835-
self.last_ping = 0.0
836-
self.last_pong = 0.0
864+
self._received_pong = False # type: bool
837865
self.close_code = None # type: Optional[int]
838866
self.close_reason = None # type: Optional[str]
867+
self._ping_coroutine = None # type: Optional[asyncio.Task]
839868

840869
# Use a property for this to satisfy the abc.
841870
@property
@@ -1232,7 +1261,7 @@ def _handle_message(self, opcode: int, data: bytes) -> "Optional[Future[None]]":
12321261
self._run_callback(self.handler.on_ping, data)
12331262
elif opcode == 0xA:
12341263
# Pong
1235-
self.last_pong = IOLoop.current().time()
1264+
self._received_pong = True
12361265
return self._run_callback(self.handler.on_pong, data)
12371266
else:
12381267
self._abort()
@@ -1266,9 +1295,9 @@ def close(self, code: Optional[int] = None, reason: Optional[str] = None) -> Non
12661295
self._waiting = self.stream.io_loop.add_timeout(
12671296
self.stream.io_loop.time() + 5, self._abort
12681297
)
1269-
if self.ping_callback:
1270-
self.ping_callback.stop()
1271-
self.ping_callback = None
1298+
if self._ping_coroutine:
1299+
self._ping_coroutine.cancel()
1300+
self._ping_coroutine = None
12721301

12731302
def is_closing(self) -> bool:
12741303
"""Return ``True`` if this connection is closing.
@@ -1279,60 +1308,69 @@ def is_closing(self) -> bool:
12791308
"""
12801309
return self.stream.closed() or self.client_terminated or self.server_terminated
12811310

1311+
def set_nodelay(self, x: bool) -> None:
1312+
self.stream.set_nodelay(x)
1313+
12821314
@property
1283-
def ping_interval(self) -> Optional[float]:
1315+
def ping_interval(self) -> float:
12841316
interval = self.params.ping_interval
12851317
if interval is not None:
12861318
return interval
12871319
return 0
12881320

12891321
@property
1290-
def ping_timeout(self) -> Optional[float]:
1322+
def ping_timeout(self) -> float:
12911323
timeout = self.params.ping_timeout
12921324
if timeout is not None:
1325+
if self.ping_interval and timeout > self.ping_interval:
1326+
de_dupe_gen_log(
1327+
# Note: using de_dupe_gen_log to prevent this message from
1328+
# being duplicated for each connection
1329+
logging.WARNING,
1330+
f'The websocket_ping_timeout ({timeout}) cannot be longer'
1331+
f' than the websocket_ping_interval ({self.ping_interval}).'
1332+
f'\nSetting websocket_ping_timeout={self.ping_interval}'
1333+
)
1334+
return self.ping_interval
12931335
return timeout
1294-
assert self.ping_interval is not None
1295-
return max(3 * self.ping_interval, 30)
1336+
return min(self.ping_interval, 30)
12961337

12971338
def start_pinging(self) -> None:
12981339
"""Start sending periodic pings to keep the connection alive"""
1299-
assert self.ping_interval is not None
1300-
if self.ping_interval > 0:
1301-
self.last_ping = self.last_pong = IOLoop.current().time()
1302-
self.ping_callback = PeriodicCallback(
1303-
self.periodic_ping, self.ping_interval * 1000
1304-
)
1305-
self.ping_callback.start()
1340+
if (
1341+
# prevent multiple ping coroutines being run in parallel
1342+
not self._ping_coroutine
1343+
# only run the ping coroutine if a ping interval is configured
1344+
and self.ping_interval > 0
1345+
):
1346+
self._ping_coroutine = asyncio.create_task(self.periodic_ping())
13061347

1307-
def periodic_ping(self) -> None:
1308-
"""Send a ping to keep the websocket alive
1348+
async def periodic_ping(self) -> None:
1349+
"""Send a ping and wait for a pong if ping_timeout is configured.
13091350
13101351
Called periodically if the websocket_ping_interval is set and non-zero.
13111352
"""
1312-
if self.is_closing() and self.ping_callback is not None:
1313-
self.ping_callback.stop()
1314-
return
1353+
interval = self.ping_interval
1354+
timeout = self.ping_timeout
13151355

1316-
# Check for timeout on pong. Make sure that we really have
1317-
# sent a recent ping in case the machine with both server and
1318-
# client has been suspended since the last ping.
1319-
now = IOLoop.current().time()
1320-
since_last_pong = now - self.last_pong
1321-
since_last_ping = now - self.last_ping
1322-
assert self.ping_interval is not None
1323-
assert self.ping_timeout is not None
1324-
if (
1325-
since_last_ping < 2 * self.ping_interval
1326-
and since_last_pong > self.ping_timeout
1327-
):
1328-
self.close()
1329-
return
1356+
await asyncio.sleep(interval)
13301357

1331-
self.write_ping(b"")
1332-
self.last_ping = now
1358+
while True:
1359+
# send a ping
1360+
self._received_pong = False
1361+
ping_time = IOLoop.current().time()
1362+
self.write_ping(b"")
13331363

1334-
def set_nodelay(self, x: bool) -> None:
1335-
self.stream.set_nodelay(x)
1364+
# wait until the ping timeout
1365+
await asyncio.sleep(timeout)
1366+
1367+
# make sure we received a pong within the timeout
1368+
if timeout > 0 and not self._received_pong:
1369+
self.close(reason='ping timed out')
1370+
return
1371+
1372+
# wait until the next scheduled ping
1373+
await asyncio.sleep(IOLoop.current().time() - ping_time + interval)
13361374

13371375

13381376
class WebSocketClientConnection(simple_httpclient._HTTPConnection):

0 commit comments

Comments
 (0)