Skip to content

Commit 6241c78

Browse files
authored
Merge pull request #68277 from bluesliverx/3007.x
Fix 66568: Prevent unclosed TCP channels and avoid additional errors
2 parents ed27c4d + ae480b7 commit 6241c78

File tree

4 files changed

+26
-3
lines changed

4 files changed

+26
-3
lines changed

changelog/66568.fixed.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fix closing of TCP transport channels and avoid additional errors

salt/channel/client.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -556,7 +556,10 @@ def connect_callback(self, result):
556556
# may have been restarted
557557
yield self.send_id(self.token, self._reconnected)
558558
self.connected = True
559-
self.event.fire_event({"master": self.opts["master"]}, "__master_connected")
559+
if self.event:
560+
self.event.fire_event(
561+
{"master": self.opts["master"]}, "__master_connected"
562+
)
560563
if self._reconnected:
561564
# On reconnects, fire a master event to notify that the minion is
562565
# available.

salt/transport/tcp.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,6 @@ def __init__(self, opts, io_loop, **kwargs): # pylint: disable=W0231
235235
self.connected = False
236236
self._closing = False
237237
self._stream = None
238-
self._closing = False
239238
self._closed = False
240239
self.backoff = opts.get("tcp_reconnect_backoff", 1)
241240
self.resolver = kwargs.get("resolver")
@@ -1769,6 +1768,7 @@ def __init__(self, opts, io_loop, **kwargs): # pylint: disable=W0231
17691768
self._closed = False
17701769
self._stream_return_running = False
17711770
self._stream = None
1771+
self.task = None
17721772
self.disconnect_callback = _null_callback
17731773
self.connect_callback = _null_callback
17741774
self.backoff = opts.get("tcp_reconnect_backoff", 1)
@@ -1837,7 +1837,7 @@ async def _stream_return(self):
18371837
message_id,
18381838
)
18391839
except tornado.iostream.StreamClosedError as e:
1840-
log.error(
1840+
log.debug(
18411841
"tcp stream to %s:%s closed, unable to recv",
18421842
self.host,
18431843
self.port,
@@ -1879,6 +1879,8 @@ async def _stream_return(self):
18791879
stream.close()
18801880
unpacker = salt.utils.msgpack.Unpacker()
18811881
await self.connect()
1882+
except asyncio.CancelledError:
1883+
log.debug("Stream return cancelled")
18821884
self._stream_return_running = False
18831885

18841886
def _message_id(self):
@@ -1927,9 +1929,20 @@ async def _do_send():
19271929
def close(self):
19281930
if self._closing:
19291931
return
1932+
self._closing = True
19301933
if self._stream is not None:
19311934
self._stream.close()
19321935
self._stream = None
1936+
if self.task is not None:
1937+
self.task.cancel()
1938+
# Wait for the task to finish via asyncio
1939+
group = asyncio.gather(self.task)
1940+
try:
1941+
self.task.get_loop().run_until_complete(group)
1942+
except RuntimeError:
1943+
# Ignore event loop was already running message
1944+
pass
1945+
self.task = None
19331946

19341947
def __enter__(self):
19351948
return self

tests/pytests/functional/transport/server/test_request_server.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,16 @@ async def handler(message):
4444
try:
4545

4646
ret = await req_client.send({"req": "test"})
47+
if transport == "tcp":
48+
assert req_client.task is not None
4749
assert [reqmsg] == requests
4850
assert repmsg == ret
4951
finally:
5052
req_client.close()
53+
if transport == "tcp":
54+
# Ensure that issue 68277 is fixed
55+
assert req_client.task is None or req_client.task.done()
56+
assert req_client._closing
5157
req_server.close()
5258

5359
# Yield to loop in order to allow background close methods to finish.

0 commit comments

Comments
 (0)