Skip to content

Commit c56ff40

Browse files
committed
check vector with mypy
1 parent 2807da1 commit c56ff40

File tree

4 files changed

+92
-63
lines changed

4 files changed

+92
-63
lines changed

can/interfaces/vector/canlib.py

Lines changed: 64 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -10,22 +10,16 @@
1010
import logging
1111
import time
1212
import os
13-
from typing import List, NamedTuple, Optional, Tuple, Sequence, Union
13+
from types import ModuleType
14+
from typing import List, NamedTuple, Optional, Tuple, Sequence, Union, Any, Dict
1415

1516
try:
1617
# Try builtin Python 3 Windows API
1718
from _winapi import WaitForSingleObject, INFINITE
1819

1920
HAS_EVENTS = True
2021
except ImportError:
21-
try:
22-
# Try pywin32 package
23-
from win32event import WaitForSingleObject, INFINITE
24-
25-
HAS_EVENTS = True
26-
except ImportError:
27-
# Use polling instead
28-
HAS_EVENTS = False
22+
HAS_EVENTS = False
2923

3024
# Import Modules
3125
# ==============
@@ -36,7 +30,7 @@
3630
deprecated_args_alias,
3731
time_perfcounter_correlation,
3832
)
39-
from can.typechecking import AutoDetectedConfig, CanFilters
33+
from can.typechecking import AutoDetectedConfig, CanFilters, Channel
4034

4135
# Define Module Logger
4236
# ====================
@@ -48,7 +42,7 @@
4842
from . import xldefine, xlclass
4943

