Skip to content

Commit 5004363

Browse files
Tests: improve EMCY coverage (#529)
1 parent 28ee18c commit 5004363

File tree

1 file changed

+203
-54
lines changed

1 file changed

+203
-54
lines changed

test/test_emcy.py

Lines changed: 203 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1,61 +1,210 @@
1+
import logging
2+
import threading
13
import unittest
2-
from canopen import emcy
34

4-
5-
class TestEmcyConsumer(unittest.TestCase):
6-
7-
def test_emcy_list(self):
8-
emcy_node = emcy.EmcyConsumer()
9-
emcy_node.on_emcy(0x81, b'\x01\x20\x02\x00\x01\x02\x03\x04', 1473418396.0)
10-
emcy_node.on_emcy(0x81, b'\x10\x90\x01\x00\x01\x02\x03\x04', 1473418397.0)
11-
12-
self.assertEqual(len(emcy_node.log), 2)
13-
self.assertEqual(len(emcy_node.active), 2)
14-
15-
error = emcy_node.log[0]
16-
self.assertIsInstance(error, emcy.EmcyError)
17-
self.assertIsInstance(error, Exception)
5+
import can
6+
import canopen
7+
from canopen.emcy import EmcyError
8+
9+
10+
TIMEOUT = 0.1
11+
12+
13+
class TestEmcy(unittest.TestCase):
14+
def setUp(self):
15+
self.emcy = canopen.emcy.EmcyConsumer()
16+
17+
def check_error(self, err, code, reg, data, ts):
18+
self.assertIsInstance(err, EmcyError)
19+
self.assertIsInstance(err, Exception)
20+
self.assertEqual(err.code, code)
21+
self.assertEqual(err.register, reg)
22+
self.assertEqual(err.data, data)
23+
self.assertAlmostEqual(err.timestamp, ts)
24+
25+
def test_emcy_consumer_on_emcy(self):
26+
# Make sure multiple callbacks receive the same information.
27+
acc1 = []
28+
acc2 = []
29+
self.emcy.add_callback(lambda err: acc1.append(err))
30+
self.emcy.add_callback(lambda err: acc2.append(err))
31+
32+
# Dispatch an EMCY datagram.
33+
self.emcy.on_emcy(0x81, b'\x01\x20\x02\x00\x01\x02\x03\x04', 1000)
34+
35+
self.assertEqual(len(self.emcy.log), 1)
36+
self.assertEqual(len(self.emcy.active), 1)
37+
38+
error = self.emcy.log[0]
39+
self.assertEqual(self.emcy.active[0], error)
40+
for err in error, acc1[0], acc2[0]:
41+
self.check_error(
42+
error, code=0x2001, reg=0x02,
43+
data=bytes([0, 1, 2, 3, 4]), ts=1000,
44+
)
45+
46+
# Dispatch a new EMCY datagram.
47+
self.emcy.on_emcy(0x81, b'\x10\x90\x01\x04\x03\x02\x01\x00', 2000)
48+
self.assertEqual(len(self.emcy.log), 2)
49+
self.assertEqual(len(self.emcy.active), 2)
50+
51+
error = self.emcy.log[1]
52+
self.assertEqual(self.emcy.active[1], error)
53+
for err in error, acc1[1], acc2[1]:
54+
self.check_error(
55+
error, code=0x9010, reg=0x01,
56+
data=bytes([4, 3, 2, 1, 0]), ts=2000,
57+
)
58+
59+
# Dispatch an EMCY reset.
60+
self.emcy.on_emcy(0x81, b'\x00\x00\x00\x00\x00\x00\x00\x00', 2000)
61+
self.assertEqual(len(self.emcy.log), 3)
62+
self.assertEqual(len(self.emcy.active), 0)
63+
64+
def test_emcy_consumer_reset(self):
65+
self.emcy.on_emcy(0x81, b'\x01\x20\x02\x00\x01\x02\x03\x04', 1000)
66+
self.emcy.on_emcy(0x81, b'\x10\x90\x01\x04\x03\x02\x01\x00', 2000)
67+
self.assertEqual(len(self.emcy.log), 2)
68+
self.assertEqual(len(self.emcy.active), 2)
69+
70+
self.emcy.reset()
71+
self.assertEqual(len(self.emcy.log), 0)
72+
self.assertEqual(len(self.emcy.active), 0)
73+
74+
def test_emcy_consumer_wait(self):
75+
PAUSE = TIMEOUT / 4
76+
77+
def push_err():
78+
self.emcy.on_emcy(0x81, b'\x01\x20\x01\x01\x02\x03\x04\x05', 100)
79+
80+
def check_err(err):
81+
self.assertIsNotNone(err)
82+
self.check_error(
83+
err, code=0x2001, reg=1,
84+
data=bytes([1, 2, 3, 4, 5]), ts=100,
85+
)
86+
87+
# Check unfiltered wait, on timeout.
88+
self.assertIsNone(self.emcy.wait(timeout=TIMEOUT))
89+
90+
# Check unfiltered wait, on success.
91+
timer = threading.Timer(PAUSE, push_err)
92+
timer.start()
93+
with self.assertLogs(level=logging.INFO):
94+
err = self.emcy.wait(timeout=TIMEOUT)
95+
check_err(err)
96+
97+
# Check filtered wait, on success.
98+
timer = threading.Timer(PAUSE, push_err)
99+
timer.start()
100+
with self.assertLogs(level=logging.INFO):
101+
err = self.emcy.wait(0x2001, TIMEOUT)
102+
check_err(err)
103+
104+
# Check filtered wait, on timeout.
105+
timer = threading.Timer(PAUSE, push_err)
106+
timer.start()
107+
self.assertIsNone(self.emcy.wait(0x9000, TIMEOUT))
108+
109+
def push_reset():
110+
self.emcy.on_emcy(0x81, b'\x00\x00\x00\x00\x00\x00\x00\x00', 100)
111+
112+
timer = threading.Timer(PAUSE, push_reset)
113+
timer.start()
114+
self.assertIsNone(self.emcy.wait(0x9000, TIMEOUT))
115+
116+
117+
class TestEmcyError(unittest.TestCase):
118+
def test_emcy_error(self):
119+
error = EmcyError(0x2001, 0x02, b'\x00\x01\x02\x03\x04', 1000)
18120
self.assertEqual(error.code, 0x2001)
19-
self.assertEqual(error.register, 0x02)
20121
self.assertEqual(error.data, b'\x00\x01\x02\x03\x04')
21-
self.assertAlmostEqual(error.timestamp, 1473418396.0)
22-
self.assertEqual(emcy_node.active[0], error)
23-
24-
error = emcy_node.log[1]
25-
self.assertEqual(error.code, 0x9010)
26-
self.assertEqual(error.register, 0x01)
27-
self.assertEqual(error.data, b'\x00\x01\x02\x03\x04')
28-
self.assertAlmostEqual(error.timestamp, 1473418397.0)
29-
self.assertEqual(emcy_node.active[1], error)
30-
31-
emcy_node.on_emcy(0x81, b'\x00\x00\x00\x00\x00\x00\x00\x00', 1473418397.0)
32-
self.assertEqual(len(emcy_node.log), 3)
33-
self.assertEqual(len(emcy_node.active), 0)
34-
35-
def test_str(self):
36-
error = emcy.EmcyError(0x2001, 0x02, b'\x00\x01\x02\x03\x04', 1473418396.0)
37-
self.assertEqual(str(error), "Code 0x2001, Current")
38-
39-
error = emcy.EmcyError(0x50FF, 0x01, b'\x00\x01\x02\x03\x04', 1473418396.0)
40-
self.assertEqual(str(error), "Code 0x50FF, Device Hardware")
41-
42-
error = emcy.EmcyError(0x7100, 0x01, b'\x00\x01\x02\x03\x04', 1473418396.0)
43-
self.assertEqual(str(error), "Code 0x7100")
44-
45-
46-
class MockNetwork(object):
47-
48-
data = None
49-
50-
def send_message(self, can_id, data):
51-
self.data = data
122+
self.assertEqual(error.register, 2)
123+
self.assertEqual(error.timestamp, 1000)
124+
125+
def test_emcy_str(self):
126+
def check(code, expected):
127+
err = EmcyError(code, 1, b'', 1000)
128+
actual = str(err)
129+
self.assertEqual(actual, expected)
130+
131+
check(0x2001, "Code 0x2001, Current")
132+
check(0x3abc, "Code 0x3ABC, Voltage")
133+
check(0x0234, "Code 0x0234")
134+
check(0xbeef, "Code 0xBEEF")
135+
136+
def test_emcy_get_desc(self):
137+
def check(code, expected):
138+
err = EmcyError(code, 1, b'', 1000)
139+
actual = err.get_desc()
140+
self.assertEqual(actual, expected)
141+
142+
check(0x0000, "Error Reset / No Error")
143+
check(0x00ff, "Error Reset / No Error")
144+
check(0x0100, "")
145+
check(0x1000, "Generic Error")
146+
check(0x10ff, "Generic Error")
147+
check(0x1100, "")
148+
check(0x2000, "Current")
149+
check(0x2fff, "Current")
150+
check(0x3000, "Voltage")
151+
check(0x3fff, "Voltage")
152+
check(0x4000, "Temperature")
153+
check(0x4fff, "Temperature")
154+
check(0x5000, "Device Hardware")
155+
check(0x50ff, "Device Hardware")
156+
check(0x5100, "")
157+
check(0x6000, "Device Software")
158+
check(0x6fff, "Device Software")
159+
check(0x7000, "Additional Modules")
160+
check(0x70ff, "Additional Modules")
161+
check(0x7100, "")
162+
check(0x8000, "Monitoring")
163+
check(0x8fff, "Monitoring")
164+
check(0x9000, "External Error")
165+
check(0x90ff, "External Error")
166+
check(0x9100, "")
167+
check(0xf000, "Additional Functions")
168+
check(0xf0ff, "Additional Functions")
169+
check(0xf100, "")
170+
check(0xff00, "Device Specific")
171+
check(0xffff, "Device Specific")
52172

53173

54174
class TestEmcyProducer(unittest.TestCase):
55-
56-
def test_send(self):
57-
network = MockNetwork()
58-
emcy_node = emcy.EmcyProducer(0x80 + 1)
59-
emcy_node.network = network
60-
emcy_node.send(0x2001, 0x2, b'\x00\x01\x02\x03\x04')
61-
self.assertEqual(network.data, b'\x01\x20\x02\x00\x01\x02\x03\x04')
175+
def setUp(self):
176+
self.txbus = can.Bus(interface="virtual")
177+
self.rxbus = can.Bus(interface="virtual")
178+
self.net = canopen.Network(self.txbus)
179+
self.net.connect()
180+
self.emcy = canopen.emcy.EmcyProducer(0x80 + 1)
181+
self.emcy.network = self.net
182+
183+
def tearDown(self):
184+
self.net.disconnect()
185+
self.txbus.shutdown()
186+
self.rxbus.shutdown()
187+
188+
def check_response(self, expected):
189+
msg = self.rxbus.recv(TIMEOUT)
190+
self.assertIsNotNone(msg)
191+
actual = msg.data
192+
self.assertEqual(actual, expected)
193+
194+
def test_emcy_producer_send(self):
195+
def check(*args, res):
196+
self.emcy.send(*args)
197+
self.check_response(res)
198+
199+
check(0x2001, res=b'\x01\x20\x00\x00\x00\x00\x00\x00')
200+
check(0x2001, 0x2, res=b'\x01\x20\x02\x00\x00\x00\x00\x00')
201+
check(0x2001, 0x2, b'\x2a', res=b'\x01\x20\x02\x2a\x00\x00\x00\x00')
202+
203+
def test_emcy_producer_reset(self):
204+
def check(*args, res):
205+
self.emcy.reset(*args)
206+
self.check_response(res)
207+
208+
check(res=b'\x00\x00\x00\x00\x00\x00\x00\x00')
209+
check(3, res=b'\x00\x00\x03\x00\x00\x00\x00\x00')
210+
check(3, b"\xaa\xbb", res=b'\x00\x00\x03\xaa\xbb\x00\x00\x00')

0 commit comments

Comments
 (0)