1
+ import ctypes
1
2
import time
2
- from typing import Any , List , Optional , Tuple
3
+ from typing import Dict , List , Optional , Tuple
3
4
4
5
import can
6
+ from ...exceptions import CanInitializationError , CanOperationError
5
7
from .boa import *
6
8
7
9
8
10
class EtasBus (can .BusABC ):
9
11
def __init__ (
10
12
self ,
11
- channel : Any ,
13
+ channel : str ,
12
14
can_filters : Optional [can .typechecking .CanFilters ] = None ,
13
15
receive_own_messages : bool = False ,
14
16
bitrate : int = 1000000 ,
@@ -24,7 +26,7 @@ def __init__(
24
26
ctypes .c_char_p (b"" ), nodeRange , ctypes .byref (self .tree )
25
27
)
26
28
if ec != 0x0 :
27
- raise can . exceptions . CanInitializationError (
29
+ raise CanInitializationError (
28
30
f"CSI_CreateProtocolTree failed with error 0x{ ec :X} "
29
31
)
30
32
@@ -40,7 +42,7 @@ def __init__(
40
42
ctypes .byref (self .ctrl ),
41
43
)
42
44
if ec != 0x0 :
43
- raise can . exceptions . CanInitializationError (
45
+ raise CanInitializationError (
44
46
f"OCI_CreateCANControllerNoSearch failed with error 0x{ ec :X} "
45
47
)
46
48
@@ -80,7 +82,7 @@ def __init__(
80
82
self .ctrl , ctypes .byref (ctrlConf ), ctypes .byref (ctrlProp )
81
83
)
82
84
if ec != 0x0 and ec != 0x40004000 : # accept BOA_WARN_PARAM_ADAPTED
83
- raise can . exceptions . CanInitializationError (
85
+ raise CanInitializationError (
84
86
f"OCI_OpenCANController failed with error 0x{ ec :X} "
85
87
)
86
88
@@ -100,7 +102,7 @@ def __init__(
100
102
self .ctrl , ctypes .byref (rxQConf ), ctypes .byref (self .rxQueue )
101
103
)
102
104
if ec != 0x0 :
103
- raise can . exceptions . CanInitializationError (
105
+ raise CanInitializationError (
104
106
f"OCI_CreateCANRxQueue failed with error 0x{ ec :X} "
105
107
)
106
108
@@ -116,7 +118,7 @@ def __init__(
116
118
self .ctrl , ctypes .byref (txQConf ), ctypes .byref (self .txQueue )
117
119
)
118
120
if ec != 0x0 :
119
- raise can . exceptions . CanInitializationError (
121
+ raise CanInitializationError (
120
122
f"OCI_CreateCANTxQueue failed with error 0x{ ec :X} "
121
123
)
122
124
@@ -125,17 +127,17 @@ def __init__(
125
127
timerCapabilities = OCI_TimerCapabilities ()
126
128
ec = OCI_GetTimerCapabilities (self .ctrl , ctypes .byref (timerCapabilities ))
127
129
if ec != 0x0 :
128
- raise can . exceptions . CanInitializationError (
130
+ raise CanInitializationError (
129
131
f"OCI_GetTimerCapabilities failed with error 0x{ ec :X} "
130
132
)
131
133
self .tickFrequency = timerCapabilities .tickFrequency # clock ticks per second
132
134
133
- # all timestamps are hardware timestamps relative to powerup
135
+ # all timestamps are hardware timestamps relative to the CAN device powerup
134
136
# calculate an offset to make them relative to epoch
135
137
now = OCI_Time ()
136
138
ec = OCI_GetTimerValue (self .ctrl , ctypes .byref (now ))
137
139
if ec != 0x0 :
138
- raise can . exceptions . CanInitializationError (
140
+ raise CanInitializationError (
139
141
f"OCI_GetTimerValue failed with error 0x{ ec :X} "
140
142
)
141
143
self .timeOffset = time .time () - (float (now .value ) / self .tickFrequency )
@@ -153,7 +155,7 @@ def _recv_internal(
153
155
count = ctypes .c_uint32 ()
154
156
remaining = ctypes .c_uint32 ()
155
157
if timeout is not None :
156
- t = OCI_Time (int (timeout * self .tickFrequency ))
158
+ t = OCI_Time (round (timeout * self .tickFrequency ))
157
159
else :
158
160
t = OCI_NO_TIME
159
161
ec = OCI_ReadCANDataEx (
@@ -167,9 +169,7 @@ def _recv_internal(
167
169
if ec != 0x0 :
168
170
text = ctypes .create_string_buffer (500 )
169
171
OCI_GetError (self .ctrl , ec , text , 500 )
170
- raise can .exceptions .CanOperationError (
171
- f"OCI_ReadCANDataEx failed with error 0x{ ec :X} "
172
- )
172
+ raise CanOperationError (f"OCI_ReadCANDataEx failed with error 0x{ ec :X} " )
173
173
174
174
msg = None
175
175
@@ -260,15 +260,13 @@ def send(self, msg: can.Message, timeout: Optional[float] = None) -> None:
260
260
261
261
ec = OCI_WriteCANDataEx (self .txQueue , OCI_NO_TIME , canMessages , 1 , None )
262
262
if ec != 0x0 :
263
- raise can .exceptions .CanOperationError (
264
- f"OCI_WriteCANDataEx failed with error 0x{ ec :X} "
265
- )
263
+ raise CanOperationError (f"OCI_WriteCANDataEx failed with error 0x{ ec :X} " )
266
264
267
265
def _apply_filters (self , filters : Optional [can .typechecking .CanFilters ]) -> None :
268
266
if self ._oci_filters :
269
267
ec = OCI_RemoveCANFrameFilterEx (self .rxQueue , self ._oci_filters , 1 )
270
268
if ec != 0x0 :
271
- raise can . exceptions . CanOperationError (
269
+ raise CanOperationError (
272
270
f"OCI_RemoveCANFrameFilterEx failed with error 0x{ ec :X} "
273
271
)
274
272
@@ -277,6 +275,7 @@ def _apply_filters(self, filters: Optional[can.typechecking.CanFilters]) -> None
277
275
filters = [{"can_id" : 0x0 , "can_mask" : 0x0 }]
278
276
279
277
self ._oci_filters = (ctypes .POINTER (OCI_CANRxFilterEx ) * len (filters ))()
278
+
280
279
for i , filter in enumerate (filters ):
281
280
f = OCI_CANRxFilterEx ()
282
281
f .frameIDValue = filter ["can_id" ]
@@ -298,23 +297,21 @@ def _apply_filters(self, filters: Optional[can.typechecking.CanFilters]) -> None
298
297
self .rxQueue , self ._oci_filters , len (self ._oci_filters )
299
298
)
300
299
if ec != 0x0 :
301
- raise can . exceptions . CanOperationError (
300
+ raise CanOperationError (
302
301
f"OCI_AddCANFrameFilterEx failed with error 0x{ ec :X} "
303
302
)
304
303
305
304
def flush_tx_buffer (self ) -> None :
306
305
ec = OCI_ResetQueue (self .txQueue )
307
306
if ec != 0x0 :
308
- raise can .exceptions .CanOperationError (
309
- f"OCI_ResetQueue failed with error 0x{ ec :X} "
310
- )
307
+ raise CanOperationError (f"OCI_ResetQueue failed with error 0x{ ec :X} " )
311
308
312
309
def shutdown (self ) -> None :
313
310
# Cleanup TX
314
311
if self .txQueue :
315
312
ec = OCI_DestroyCANTxQueue (self .txQueue )
316
313
if ec != 0x0 :
317
- raise can . exceptions . CanOperationError (
314
+ raise CanOperationError (
318
315
f"OCI_DestroyCANTxQueue failed with error 0x{ ec :X} "
319
316
)
320
317
self .txQueue = None
@@ -323,7 +320,7 @@ def shutdown(self) -> None:
323
320
if self .rxQueue :
324
321
ec = OCI_DestroyCANRxQueue (self .rxQueue )
325
322
if ec != 0x0 :
326
- raise can . exceptions . CanOperationError (
323
+ raise CanOperationError (
327
324
f"OCI_DestroyCANRxQueue failed with error 0x{ ec :X} "
328
325
)
329
326
self .rxQueue = None
@@ -332,20 +329,20 @@ def shutdown(self) -> None:
332
329
if self .ctrl :
333
330
ec = OCI_CloseCANController (self .ctrl )
334
331
if ec != 0x0 :
335
- raise can . exceptions . CanOperationError (
332
+ raise CanOperationError (
336
333
f"OCI_CloseCANController failed with error 0x{ ec :X} "
337
334
)
338
335
ec = OCI_DestroyCANController (self .ctrl )
339
336
if ec != 0x0 :
340
- raise can . exceptions . CanOperationError (
337
+ raise CanOperationError (
341
338
f"OCI_DestroyCANController failed with error 0x{ ec :X} "
342
339
)
343
340
self .ctrl = None
344
341
345
342
if self .tree :
346
343
ec = CSI_DestroyProtocolTree (self .tree )
347
344
if ec != 0x0 :
348
- raise can . exceptions . CanOperationError (
345
+ raise CanOperationError (
349
346
f"CSI_DestroyProtocolTree failed with error 0x{ ec :X} "
350
347
)
351
348
self .tree = None
@@ -355,7 +352,7 @@ def state(self) -> can.BusState:
355
352
status = OCI_CANControllerStatus ()
356
353
ec = OCI_GetCANControllerStatus (self .ctrl , ctypes .byref (status ))
357
354
if ec != 0x0 :
358
- raise can . exceptions . CanOperationError (
355
+ raise CanOperationError (
359
356
f"OCI_GetCANControllerStatus failed with error 0x{ ec :X} "
360
357
)
361
358
if status .stateCode & OCI_CAN_STATE_ACTIVE :
@@ -365,25 +362,26 @@ def state(self) -> can.BusState:
365
362
366
363
@state .setter
367
364
def state (self , new_state : can .BusState ) -> None :
365
+ # disabled, OCI_AdaptCANConfiguration does not allow changing the bus mode
368
366
# if new_state == can.BusState.ACTIVE:
369
367
# self.ctrlConf.busParticipationMode = OCI_BUSMODE_ACTIVE
370
368
# else:
371
369
# self.ctrlConf.busParticipationMode = OCI_BUSMODE_PASSIVE
372
370
# ec = OCI_AdaptCANConfiguration(self.ctrl, ctypes.byref(self.ctrlConf))
373
371
# if ec != 0x0:
374
- # raise can.exceptions. CanOperationError(f"OCI_AdaptCANConfiguration failed with error 0x{ec:X}")
372
+ # raise CanOperationError(f"OCI_AdaptCANConfiguration failed with error 0x{ec:X}")
375
373
raise NotImplementedError ("Setting state is not implemented." )
376
374
377
375
def _detect_available_configs () -> List [can .typechecking .AutoDetectedConfig ]:
378
376
nodeRange = CSI_NodeRange (CSI_NODE_MIN , CSI_NODE_MAX )
379
377
tree = ctypes .POINTER (CSI_Tree )()
380
378
ec = CSI_CreateProtocolTree (ctypes .c_char_p (b"" ), nodeRange , ctypes .byref (tree ))
381
379
if ec != 0x0 :
382
- raise can . exceptions . CanOperationError (
380
+ raise CanOperationError (
383
381
f"CSI_CreateProtocolTree failed with error 0x{ ec :X} "
384
382
)
385
383
386
- nodes = []
384
+ nodes : Dict [ str , str ] = []
387
385
388
386
def _findNodes (tree , prefix ):
389
387
uri = f"{ prefix } /{ tree .contents .item .uriName .decode ()} "
@@ -402,7 +400,7 @@ def _findNodes(tree, prefix):
402
400
403
401
ec = CSI_DestroyProtocolTree (tree )
404
402
if ec != 0x0 :
405
- raise can . exceptions . CanOperationError (
403
+ raise CanOperationError (
406
404
f"CSI_DestroyProtocolTree failed with error 0x{ ec :X} "
407
405
)
408
406
0 commit comments