5044
# Import safely Vector API module for Travis tests
51-
xldriver = None
45+
xldriver: Optional[ModuleType] = None
5246
try:
5347
from . import xldriver
5448
except Exception as exc:
@@ -73,20 +67,20 @@ def __init__(
7367
channel: Union[int, Sequence[int], str],
7468
can_filters: Optional[CanFilters] = None,
7569
poll_interval: float = 0.01,
76-
receive_own_messages: bool = False,
77-
bitrate: int = None,
70+
receive_own_messages: Optional[bool] = False,
71+
bitrate: Optional[int] = None,
7872
rx_queue_size: int = 2 ** 14,
79-
app_name: str = "CANalyzer",
80-
serial: int = None,
81-
fd: bool = False,
82-
data_bitrate: int = None,
73+
app_name: Optional[str] = "CANalyzer",
74+
serial: Optional[int] = None,
75+
fd: Optional[bool] = False,
76+
data_bitrate: Optional[int] = None,
8377
sjw_abr: int = 2,
8478
tseg1_abr: int = 6,
8579
tseg2_abr: int = 3,
8680
sjw_dbr: int = 2,
8781
tseg1_dbr: int = 6,
8882
tseg2_dbr: int = 3,
89-
**kwargs,
83+
**kwargs: Any,
9084
) -> None:
9185
"""
9286
:param channel:
@@ -144,16 +138,18 @@ def __init__(
144138

145139
if xldriver is None:
146140
raise CanInterfaceNotImplementedError("The Vector API has not been loaded")
141+
self.xldriver = xldriver # keep reference so mypy knows it is not None
147142

148143
self.poll_interval = poll_interval
149144

150-
if isinstance(channel, str): # must be checked before generic Sequence
145+
self.channels: Sequence[int]
146+
if isinstance(channel, int):
147+
self.channels = [channel]
148+
elif isinstance(channel, str): # must be checked before generic Sequence
151149
# Assume comma separated string of channels
152150
self.channels = [int(ch.strip()) for ch in channel.split(",")]
153-
elif isinstance(channel, int):
154-
self.channels = [channel]
155151
elif isinstance(channel, Sequence):
156-
self.channels = channel
152+
self.channels = [int(ch) for ch in channel]
157153
else:
158154
raise TypeError(
159155
f"Invalid type for channels parameter: {type(channel).__name__}"
@@ -185,12 +181,12 @@ def __init__(
185181
"None of the configured channels could be found on the specified hardware."
186182
)
187183

188-
xldriver.xlOpenDriver()
184+
self.xldriver.xlOpenDriver()
189185
self.port_handle = xlclass.XLportHandle(xldefine.XL_INVALID_PORTHANDLE)
190186
self.mask = 0
191187
self.fd = fd
192188
# Get channels masks
193-
self.channel_masks = {}
189+
self.channel_masks: Dict[Optional[Channel], int] = {}
194190
self.index_to_channel = {}
195191

196192
for channel in self.channels:
@@ -200,7 +196,7 @@ def __init__(
200196
app_name, channel
201197
)
202198
LOG.debug("Channel index %d found", channel)
203-
idx = xldriver.xlGetChannelIndex(hw_type, hw_index, hw_channel)
199+
idx = self.xldriver.xlGetChannelIndex(hw_type, hw_index, hw_channel)
204200
if idx < 0:
205201
# Undocumented behavior! See issue #353.
206202
# If hardware is unavailable, this function returns -1.
@@ -224,7 +220,7 @@ def __init__(
224220
if bitrate or fd:
225221
permission_mask.value = self.mask
226222
if fd:
227-
xldriver.xlOpenPort(
223+
self.xldriver.xlOpenPort(
228224
self.port_handle,
229225
self._app_name,
230226
self.mask,
@@ -234,7 +230,7 @@ def __init__(
234230
xldefine.XL_BusTypes.XL_BUS_TYPE_CAN,
235231
)
236232
else:
237-
xldriver.xlOpenPort(
233+
self.xldriver.xlOpenPort(
238234
self.port_handle,
239235
self._app_name,
240236
self.mask,
@@ -267,7 +263,7 @@ def __init__(
267263
self.canFdConf.tseg1Dbr = int(tseg1_dbr)
268264
self.canFdConf.tseg2Dbr = int(tseg2_dbr)
269265

270-
xldriver.xlCanFdSetConfiguration(
266+
self.xldriver.xlCanFdSetConfiguration(
271267
self.port_handle, self.mask, self.canFdConf
272268
)
273269
LOG.info(
@@ -289,7 +285,7 @@ def __init__(
289285
)
290286
else:
291287
if bitrate:
292-
xldriver.xlCanSetChannelBitrate(
288+
self.xldriver.xlCanSetChannelBitrate(
293289
self.port_handle, permission_mask, bitrate
294290
)
295291
LOG.info("SetChannelBitrate: baudr.=%u", bitrate)
@@ -298,16 +294,16 @@ def __init__(
298294

299295
# Enable/disable TX receipts
300296
tx_receipts = 1 if receive_own_messages else 0
301-
xldriver.xlCanSetChannelMode(self.port_handle, self.mask, tx_receipts, 0)
297+
self.xldriver.xlCanSetChannelMode(self.port_handle, self.mask, tx_receipts, 0)
302298

303299
if HAS_EVENTS:
304300
self.event_handle = xlclass.XLhandle()
305-
xldriver.xlSetNotification(self.port_handle, self.event_handle, 1)
301+
self.xldriver.xlSetNotification(self.port_handle, self.event_handle, 1)
306302
else:
307303
LOG.info("Install pywin32 to avoid polling")
308304

309305
try:
310-
xldriver.xlActivateChannel(
306+
self.xldriver.xlActivateChannel(
311307
self.port_handle, self.mask, xldefine.XL_BusTypes.XL_BUS_TYPE_CAN, 0
312308
)
313309
except VectorOperationError as error:
@@ -320,17 +316,17 @@ def __init__(
320316
if time.get_clock_info("time").resolution > 1e-5:
321317
ts, perfcounter = time_perfcounter_correlation()
322318
try:
323-
xldriver.xlGetSyncTime(self.port_handle, offset)
319+
self.xldriver.xlGetSyncTime(self.port_handle, offset)
324320
except VectorInitializationError:
325-
xldriver.xlGetChannelTime(self.port_handle, self.mask, offset)
321+
self.xldriver.xlGetChannelTime(self.port_handle, self.mask, offset)
326322
current_perfcounter = time.perf_counter()
327323
now = ts + (current_perfcounter - perfcounter)
328324
self._time_offset = now - offset.value * 1e-9
329325
else:
330326
try:
331-
xldriver.xlGetSyncTime(self.port_handle, offset)
327+
self.xldriver.xlGetSyncTime(self.port_handle, offset)
332328
except VectorInitializationError:
333-
xldriver.xlGetChannelTime(self.port_handle, self.mask, offset)
329+
self.xldriver.xlGetChannelTime(self.port_handle, self.mask, offset)
334330
self._time_offset = time.time() - offset.value * 1e-9
335331

336332
except VectorInitializationError:
@@ -339,7 +335,7 @@ def __init__(
339335
self._is_filtered = False
340336
super().__init__(channel=channel, can_filters=can_filters, **kwargs)
341337

342-
def _apply_filters(self, filters) -> None:
338+
def _apply_filters(self, filters: Optional[CanFilters]) -> None:
343339
if filters:
344340
# Only up to one filter per ID type allowed
345341
if len(filters) == 1 or (
@@ -348,7 +344,7 @@ def _apply_filters(self, filters) -> None:
348344
):
349345
try:
350346
for can_filter in filters:
351-
xldriver.xlCanSetChannelAcceptance(
347+
self.xldriver.xlCanSetChannelAcceptance(
352348
self.port_handle,
353349
self.mask,
354350
can_filter["can_id"],
@@ -370,14 +366,14 @@ def _apply_filters(self, filters) -> None:
370366
# fallback: reset filters
371367
self._is_filtered = False
372368
try:
373-
xldriver.xlCanSetChannelAcceptance(
369+
self.xldriver.xlCanSetChannelAcceptance(
374370
self.port_handle,
375371
self.mask,
376372
0x0,
377373
0x0,
378374
xldefine.XL_AcceptanceFilter.XL_CAN_EXT,
379375
)
380-
xldriver.xlCanSetChannelAcceptance(
376+
self.xldriver.xlCanSetChannelAcceptance(
381377
self.port_handle,
382378
self.mask,
383379
0x0,
@@ -417,14 +413,14 @@ def _recv_internal(
417413
else:
418414
time_left = end_time - time.time()
419415
time_left_ms = max(0, int(time_left * 1000))
420-
WaitForSingleObject(self.event_handle.value, time_left_ms)
416+
WaitForSingleObject(self.event_handle.value, time_left_ms) # type: ignore
421417
else:
422418
# Wait a short time until we try again
423419
time.sleep(self.poll_interval)
424420

425421
def _recv_canfd(self) -> Optional[Message]:
426422
xl_can_rx_event = xlclass.XLcanRxEvent()
427-
xldriver.xlCanReceive(self.port_handle, xl_can_rx_event)
423+
self.xldriver.xlCanReceive(self.port_handle, xl_can_rx_event)
428424

429425
if xl_can_rx_event.tag == xldefine.XL_CANFD_RX_EventTags.XL_CAN_EV_TAG_RX_OK:
430426
is_rx = True
@@ -470,7 +466,7 @@ def _recv_canfd(self) -> Optional[Message]:
470466
def _recv_can(self) -> Optional[Message]:
471467
xl_event = xlclass.XLevent()
472468
event_count = ctypes.c_uint(1)
473-
xldriver.xlReceive(self.port_handle, event_count, xl_event)
469+
self.xldriver.xlReceive(self.port_handle, event_count, xl_event)
474470

475471
if xl_event.tag != xldefine.XL_EventTags.XL_RECEIVE_MSG:
476472
self.handle_can_event(xl_event)
@@ -523,7 +519,7 @@ def handle_canfd_event(self, event: xlclass.XLcanRxEvent) -> None:
523519
`XL_CAN_EV_TAG_TX_ERROR`, `XL_TIMER` or `XL_CAN_EV_TAG_CHIP_STATE` tag.
524520
"""
525521

