11"""API Client for Systemair VSR ventilation units using Modbus TCP."""
22
33import asyncio
4- from typing import Any
4+ import contextlib
5+ from typing import Any , NoReturn
56
67from pymodbus .client import AsyncModbusTcpClient
7- from pymodbus .exceptions import ModbusException
8+ from pymodbus .exceptions import ConnectionException
89
910from .const import LOGGER
10- from .modbus import parameter_map
1111
1212MODBUS_DEVICE_BUSY_EXCEPTION = 6
13+ MODBUS_GATEWAY_TARGET_FAILED_TO_RESPOND = 11
1314
1415
1516class ModbusConnectionError (Exception ):
@@ -21,71 +22,147 @@ class SystemairVSRModbusClient:
2122
2223 def __init__ (self , host : str , port : int , slave_id : int , timeout : int = 5 ) -> None :
2324 """Initialize the Modbus client."""
24- self ._client = AsyncModbusTcpClient (host , port = port , timeout = timeout )
25+ self ._host = host
26+ self ._port = port
27+ self ._timeout = timeout
2528 self .slave_id = slave_id
29+
30+ self ._client : AsyncModbusTcpClient | None = None
2631 self ._lock = asyncio .Lock ()
27- self ._is_connected = False
32+ self ._is_running = False
33+ self ._worker_task : asyncio .Task | None = None
34+ self ._request_queue : asyncio .Queue = asyncio .Queue ()
2835
29- async def close (self ) -> None :
30- """Close the Modbus connection."""
31- if self ._is_connected :
32- self ._client .close ()
33- self ._is_connected = False
36+ async def start (self ) -> None :
37+ """Start the client and the background worker."""
38+ async with self ._lock :
39+ if self ._is_running :
40+ return
41+ self ._is_running = True
42+ self ._worker_task = asyncio .create_task (self ._modbus_worker ())
43+ LOGGER .info ("Systemair Modbus client worker started." )
44+
45+ async def stop (self ) -> None :
46+ """Stop the client and the background worker."""
47+ async with self ._lock :
48+ if not self ._is_running :
49+ return
50+ self ._is_running = False
51+ if self ._worker_task :
52+ self ._worker_task .cancel ()
53+ with contextlib .suppress (asyncio .CancelledError ):
54+ await self ._worker_task
55+ await self ._close_connection ()
56+ LOGGER .info ("Systemair Modbus client worker stopped." )
3457
3558 async def _ensure_connected (self ) -> None :
3659 """Ensure the client is connected, establishing connection if needed."""
37- if not self ._is_connected :
38- self ._is_connected = await self ._client .connect ()
39- if not self ._is_connected :
40- msg = "Could not connect to VSR unit"
41- raise ModbusConnectionError (msg )
42-
43- async def test_connection (self ) -> bool :
44- """Test the connection to the Modbus device."""
45- async with self ._lock :
46- try :
47- await self ._ensure_connected ()
48- test_register_1based = parameter_map ["REG_TC_SP" ].register
49- await self ._client .read_holding_registers (address = test_register_1based - 1 , count = 1 , device_id = self .slave_id )
50- except (ModbusException , ModbusConnectionError ) as e :
51- LOGGER .error ("Failed to connect during test: %s" , e )
52- return False
53- else :
54- return True
55- finally :
56- await self .close ()
60+ if self ._client and self ._client .connected :
61+ return
5762
58- async def write_register (self , address_1based : int , value : int ) -> None :
59- """Write a single holding register with retry logic."""
60- max_retries = 3
61- retry_delay = 0.3
63+ LOGGER .debug ("Connecting to Modbus device at %s:%s" , self ._host , self ._port )
64+ self ._client = AsyncModbusTcpClient (host = self ._host , port = self ._port , timeout = self ._timeout )
65+ if not await self ._client .connect ():
66+ msg = f"Could not connect to VSR unit at { self ._host } :{ self ._port } "
67+ raise ModbusConnectionError (msg )
68+ LOGGER .debug ("Modbus connection successful." )
6269
63- async with self ._lock :
64- last_exception = None
65- for attempt in range (max_retries ):
66- try :
67- await self ._ensure_connected ()
68- result = await self ._client .write_register (address = address_1based - 1 , value = value , device_id = self .slave_id )
69- if not result .isError ():
70- LOGGER .debug (f"Successfully wrote { value } to register { address_1based } " )
71- return
70+ async def _close_connection (self ) -> None :
71+ """Close the Modbus connection."""
72+ if self ._client :
73+ self ._client .close ()
74+ self ._client = None
75+ LOGGER .debug ("Modbus connection closed." )
76+
77+ def _raise_unknown_request_type (self , request_type : str ) -> NoReturn :
78+ """Raise ValueError for an unknown request type."""
79+ msg = f"Unknown request type: { request_type } "
80+ raise ValueError (msg )
81+
82+ def _raise_unrecoverable_modbus_error (self , result : Any ) -> NoReturn :
83+ """Raise ModbusConnectionError for unrecoverable errors."""
84+ msg = f"Unrecoverable Modbus error: { result } "
85+ raise ModbusConnectionError (msg )
7286
73- last_exception = result
74- LOGGER .debug (f"Write error on register { address_1based } , attempt { attempt + 1 } : { result } " )
87+ async def _execute_request (self , request_type : str , address : int , ** kwargs : Any ) -> list [int ] | bool :
88+ """Execute a single Modbus request with robust retry and reconnect logic."""
89+ max_retries = 5
90+ base_delay = 0.2
7591
76- except ModbusException as e :
77- last_exception = e
78- LOGGER . debug ( f"Write exception on register { address_1based } , attempt { attempt + 1 } : { e } " )
92+ for attempt in range ( max_retries ) :
93+ try :
94+ await self . _ensure_connected ( )
7995
80- if attempt < max_retries - 1 :
81- await asyncio .sleep (retry_delay )
96+ if request_type == "read" :
97+ result = await self ._client .read_holding_registers (address = address , count = kwargs ["count" ], device_id = self .slave_id )
98+ elif request_type == "write" :
99+ result = await self ._client .write_register (address = address , value = kwargs ["value" ], device_id = self .slave_id )
100+ else :
101+ self ._raise_unknown_request_type (request_type )
102+
103+ if not result .isError ():
104+ return result .registers if request_type == "read" else True
105+
106+ if result .exception_code in [
107+ MODBUS_DEVICE_BUSY_EXCEPTION ,
108+ MODBUS_GATEWAY_TARGET_FAILED_TO_RESPOND ,
109+ ]:
110+ delay = base_delay * (2 ** attempt )
111+ LOGGER .debug (
112+ "Device busy/unresponsive (code %s) on %s. Retrying in %.2fs..." ,
113+ result .exception_code ,
114+ request_type ,
115+ delay ,
116+ )
117+ await asyncio .sleep (delay )
118+ else :
119+ self ._raise_unrecoverable_modbus_error (result )
120+
121+ except (TimeoutError , ConnectionException , ModbusConnectionError ) as e :
122+ LOGGER .warning ("Connection error during %s: %s. Attempting to reconnect..." , request_type , e )
123+ await self ._close_connection ()
124+ await asyncio .sleep (1 )
125+
126+ except Exception as e :
127+ LOGGER .error ("Unexpected error during Modbus %s: %s" , request_type , e , exc_info = True )
128+ raise
129+
130+ msg = f"Failed to execute Modbus { request_type } after { max_retries } attempts."
131+ raise ModbusConnectionError (msg )
132+
133+ async def _modbus_worker (self ) -> None :
134+ """Process requests from the queue."""
135+ while self ._is_running :
136+ try :
137+ request_type , address , future , kwargs = await self ._request_queue .get ()
82138
83- msg = f"Failed to write to register { address_1based } after { max_retries } attempts: { last_exception } "
84- await self .close ()
139+ try :
140+ result = await self ._execute_request (request_type , address , ** kwargs )
141+ future .set_result (result )
142+ except (ModbusConnectionError , ValueError ) as e :
143+ future .set_exception (e )
144+ finally :
145+ self ._request_queue .task_done ()
146+ except asyncio .CancelledError :
147+ break
148+ LOGGER .debug ("Modbus worker shutting down." )
149+
150+ async def _queue_request (self , request_type : str , address : int , ** kwargs : Any ) -> Any :
151+ """Add a request to the queue and wait for its completion."""
152+ if not self ._is_running :
153+ msg = "Client is not running. Call start() first."
85154 raise ModbusConnectionError (msg )
86155
156+ future = asyncio .Future ()
157+ self ._request_queue .put_nowait ((request_type , address , future , kwargs ))
158+ return await future
159+
160+ async def write_register (self , address_1based : int , value : int ) -> None :
161+ """Queue a write request for a single holding register."""
162+ await self ._queue_request ("write" , address = address_1based - 1 , value = value )
163+
87164 async def get_all_data (self ) -> dict [str , Any ]:
88- """Read all required registers using a robust, paced, and fault-tolerant approach ."""
165+ """Queue read requests for all required data blocks and assemble the result ."""
89166 read_blocks = [
90167 (1001 , 62 ),
91168 (1101 , 80 ),
@@ -115,42 +192,25 @@ async def get_all_data(self) -> dict[str, Any]:
115192 (15891 , 13 ),
116193 ]
117194
118- all_registers = {}
119- max_retries = 3
120- retry_delay = 0.3
121-
122- async with self ._lock :
123- await self ._ensure_connected ()
124- for start_addr_1based , count in read_blocks :
125- block_success = False
126- for attempt in range (max_retries ):
127- try :
128- result = await self ._client .read_holding_registers (
129- address = start_addr_1based - 1 , count = count , device_id = self .slave_id
130- )
131-
132- if not result .isError ():
133- for i , reg_val in enumerate (result .registers ):
134- key = str (start_addr_1based - 1 + i )
135- all_registers [key ] = reg_val
136- block_success = True
137- break
138-
139- LOGGER .debug (f"Modbus error on block { start_addr_1based } , attempt { attempt + 1 } : { result } . Retrying..." )
195+ tasks = [self ._queue_request ("read" , address = start - 1 , count = count ) for start , count in read_blocks ]
140196
141- except ModbusException as e :
142- LOGGER .debug (f"Modbus exception on block { start_addr_1based } , attempt { attempt + 1 } : { e } . Retrying..." )
197+ results = await asyncio .gather (* tasks , return_exceptions = True )
143198
144- if attempt < max_retries - 1 :
145- await asyncio .sleep (retry_delay )
146-
147- if not block_success :
148- LOGGER .error (f"Failed to read block { start_addr_1based } after { max_retries } attempts. Continuing with next blocks." )
149-
150- await asyncio .sleep (0.15 )
151-
152- if not all_registers :
153- msg = "Failed to read any data from the device after multiple retries."
199+ all_registers = {}
200+ has_successful_read = False
201+ for i , result in enumerate (results ):
202+ start_addr_1based , _ = read_blocks [i ]
203+ if isinstance (result , Exception ):
204+ LOGGER .error (f"Failed to read block { start_addr_1based } : { result } " )
205+ continue
206+
207+ has_successful_read = True
208+ for offset , reg_val in enumerate (result ):
209+ key = str (start_addr_1based - 1 + offset )
210+ all_registers [key ] = reg_val
211+
212+ if not has_successful_read :
213+ msg = "Failed to read any data blocks from the device."
154214 raise ModbusConnectionError (msg )
155215
156216 return all_registers
0 commit comments