Skip to content

Commit 03b732c

Browse files
committed
Rewrite Tornado AsyncModbusSerialClient - #533
1 parent 0eefa9b commit 03b732c

File tree

3 files changed

+168
-3
lines changed

3 files changed

+168
-3
lines changed

pymodbus/client/asynchronous/tornado/__init__.py

Lines changed: 153 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import logging
99

10+
import time
1011
import socket
1112
from serial import Serial
1213
from tornado import gen
@@ -17,8 +18,13 @@
1718

1819
from pymodbus.client.asynchronous.mixins import (AsyncModbusClientMixin,
1920
AsyncModbusSerialClientMixin)
20-
from pymodbus.exceptions import ConnectionException
2121
from pymodbus.compat import byte2int
22+
from pymodbus.exceptions import (ConnectionException,
23+
ModbusIOException,
24+
TimeOutException)
25+
from pymodbus.utilities import (hexlify_packets,
26+
ModbusTransactionState)
27+
from pymodbus.constants import Defaults
2228

2329
LOGGER = logging.getLogger(__name__)
2430

@@ -291,6 +297,24 @@ class AsyncModbusSerialClient(BaseTornadoSerialClient):
291297
"""
292298
Tornado based asynchronous serial client
293299
"""
300+
def __init__(self, *args, **kwargs):
301+
"""
302+
Initializes AsyncModbusSerialClient.
303+
:param args:
304+
:param kwargs:
305+
"""
306+
self.state = ModbusTransactionState.IDLE
307+
self.timeout = kwargs.get('timeout', Defaults.Timeout)
308+
self.baudrate = kwargs.get('baudrate', Defaults.Baudrate)
309+
if self.baudrate > 19200:
310+
self.silent_interval = 1.75 / 1000 # ms
311+
else:
312+
self._t0 = float((1 + 8 + 2)) / self.baudrate
313+
self.silent_interval = 3.5 * self._t0
314+
self.silent_interval = round(self.silent_interval, 6)
315+
self.last_frame_end = 0.0
316+
super().__init__(*args, **kwargs)
317+
294318
def get_socket(self):
295319
"""
296320
Creates Pyserial object
@@ -318,6 +342,134 @@ def connect(self):
318342

319343
raise gen.Return(self)
320344

345+
def execute(self, request):
346+
"""
347+
Executes a transaction
348+
:param request: Request to be written on to the bus
349+
:return:
350+
"""
351+
request.transaction_id = self.transaction.getNextTID()
352+
353+
def _clear_timer():
354+
"""
355+
Clear serial waiting timeout
356+
"""
357+
if self.timeout_handle:
358+
self.io_loop.remove_timeout(self.timeout_handle)
359+
self.timeout_handle = None
360+
361+
def _on_timeout():
362+
"""
363+
Got timeout while waiting data from serial port
364+
"""
365+
LOGGER.warning("serial receive timeout")
366+
_clear_timer()
367+
if self.stream:
368+
self.io_loop.remove_handler(self.stream.fileno())
369+
self.framer.resetFrame()
370+
transaction = self.transaction.getTransaction(request.transaction_id)
371+
if transaction:
372+
transaction.set_exception(TimeOutException())
373+
374+
def _on_write_done(*args):
375+
"""
376+
Set up reader part after sucessful write to the serial
377+
"""
378+
LOGGER.debug("frame sent, waiting for a reply")
379+
self.last_frame_end = round(time.time(), 6)
380+
self.state = ModbusTransactionState.WAITING_FOR_REPLY
381+
self.io_loop.add_handler(self.stream.fileno(), _on_receive, IOLoop.READ)
382+
383+
def _on_fd_error(fd, *args):
384+
_clear_timer()
385+
self.io_loop.remove_handler(fd)
386+
self.close()
387+
self.transaction.getTransaction(request.transaction_id).set_exception(ModbusIOException(*args))
388+
389+
def _on_receive(fd, events):
390+
"""
391+
New data in serial buffer to read or serial port closed
392+
"""
393+
if events & IOLoop.ERROR:
394+
_on_fd_error(fd)
395+
return
396+
397+
try:
398+
waiting = self.stream.connection.in_waiting
399+
if waiting:
400+
data = self.stream.connection.read(waiting)
401+
LOGGER.debug(
402+
"recv: " + hexlify_packets(data))
403+
self.last_frame_end = round(time.time(), 6)
404+
except OSError as ex:
405+
_on_fd_error(fd, ex)
406+
return
407+
408+
self.framer.addToFrame(data)
409+
410+
# check if we have regular frame or modbus exception
411+
fcode = self.framer.decode_data(self.framer.getRawFrame()).get("fcode", 0)
412+
if fcode and (
413+
(fcode > 0x80 and len(self.framer.getRawFrame()) == exception_response_length)
414+
or
415+
(len(self.framer.getRawFrame()) == expected_response_length)
416+
):
417+
_clear_timer()
418+
self.io_loop.remove_handler(fd)
419+
self.state = ModbusTransactionState.IDLE
420+
self.framer.processIncomingPacket(
421+
b'', # already sent via addToFrame()
422+
self._handle_response,
423+
0, # don't care when `single=True`
424+
single=True,
425+
tid=request.transaction_id
426+
)
427+
428+
packet = self.framer.buildPacket(request)
429+
f = self._build_response(request.transaction_id)
430+
431+
response_pdu_size = request.get_response_pdu_size()
432+
expected_response_length = self.transaction._calculate_response_length(response_pdu_size)
433+
LOGGER.debug("expected_response_length = %d", expected_response_length)
434+
435+
exception_response_length = self.transaction._calculate_exception_length() # TODO: calculate once
436+
437+
if self.timeout:
438+
self.timeout_handle = self.io_loop.add_timeout(time.time() + self.timeout, _on_timeout)
439+
self._sendPacket(packet, callback=_on_write_done)
440+
441+
return f
442+
443+
def _sendPacket(self, message, callback):
444+
"""
445+
Sends packets on the bus with 3.5char delay between frames
446+
:param message: Message to be sent over the bus
447+
:return:
448+
"""
449+
@gen.coroutine
450+
def sleep(timeout):
451+
yield gen.sleep(timeout)
452+
453+
try:
454+
waiting = self.stream.connection.in_waiting
455+
if waiting:
456+
result = self.stream.connection.read(waiting)
457+
LOGGER.info(
458+
"Cleanup recv buffer before send: " + hexlify_packets(result))
459+
except OSError as e:
460+
self.transaction.getTransaction(request.transaction_id).set_exception(ModbusIOException(e))
461+
return
462+
463+
start = time.time()
464+
if self.last_frame_end:
465+
waittime = self.last_frame_end + self.silent_interval - start
466+
if waittime > 0:
467+
LOGGER.debug("Waiting for 3.5 char before next send - %f ms", waittime)
468+
sleep(waittime)
469+
470+
self.state = ModbusTransactionState.SENDING
471+
LOGGER.debug("send: " + hexlify_packets(message))
472+
self.stream.write(message, callback)
321473

322474
class AsyncModbusTCPClient(BaseTornadoClient):
323475
"""

