|
1 | 1 | import logging |
2 | 2 | import threading |
3 | 3 | import unittest |
| 4 | +import asyncio |
4 | 5 | from contextlib import contextmanager |
5 | 6 |
|
6 | 7 | import can |
@@ -181,44 +182,61 @@ def check(code, expected): |
181 | 182 | check(0xffff, "Device Specific") |
182 | 183 |
|
183 | 184 |
|
184 | | -class TestEmcyProducer(unittest.TestCase): |
185 | | - def setUp(self): |
186 | | - self.txbus = can.Bus(interface="virtual") |
187 | | - self.rxbus = can.Bus(interface="virtual") |
188 | | - self.net = canopen.Network(self.txbus) |
189 | | - self.net.NOTIFIER_SHUTDOWN_TIMEOUT = 0.0 |
190 | | - self.net.connect() |
191 | | - self.emcy = canopen.emcy.EmcyProducer(0x80 + 1) |
192 | | - self.emcy.network = self.net |
193 | | - |
194 | | - def tearDown(self): |
195 | | - self.net.disconnect() |
196 | | - self.txbus.shutdown() |
197 | | - self.rxbus.shutdown() |
198 | | - |
199 | | - def check_response(self, expected): |
200 | | - msg = self.rxbus.recv(TIMEOUT) |
201 | | - self.assertIsNotNone(msg) |
202 | | - actual = msg.data |
203 | | - self.assertEqual(actual, expected) |
204 | | - |
205 | | - def test_emcy_producer_send(self): |
206 | | - def check(*args, res): |
207 | | - self.emcy.send(*args) |
208 | | - self.check_response(res) |
209 | | - |
210 | | - check(0x2001, res=b'\x01\x20\x00\x00\x00\x00\x00\x00') |
211 | | - check(0x2001, 0x2, res=b'\x01\x20\x02\x00\x00\x00\x00\x00') |
212 | | - check(0x2001, 0x2, b'\x2a', res=b'\x01\x20\x02\x2a\x00\x00\x00\x00') |
213 | | - |
214 | | - def test_emcy_producer_reset(self): |
215 | | - def check(*args, res): |
216 | | - self.emcy.reset(*args) |
217 | | - self.check_response(res) |
218 | | - |
219 | | - check(res=b'\x00\x00\x00\x00\x00\x00\x00\x00') |
220 | | - check(3, res=b'\x00\x00\x03\x00\x00\x00\x00\x00') |
221 | | - check(3, b"\xaa\xbb", res=b'\x00\x00\x03\xaa\xbb\x00\x00\x00') |
| 185 | +class BaseTests: |
| 186 | + |
| 187 | + class TestEmcyProducer(unittest.IsolatedAsyncioTestCase): |
| 188 | + |
| 189 | + use_async: bool |
| 190 | + |
| 191 | + def setUp(self): |
| 192 | + loop = None |
| 193 | + if self.use_async: |
| 194 | + loop = asyncio.get_event_loop() |
| 195 | + |
| 196 | + self.txbus = can.Bus(interface="virtual", loop=loop) |
| 197 | + self.rxbus = can.Bus(interface="virtual", loop=loop) |
| 198 | + self.net = canopen.Network(self.txbus, loop=loop) |
| 199 | + self.net.NOTIFIER_SHUTDOWN_TIMEOUT = 0.0 |
| 200 | + self.net.connect() |
| 201 | + self.emcy = canopen.emcy.EmcyProducer(0x80 + 1) |
| 202 | + self.emcy.network = self.net |
| 203 | + |
| 204 | + def tearDown(self): |
| 205 | + self.net.disconnect() |
| 206 | + self.txbus.shutdown() |
| 207 | + self.rxbus.shutdown() |
| 208 | + |
| 209 | + def check_response(self, expected): |
| 210 | + msg = self.rxbus.recv(TIMEOUT) # FIXME: This probably needs to be looked at for async. |
| 211 | + self.assertIsNotNone(msg) |
| 212 | + actual = msg.data |
| 213 | + self.assertEqual(actual, expected) |
| 214 | + |
| 215 | + async def test_emcy_producer_send(self): |
| 216 | + def check(*args, res): |
| 217 | + self.emcy.send(*args) |
| 218 | + self.check_response(res) |
| 219 | + |
| 220 | + check(0x2001, res=b'\x01\x20\x00\x00\x00\x00\x00\x00') |
| 221 | + check(0x2001, 0x2, res=b'\x01\x20\x02\x00\x00\x00\x00\x00') |
| 222 | + check(0x2001, 0x2, b'\x2a', res=b'\x01\x20\x02\x2a\x00\x00\x00\x00') |
| 223 | + |
| 224 | + async def test_emcy_producer_reset(self): |
| 225 | + def check(*args, res): |
| 226 | + self.emcy.reset(*args) |
| 227 | + self.check_response(res) |
| 228 | + |
| 229 | + check(res=b'\x00\x00\x00\x00\x00\x00\x00\x00') |
| 230 | + check(3, res=b'\x00\x00\x03\x00\x00\x00\x00\x00') |
| 231 | + check(3, b"\xaa\xbb", res=b'\x00\x00\x03\xaa\xbb\x00\x00\x00') |
| 232 | + |
| 233 | + |
| 234 | +class TestEmcyProducerSync(BaseTests.TestEmcyProducer): |
| 235 | + use_async = False |
| 236 | + |
| 237 | + |
| 238 | +class TestEmcyProducerAsync(BaseTests.TestEmcyProducer): |
| 239 | + use_async = True |
222 | 240 |
|
223 | 241 |
|
224 | 242 | if __name__ == "__main__": |
|
0 commit comments