From 7927c61c95712d93ff7b96d0165a43d81e039f23 Mon Sep 17 00:00:00 2001 From: Mats Sundvall Date: Wed, 25 Feb 2026 22:24:14 +0100 Subject: [PATCH 1/3] Enhance Interface ABC with common connection management MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Formalize attributes and methods that all Interface implementations already share: xknx/cemi_received_callback slots, connection_state_changed() method, and current_address property. Update _Tunnel and Routing to use these Interface methods instead of direct xknx access. Pure refactor — no behavior changes. Prepares Interface for custom transport implementations per feedback on #1802. Co-Authored-By: Claude Opus 4.6 --- xknx/io/interface.py | 30 +++++++++++++++++++++++++++++- xknx/io/routing.py | 12 +++++------- xknx/io/tunnel.py | 12 +++++------- 3 files changed, 39 insertions(+), 15 deletions(-) diff --git a/xknx/io/interface.py b/xknx/io/interface.py index a9e32b358..0682a2b3a 100644 --- a/xknx/io/interface.py +++ b/xknx/io/interface.py @@ -10,20 +10,28 @@ from abc import ABC, abstractmethod from collections.abc import Callable +from typing import TYPE_CHECKING from xknx.cemi import CEMIFrame +from xknx.core import XknxConnectionState, XknxConnectionType +from xknx.telegram import IndividualAddress from .transport.ip_transport import KNXIPTransport +if TYPE_CHECKING: + from xknx.xknx import XKNX + CEMIBytesCallbackType = Callable[[bytes], None] class Interface(ABC): """Abstract base class for KNX/IP connections.""" - __slots__ = ("transport",) + __slots__ = ("cemi_received_callback", "transport", "xknx") + cemi_received_callback: CEMIBytesCallbackType transport: KNXIPTransport + xknx: XKNX @abstractmethod async def connect(self) -> None: @@ -36,3 +44,23 @@ async def disconnect(self) -> None: @abstractmethod async def send_cemi(self, cemi: CEMIFrame) -> None: """Send CEMIFrame to KNX bus.""" + + def connection_state_changed( + self, + state: XknxConnectionState, + connection_type: XknxConnectionType = XknxConnectionType.NOT_CONNECTED, + ) -> None: + """Update connection state via connection manager.""" + self.xknx.connection_manager.connection_state_changed( + state, connection_type + ) + + @property + def current_address(self) -> IndividualAddress: + """Get current individual address.""" + return self.xknx.current_address + + @current_address.setter + def current_address(self, address: IndividualAddress) -> None: + """Set current individual address.""" + self.xknx.current_address = address diff --git a/xknx/io/routing.py b/xknx/io/routing.py index abe2123a6..a70d8057f 100644 --- a/xknx/io/routing.py +++ b/xknx/io/routing.py @@ -147,12 +147,10 @@ class Routing(Interface): __slots__ = ( "_flow_control", - "cemi_received_callback", "individual_address", "local_ip", "multicast_group", "multicast_port", - "xknx", ) connection_type = XknxConnectionType.ROUTING @@ -202,8 +200,8 @@ def _init_transport(self) -> None: async def connect(self) -> None: """Start routing.""" - self.xknx.current_address = self.individual_address - self.xknx.connection_manager.connection_state_changed( + self.current_address = self.individual_address + self.connection_state_changed( XknxConnectionState.CONNECTING, self.connection_type ) try: @@ -214,20 +212,20 @@ async def connect(self) -> None: type(ex).__name__, ex, ) - self.xknx.connection_manager.connection_state_changed( + self.connection_state_changed( XknxConnectionState.DISCONNECTED ) # close udp transport to prevent open file descriptors self.transport.stop() raise CommunicationError("Routing could not be started") from ex - self.xknx.connection_manager.connection_state_changed( + self.connection_state_changed( XknxConnectionState.CONNECTED, self.connection_type ) async def disconnect(self) -> None: """Stop routing.""" self.transport.stop() - self.xknx.connection_manager.connection_state_changed( + self.connection_state_changed( XknxConnectionState.DISCONNECTED ) self._flow_control.cancel() diff --git a/xknx/io/tunnel.py b/xknx/io/tunnel.py index aab3aa033..9320a45d8 100644 --- a/xknx/io/tunnel.py +++ b/xknx/io/tunnel.py @@ -55,11 +55,9 @@ class _Tunnel(Interface): "_src_address", "auto_reconnect", "auto_reconnect_wait", - "cemi_received_callback", "communication_channel", "local_hpai", "sequence_number", - "xknx", ) connection_type: XknxConnectionType @@ -119,7 +117,7 @@ async def connect(self) -> None: Raise CommunicationError when not successful. """ - self.xknx.connection_manager.connection_state_changed( + self.connection_state_changed( XknxConnectionState.CONNECTING, self.connection_type ) try: @@ -132,7 +130,7 @@ async def connect(self) -> None: type(ex).__name__, ex, ) - self.xknx.connection_manager.connection_state_changed( + self.connection_state_changed( XknxConnectionState.DISCONNECTED ) # close transport to prevent open file descriptors @@ -142,7 +140,7 @@ async def connect(self) -> None: ) from ex self._tunnel_established() - self.xknx.connection_manager.connection_state_changed( + self.connection_state_changed( XknxConnectionState.CONNECTED, self.connection_type ) @@ -209,7 +207,7 @@ async def _reconnect(self) -> None: def _prepare_disconnect(self) -> None: """Prepare for disconnect. Stop tunnel related tasks and set connection state.""" self.stop_heartbeat() - self.xknx.connection_manager.connection_state_changed( + self.connection_state_changed( XknxConnectionState.DISCONNECTED ) @@ -251,7 +249,7 @@ async def _connect_request(self) -> bool: ) # Use the individual address provided by the tunnelling server self._src_address = connect.crd.individual_address or IndividualAddress(0) - self.xknx.current_address = self._src_address + self.current_address = self._src_address logger.debug( "Tunnel established. communication_channel=%s, address=%s", connect.communication_channel, From 243c00fd7648f7ca5fb5cafc8e73f06898782822 Mon Sep 17 00:00:00 2001 From: Mats Sundvall Date: Fri, 27 Feb 2026 13:55:57 +0100 Subject: [PATCH 2/3] refactor: simplify connection_state_changed per maintainer feedback - Annotate connection_type as ClassVar[XknxConnectionType] in Interface ABC and all subclasses (_Tunnel, UDPTunnel, TCPTunnel, SecureTunnel, Routing, RoutingSecure) - Remove connection_type parameter from Interface.connection_state_changed; method now uses self.connection_type internally - Auto-reset connection_type to NOT_CONNECTED on DISCONNECTED state in ConnectionManager, making the invariant explicit and removing reliance on callers passing the correct default Co-Authored-By: Claude Sonnet 4.6 --- xknx/core/connection_manager.py | 6 +++++- xknx/io/interface.py | 11 ++++------- xknx/io/routing.py | 18 ++++++------------ xknx/io/tunnel.py | 22 ++++++++-------------- 4 files changed, 23 insertions(+), 34 deletions(-) diff --git a/xknx/core/connection_manager.py b/xknx/core/connection_manager.py index d7aa7166b..8195641e5 100644 --- a/xknx/core/connection_manager.py +++ b/xknx/core/connection_manager.py @@ -88,7 +88,11 @@ def _connection_state_changed( return self._state = state - self.connection_type = connection_type + self.connection_type = ( + XknxConnectionType.NOT_CONNECTED + if state == XknxConnectionState.DISCONNECTED + else connection_type + ) if state == XknxConnectionState.CONNECTED: self.connected.set() self._reset_counters() diff --git a/xknx/io/interface.py b/xknx/io/interface.py index 0682a2b3a..9bef818f9 100644 --- a/xknx/io/interface.py +++ b/xknx/io/interface.py @@ -10,7 +10,7 @@ from abc import ABC, abstractmethod from collections.abc import Callable -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, ClassVar from xknx.cemi import CEMIFrame from xknx.core import XknxConnectionState, XknxConnectionType @@ -29,6 +29,7 @@ class Interface(ABC): __slots__ = ("cemi_received_callback", "transport", "xknx") + connection_type: ClassVar[XknxConnectionType] cemi_received_callback: CEMIBytesCallbackType transport: KNXIPTransport xknx: XKNX @@ -45,14 +46,10 @@ async def disconnect(self) -> None: async def send_cemi(self, cemi: CEMIFrame) -> None: """Send CEMIFrame to KNX bus.""" - def connection_state_changed( - self, - state: XknxConnectionState, - connection_type: XknxConnectionType = XknxConnectionType.NOT_CONNECTED, - ) -> None: + def connection_state_changed(self, state: XknxConnectionState) -> None: """Update connection state via connection manager.""" self.xknx.connection_manager.connection_state_changed( - state, connection_type + state, self.connection_type ) @property diff --git a/xknx/io/routing.py b/xknx/io/routing.py index a70d8057f..4d0b98254 100644 --- a/xknx/io/routing.py +++ b/xknx/io/routing.py @@ -11,7 +11,7 @@ from contextlib import asynccontextmanager import logging import random -from typing import TYPE_CHECKING, Final +from typing import TYPE_CHECKING, ClassVar, Final from xknx.cemi import CEMIFrame, CEMIMessageCode from xknx.core import XknxConnectionState, XknxConnectionType @@ -153,7 +153,7 @@ class Routing(Interface): "multicast_port", ) - connection_type = XknxConnectionType.ROUTING + connection_type: ClassVar[XknxConnectionType] = XknxConnectionType.ROUTING transport: UDPTransport def __init__( @@ -201,9 +201,7 @@ def _init_transport(self) -> None: async def connect(self) -> None: """Start routing.""" self.current_address = self.individual_address - self.connection_state_changed( - XknxConnectionState.CONNECTING, self.connection_type - ) + self.connection_state_changed(XknxConnectionState.CONNECTING) try: await self.transport.connect() except OSError as ex: @@ -212,15 +210,11 @@ async def connect(self) -> None: type(ex).__name__, ex, ) - self.connection_state_changed( - XknxConnectionState.DISCONNECTED - ) + self.connection_state_changed(XknxConnectionState.DISCONNECTED) # close udp transport to prevent open file descriptors self.transport.stop() raise CommunicationError("Routing could not be started") from ex - self.connection_state_changed( - XknxConnectionState.CONNECTED, self.connection_type - ) + self.connection_state_changed(XknxConnectionState.CONNECTED) async def disconnect(self) -> None: """Stop routing.""" @@ -288,7 +282,7 @@ class SecureRouting(Routing): "latency_ms", ) - connection_type = XknxConnectionType.ROUTING_SECURE + connection_type: ClassVar[XknxConnectionType] = XknxConnectionType.ROUTING_SECURE transport: SecureGroup def __init__( diff --git a/xknx/io/tunnel.py b/xknx/io/tunnel.py index 9320a45d8..9cfee214d 100644 --- a/xknx/io/tunnel.py +++ b/xknx/io/tunnel.py @@ -11,7 +11,7 @@ from collections.abc import AsyncGenerator from contextlib import asynccontextmanager import logging -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, ClassVar from xknx.cemi import CEMIFrame from xknx.core import XknxConnectionState, XknxConnectionType @@ -60,7 +60,7 @@ class _Tunnel(Interface): "sequence_number", ) - connection_type: XknxConnectionType + connection_type: ClassVar[XknxConnectionType] transport: KNXIPTransport def __init__( @@ -117,9 +117,7 @@ async def connect(self) -> None: Raise CommunicationError when not successful. """ - self.connection_state_changed( - XknxConnectionState.CONNECTING, self.connection_type - ) + self.connection_state_changed(XknxConnectionState.CONNECTING) try: await self.transport.connect() await self.setup_tunnel() @@ -130,9 +128,7 @@ async def connect(self) -> None: type(ex).__name__, ex, ) - self.connection_state_changed( - XknxConnectionState.DISCONNECTED - ) + self.connection_state_changed(XknxConnectionState.DISCONNECTED) # close transport to prevent open file descriptors self.transport.stop() raise CommunicationError( @@ -140,9 +136,7 @@ async def connect(self) -> None: ) from ex self._tunnel_established() - self.connection_state_changed( - XknxConnectionState.CONNECTED, self.connection_type - ) + self.connection_state_changed(XknxConnectionState.CONNECTED) def _tunnel_established(self) -> None: """Set up interface when the tunnel is ready.""" @@ -454,7 +448,7 @@ class UDPTunnel(_Tunnel): "route_back", ) - connection_type = XknxConnectionType.TUNNEL_UDP + connection_type: ClassVar[XknxConnectionType] = XknxConnectionType.TUNNEL_UDP transport: UDPTransport def __init__( @@ -698,7 +692,7 @@ class TCPTunnel(_Tunnel): __slots__ = ("gateway_ip", "gateway_port") - connection_type = XknxConnectionType.TUNNEL_TCP + connection_type: ClassVar[XknxConnectionType] = XknxConnectionType.TUNNEL_TCP transport: TCPTransport def __init__( @@ -744,7 +738,7 @@ class SecureTunnel(TCPTunnel): __slots__ = ("_device_authentication_password", "_user_id", "_user_password") - connection_type = XknxConnectionType.TUNNEL_SECURE + connection_type: ClassVar[XknxConnectionType] = XknxConnectionType.TUNNEL_SECURE transport: SecureSession def __init__( From fd3b91f604cbc06c6b90c8fa5fd430e13cc06837 Mon Sep 17 00:00:00 2001 From: matssun <129606238+matssun@users.noreply.github.com> Date: Sun, 8 Mar 2026 18:59:33 +0100 Subject: [PATCH 3/3] Update xknx/io/interface.py Co-authored-by: Matthias Alphart --- xknx/io/interface.py | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/xknx/io/interface.py b/xknx/io/interface.py index 9bef818f9..1127a1b4a 100644 --- a/xknx/io/interface.py +++ b/xknx/io/interface.py @@ -52,12 +52,6 @@ def connection_state_changed(self, state: XknxConnectionState) -> None: state, self.connection_type ) - @property - def current_address(self) -> IndividualAddress: - """Get current individual address.""" - return self.xknx.current_address - - @current_address.setter - def current_address(self, address: IndividualAddress) -> None: + def _set_individual_address(self, address: IndividualAddress) -> None: """Set current individual address.""" self.xknx.current_address = address