Skip to content

Commit e7efbe5

Browse files
authored
move reconnect to transport. (#1507)
1 parent 369d45e commit e7efbe5

File tree

14 files changed

+409
-233
lines changed

14 files changed

+409
-233
lines changed

API_changes.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ Version 3.3.0
99
- general parameter unit= -> slave=
1010
- move SqlSlaveContext, RedisSlaveContext to examples/contrib (due to lack of maintenance)
1111
- :code:`BinaryPayloadBuilder.to_string` was renamed to :code:`BinaryPayloadBuilder.encode`
12+
- on_reconnect_callback for async clients works slightly different
1213

1314
-------------
1415
Version 3.2.0

pymodbus/client/base.py

Lines changed: 33 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ class ModbusBaseClient(ModbusClientMixin, BaseTransport):
4040
and not repeated with each client.
4141
4242
.. tip::
43-
**delay_ms** doubles automatically with each unsuccessful connect, from
43+
**reconnect_delay** doubles automatically with each unsuccessful connect, from
4444
**reconnect_delay** to **reconnect_delay_max**.
4545
Set `reconnect_delay=0` to avoid automatic reconnection.
4646
@@ -94,7 +94,9 @@ def __init__( # pylint: disable=too-many-arguments
9494
**kwargs: Any,
9595
) -> None:
9696
"""Initialize a client instance."""
97-
BaseTransport.__init__(self)
97+
BaseTransport.__init__(
98+
self, "comm", framer, reconnect_delay, reconnect_delay_max, timeout, timeout
99+
)
98100
self.params = self._params()
99101
self.params.framer = framer
100102
self.params.timeout = float(timeout)
@@ -107,13 +109,19 @@ def __init__( # pylint: disable=too-many-arguments
107109
self.reconnect_delay_max = int(reconnect_delay_max)
108110
self.on_reconnect_callback = on_reconnect_callback
109111
self.params.kwargs = kwargs
112+
self.retry_on_empty: int = 0
113+
# -> retry read on nothing
114+
115+
self.slaves: list[int] = []
116+
# -> list of acceptable slaves (0 for accept all)
110117

111118
# Common variables.
112119
self.framer = self.params.framer(ClientDecoder(), self)
113120
self.transaction = DictTransactionManager(
114121
self, retries=retries, retry_on_empty=retry_on_empty, **kwargs
115122
)
116-
self.delay_ms = self.params.reconnect_delay
123+
self.reconnect_delay = self.params.reconnect_delay
124+
self.reconnect_delay_current = self.params.reconnect_delay
117125
self.use_protocol = False
118126
self.use_udp = False
119127
self.state = ModbusTransactionState.IDLE
@@ -138,15 +146,6 @@ def register(self, custom_response_class: ModbusResponse) -> None:
138146
"""
139147
self.framer.decoder.register(custom_response_class)
140148

141-
def connect(self):
142-
"""Connect to the modbus remote host (call **sync/async**).
143-
144-
:raises ModbusException: Different exceptions, check exception text.
145-
146-
**Remark** Retries are handled automatically after first successful connect.
147-
"""
148-
raise NotImplementedException
149-
150149
def is_socket_open(self) -> bool:
151150
"""Return whether socket/serial is open or not (call **sync**)."""
152151
raise NotImplementedException
@@ -161,10 +160,6 @@ def idle_time(self) -> float:
161160
return 0
162161
return self.last_frame_end + self.silent_interval
163162

164-
def reset_delay(self) -> None:
165-
"""Reset wait time before next reconnect to minimal period (call **sync**)."""
166-
self.delay_ms = self.params.reconnect_delay
167-
168163
def execute(self, request: ModbusRequest = None) -> ModbusResponse:
169164
"""Execute request and get response (call **sync/async**).
170165
@@ -199,23 +194,10 @@ async def async_execute(self, request=None):
199194
try:
200195
resp = await asyncio.wait_for(req, timeout=self.params.timeout)
201196
except asyncio.exceptions.TimeoutError:
202-
self.connection_lost("trying to send")
197+
self.close(reconnect=True)
203198
raise
204199
return resp
205200

206-
def connection_lost(self, reason):
207-
"""Call when the connection is lost or closed.
208-
209-
The argument is either an exception object or None
210-
"""
211-
Log.debug("Client disconnected from modbus server: {}", reason)
212-
self.close(reconnect=True)
213-
for tid in list(self.transaction):
214-
self.raise_future(
215-
self.transaction.getTransaction(tid),
216-
ConnectionException("Connection lost during request"),
217-
)
218-
219201
def data_received(self, data):
220202
"""Call when some data is received.
221203
@@ -224,9 +206,25 @@ def data_received(self, data):
224206
Log.debug("recv: {}", data, ":hex")
225207
self.framer.processIncomingPacket(data, self._handle_response, slave=0)
226208

227-
def create_future(self):
228-
"""Help function to create asyncio Future object."""
229-
return asyncio.Future()
209+
def cb_handle_data(self, _data: bytes) -> int:
210+
"""Handle received data
211+
212+
returns number of bytes consumed
213+
"""
214+
215+
def cb_connection_made(self) -> None:
216+
"""Handle new connection"""
217+
218+
def cb_connection_lost(self, _reason: Exception) -> None:
219+
"""Handle lost connection"""
220+
for tid in list(self.transaction):
221+
self.raise_future(
222+
self.transaction.getTransaction(tid),
223+
ConnectionException("Connection lost during request"),
224+
)
225+
226+
async def connect(self):
227+
"""Connect to the modbus remote host."""
230228

231229
def raise_future(self, my_future, exc):
232230
"""Set exception of a future if not done."""
@@ -245,49 +243,17 @@ def _handle_response(self, reply, **_kwargs):
245243

246244
def _build_response(self, tid):
247245
"""Return a deferred response for the current request."""
248-
my_future = self.create_future()
246+
my_future = asyncio.Future()
249247
if not self.transport:
250248
self.raise_future(my_future, ConnectionException("Client is not connected"))
251249
else:
252250
self.transaction.addTransaction(my_future, tid)
253251
return my_future
254252

255-
def close(self, reconnect: bool = False) -> None:
256-
"""Close connection.
257-
258-
:param reconnect: (default false), try to reconnect
259-
"""
260-
if self.transport:
261-
if hasattr(self.transport, "_sock"):
262-
self.transport._sock.close() # pylint: disable=protected-access
263-
self.transport.abort()
264-
self.transport.close()
265-
self.transport = None
266-
if self._reconnect_task:
267-
self._reconnect_task.cancel()
268-
self._reconnect_task = None
269-
270-
if not reconnect or not self.delay_ms:
271-
self.delay_ms = 0
272-
return
273-
274-
self._reconnect_task = asyncio.create_task(self._reconnect())
275-
276-
async def _reconnect(self):
277-
"""Reconnect."""
278-
Log.debug("Waiting {} ms before next connection attempt.", self.delay_ms)
279-
await asyncio.sleep(self.delay_ms / 1000)
280-
self.delay_ms = min(2 * self.delay_ms, self.reconnect_delay_max)
281-
282-
self._reconnect_task = None
283-
if self.on_reconnect_callback:
284-
self.on_reconnect_callback()
285-
return await self.connect()
286-
287253
# ----------------------------------------------------------------------- #
288254
# Internal methods
289255
# ----------------------------------------------------------------------- #
290-
def send(self, request):
256+
def send(self, request): # pylint: disable=invalid-overridden-method
291257
"""Send request.
292258
293259
:meta private:

pymodbus/client/serial.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ def connected(self):
7878
"""Connect internal."""
7979
return self.transport is not None
8080

81-
async def connect(self): # pylint: disable=invalid-overridden-method
81+
async def connect(self):
8282
"""Connect Async client."""
8383
# get current loop, if there are no loop a RuntimeError will be raised
8484
Log.debug("Starting serial connection")
@@ -101,6 +101,7 @@ async def connect(self): # pylint: disable=invalid-overridden-method
101101
except Exception as exc: # pylint: disable=broad-except
102102
Log.warning("Failed to connect: {}", exc)
103103
self.close(reconnect=True)
104+
self.reset_delay()
104105
return self.connected
105106

106107

@@ -185,7 +186,7 @@ def connected(self):
185186
"""Connect internal."""
186187
return self.connect()
187188

188-
def connect(self):
189+
def connect(self): # pylint: disable=invalid-overridden-method
189190
"""Connect to the modbus serial server."""
190191
if self.socket:
191192
return True

pymodbus/client/tcp.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,12 +51,11 @@ def __init__(
5151
self.params.host = host
5252
self.params.port = port
5353
self.params.source_address = source_address
54-
self.delay_ms = self.params.reconnect_delay
5554

56-
async def connect(self): # pylint: disable=invalid-overridden-method
55+
async def connect(self):
5756
"""Initiate connection to start client."""
5857

59-
# if delay_ms was set to 0 by close(), we need to set it back again
58+
# if reconnect_delay_current was set to 0 by close(), we need to set it back again
6059
# so this instance will work
6160
self.reset_delay()
6261

@@ -145,9 +144,9 @@ def __init__(
145144
@property
146145
def connected(self):
147146
"""Connect internal."""
148-
return self.connect()
147+
return self.transport is not None
149148

150-
def connect(self):
149+
def connect(self): # pylint: disable=invalid-overridden-method
151150
"""Connect to the modbus tcp server."""
152151
if self.socket:
153152
return True

pymodbus/client/tls.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ def __init__(
166166
@property
167167
def connected(self):
168168
"""Connect internal."""
169-
return self.connect()
169+
return self.transport is not None
170170

171171
def connect(self):
172172
"""Connect to the modbus tls server."""

pymodbus/client/udp.py

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -54,15 +54,13 @@ def __init__(
5454
self.params.host = host
5555
self.params.port = port
5656
self.params.source_address = source_address
57-
self.delay_ms = self.params.reconnect_delay
58-
self.reset_delay()
5957

6058
@property
6159
def connected(self):
6260
"""Return true if connected."""
6361
return self.transport is not None
6462

65-
async def connect(self): # pylint: disable=invalid-overridden-method
63+
async def connect(self):
6664
"""Start reconnecting asynchronous udp client.
6765
6866
:meta private:
@@ -96,6 +94,7 @@ async def _connect(self):
9694
except Exception as exc: # pylint: disable=broad-except
9795
Log.warning("Failed to connect: {}", exc)
9896
self.close(reconnect=True)
97+
self.reset_delay()
9998

10099

101100
class ModbusUdpClient(ModbusBaseClient):
@@ -140,7 +139,7 @@ def __init__(
140139

141140
self.socket = None
142141

143-
def connect(self):
142+
def connect(self): # pylint: disable=invalid-overridden-method
144143
"""Connect to the modbus tcp server.
145144
146145
:meta private:
@@ -190,9 +189,7 @@ def is_socket_open(self):
190189
191190
:meta private:
192191
"""
193-
if self.socket:
194-
return True
195-
return self.connect()
192+
return True
196193

197194
def __str__(self):
198195
"""Build a string representation of the connection."""

pymodbus/server/async_io.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -386,7 +386,7 @@ class ModbusDisconnectedRequestHandler(
386386
def __init__(self, owner):
387387
"""Initialize."""
388388
super().__init__(owner)
389-
_future = asyncio.get_running_loop().create_future()
389+
_future = asyncio.Future()
390390
self.server.on_connection_terminated = _future
391391

392392
def connection_lost(self, call_exc):
@@ -533,7 +533,7 @@ def __init__(
533533
self.control.Identity.update(identity)
534534

535535
# asyncio future that will be done once server has started
536-
self.serving = self.loop.create_future()
536+
self.serving = asyncio.Future()
537537
# constructors cannot be declared async, so we have to
538538
# defer the initialization of the server
539539
self.server = None
@@ -641,7 +641,7 @@ def __init__(
641641
self.control.Identity.update(identity)
642642

643643
# asyncio future that will be done once server has started
644-
self.serving = self.loop.create_future()
644+
self.serving = asyncio.Future()
645645
# constructors cannot be declared async, so we have to
646646
# defer the initialization of the server
647647
self.server = None
@@ -824,7 +824,7 @@ def __init__(
824824
self.endpoint = None
825825
self.on_connection_terminated = None
826826
# asyncio future that will be done once server has started
827-
self.serving = self.loop.create_future()
827+
self.serving = asyncio.Future()
828828
self.factory_parms = {
829829
"local_addr": self.address,
830830
"allow_broadcast": True,

pymodbus/transport/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,4 @@
44
"BaseTransport",
55
]
66

7-
from pymodbus.transport.base import BaseTransport
7+
from pymodbus.transport.transport import BaseTransport

0 commit comments

Comments
 (0)