pymodbus/exceptions.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,17 @@ def __init__(self, string=""):
105105
message = '[Error registering message] %s' % string
106106
ModbusException.__init__(self, message)
107107

108+
class TimeOutException(ModbusException):
109+
""" Error resulting from modbus response timeout """
110+
111+
def __init__(self, string=""):
112+
""" Initialize the exception
113+
114+
:param string: The message to append to the error
115+
"""
116+
message = "[Timeout] %s" % string
117+
ModbusException.__init__(self, message)
118+
108119

109120
# --------------------------------------------------------------------------- #
110121
# Exported symbols
@@ -114,5 +125,5 @@ def __init__(self, string=""):
114125
"ParameterException", "NotImplementedException",
115126
"ConnectionException", "NoSuchSlaveException",
116127
"InvalidMessageReceivedException",
117-
"MessageRegisterException"
128+
"MessageRegisterException", "TimeOutException"
118129
]

test/test_client_async_tornado.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -228,10 +228,12 @@ def testSerialClientExecute(self, mock_serial, mock_seriostream, mock_ioloop):
228228
client = AsyncModbusSerialClient(ioloop=schedulers.IO_LOOP,
229229
framer=ModbusRtuFramer(
230230
ClientDecoder()),
231-
port=SERIAL_PORT)
231+
port=SERIAL_PORT,
232+
timeout=0)
232233
client.connect()
233234
client.stream = Mock()
234235
client.stream.write = Mock()
236+
client.stream.connection.read.return_value = b''
235237

236238
request = ReadCoilsRequest(1, 1)
237239
d = client.execute(request)

0 commit comments

Comments
 (0)