17
17
import logging
18
18
import sys
19
19
20
- from can import CanError , BusABC , Message
20
+ from can import BusABC , Message
21
+ import can .typechecking
22
+ from ..exceptions import (
23
+ CanError ,
24
+ CanInterfaceNotImplementedError ,
25
+ CanOperationError ,
26
+ CanInitializationError ,
27
+ )
28
+ from typing import Optional , Tuple , Type
29
+
21
30
22
31
logger = logging .getLogger (__name__ )
23
32
@@ -74,15 +83,48 @@ class TxMessageStruct(ctypes.Structure):
74
83
]
75
84
76
85
77
- def check_status (result , function , arguments ):
86
+ class NicanError (CanError ):
87
+ """Error from NI-CAN driver."""
88
+
89
+ def __init__ (self , function , error_code : int , arguments ) -> None :
90
+ super ().__init__ (
91
+ message = f"{ function } failed: { get_error_message (self .error_code )} " ,
92
+ error_code = error_code ,
93
+ )
94
+
95
+ #: Function that failed
96
+ self .function = function
97
+
98
+ #: Arguments passed to function
99
+ self .arguments = arguments
100
+
101
+
102
+ class NicanInitializationError (NicanError , CanInitializationError ):
103
+ pass
104
+
105
+
106
+ class NicanOperationError (NicanError , CanOperationError ):
107
+ pass
108
+
109
+
110
+ def check_status (
111
+ result : int ,
112
+ function ,
113
+ arguments ,
114
+ error_class : Type [NicanError ] = NicanOperationError ,
115
+ ) -> int :
78
116
if result > 0 :
79
117
logger .warning (get_error_message (result ))
80
118
elif result < 0 :
81
- raise NicanError (function , result , arguments )
119
+ raise error_class (function , result , arguments )
82
120
return result
83
121
84
122
85
- def get_error_message (status_code ):
123
+ def check_status_init (* args , ** kwargs ) -> int :
124
+ return check_status (* args , ** kwargs , error_class = NicanInitializationError )
125
+
126
+
127
+ def get_error_message (status_code : int ) -> str :
86
128
"""Convert status code to descriptive string."""
87
129
errmsg = ctypes .create_string_buffer (1024 )
88
130
nican .ncStatusToString (status_code , len (errmsg ), errmsg )
@@ -102,21 +144,28 @@ def get_error_message(status_code):
102
144
ctypes .c_void_p ,
103
145
ctypes .c_void_p ,
104
146
]
105
- nican .ncConfig .errcheck = check_status
147
+ nican .ncConfig .errcheck = check_status_init
148
+
106
149
nican .ncOpenObject .argtypes = [ctypes .c_char_p , ctypes .c_void_p ]
107
- nican .ncOpenObject .errcheck = check_status
150
+ nican .ncOpenObject .errcheck = check_status_init
151
+
108
152
nican .ncCloseObject .errcheck = check_status
153
+
109
154
nican .ncAction .argtypes = [ctypes .c_ulong , ctypes .c_ulong , ctypes .c_ulong ]
110
155
nican .ncAction .errcheck = check_status
156
+
111
157
nican .ncRead .errcheck = check_status
158
+
112
159
nican .ncWrite .errcheck = check_status
160
+
113
161
nican .ncWaitForState .argtypes = [
114
162
ctypes .c_ulong ,
115
163
ctypes .c_ulong ,
116
164
ctypes .c_ulong ,
117
165
ctypes .c_void_p ,
118
166
]
119
167
nican .ncWaitForState .errcheck = check_status
168
+
120
169
nican .ncStatusToString .argtypes = [ctypes .c_int , ctypes .c_uint , ctypes .c_char_p ]
121
170
else :
122
171
nican = None
@@ -137,37 +186,42 @@ class NicanBus(BusABC):
137
186
"""
138
187
139
188
def __init__ (
140
- self , channel , can_filters = None , bitrate = None , log_errors = True , ** kwargs
141
- ):
189
+ self ,
190
+ channel : str ,
191
+ can_filters : Optional [can .typechecking .CanFilters ] = None ,
192
+ bitrate : Optional [int ] = None ,
193
+ log_errors : bool = True ,
194
+ ** kwargs ,
195
+ ) -> None :
142
196
"""
143
- :param str channel:
144
- Name of the object to open (e.g. ' CAN0' )
197
+ :param channel:
198
+ Name of the object to open (e.g. `" CAN0"` )
145
199
146
- :param int bitrate:
147
- Bitrate in bits /s
200
+ :param bitrate:
201
+ Bitrate in bit /s
148
202
149
- :param list can_filters:
203
+ :param can_filters:
150
204
See :meth:`can.BusABC.set_filters`.
151
205
152
- :param bool log_errors:
206
+ :param log_errors:
153
207
If True, communication errors will appear as CAN messages with
154
208
``is_error_frame`` set to True and ``arbitration_id`` will identify
155
209
the error (default True)
156
210
157
- :raises can.interfaces.nican.NicanError:
158
- If starting communication fails
159
-
211
+ :raise can.CanInterfaceNotImplementedError:
212
+ If the current operating system is not supported or the driver could not be loaded.
213
+ :raise can.interfaces.nican.NicanInitializationError:
214
+ If the bus could not be set up.
160
215
"""
161
216
if nican is None :
162
- raise ImportError (
217
+ raise CanInterfaceNotImplementedError (
163
218
"The NI-CAN driver could not be loaded. "
164
219
"Check that you are using 32-bit Python on Windows."
165
220
)
166
221
167
222
self .channel = channel
168
- self .channel_info = "NI-CAN: " + channel
169
- if not isinstance (channel , bytes ):
170
- channel = channel .encode ()
223
+ self .channel_info = f"NI-CAN: { channel } "
224
+ channel_bytes = channel .encode ("ascii" )
171
225
172
226
config = [(NC_ATTR_START_ON_OPEN , True ), (NC_ATTR_LOG_COMM_ERRS , log_errors )]
173
227
@@ -208,31 +262,33 @@ def __init__(
208
262
attr_id_list = AttrList (* (row [0 ] for row in config ))
209
263
attr_value_list = AttrList (* (row [1 ] for row in config ))
210
264
nican .ncConfig (
211
- channel ,
265
+ channel_bytes ,
212
266
len (config ),
213
267
ctypes .byref (attr_id_list ),
214
268
ctypes .byref (attr_value_list ),
215
269
)
216
270
217
271
self .handle = ctypes .c_ulong ()
218
- nican .ncOpenObject (channel , ctypes .byref (self .handle ))
272
+ nican .ncOpenObject (channel_bytes , ctypes .byref (self .handle ))
219
273
220
274
super ().__init__ (
221
275
channel = channel ,
222
276
can_filters = can_filters ,
223
277
bitrate = bitrate ,
224
278
log_errors = log_errors ,
225
- ** kwargs
279
+ ** kwargs ,
226
280
)
227
281
228
- def _recv_internal (self , timeout ):
282
+ def _recv_internal (
283
+ self , timeout : Optional [float ]
284
+ ) -> Tuple [Optional [Message ], bool ]:
229
285
"""
230
286
Read a message from a NI-CAN bus.
231
287
232
- :param float timeout:
233
- Max time to wait in seconds or None if infinite
288
+ :param timeout:
289
+ Max time to wait in seconds or `` None`` if infinite
234
290
235
- :raises can.interfaces.nican.NicanError :
291
+ :raises can.interfaces.nican.NicanOperationError :
236
292
If reception fails
237
293
"""
238
294
if timeout is None :
@@ -274,14 +330,19 @@ def _recv_internal(self, timeout):
274
330
)
275
331
return msg , True
276
332
277
- def send (self , msg , timeout = None ):
333
+ def send (self , msg : Message , timeout : Optional [ float ] = None ) -> None :
278
334
"""
279
335
Send a message to NI-CAN.
280
336
281
- :param can.Message msg:
337
+ :param msg:
282
338
Message to send
283
339
284
- :raises can.interfaces.nican.NicanError:
340
+ :param timeout:
341
+ The timeout
342
+
343
+ .. warning:: This gets ignored.
344
+
345
+ :raises can.interfaces.nican.NicanOperationError:
285
346
If writing to transmit buffer fails.
286
347
It does not wait for message to be ACKed currently.
287
348
"""
@@ -299,36 +360,19 @@ def send(self, msg, timeout=None):
299
360
# Maybe it is possible to use ncCreateNotification instead but seems a
300
361
# bit overkill at the moment.
301
362
# state = ctypes.c_ulong()
302
- # nican.ncWaitForState(
303
- # self.handle, NC_ST_WRITE_SUCCESS, int(timeout * 1000), ctypes.byref(state))
363
+ # nican.ncWaitForState(self.handle, NC_ST_WRITE_SUCCESS, int(timeout * 1000), ctypes.byref(state))
304
364
305
- def reset (self ):
365
+ def reset (self ) -> None :
306
366
"""
307
367
Resets network interface. Stops network interface, then resets the CAN
308
368
chip to clear the CAN error counters (clear error passive state).
309
369
Resetting includes clearing all entries from read and write queues.
370
+
371
+ :raises can.interfaces.nican.NicanOperationError:
372
+ If resetting fails.
310
373
"""
311
374
nican .ncAction (self .handle , NC_OP_RESET , 0 )
312
375
313
- def shutdown (self ):
376
+ def shutdown (self ) -> None :
314
377
"""Close object."""
315
378
nican .ncCloseObject (self .handle )
316
-
317
-
318
- class NicanError (CanError ):
319
- """Error from NI-CAN driver."""
320
-
321
- def __init__ (self , function , error_code , arguments ):
322
- super ().__init__ ()
323
- #: Status code
324
- self .error_code = error_code
325
- #: Function that failed
326
- self .function = function
327
- #: Arguments passed to function
328
- self .arguments = arguments
329
-
330
- def __str__ (self ):
331
- return "Function %s failed:\n %s" % (
332
- self .function .__name__ ,
333
- get_error_message (self .error_code ),
334
- )
0 commit comments