7
7
from unittest .mock import Mock
8
8
9
9
from can import BusABC , Message
10
+ from ..exceptions import CanInitializationError , error_check
10
11
11
12
logger = logging .getLogger (__name__ )
12
13
13
14
try :
14
15
import cantact
15
16
except ImportError :
17
+ cantact = None
16
18
logger .warning (
17
- "The CANtact module is not installed. Install it using `python3 -m pip install cantact`"
19
+ "The CANtact module is not installed. Install it using `python -m pip install cantact`"
18
20
)
19
21
20
22
@@ -26,7 +28,7 @@ def _detect_available_configs():
26
28
try :
27
29
interface = cantact .Interface ()
28
30
except (NameError , SystemError ):
29
- # couldn't import cantact, so no configurations are available
31
+ logger . debug ( "Could not import cantact, so no configurations are available" )
30
32
return []
31
33
32
34
channels = []
@@ -42,7 +44,7 @@ def __init__(
42
44
monitor = False ,
43
45
bit_timing = None ,
44
46
_testing = False ,
45
- ** kwargs
47
+ ** kwargs ,
46
48
):
47
49
"""
48
50
:param int channel:
@@ -58,36 +60,45 @@ def __init__(
58
60
if _testing :
59
61
self .interface = MockInterface ()
60
62
else :
61
- self .interface = cantact .Interface ()
63
+ if cantact is None :
64
+ raise CanInitializationError (
65
+ "The CANtact module is not installed. Install it using `python -m pip install cantact`"
66
+ )
67
+ with error_check (
68
+ "Cannot create the cantact.Interface" , CanInitializationError
69
+ ):
70
+ self .interface = cantact .Interface ()
62
71
63
72
self .channel = int (channel )
64
- self .channel_info = "CANtact: ch:%s" % channel
65
-
66
- # configure the interface
67
- if bit_timing is None :
68
- # use bitrate
69
- self .interface .set_bitrate (int (channel ), int (bitrate ))
70
- else :
71
- # use custom bit timing
72
- self .interface .set_bit_timing (
73
- int (channel ),
74
- int (bit_timing .brp ),
75
- int (bit_timing .tseg1 ),
76
- int (bit_timing .tseg2 ),
77
- int (bit_timing .sjw ),
78
- )
79
- self .interface .set_enabled (int (channel ), True )
80
- self .interface .set_monitor (int (channel ), monitor )
81
- self .interface .start ()
73
+ self .channel_info = f"CANtact: ch:{ channel } "
74
+
75
+ # Configure the interface
76
+ with error_check ("Cannot setup the cantact.Interface" , CanInitializationError ):
77
+ if bit_timing is None :
78
+ # use bitrate
79
+ self .interface .set_bitrate (int (channel ), int (bitrate ))
80
+ else :
81
+ # use custom bit timing
82
+ self .interface .set_bit_timing (
83
+ int (channel ),
84
+ int (bit_timing .brp ),
85
+ int (bit_timing .tseg1 ),
86
+ int (bit_timing .tseg2 ),
87
+ int (bit_timing .sjw ),
88
+ )
89
+ self .interface .set_enabled (int (channel ), True )
90
+ self .interface .set_monitor (int (channel ), monitor )
91
+ self .interface .start ()
82
92
83
93
super ().__init__ (
84
94
channel = channel , bitrate = bitrate , poll_interval = poll_interval , ** kwargs
85
95
)
86
96
87
97
def _recv_internal (self , timeout ):
88
- frame = self .interface .recv (int (timeout * 1000 ))
98
+ with error_check ("Cannot receive message" ):
99
+ frame = self .interface .recv (int (timeout * 1000 ))
89
100
if frame is None :
90
- # timeout occured
101
+ # timeout occurred
91
102
return None , False
92
103
93
104
msg = Message (
@@ -103,31 +114,33 @@ def _recv_internal(self, timeout):
103
114
return msg , False
104
115
105
116
def send (self , msg , timeout = None ):
106
- self .interface .send (
107
- self .channel ,
108
- msg .arbitration_id ,
109
- bool (msg .is_extended_id ),
110
- bool (msg .is_remote_frame ),
111
- msg .dlc ,
112
- msg .data ,
113
- )
117
+ with error_check ("Cannot send message" ):
118
+ self .interface .send (
119
+ self .channel ,
120
+ msg .arbitration_id ,
121
+ bool (msg .is_extended_id ),
122
+ bool (msg .is_remote_frame ),
123
+ msg .dlc ,
124
+ msg .data ,
125
+ )
114
126
115
127
def shutdown (self ):
116
- self .interface .stop ()
128
+ with error_check ("Cannot shutdown interface" ):
129
+ self .interface .stop ()
117
130
118
131
119
132
def mock_recv (timeout ):
120
133
if timeout > 0 :
121
- frame = {}
122
- frame [ "id" ] = 0x123
123
- frame [ "extended" ] = False
124
- frame [ "timestamp" ] = time .time ()
125
- frame [ "loopback" ] = False
126
- frame [ "rtr" ] = False
127
- frame [ "dlc" ] = 8
128
- frame [ "data" ] = [1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 ]
129
- frame [ "channel" ] = 0
130
- return frame
134
+ return {
135
+ "id" : 0x123 ,
136
+ "extended" : False ,
137
+ "timestamp" : time .time (),
138
+ "loopback" : False ,
139
+ "rtr" : False ,
140
+ "dlc" : 8 ,
141
+ "data" : [1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 ],
142
+ "channel" : 0 ,
143
+ }
131
144
else :
132
145
# simulate timeout when timeout = 0
133
146
return None
@@ -144,7 +157,6 @@ class MockInterface:
144
157
set_bit_timing = Mock ()
145
158
set_enabled = Mock ()
146
159
set_monitor = Mock ()
147
- start = Mock ()
148
160
stop = Mock ()
149
161
send = Mock ()
150
162
channel_count = Mock (return_value = 1 )
0 commit comments