1
1
import unittest
2
+ import asyncio
2
3
3
4
import canopen
4
5
@@ -8,97 +9,122 @@ def count_subscribers(network: canopen.Network) -> int:
8
9
return sum (len (n ) for n in network .subscribers .values ())
9
10
10
11
11
- class TestLocalNode (unittest .TestCase ):
12
+ class BaseTests :
13
+ class TestLocalNode (unittest .IsolatedAsyncioTestCase ):
12
14
13
- @classmethod
14
- def setUpClass (cls ):
15
- cls .network = canopen .Network ()
16
- cls .network .NOTIFIER_SHUTDOWN_TIMEOUT = 0.0
17
- cls .network .connect (interface = "virtual" )
15
+ use_async : bool
18
16
19
- cls .node = canopen .LocalNode (2 , canopen .objectdictionary .ObjectDictionary ())
17
+ def setUp (self ):
18
+ loop = None
19
+ if self .use_async :
20
+ loop = asyncio .get_event_loop ()
20
21
21
- @ classmethod
22
- def tearDownClass ( cls ):
23
- cls .network .disconnect ( )
22
+ self . network = canopen . Network ( loop = loop )
23
+ self . network . NOTIFIER_SHUTDOWN_TIMEOUT = 0.0
24
+ self .network .connect ( interface = "virtual" )
24
25
25
- def test_associate_network (self ):
26
- # Need to store the number of subscribers before associating because the
27
- # network implementation automatically adds subscribers to the list
28
- n_subscribers = count_subscribers (self .network )
26
+ self .node = canopen .LocalNode (2 , canopen .objectdictionary .ObjectDictionary ())
29
27
30
- # Associating the network with the local node
31
- self .node .associate_network (self .network )
32
- self .assertIs (self .node .network , self .network )
33
- self .assertIs (self .node .sdo .network , self .network )
34
- self .assertIs (self .node .tpdo .network , self .network )
35
- self .assertIs (self .node .rpdo .network , self .network )
36
- self .assertIs (self .node .nmt .network , self .network )
37
- self .assertIs (self .node .emcy .network , self .network )
28
+ def tearDown (self ):
29
+ self .network .disconnect ()
38
30
39
- # Test that its not possible to associate the network multiple times
40
- with self .assertRaises (RuntimeError ) as cm :
31
+ async def test_associate_network (self ):
32
+ # Need to store the number of subscribers before associating because the
33
+ # network implementation automatically adds subscribers to the list
34
+ n_subscribers = count_subscribers (self .network )
35
+
36
+ # Associating the network with the local node
41
37
self .node .associate_network (self .network )
42
- self .assertIn ("already associated with a network" , str (cm .exception ))
43
-
44
- # Test removal of the network. The count of subscribers should
45
- # be the same as before the association
46
- self .node .remove_network ()
47
- uninitalized = canopen .network ._UNINITIALIZED_NETWORK
48
- self .assertIs (self .node .network , uninitalized )
49
- self .assertIs (self .node .sdo .network , uninitalized )
50
- self .assertIs (self .node .tpdo .network , uninitalized )
51
- self .assertIs (self .node .rpdo .network , uninitalized )
52
- self .assertIs (self .node .nmt .network , uninitalized )
53
- self .assertIs (self .node .emcy .network , uninitalized )
54
- self .assertEqual (count_subscribers (self .network ), n_subscribers )
55
-
56
- # Test that its possible to deassociate the network multiple times
57
- self .node .remove_network ()
58
-
59
-
60
- class TestRemoteNode (unittest .TestCase ):
61
-
62
- @classmethod
63
- def setUpClass (cls ):
64
- cls .network = canopen .Network ()
65
- cls .network .NOTIFIER_SHUTDOWN_TIMEOUT = 0.0
66
- cls .network .connect (interface = "virtual" )
67
-
68
- cls .node = canopen .RemoteNode (2 , canopen .objectdictionary .ObjectDictionary ())
69
-
70
- @classmethod
71
- def tearDownClass (cls ):
72
- cls .network .disconnect ()
73
-
74
- def test_associate_network (self ):
75
- # Need to store the number of subscribers before associating because the
76
- # network implementation automatically adds subscribers to the list
77
- n_subscribers = count_subscribers (self .network )
78
-
79
- # Associating the network with the local node
80
- self .node .associate_network (self .network )
81
- self .assertIs (self .node .network , self .network )
82
- self .assertIs (self .node .sdo .network , self .network )
83
- self .assertIs (self .node .tpdo .network , self .network )
84
- self .assertIs (self .node .rpdo .network , self .network )
85
- self .assertIs (self .node .nmt .network , self .network )
86
-
87
- # Test that its not possible to associate the network multiple times
88
- with self .assertRaises (RuntimeError ) as cm :
38
+ self .assertIs (self .node .network , self .network )
39
+ self .assertIs (self .node .sdo .network , self .network )
40
+ self .assertIs (self .node .tpdo .network , self .network )
41
+ self .assertIs (self .node .rpdo .network , self .network )
42
+ self .assertIs (self .node .nmt .network , self .network )
43
+ self .assertIs (self .node .emcy .network , self .network )
44
+
45
+ # Test that its not possible to associate the network multiple times
46
+ with self .assertRaises (RuntimeError ) as cm :
47
+ self .node .associate_network (self .network )
48
+ self .assertIn ("already associated with a network" , str (cm .exception ))
49
+
50
+ # Test removal of the network. The count of subscribers should
51
+ # be the same as before the association
52
+ self .node .remove_network ()
53
+ uninitalized = canopen .network ._UNINITIALIZED_NETWORK
54
+ self .assertIs (self .node .network , uninitalized )
55
+ self .assertIs (self .node .sdo .network , uninitalized )
56
+ self .assertIs (self .node .tpdo .network , uninitalized )
57
+ self .assertIs (self .node .rpdo .network , uninitalized )
58
+ self .assertIs (self .node .nmt .network , uninitalized )
59
+ self .assertIs (self .node .emcy .network , uninitalized )
60
+ self .assertEqual (count_subscribers (self .network ), n_subscribers )
61
+
62
+ # Test that its possible to deassociate the network multiple times
63
+ self .node .remove_network ()
64
+
65
+
66
+ class TestRemoteNode (unittest .IsolatedAsyncioTestCase ):
67
+
68
+ use_async : bool
69
+
70
+ def setUp (self ):
71
+ loop = None
72
+ if self .use_async :
73
+ loop = asyncio .get_event_loop ()
74
+
75
+ self .network = canopen .Network (loop = loop )
76
+ self .network .NOTIFIER_SHUTDOWN_TIMEOUT = 0.0
77
+ self .network .connect (interface = "virtual" )
78
+
79
+ self .node = canopen .RemoteNode (2 , canopen .objectdictionary .ObjectDictionary ())
80
+
81
+ def tearDown (self ):
82
+ self .network .disconnect ()
83
+
84
+ async def test_associate_network (self ):
85
+ # Need to store the number of subscribers before associating because the
86
+ # network implementation automatically adds subscribers to the list
87
+ n_subscribers = count_subscribers (self .network )
88
+
89
+ # Associating the network with the local node
89
90
self .node .associate_network (self .network )
90
- self .assertIn ("already associated with a network" , str (cm .exception ))
91
-
92
- # Test removal of the network. The count of subscribers should
93
- # be the same as before the association
94
- self .node .remove_network ()
95
- uninitalized = canopen .network ._UNINITIALIZED_NETWORK
96
- self .assertIs (self .node .network , uninitalized )
97
- self .assertIs (self .node .sdo .network , uninitalized )
98
- self .assertIs (self .node .tpdo .network , uninitalized )
99
- self .assertIs (self .node .rpdo .network , uninitalized )
100
- self .assertIs (self .node .nmt .network , uninitalized )
101
- self .assertEqual (count_subscribers (self .network ), n_subscribers )
102
-
103
- # Test that its possible to deassociate the network multiple times
104
- self .node .remove_network ()
91
+ self .assertIs (self .node .network , self .network )
92
+ self .assertIs (self .node .sdo .network , self .network )
93
+ self .assertIs (self .node .tpdo .network , self .network )
94
+ self .assertIs (self .node .rpdo .network , self .network )
95
+ self .assertIs (self .node .nmt .network , self .network )
96
+
97
+ # Test that its not possible to associate the network multiple times
98
+ with self .assertRaises (RuntimeError ) as cm :
99
+ self .node .associate_network (self .network )
100
+ self .assertIn ("already associated with a network" , str (cm .exception ))
101
+
102
+ # Test removal of the network. The count of subscribers should
103
+ # be the same as before the association
104
+ self .node .remove_network ()
105
+ uninitalized = canopen .network ._UNINITIALIZED_NETWORK
106
+ self .assertIs (self .node .network , uninitalized )
107
+ self .assertIs (self .node .sdo .network , uninitalized )
108
+ self .assertIs (self .node .tpdo .network , uninitalized )
109
+ self .assertIs (self .node .rpdo .network , uninitalized )
110
+ self .assertIs (self .node .nmt .network , uninitalized )
111
+ self .assertEqual (count_subscribers (self .network ), n_subscribers )
112
+
113
+ # Test that its possible to deassociate the network multiple times
114
+ self .node .remove_network ()
115
+
116
+
117
+ class TestLocalNodeSync (BaseTests .TestLocalNode ):
118
+ use_async = False
119
+
120
+
121
+ class TestLocalNodeAsync (BaseTests .TestLocalNode ):
122
+ use_async = True
123
+
124
+
125
+ class TestRemoteNodeSync (BaseTests .TestRemoteNode ):
126
+ use_async = False
127
+
128
+
129
+ class TestRemoteNodeAsync (BaseTests .TestRemoteNode ):
130
+ use_async = True
0 commit comments