|
1 | 1 | #!/usr/bin/env python |
| 2 | +import contextlib |
| 3 | +import sys |
2 | 4 | import unittest |
3 | 5 | import pytest |
4 | 6 | from pymodbus.compat import IS_PYTHON3, PYTHON_VERSION |
@@ -47,6 +49,15 @@ def mock_asyncio_gather(coro): |
47 | 49 | return coro |
48 | 50 |
|
49 | 51 |
|
| 52 | +@contextlib.contextmanager |
| 53 | +def maybe_manage(condition, manager): |
| 54 | + if condition: |
| 55 | + with manager as value: |
| 56 | + yield value |
| 57 | + else: |
| 58 | + yield None |
| 59 | + |
| 60 | + |
50 | 61 | class TestAsynchronousClient(object): |
51 | 62 | """ |
52 | 63 | This is the unittest for the pymodbus.client.asynchronous module |
@@ -216,24 +227,26 @@ def handle_failure(failure): |
216 | 227 | ("ascii", ModbusAsciiFramer)]) |
217 | 228 | def testSerialTornadoClient(self, method, framer): |
218 | 229 | """ Test the serial tornado client client initialize """ |
219 | | - protocol, future = AsyncModbusSerialClient(schedulers.IO_LOOP, method=method, port=SERIAL_PORT) |
220 | | - client = future.result() |
221 | | - assert(isinstance(client, AsyncTornadoModbusSerialClient)) |
222 | | - assert(0 == len(list(client.transaction))) |
223 | | - assert(isinstance(client.framer, framer)) |
224 | | - assert(client.port == SERIAL_PORT) |
225 | | - assert(client._connected) |
226 | | - |
227 | | - def handle_failure(failure): |
228 | | - assert(isinstance(failure.exception(), ConnectionException)) |
229 | | - |
230 | | - d = client._build_response(0x00) |
231 | | - d.add_done_callback(handle_failure) |
232 | | - |
233 | | - assert(client._connected) |
234 | | - client.close() |
235 | | - protocol.stop() |
236 | | - assert(not client._connected) |
| 230 | + from serial import Serial |
| 231 | + with maybe_manage(sys.platform == 'darwin', patch.object(Serial, "open")): |
| 232 | + protocol, future = AsyncModbusSerialClient(schedulers.IO_LOOP, method=method, port=SERIAL_PORT) |
| 233 | + client = future.result() |
| 234 | + assert(isinstance(client, AsyncTornadoModbusSerialClient)) |
| 235 | + assert(0 == len(list(client.transaction))) |
| 236 | + assert(isinstance(client.framer, framer)) |
| 237 | + assert(client.port == SERIAL_PORT) |
| 238 | + assert(client._connected) |
| 239 | + |
| 240 | + def handle_failure(failure): |
| 241 | + assert(isinstance(failure.exception(), ConnectionException)) |
| 242 | + |
| 243 | + d = client._build_response(0x00) |
| 244 | + d.add_done_callback(handle_failure) |
| 245 | + |
| 246 | + assert(client._connected) |
| 247 | + client.close() |
| 248 | + protocol.stop() |
| 249 | + assert(not client._connected) |
237 | 250 |
|
238 | 251 | @pytest.mark.skipif(IS_PYTHON3 , reason="requires python2.7") |
239 | 252 | def testSerialAsyncioClientPython2(self): |
|
0 commit comments