526-
def send(self, msg: Message, timeout: Optional[float] = None):
522+
def send(self, msg: Message, timeout: Optional[float] = None) -> None:
527523
self._send_sequence([msg])
528524

529525
def _send_sequence(self, msgs: Sequence[Message]) -> int:
@@ -548,7 +544,9 @@ def _send_can_msg_sequence(self, msgs: Sequence[Message]) -> int:
548544
*map(self._build_xl_event, msgs)
549545
)
550546

551-
xldriver.xlCanTransmit(self.port_handle, mask, message_count, xl_event_array)
547+
self.xldriver.xlCanTransmit(
548+
self.port_handle, mask, message_count, xl_event_array
549+
)
552550
return message_count.value
553551

554552
@staticmethod
@@ -580,7 +578,7 @@ def _send_can_fd_msg_sequence(self, msgs: Sequence[Message]) -> int:
580578
)
581579

582580
msg_count_sent = ctypes.c_uint(0)
583-
xldriver.xlCanTransmitEx(
581+
self.xldriver.xlCanTransmitEx(
584582
self.port_handle, mask, message_count, msg_count_sent, xl_can_tx_event_array
585583
)
586584
return msg_count_sent.value
@@ -611,17 +609,17 @@ def _build_xl_can_tx_event(msg: Message) -> xlclass.XLcanTxEvent:
611609
return xl_can_tx_event
612610

613611
def flush_tx_buffer(self) -> None:
614-
xldriver.xlCanFlushTransmitQueue(self.port_handle, self.mask)
612+
self.xldriver.xlCanFlushTransmitQueue(self.port_handle, self.mask)
615613

616614
def shutdown(self) -> None:
617615
super().shutdown()
618-
xldriver.xlDeactivateChannel(self.port_handle, self.mask)
619-
xldriver.xlClosePort(self.port_handle)
620-
xldriver.xlCloseDriver()
616+
self.xldriver.xlDeactivateChannel(self.port_handle, self.mask)
617+
self.xldriver.xlClosePort(self.port_handle)
618+
self.xldriver.xlCloseDriver()
621619

622620
def reset(self) -> None:
623-
xldriver.xlDeactivateChannel(self.port_handle, self.mask)
624-
xldriver.xlActivateChannel(
621+
self.xldriver.xlDeactivateChannel(self.port_handle, self.mask)
622+
self.xldriver.xlActivateChannel(
625623
self.port_handle, self.mask, xldefine.XL_BusTypes.XL_BUS_TYPE_CAN, 0
626624
)
627625

@@ -657,7 +655,7 @@ def _detect_available_configs() -> List[AutoDetectedConfig]:
657655
"vector_channel_config": channel_config,
658656
}
659657
)
660-
return configs
658+
return configs # type: ignore
661659

