3434
3535import pytest
3636
37- #
37+ #
3838# We require *explicit* access to the /dev/ttys[012] serial ports to perform this test -- we must
3939# not assume that can safely use the serial port(s), or that they are configured for us to run our
4040# tests! Therefore, we will run these tests ONLY if "serial" tests are explicitly called for
4343# They must be configured as RS485, in the following multi-drop pattern:
4444#
4545# - master - - slaves -----------------
46- # ttyS0(COM1) --> ttys1 (COM2) --> ttys2 (COM3)
47- #
46+ # ttyS0(COM1) --> ttyS1 (COM2) --> ttyS2 (COM3)
47+ #
4848# The Modbus Master will be on ttyS0, and two Modbus slave-ids (unit numbers) will be simulated on
4949# each of ttyS1 and ttyS2. Since they each must ignore requests to slave-ids they do not simulate,
5050# pymodbus >= 1.3.0 is required.
51- #
51+ #
5252PORT_MASTER = "ttyS0"
5353PORT_SLAVES = {
5454 "ttyS1" : [1 ,3 ],
6868 has_pyserial = True
6969except ImportError :
7070 logging .warning ( "Failed to import pyserial module; skipping Modbus/RTU related tests; run 'pip install pyserial'" )
71-
71+
7272has_minimalmodbus = False
7373try :
7474 # Configure minimalmodbus to use the specified port serial framing
@@ -135,7 +135,6 @@ def test_pymodbus_rs485_sync():
135135 # handle_local_echo=False,
136136 )
137137
138- # Start the server on
139138 import asyncio
140139 from contextlib import suppress
141140
@@ -193,14 +192,14 @@ async def server_start( port, unit ):
193192 ** serial_args ,
194193 )
195194 client .connect ()
196-
197- rr1 = client .read_coils ( 1 , 1 , slave = 1 )
198- rr2 = client .read_coils ( 2 , 1 , slave = 2 )
195+
196+ rr1 = client .read_coils ( 1 , count = 1 , slave = 1 )
197+ rr2 = client .read_coils ( 2 , count = 1 , slave = 2 )
199198 assert (( not rr2 .isError () and rr2 .bits [0 ] == True ) or
200- ( not rr1 .isError () and rr .bits [0 ] == False ))
201- rr3 = client .read_coils ( 2 , 1 , slave = 3 )
199+ ( not rr1 .isError () and rr1 .bits [0 ] == False ))
200+ rr3 = client .read_coils ( 2 , count = 1 , slave = 3 )
202201 assert rr3 .isError ()
203-
202+
204203 client .close ()
205204 del client
206205
@@ -217,7 +216,7 @@ def reader():
217216 with client :
218217 unit = 1 + a % len (SERVER_ttyS )
219218 expect = not bool ( a % 2 )
220- rr = client .read_coils ( 1 , a , slave = unit )
219+ rr = client .read_coils ( a , count = 1 , slave = unit )
221220 if rr .isError () or rr .bits [0 ] != expect :
222221 logging .warning ( "Expected unit {unit} coil {a} == {expect}, got {val}" .format (
223222 unit = unit , a = a , expect = expect , val = ( rr if rr .isError () else rr .bits [0 ] )))
@@ -238,7 +237,6 @@ def reader():
238237 asyncio .run_coroutine_threadsafe ( s .shutdown (), l )
239238 for u in servers :
240239 servers [u ].join ()
241-
242240
243241
244242RTU_TIMEOUT = 0.1 # latency while simulated slave awaits next incoming byte
@@ -275,7 +273,7 @@ def simulated_modbus_rtu( tty ):
275273
276274@pytest .fixture ( scope = "module" )
277275def simulated_modbus_rtu_ttyS1 ( request ):
278- command ,address = simulated_modbus_rtu ( "ttyS1" )
276+ command ,address = simulated_modbus_rtu ( "ttyS1" )
279277 request .addfinalizer ( command .kill )
280278 return command ,address
281279
@@ -342,7 +340,7 @@ def test_rs485_poll( simulated_modbus_rtu_ttyS1 ):
342340 assert success
343341 assert elapsed < 1.0
344342 assert plc .read ( 40001 ) == 0
345-
343+
346344 assert plc .read ( 1 ) == None
347345 assert plc .read ( 40002 ) == None
348346 success ,elapsed = waitfor ( lambda : plc .read ( 40002 ) is not None , "40002 polled" , ** wfkw )
@@ -358,7 +356,7 @@ def test_rs485_poll( simulated_modbus_rtu_ttyS1 ):
358356 success ,elapsed = waitfor ( lambda : plc .read ( 40001 ) == 99 , "40001 polled" , ** wfkw )
359357 assert success
360358 assert elapsed < 1.0
361-
359+
362360 # See if we converge on our target poll time
363361 count = plc .counter
364362 while plc .counter < count + 20 :
0 commit comments