From 63bd7b58b718184b0732b77a6fb7664ec469fad4 Mon Sep 17 00:00:00 2001 From: saville Date: Wed, 20 Aug 2025 15:34:01 -0600 Subject: [PATCH 1/3] Fix 66568: Prevent unclosed TCP channels and avoid additional errors --- changelog/66568.fixed.md | 1 + salt/channel/client.py | 3 ++- salt/transport/tcp.py | 17 +++++++++++++++-- 3 files changed, 18 insertions(+), 3 deletions(-) create mode 100644 changelog/66568.fixed.md diff --git a/changelog/66568.fixed.md b/changelog/66568.fixed.md new file mode 100644 index 000000000000..fbfa1aa540b0 --- /dev/null +++ b/changelog/66568.fixed.md @@ -0,0 +1 @@ +Fix closing of TCP transport channels and avoid additional errors diff --git a/salt/channel/client.py b/salt/channel/client.py index b9034ee94519..2c8441156957 100644 --- a/salt/channel/client.py +++ b/salt/channel/client.py @@ -556,7 +556,8 @@ def connect_callback(self, result): # may have been restarted yield self.send_id(self.token, self._reconnected) self.connected = True - self.event.fire_event({"master": self.opts["master"]}, "__master_connected") + if self.event: + self.event.fire_event({"master": self.opts["master"]}, "__master_connected") if self._reconnected: # On reconnects, fire a master event to notify that the minion is # available. diff --git a/salt/transport/tcp.py b/salt/transport/tcp.py index e326efb872ed..e26884135090 100644 --- a/salt/transport/tcp.py +++ b/salt/transport/tcp.py @@ -235,7 +235,6 @@ def __init__(self, opts, io_loop, **kwargs): # pylint: disable=W0231 self.connected = False self._closing = False self._stream = None - self._closing = False self._closed = False self.backoff = opts.get("tcp_reconnect_backoff", 1) self.resolver = kwargs.get("resolver") @@ -1769,6 +1768,7 @@ def __init__(self, opts, io_loop, **kwargs): # pylint: disable=W0231 self._closed = False self._stream_return_running = False self._stream = None + self.task = None self.disconnect_callback = _null_callback self.connect_callback = _null_callback self.backoff = opts.get("tcp_reconnect_backoff", 1) @@ -1837,7 +1837,7 @@ async def _stream_return(self): message_id, ) except tornado.iostream.StreamClosedError as e: - log.error( + log.debug( "tcp stream to %s:%s closed, unable to recv", self.host, self.port, @@ -1879,6 +1879,8 @@ async def _stream_return(self): stream.close() unpacker = salt.utils.msgpack.Unpacker() await self.connect() + except asyncio.CancelledError: + log.debug("Stream return cancelled") self._stream_return_running = False def _message_id(self): @@ -1927,9 +1929,20 @@ async def _do_send(): def close(self): if self._closing: return + self._closing = True if self._stream is not None: self._stream.close() self._stream = None + if self.task is not None: + self.task.cancel() + # Wait for the task to finish via asyncio + group = asyncio.gather(self.task) + try: + self.task.get_loop().run_until_complete(group) + except RuntimeError: + # Ignore event loop was already running message + pass + self.task = None def __enter__(self): return self From 2db10e9609db73a828246196392d124c0b6539f4 Mon Sep 17 00:00:00 2001 From: saville Date: Tue, 26 Aug 2025 10:01:10 -0600 Subject: [PATCH 2/3] Fix formatting errors --- salt/channel/client.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/salt/channel/client.py b/salt/channel/client.py index 2c8441156957..a5a51c433186 100644 --- a/salt/channel/client.py +++ b/salt/channel/client.py @@ -557,7 +557,9 @@ def connect_callback(self, result): yield self.send_id(self.token, self._reconnected) self.connected = True if self.event: - self.event.fire_event({"master": self.opts["master"]}, "__master_connected") + self.event.fire_event( + {"master": self.opts["master"]}, "__master_connected" + ) if self._reconnected: # On reconnects, fire a master event to notify that the minion is # available. From ae480b7e9f6a943c99df67f1bd08ad90a29fc3ce Mon Sep 17 00:00:00 2001 From: saville Date: Tue, 2 Sep 2025 13:17:24 -0600 Subject: [PATCH 3/3] Add tests to ensure that tasks are closed and transport is cleaned up properly --- .../functional/transport/server/test_request_server.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tests/pytests/functional/transport/server/test_request_server.py b/tests/pytests/functional/transport/server/test_request_server.py index d2f0363791c0..ee4f34c294e4 100644 --- a/tests/pytests/functional/transport/server/test_request_server.py +++ b/tests/pytests/functional/transport/server/test_request_server.py @@ -44,10 +44,16 @@ async def handler(message): try: ret = await req_client.send({"req": "test"}) + if transport == "tcp": + assert req_client.task is not None assert [reqmsg] == requests assert repmsg == ret finally: req_client.close() + if transport == "tcp": + # Ensure that issue 68277 is fixed + assert req_client.task is None or req_client.task.done() + assert req_client._closing req_server.close() # Yield to loop in order to allow background close methods to finish.