662660
@staticmethod
663661
def popup_vector_hw_configuration(wait_for_finish: int = 0) -> None:
@@ -666,6 +664,9 @@ def popup_vector_hw_configuration(wait_for_finish: int = 0) -> None:
666664
:param wait_for_finish:
667665
Time to wait for user input in milliseconds.
668666
"""
667+
if xldriver is None:
668+
raise CanInterfaceNotImplementedError("The Vector API has not been loaded")
669+
669670
xldriver.xlPopupHwConfig(ctypes.c_char_p(), ctypes.c_uint(wait_for_finish))
670671

671672
@staticmethod
@@ -685,14 +686,17 @@ def get_application_config(
685686
:raises can.interfaces.vector.VectorInitializationError:
686687
If the application name does not exist in the Vector hardware configuration.
687688
"""
689+
if xldriver is None:
690+
raise CanInterfaceNotImplementedError("The Vector API has not been loaded")
691+
688692
hw_type = ctypes.c_uint()
689693
hw_index = ctypes.c_uint()
690694
hw_channel = ctypes.c_uint()
691-
app_channel = ctypes.c_uint(app_channel)
695+
_app_channel = ctypes.c_uint(app_channel)
692696

693697
xldriver.xlGetApplConfig(
694698
app_name.encode(),
695-
app_channel,
699+
_app_channel,
696700
hw_type,
697701
hw_index,
698702
hw_channel,
@@ -707,7 +711,7 @@ def set_application_config(
707711
hw_type: xldefine.XL_HardwareType,
708712
hw_index: int,
709713
hw_channel: int,
710-
**kwargs,
714+
**kwargs: Any,
711715
) -> None:
712716
"""Modify the application settings in Vector Hardware Configuration.
713717
@@ -737,6 +741,9 @@ def set_application_config(
737741
:raises can.interfaces.vector.VectorInitializationError:
738742
If the application name does not exist in the Vector hardware configuration.
739743
"""
744+
if xldriver is None:
745+
raise CanInterfaceNotImplementedError("The Vector API has not been loaded")
746+
740747
xldriver.xlSetApplConfig(
741748
app_name.encode(),
742749
app_channel,
@@ -758,7 +765,7 @@ def set_timer_rate(self, timer_rate_ms: int) -> None:
758765
the timer events.
759766
"""
760767
timer_rate_10us = timer_rate_ms * 100
761-
xldriver.xlSetTimerRate(self.port_handle, timer_rate_10us)
768+
self.xldriver.xlSetTimerRate(self.port_handle, timer_rate_10us)
762769

763770

764771
class VectorChannelConfig(NamedTuple):

can/interfaces/vector/xldriver.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
# type: ignore
12
"""
23
Ctypes wrapper module for Vector CAN Interface on win32/win64 systems.
34

0 commit comments

Comments
 (0)