diff --git a/can/__init__.py b/can/__init__.py index e8d7850ca..a0d1aab9e 100644 --- a/can/__init__.py +++ b/can/__init__.py @@ -25,6 +25,7 @@ "CanInitializationError", "CanInterfaceNotImplementedError", "CanOperationError", + "CanProtocol", "CanTimeoutError", "CanutilsLogReader", "CanutilsLogWriter", @@ -88,7 +89,7 @@ ModifiableCyclicTaskABC, RestartableCyclicTaskABC, ) -from .bus import BusABC, BusState +from .bus import BusABC, BusState, CanProtocol from .exceptions import ( CanError, CanInitializationError, diff --git a/can/bus.py b/can/bus.py index 5bd2d4e30..b7a54dbb1 100644 --- a/can/bus.py +++ b/can/bus.py @@ -26,6 +26,14 @@ class BusState(Enum): ERROR = auto() +class CanProtocol(Enum): + """The CAN protocol type supported by a :class:`can.BusABC` instance""" + + CAN_20 = auto() + CAN_FD = auto() + CAN_XL = auto() + + class BusABC(metaclass=ABCMeta): """The CAN Bus Abstract Base Class that serves as the basis for all concrete interfaces. @@ -44,6 +52,7 @@ class BusABC(metaclass=ABCMeta): RECV_LOGGING_LEVEL = 9 _is_shutdown: bool = False + _can_protocol: CanProtocol = CanProtocol.CAN_20 @abstractmethod def __init__( @@ -459,6 +468,15 @@ def state(self, new_state: BusState) -> None: """ raise NotImplementedError("Property is not implemented.") + @property + def protocol(self) -> CanProtocol: + """Return the CAN protocol used by this bus instance. + + This value is set at initialization time and does not change + during the lifetime of a bus instance. + """ + return self._can_protocol + @staticmethod def _detect_available_configs() -> List[can.typechecking.AutoDetectedConfig]: """Detect all configurations/channels that this interface could diff --git a/can/interfaces/canalystii.py b/can/interfaces/canalystii.py index dd65f5ba0..2fef19497 100644 --- a/can/interfaces/canalystii.py +++ b/can/interfaces/canalystii.py @@ -6,7 +6,7 @@ import canalystii as driver -from can import BitTiming, BitTimingFd, BusABC, Message +from can import BitTiming, BitTimingFd, BusABC, CanProtocol, Message from can.exceptions import CanTimeoutError from can.typechecking import CanFilters from can.util import check_or_adjust_timing_clock, deprecated_args_alias @@ -54,8 +54,11 @@ def __init__( raise ValueError("Either bitrate or timing argument is required") # Do this after the error handling - super().__init__(channel=channel, can_filters=can_filters, **kwargs) - + super().__init__( + channel=channel, + can_filters=can_filters, + **kwargs, + ) if isinstance(channel, str): # Assume comma separated string of channels self.channels = [int(ch.strip()) for ch in channel.split(",")] @@ -64,11 +67,11 @@ def __init__( else: # Sequence[int] self.channels = list(channel) - self.rx_queue: Deque[Tuple[int, driver.Message]] = deque(maxlen=rx_queue_size) - self.channel_info = f"CANalyst-II: device {device}, channels {self.channels}" - + self.rx_queue: Deque[Tuple[int, driver.Message]] = deque(maxlen=rx_queue_size) self.device = driver.CanalystDevice(device_index=device) + self._can_protocol = CanProtocol.CAN_20 + for single_channel in self.channels: if isinstance(timing, BitTiming): timing = check_or_adjust_timing_clock(timing, valid_clocks=[8_000_000]) diff --git a/can/interfaces/cantact.py b/can/interfaces/cantact.py index 75e1adbaa..963a9ee3b 100644 --- a/can/interfaces/cantact.py +++ b/can/interfaces/cantact.py @@ -7,7 +7,7 @@ from typing import Any, Optional, Union from unittest.mock import Mock -from can import BitTiming, BitTimingFd, BusABC, Message +from can import BitTiming, BitTimingFd, BusABC, CanProtocol, Message from ..exceptions import ( CanInitializationError, @@ -87,6 +87,7 @@ def __init__( self.channel = int(channel) self.channel_info = f"CANtact: ch:{channel}" + self._can_protocol = CanProtocol.CAN_20 # Configure the interface with error_check("Cannot setup the cantact.Interface", CanInitializationError): @@ -114,7 +115,10 @@ def __init__( self.interface.start() super().__init__( - channel=channel, bitrate=bitrate, poll_interval=poll_interval, **kwargs + channel=channel, + bitrate=bitrate, + poll_interval=poll_interval, + **kwargs, ) def _recv_internal(self, timeout): diff --git a/can/interfaces/etas/__init__.py b/can/interfaces/etas/__init__.py index 5f768b3e5..62e060f48 100644 --- a/can/interfaces/etas/__init__.py +++ b/can/interfaces/etas/__init__.py @@ -1,4 +1,3 @@ -import ctypes import time from typing import Dict, List, Optional, Tuple @@ -17,9 +16,12 @@ def __init__( bitrate: int = 1000000, fd: bool = True, data_bitrate: int = 2000000, - **kwargs: object, + **kwargs: Dict[str, any], ): + super().__init__(channel=channel, **kwargs) + self.receive_own_messages = receive_own_messages + self._can_protocol = can.CanProtocol.CAN_FD if fd else can.CanProtocol.CAN_20 nodeRange = CSI_NodeRange(CSI_NODE_MIN, CSI_NODE_MAX) self.tree = ctypes.POINTER(CSI_Tree)() @@ -297,7 +299,7 @@ def _detect_available_configs() -> List[can.typechecking.AutoDetectedConfig]: tree = ctypes.POINTER(CSI_Tree)() CSI_CreateProtocolTree(ctypes.c_char_p(b""), nodeRange, ctypes.byref(tree)) - nodes: Dict[str, str] = [] + nodes: List[Dict[str, str]] = [] def _findNodes(tree, prefix): uri = f"{prefix}/{tree.contents.item.uriName.decode()}" diff --git a/can/interfaces/gs_usb.py b/can/interfaces/gs_usb.py index fb5ce1d80..32ad54e75 100644 --- a/can/interfaces/gs_usb.py +++ b/can/interfaces/gs_usb.py @@ -52,11 +52,16 @@ def __init__( self.gs_usb = gs_usb self.channel_info = channel + self._can_protocol = can.CanProtocol.CAN_20 self.gs_usb.set_bitrate(bitrate) self.gs_usb.start() - super().__init__(channel=channel, can_filters=can_filters, **kwargs) + super().__init__( + channel=channel, + can_filters=can_filters, + **kwargs, + ) def send(self, msg: can.Message, timeout: Optional[float] = None): """Transmit a message to the CAN bus. diff --git a/can/interfaces/ics_neovi/neovi_bus.py b/can/interfaces/ics_neovi/neovi_bus.py index 848cefcc8..f2dffe0a6 100644 --- a/can/interfaces/ics_neovi/neovi_bus.py +++ b/can/interfaces/ics_neovi/neovi_bus.py @@ -16,7 +16,7 @@ from threading import Event from warnings import warn -from can import BusABC, Message +from can import BusABC, CanProtocol, Message from ...exceptions import ( CanError, @@ -169,7 +169,11 @@ def __init__(self, channel, can_filters=None, **kwargs): if ics is None: raise ImportError("Please install python-ics") - super().__init__(channel=channel, can_filters=can_filters, **kwargs) + super().__init__( + channel=channel, + can_filters=can_filters, + **kwargs, + ) logger.info(f"CAN Filters: {can_filters}") logger.info(f"Got configuration of: {kwargs}") @@ -190,6 +194,9 @@ def __init__(self, channel, can_filters=None, **kwargs): serial = kwargs.get("serial") self.dev = self._find_device(type_filter, serial) + is_fd = kwargs.get("fd", False) + self._can_protocol = CanProtocol.CAN_FD if is_fd else CanProtocol.CAN_20 + with open_lock: ics.open_device(self.dev) @@ -198,7 +205,7 @@ def __init__(self, channel, can_filters=None, **kwargs): for channel in self.channels: ics.set_bit_rate(self.dev, kwargs.get("bitrate"), channel) - if kwargs.get("fd", False): + if is_fd: if "data_bitrate" in kwargs: for channel in self.channels: ics.set_fd_bit_rate( diff --git a/can/interfaces/iscan.py b/can/interfaces/iscan.py index ef3a48215..be0b0dae8 100644 --- a/can/interfaces/iscan.py +++ b/can/interfaces/iscan.py @@ -13,6 +13,7 @@ CanInitializationError, CanInterfaceNotImplementedError, CanOperationError, + CanProtocol, Message, ) @@ -99,6 +100,7 @@ def __init__( self.channel = ctypes.c_ubyte(int(channel)) self.channel_info = f"IS-CAN: {self.channel}" + self._can_protocol = CanProtocol.CAN_20 if bitrate not in self.BAUDRATES: raise ValueError(f"Invalid bitrate, choose one of {set(self.BAUDRATES)}") @@ -107,7 +109,10 @@ def __init__( iscan.isCAN_DeviceInitEx(self.channel, self.BAUDRATES[bitrate]) super().__init__( - channel=channel, bitrate=bitrate, poll_interval=poll_interval, **kwargs + channel=channel, + bitrate=bitrate, + poll_interval=poll_interval, + **kwargs, ) def _recv_internal( diff --git a/can/interfaces/ixxat/canlib.py b/can/interfaces/ixxat/canlib.py index 3db719f96..b28c93541 100644 --- a/can/interfaces/ixxat/canlib.py +++ b/can/interfaces/ixxat/canlib.py @@ -131,6 +131,9 @@ def __init__( **kwargs ) + super().__init__(channel=channel, **kwargs) + self._can_protocol = self.bus.protocol + def flush_tx_buffer(self): """Flushes the transmit buffer on the IXXAT""" return self.bus.flush_tx_buffer() diff --git a/can/interfaces/ixxat/canlib_vcinpl.py b/can/interfaces/ixxat/canlib_vcinpl.py index 550484f3e..5a366cc30 100644 --- a/can/interfaces/ixxat/canlib_vcinpl.py +++ b/can/interfaces/ixxat/canlib_vcinpl.py @@ -15,7 +15,7 @@ import sys from typing import Callable, Optional, Tuple -from can import BusABC, Message +from can import BusABC, CanProtocol, Message from can.broadcastmanager import ( LimitedDurationCyclicSendTaskABC, RestartableCyclicTaskABC, @@ -490,6 +490,7 @@ def __init__( self._channel_capabilities = structures.CANCAPABILITIES() self._message = structures.CANMSG() self._payload = (ctypes.c_byte * 8)() + self._can_protocol = CanProtocol.CAN_20 # Search for supplied device if unique_hardware_id is None: diff --git a/can/interfaces/ixxat/canlib_vcinpl2.py b/can/interfaces/ixxat/canlib_vcinpl2.py index 3e7f2ff91..446b3e35c 100644 --- a/can/interfaces/ixxat/canlib_vcinpl2.py +++ b/can/interfaces/ixxat/canlib_vcinpl2.py @@ -15,8 +15,7 @@ import sys from typing import Callable, Optional, Tuple -import can.util -from can import BusABC, Message +from can import BusABC, CanProtocol, Message from can.broadcastmanager import ( LimitedDurationCyclicSendTaskABC, RestartableCyclicTaskABC, @@ -24,7 +23,7 @@ from can.ctypesutil import HANDLE, PHANDLE, CLibrary from can.ctypesutil import HRESULT as ctypes_HRESULT from can.exceptions import CanInitializationError, CanInterfaceNotImplementedError -from can.util import deprecated_args_alias +from can.util import deprecated_args_alias, dlc2len, len2dlc from . import constants, structures from .exceptions import * @@ -536,6 +535,7 @@ def __init__( self._channel_capabilities = structures.CANCAPABILITIES2() self._message = structures.CANMSG2() self._payload = (ctypes.c_byte * 64)() + self._can_protocol = CanProtocol.CAN_FD # Search for supplied device if unique_hardware_id is None: @@ -865,7 +865,7 @@ def _recv_internal(self, timeout): # Timed out / can message type is not DATA return None, True - data_len = can.util.dlc2len(self._message.uMsgInfo.Bits.dlc) + data_len = dlc2len(self._message.uMsgInfo.Bits.dlc) # The _message.dwTime is a 32bit tick value and will overrun, # so expect to see the value restarting from 0 rx_msg = Message( @@ -915,7 +915,7 @@ def send(self, msg: Message, timeout: Optional[float] = None) -> None: message.uMsgInfo.Bits.edl = 1 if msg.is_fd else 0 message.dwMsgId = msg.arbitration_id if msg.dlc: # this dlc means number of bytes of payload - message.uMsgInfo.Bits.dlc = can.util.len2dlc(msg.dlc) + message.uMsgInfo.Bits.dlc = len2dlc(msg.dlc) data_len_dif = msg.dlc - len(msg.data) data = msg.data + bytearray( [0] * data_len_dif diff --git a/can/interfaces/kvaser/canlib.py b/can/interfaces/kvaser/canlib.py index f6b92ccef..32d28059a 100644 --- a/can/interfaces/kvaser/canlib.py +++ b/can/interfaces/kvaser/canlib.py @@ -11,7 +11,7 @@ import sys import time -from can import BusABC, Message +from can import BusABC, CanProtocol, Message from can.util import time_perfcounter_correlation from ...exceptions import CanError, CanInitializationError, CanOperationError @@ -428,11 +428,12 @@ def __init__(self, channel, can_filters=None, **kwargs): channel = int(channel) except ValueError: raise ValueError("channel must be an integer") - self.channel = channel - log.debug("Initialising bus instance") + self.channel = channel self.single_handle = single_handle + self._can_protocol = CanProtocol.CAN_FD if fd else CanProtocol.CAN_20 + log.debug("Initialising bus instance") num_channels = ctypes.c_int(0) canGetNumberOfChannels(ctypes.byref(num_channels)) num_channels = int(num_channels.value) @@ -520,7 +521,11 @@ def __init__(self, channel, can_filters=None, **kwargs): self._timestamp_offset = time.time() - (timer.value * TIMESTAMP_FACTOR) self._is_filtered = False - super().__init__(channel=channel, can_filters=can_filters, **kwargs) + super().__init__( + channel=channel, + can_filters=can_filters, + **kwargs, + ) def _apply_filters(self, filters): if filters and len(filters) == 1: diff --git a/can/interfaces/neousys/neousys.py b/can/interfaces/neousys/neousys.py index d57234ddd..b7dd2117c 100644 --- a/can/interfaces/neousys/neousys.py +++ b/can/interfaces/neousys/neousys.py @@ -34,12 +34,13 @@ except ImportError: from ctypes import CDLL -from can import BusABC, Message - -from ...exceptions import ( +from can import ( + BusABC, CanInitializationError, CanInterfaceNotImplementedError, CanOperationError, + CanProtocol, + Message, ) logger = logging.getLogger(__name__) @@ -150,6 +151,7 @@ def __init__(self, channel, device=0, bitrate=500000, **kwargs): self.channel = channel self.device = device self.channel_info = f"Neousys Can: device {self.device}, channel {self.channel}" + self._can_protocol = CanProtocol.CAN_20 self.queue = queue.Queue() diff --git a/can/interfaces/nican.py b/can/interfaces/nican.py index 8457a1a38..f4b1a37f0 100644 --- a/can/interfaces/nican.py +++ b/can/interfaces/nican.py @@ -19,13 +19,14 @@ from typing import Optional, Tuple, Type import can.typechecking -from can import BusABC, Message - -from ..exceptions import ( +from can import ( + BusABC, CanError, CanInitializationError, CanInterfaceNotImplementedError, CanOperationError, + CanProtocol, + Message, ) logger = logging.getLogger(__name__) @@ -219,6 +220,7 @@ def __init__( self.channel = channel self.channel_info = f"NI-CAN: {channel}" + self._can_protocol = CanProtocol.CAN_20 channel_bytes = channel.encode("ascii") config = [(NC_ATTR_START_ON_OPEN, True), (NC_ATTR_LOG_COMM_ERRS, log_errors)] diff --git a/can/interfaces/nixnet.py b/can/interfaces/nixnet.py index 4ddb52455..ba665442e 100644 --- a/can/interfaces/nixnet.py +++ b/can/interfaces/nixnet.py @@ -11,12 +11,13 @@ import logging import os import time +import warnings from queue import SimpleQueue from types import ModuleType from typing import Any, List, Optional, Tuple, Union import can.typechecking -from can import BitTiming, BitTimingFd, BusABC, Message +from can import BitTiming, BitTimingFd, BusABC, CanProtocol, Message from can.exceptions import ( CanInitializationError, CanInterfaceNotImplementedError, @@ -103,10 +104,11 @@ def __init__( self.poll_interval = poll_interval - self.fd = isinstance(timing, BitTimingFd) if timing else fd + is_fd = isinstance(timing, BitTimingFd) if timing else fd + self._can_protocol = CanProtocol.CAN_FD if is_fd else CanProtocol.CAN_20 # Set database for the initialization - database_name = ":can_fd_brs:" if self.fd else ":memory:" + database_name = ":can_fd_brs:" if is_fd else ":memory:" try: # We need two sessions for this application, @@ -158,7 +160,7 @@ def __init__( if bitrate: self._interface.baud_rate = bitrate - if self.fd: + if is_fd: # See page 951 of NI-XNET Hardware and Software Manual # to set custom can configuration self._interface.can_fd_baud_rate = fd_bitrate or bitrate @@ -188,6 +190,15 @@ def __init__( **kwargs, ) + @property + def fd(self) -> bool: + warnings.warn( + "The NiXNETcanBus.fd property is deprecated and superseded by " + "BusABC.protocol. It is scheduled for removal in version 5.0.", + DeprecationWarning, + ) + return self._can_protocol is CanProtocol.CAN_FD + def _recv_internal( self, timeout: Optional[float] ) -> Tuple[Optional[Message], bool]: diff --git a/can/interfaces/pcan/pcan.py b/can/interfaces/pcan/pcan.py index 61d19d1b4..a9b2c016b 100644 --- a/can/interfaces/pcan/pcan.py +++ b/can/interfaces/pcan/pcan.py @@ -4,6 +4,7 @@ import logging import platform import time +import warnings from datetime import datetime from typing import Any, List, Optional, Tuple, Union @@ -17,6 +18,7 @@ CanError, CanInitializationError, CanOperationError, + CanProtocol, Message, ) from can.util import check_or_adjust_timing_clock, dlc2len, len2dlc @@ -244,8 +246,9 @@ def __init__( err_msg = f"Cannot find a channel with ID {device_id:08x}" raise ValueError(err_msg) + is_fd = isinstance(timing, BitTimingFd) if timing else kwargs.get("fd", False) + self._can_protocol = CanProtocol.CAN_FD if is_fd else CanProtocol.CAN_20 self.channel_info = str(channel) - self.fd = isinstance(timing, BitTimingFd) if timing else kwargs.get("fd", False) hwtype = PCAN_TYPE_ISA ioport = 0x02A0 @@ -269,7 +272,7 @@ def __init__( result = self.m_objPCANBasic.Initialize( self.m_PcanHandle, pcan_bitrate, hwtype, ioport, interrupt ) - elif self.fd: + elif is_fd: if isinstance(timing, BitTimingFd): timing = check_or_adjust_timing_clock( timing, sorted(VALID_PCAN_FD_CLOCKS, reverse=True) @@ -336,7 +339,12 @@ def __init__( if result != PCAN_ERROR_OK: raise PcanCanInitializationError(self._get_formatted_error(result)) - super().__init__(channel=channel, state=state, bitrate=bitrate, **kwargs) + super().__init__( + channel=channel, + state=state, + bitrate=bitrate, + **kwargs, + ) def _find_channel_by_dev_id(self, device_id): """ @@ -482,7 +490,7 @@ def _recv_internal( end_time = time.time() + timeout if timeout is not None else None while True: - if self.fd: + if self._can_protocol is CanProtocol.CAN_FD: result, pcan_msg, pcan_timestamp = self.m_objPCANBasic.ReadFD( self.m_PcanHandle ) @@ -544,7 +552,7 @@ def _recv_internal( error_state_indicator = bool(pcan_msg.MSGTYPE & PCAN_MESSAGE_ESI.value) is_error_frame = bool(pcan_msg.MSGTYPE & PCAN_MESSAGE_ERRFRAME.value) - if self.fd: + if self._can_protocol is CanProtocol.CAN_FD: dlc = dlc2len(pcan_msg.DLC) timestamp = boottimeEpoch + (pcan_timestamp.value / (1000.0 * 1000.0)) else: @@ -590,7 +598,7 @@ def send(self, msg, timeout=None): if msg.error_state_indicator: msgType |= PCAN_MESSAGE_ESI.value - if self.fd: + if self._can_protocol is CanProtocol.CAN_FD: # create a TPCANMsg message structure CANMsg = TPCANMsgFD() @@ -649,6 +657,15 @@ def shutdown(self): self.m_objPCANBasic.Uninitialize(self.m_PcanHandle) + @property + def fd(self) -> bool: + warnings.warn( + "The PcanBus.fd property is deprecated and superseded by BusABC.protocol. " + "It is scheduled for removal in version 5.0.", + DeprecationWarning, + ) + return self._can_protocol is CanProtocol.CAN_FD + @property def state(self): return self._state diff --git a/can/interfaces/robotell.py b/can/interfaces/robotell.py index 4d038a38a..bfe8f5774 100644 --- a/can/interfaces/robotell.py +++ b/can/interfaces/robotell.py @@ -7,7 +7,7 @@ import time from typing import Optional -from can import BusABC, Message +from can import BusABC, CanProtocol, Message from ..exceptions import CanInterfaceNotImplementedError, CanOperationError @@ -92,6 +92,7 @@ def __init__( if bitrate is not None: self.set_bitrate(bitrate) + self._can_protocol = CanProtocol.CAN_20 self.channel_info = ( f"Robotell USB-CAN s/n {self.get_serial_number(1)} on {channel}" ) diff --git a/can/interfaces/seeedstudio/seeedstudio.py b/can/interfaces/seeedstudio/seeedstudio.py index 7d7a1e687..0540c78be 100644 --- a/can/interfaces/seeedstudio/seeedstudio.py +++ b/can/interfaces/seeedstudio/seeedstudio.py @@ -12,7 +12,7 @@ from time import time import can -from can import BusABC, Message +from can import BusABC, CanProtocol, Message logger = logging.getLogger("seeedbus") @@ -100,6 +100,8 @@ def __init__( self.op_mode = operation_mode self.filter_id = bytearray([0x00, 0x00, 0x00, 0x00]) self.mask_id = bytearray([0x00, 0x00, 0x00, 0x00]) + self._can_protocol = CanProtocol.CAN_20 + if not channel: raise can.CanInitializationError("Must specify a serial port.") diff --git a/can/interfaces/serial/serial_can.py b/can/interfaces/serial/serial_can.py index eb336feba..9de2da99c 100644 --- a/can/interfaces/serial/serial_can.py +++ b/can/interfaces/serial/serial_can.py @@ -17,6 +17,7 @@ CanInitializationError, CanInterfaceNotImplementedError, CanOperationError, + CanProtocol, CanTimeoutError, Message, ) @@ -88,6 +89,7 @@ def __init__( raise TypeError("Must specify a serial port.") self.channel_info = f"Serial interface: {channel}" + self._can_protocol = CanProtocol.CAN_20 try: self._ser = serial.serial_for_url( diff --git a/can/interfaces/slcan.py b/can/interfaces/slcan.py index 21306f28f..7ff12ce44 100644 --- a/can/interfaces/slcan.py +++ b/can/interfaces/slcan.py @@ -7,7 +7,7 @@ import time from typing import Any, Optional, Tuple -from can import BusABC, Message, typechecking +from can import BusABC, CanProtocol, Message, typechecking from ..exceptions import ( CanInitializationError, @@ -105,6 +105,7 @@ def __init__( ) self._buffer = bytearray() + self._can_protocol = CanProtocol.CAN_20 time.sleep(sleep_after_open) @@ -118,7 +119,11 @@ def __init__( self.open() super().__init__( - channel, ttyBaudrate=115200, bitrate=None, rtscts=False, **kwargs + channel, + ttyBaudrate=115200, + bitrate=None, + rtscts=False, + **kwargs, ) def set_bitrate(self, bitrate: int) -> None: diff --git a/can/interfaces/socketcan/socketcan.py b/can/interfaces/socketcan/socketcan.py index bdf39f0ab..a3f74bb82 100644 --- a/can/interfaces/socketcan/socketcan.py +++ b/can/interfaces/socketcan/socketcan.py @@ -30,7 +30,7 @@ import can -from can import BusABC, Message +from can import BusABC, CanProtocol, Message from can.broadcastmanager import ( LimitedDurationCyclicSendTaskABC, ModifiableCyclicTaskABC, @@ -656,6 +656,7 @@ def __init__( self._is_filtered = False self._task_id = 0 self._task_id_guard = threading.Lock() + self._can_protocol = CanProtocol.CAN_FD if fd else CanProtocol.CAN_20 # set the local_loopback parameter try: @@ -710,7 +711,11 @@ def __init__( "local_loopback": local_loopback, } ) - super().__init__(channel=channel, can_filters=can_filters, **kwargs) + super().__init__( + channel=channel, + can_filters=can_filters, + **kwargs, + ) def shutdown(self) -> None: """Stops all active periodic tasks and closes the socket.""" diff --git a/can/interfaces/socketcand/socketcand.py b/can/interfaces/socketcand/socketcand.py index 0c6c06ccf..183a9ba12 100644 --- a/can/interfaces/socketcand/socketcand.py +++ b/can/interfaces/socketcand/socketcand.py @@ -93,7 +93,7 @@ def __init__(self, channel, host, port, can_filters=None, **kwargs): self._expect_msg("< ok >") self._tcp_send(f"< rawmode >") self._expect_msg("< ok >") - super().__init__(channel=channel, can_filters=can_filters) + super().__init__(channel=channel, can_filters=can_filters, **kwargs) def _recv_internal(self, timeout): if len(self.__message_buffer) != 0: diff --git a/can/interfaces/systec/ucanbus.py b/can/interfaces/systec/ucanbus.py index da05b38b1..cb0dcc39e 100644 --- a/can/interfaces/systec/ucanbus.py +++ b/can/interfaces/systec/ucanbus.py @@ -1,9 +1,16 @@ import logging from threading import Event -from can import BusABC, BusState, Message +from can import ( + BusABC, + BusState, + CanError, + CanInitializationError, + CanOperationError, + CanProtocol, + Message, +) -from ...exceptions import CanError, CanInitializationError, CanOperationError from .constants import * from .exceptions import UcanException from .structures import * @@ -104,6 +111,8 @@ def __init__(self, channel, can_filters=None, **kwargs): ) from exception self.channel = int(channel) + self._can_protocol = CanProtocol.CAN_20 + device_number = int(kwargs.get("device_number", ANY_MODULE)) # configuration options @@ -145,7 +154,11 @@ def __init__(self, channel, can_filters=None, **kwargs): self._is_filtered = False - super().__init__(channel=channel, can_filters=can_filters, **kwargs) + super().__init__( + channel=channel, + can_filters=can_filters, + **kwargs, + ) def _recv_internal(self, timeout): try: diff --git a/can/interfaces/udp_multicast/bus.py b/can/interfaces/udp_multicast/bus.py index 00cbd32c8..089b8182f 100644 --- a/can/interfaces/udp_multicast/bus.py +++ b/can/interfaces/udp_multicast/bus.py @@ -3,21 +3,23 @@ import select import socket import struct +import warnings +from typing import List, Optional, Tuple, Union + +import can +from can import BusABC, CanProtocol +from can.typechecking import AutoDetectedConfig + +from .utils import check_msgpack_installed, pack_message, unpack_message try: from fcntl import ioctl except ModuleNotFoundError: # Missing on Windows pass -from typing import List, Optional, Tuple, Union log = logging.getLogger(__name__) -import can -from can import BusABC -from can.typechecking import AutoDetectedConfig - -from .utils import check_msgpack_installed, pack_message, unpack_message # see socket.getaddrinfo() IPv4_ADDRESS_INFO = Tuple[str, int] # address, port @@ -102,10 +104,22 @@ def __init__( "receiving own messages is not yet implemented" ) - super().__init__(channel, **kwargs) + super().__init__( + channel, + **kwargs, + ) - self.is_fd = fd self._multicast = GeneralPurposeUdpMulticastBus(channel, port, hop_limit) + self._can_protocol = CanProtocol.CAN_FD if fd else CanProtocol.CAN_20 + + @property + def is_fd(self) -> bool: + warnings.warn( + "The UdpMulticastBus.is_fd property is deprecated and superseded by " + "BusABC.protocol. It is scheduled for removal in version 5.0.", + DeprecationWarning, + ) + return self._can_protocol is CanProtocol.CAN_FD def _recv_internal(self, timeout: Optional[float]): result = self._multicast.recv(timeout) @@ -122,13 +136,13 @@ def _recv_internal(self, timeout: Optional[float]): "could not unpack received message" ) from exception - if not self.is_fd and can_message.is_fd: + if self._can_protocol is not CanProtocol.CAN_FD and can_message.is_fd: return None, False return can_message, False def send(self, msg: can.Message, timeout: Optional[float] = None) -> None: - if not self.is_fd and msg.is_fd: + if self._can_protocol is not CanProtocol.CAN_FD and msg.is_fd: raise can.CanOperationError( "cannot send FD message over bus with CAN FD disabled" ) diff --git a/can/interfaces/usb2can/usb2canInterface.py b/can/interfaces/usb2can/usb2canInterface.py index 2c0a0d00f..c89e394df 100644 --- a/can/interfaces/usb2can/usb2canInterface.py +++ b/can/interfaces/usb2can/usb2canInterface.py @@ -6,7 +6,7 @@ from ctypes import byref from typing import Optional -from can import BusABC, CanInitializationError, CanOperationError, Message +from can import BusABC, CanInitializationError, CanOperationError, CanProtocol, Message from .serial_selector import find_serial_devices from .usb2canabstractionlayer import ( @@ -118,6 +118,7 @@ def __init__( baudrate = min(int(bitrate // 1000), 1000) self.channel_info = f"USB2CAN device {device_id}" + self._can_protocol = CanProtocol.CAN_20 connector = f"{device_id}; {baudrate}" self.handle = self.can.open(connector, flags) diff --git a/can/interfaces/vector/canlib.py b/can/interfaces/vector/canlib.py index 03a3d9b1f..f852fb0ec 100644 --- a/can/interfaces/vector/canlib.py +++ b/can/interfaces/vector/canlib.py @@ -11,6 +11,7 @@ import logging import os import time +import warnings from types import ModuleType from typing import ( Any, @@ -44,6 +45,7 @@ BusABC, CanInitializationError, CanInterfaceNotImplementedError, + CanProtocol, Message, ) from can.typechecking import AutoDetectedConfig, CanFilters @@ -202,11 +204,12 @@ def __init__( ) channel_configs = get_channel_configs() + is_fd = isinstance(timing, BitTimingFd) if timing else fd self.mask = 0 - self.fd = isinstance(timing, BitTimingFd) if timing else fd self.channel_masks: Dict[int, int] = {} self.index_to_channel: Dict[int, int] = {} + self._can_protocol = CanProtocol.CAN_FD if is_fd else CanProtocol.CAN_20 for channel in self.channels: channel_index = self._find_global_channel_idx( @@ -229,7 +232,7 @@ def __init__( interface_version = ( xldefine.XL_InterfaceVersion.XL_INTERFACE_VERSION_V4 - if self.fd + if is_fd else xldefine.XL_InterfaceVersion.XL_INTERFACE_VERSION ) @@ -324,7 +327,20 @@ def __init__( self._time_offset = 0.0 self._is_filtered = False - super().__init__(channel=channel, can_filters=can_filters, **kwargs) + super().__init__( + channel=channel, + can_filters=can_filters, + **kwargs, + ) + + @property + def fd(self) -> bool: + warnings.warn( + "The VectorBus.fd property is deprecated and superseded by " + "BusABC.protocol. It is scheduled for removal in version 5.0.", + DeprecationWarning, + ) + return self._can_protocol is CanProtocol.CAN_FD def _find_global_channel_idx( self, @@ -647,7 +663,7 @@ def _recv_internal( while True: try: - if self.fd: + if self._can_protocol is CanProtocol.CAN_FD: msg = self._recv_canfd() else: msg = self._recv_can() @@ -781,7 +797,7 @@ def send(self, msg: Message, timeout: Optional[float] = None) -> None: def _send_sequence(self, msgs: Sequence[Message]) -> int: """Send messages and return number of successful transmissions.""" - if self.fd: + if self._can_protocol is CanProtocol.CAN_FD: return self._send_can_fd_msg_sequence(msgs) else: return self._send_can_msg_sequence(msgs) diff --git a/can/interfaces/virtual.py b/can/interfaces/virtual.py index 3eaefc230..62ad0cfe3 100644 --- a/can/interfaces/virtual.py +++ b/can/interfaces/virtual.py @@ -15,7 +15,7 @@ from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple from can import CanOperationError -from can.bus import BusABC +from can.bus import BusABC, CanProtocol from can.message import Message from can.typechecking import AutoDetectedConfig @@ -33,7 +33,8 @@ class VirtualBus(BusABC): """ - A virtual CAN bus using an internal message queue. It can be used for example for testing. + A virtual CAN bus using an internal message queue. It can be used for + example for testing. In this interface, a channel is an arbitrary object used as an identifier for connected buses. @@ -48,9 +49,11 @@ class VirtualBus(BusABC): if a message is sent to 5 receivers with the timeout set to 1.0. .. warning:: - This interface guarantees reliable delivery and message ordering, but does *not* implement rate - limiting or ID arbitration/prioritization under high loads. Please refer to the section - :ref:`virtual_interfaces_doc` for more information on this and a comparison to alternatives. + This interface guarantees reliable delivery and message ordering, but + does *not* implement rate limiting or ID arbitration/prioritization + under high loads. Please refer to the section + :ref:`virtual_interfaces_doc` for more information on this and a + comparison to alternatives. """ def __init__( @@ -59,14 +62,45 @@ def __init__( receive_own_messages: bool = False, rx_queue_size: int = 0, preserve_timestamps: bool = False, + protocol: CanProtocol = CanProtocol.CAN_20, **kwargs: Any, ) -> None: + """ + The constructed instance has access to the bus identified by the + channel parameter. It is able to see all messages transmitted on the + bus by virtual instances constructed with the same channel identifier. + + :param channel: The channel identifier. This parameter can be an + arbitrary value. The bus instance will be able to see messages + from other virtual bus instances that were created with the same + value. + :param receive_own_messages: If set to True, sent messages will be + reflected back on the input queue. + :param rx_queue_size: The size of the reception queue. The reception + queue stores messages until they are read. If the queue reaches + its capacity, it will start dropping the oldest messages to make + room for new ones. If set to 0, the queue has an infinite capacity. + Be aware that this can cause memory leaks if messages are read + with a lower frequency than they arrive on the bus. + :param preserve_timestamps: If set to True, messages transmitted via + :func:`~can.BusABC.send` will keep the timestamp set in the + :class:`~can.Message` instance. Otherwise, the timestamp value + will be replaced with the current system time. + :param protocol: The protocol implemented by this bus instance. The + value does not affect the operation of the bus instance and can + be set to an arbitrary value for testing purposes. + :param kwargs: Additional keyword arguments passed to the parent + constructor. + """ super().__init__( - channel=channel, receive_own_messages=receive_own_messages, **kwargs + channel=channel, + receive_own_messages=receive_own_messages, + **kwargs, ) # the channel identifier may be an arbitrary object self.channel_id = channel + self._can_protocol = protocol self.channel_info = f"Virtual bus channel {self.channel_id}" self.receive_own_messages = receive_own_messages self.preserve_timestamps = preserve_timestamps diff --git a/doc/bus.rst b/doc/bus.rst index 51ed0220b..822397140 100644 --- a/doc/bus.rst +++ b/doc/bus.rst @@ -87,6 +87,10 @@ Bus API :members: :undoc-members: +.. autoclass:: can.bus.CanProtocol + :members: + :undoc-members: + Thread safe bus ''''''''''''''' diff --git a/test/serial_test.py b/test/serial_test.py index d020d7232..5fa90704b 100644 --- a/test/serial_test.py +++ b/test/serial_test.py @@ -50,6 +50,9 @@ def __init__(self): self, allowed_timestamp_delta=None, preserves_channel=True ) + def test_can_protocol(self): + self.assertEqual(self.bus.protocol, can.CanProtocol.CAN_20) + def test_rx_tx_min_max_data(self): """ Tests the transfer from 0x00 to 0xFF for a 1 byte payload diff --git a/test/test_cantact.py b/test/test_cantact.py index 2cc3e479c..f90655ae5 100644 --- a/test/test_cantact.py +++ b/test/test_cantact.py @@ -14,6 +14,8 @@ class CantactTest(unittest.TestCase): def test_bus_creation(self): bus = can.Bus(channel=0, interface="cantact", _testing=True) self.assertIsInstance(bus, cantact.CantactBus) + self.assertEqual(bus.protocol, can.CanProtocol.CAN_20) + cantact.MockInterface.set_bitrate.assert_called() cantact.MockInterface.set_bit_timing.assert_not_called() cantact.MockInterface.set_enabled.assert_called() @@ -25,7 +27,10 @@ def test_bus_creation_bittiming(self): bt = can.BitTiming(f_clock=24_000_000, brp=3, tseg1=13, tseg2=2, sjw=1) bus = can.Bus(channel=0, interface="cantact", timing=bt, _testing=True) + self.assertIsInstance(bus, cantact.CantactBus) + self.assertEqual(bus.protocol, can.CanProtocol.CAN_20) + cantact.MockInterface.set_bitrate.assert_not_called() cantact.MockInterface.set_bit_timing.assert_called() cantact.MockInterface.set_enabled.assert_called() diff --git a/test/test_interface_canalystii.py b/test/test_interface_canalystii.py index 0a87f40f9..65d9ee74b 100755 --- a/test/test_interface_canalystii.py +++ b/test/test_interface_canalystii.py @@ -22,6 +22,9 @@ def test_initialize_from_constructor(self): with create_mock_device() as mock_device: instance = mock_device.return_value bus = CANalystIIBus(bitrate=1000000) + + self.assertEqual(bus.protocol, can.CanProtocol.CAN_20) + instance.init.assert_has_calls( [ call(0, bitrate=1000000), @@ -34,6 +37,8 @@ def test_initialize_single_channel_only(self): with create_mock_device() as mock_device: instance = mock_device.return_value bus = CANalystIIBus(channel, bitrate=1000000) + + self.assertEqual(bus.protocol, can.CanProtocol.CAN_20) instance.init.assert_called_once_with(channel, bitrate=1000000) def test_initialize_with_timing_registers(self): @@ -43,6 +48,8 @@ def test_initialize_with_timing_registers(self): f_clock=8_000_000, btr0=0x03, btr1=0x6F ) bus = CANalystIIBus(bitrate=None, timing=timing) + self.assertEqual(bus.protocol, can.CanProtocol.CAN_20) + instance.init.assert_has_calls( [ call(0, timing0=0x03, timing1=0x6F), diff --git a/test/test_kvaser.py b/test/test_kvaser.py index 6e7ccea38..043f86f8c 100644 --- a/test/test_kvaser.py +++ b/test/test_kvaser.py @@ -45,6 +45,7 @@ def tearDown(self): def test_bus_creation(self): self.assertIsInstance(self.bus, canlib.KvaserBus) + self.assertEqual(self.bus.protocol, can.CanProtocol.CAN_20) self.assertTrue(canlib.canOpenChannel.called) self.assertTrue(canlib.canBusOn.called) @@ -148,7 +149,8 @@ def test_available_configs(self): def test_canfd_default_data_bitrate(self): canlib.canSetBusParams.reset_mock() canlib.canSetBusParamsFd.reset_mock() - can.Bus(channel=0, interface="kvaser", fd=True) + bus = can.Bus(channel=0, interface="kvaser", fd=True) + self.assertEqual(bus.protocol, can.CanProtocol.CAN_FD) canlib.canSetBusParams.assert_called_once_with( 0, constants.canFD_BITRATE_500K_80P, 0, 0, 0, 0, 0 ) @@ -160,7 +162,8 @@ def test_canfd_nondefault_data_bitrate(self): canlib.canSetBusParams.reset_mock() canlib.canSetBusParamsFd.reset_mock() data_bitrate = 2000000 - can.Bus(channel=0, interface="kvaser", fd=True, data_bitrate=data_bitrate) + bus = can.Bus(channel=0, interface="kvaser", fd=True, data_bitrate=data_bitrate) + self.assertEqual(bus.protocol, can.CanProtocol.CAN_FD) bitrate_constant = canlib.BITRATE_FD[data_bitrate] canlib.canSetBusParams.assert_called_once_with( 0, constants.canFD_BITRATE_500K_80P, 0, 0, 0, 0, 0 diff --git a/test/test_neousys.py b/test/test_neousys.py index 080278d13..69c818869 100644 --- a/test/test_neousys.py +++ b/test/test_neousys.py @@ -36,6 +36,7 @@ def tearDown(self) -> None: def test_bus_creation(self) -> None: self.assertIsInstance(self.bus, neousys.NeousysBus) + self.assertEqual(self.bus.protocol, can.CanProtocol.CAN_20) neousys.NEOUSYS_CANLIB.CAN_Setup.assert_called() neousys.NEOUSYS_CANLIB.CAN_Start.assert_called() neousys.NEOUSYS_CANLIB.CAN_RegisterReceived.assert_called() @@ -62,6 +63,8 @@ def test_bus_creation(self) -> None: def test_bus_creation_bitrate(self) -> None: self.bus = can.Bus(channel=0, interface="neousys", bitrate=200000) self.assertIsInstance(self.bus, neousys.NeousysBus) + self.assertEqual(self.bus.protocol, can.CanProtocol.CAN_20) + CAN_Start_args = ( can.interfaces.neousys.neousys.NEOUSYS_CANLIB.CAN_Setup.call_args[0] ) diff --git a/test/test_pcan.py b/test/test_pcan.py index 93a5f5ff4..19fa44dc7 100644 --- a/test/test_pcan.py +++ b/test/test_pcan.py @@ -11,7 +11,7 @@ from parameterized import parameterized import can -from can.bus import BusState +from can import BusState, CanProtocol from can.exceptions import CanInitializationError from can.interfaces.pcan import PcanBus, PcanError from can.interfaces.pcan.basic import * @@ -52,8 +52,10 @@ def test_bus_creation(self) -> None: self.bus = can.Bus(interface="pcan") self.assertIsInstance(self.bus, PcanBus) - self.MockPCANBasic.assert_called_once() + self.assertEqual(self.bus.protocol, CanProtocol.CAN_20) + self.assertFalse(self.bus.fd) + self.MockPCANBasic.assert_called_once() self.mock_pcan.Initialize.assert_called_once() self.mock_pcan.InitializeFD.assert_not_called() @@ -79,6 +81,9 @@ def test_bus_creation_fd(self, clock_param: str, clock_val: int) -> None: ) self.assertIsInstance(self.bus, PcanBus) + self.assertEqual(self.bus.protocol, CanProtocol.CAN_FD) + self.assertTrue(self.bus.fd) + self.MockPCANBasic.assert_called_once() self.mock_pcan.Initialize.assert_not_called() self.mock_pcan.InitializeFD.assert_called_once() @@ -451,10 +456,11 @@ def test_peak_fd_bus_constructor_regression(self): def test_constructor_bit_timing(self): timing = can.BitTiming.from_registers(f_clock=8_000_000, btr0=0x47, btr1=0x2F) - can.Bus(interface="pcan", channel="PCAN_USBBUS1", timing=timing) + bus = can.Bus(interface="pcan", channel="PCAN_USBBUS1", timing=timing) bitrate_arg = self.mock_pcan.Initialize.call_args[0][1] self.assertEqual(bitrate_arg.value, 0x472F) + self.assertEqual(bus.protocol, CanProtocol.CAN_20) def test_constructor_bit_timing_fd(self): timing = can.BitTimingFd( @@ -468,7 +474,8 @@ def test_constructor_bit_timing_fd(self): data_tseg2=6, data_sjw=1, ) - can.Bus(interface="pcan", channel="PCAN_USBBUS1", timing=timing) + bus = can.Bus(interface="pcan", channel="PCAN_USBBUS1", timing=timing) + self.assertEqual(bus.protocol, CanProtocol.CAN_FD) bitrate_arg = self.mock_pcan.InitializeFD.call_args[0][-1] diff --git a/test/test_robotell.py b/test/test_robotell.py index c0658ef2c..f95139917 100644 --- a/test/test_robotell.py +++ b/test/test_robotell.py @@ -15,6 +15,9 @@ def setUp(self): def tearDown(self): self.bus.shutdown() + def test_protocol(self): + self.assertEqual(self.bus.protocol, can.CanProtocol.CAN_20) + def test_recv_extended(self): self.serial.write( bytearray( diff --git a/test/test_socketcan.py b/test/test_socketcan.py index 90a143a36..f756cb93a 100644 --- a/test/test_socketcan.py +++ b/test/test_socketcan.py @@ -9,6 +9,8 @@ import warnings from unittest.mock import patch +from .config import TEST_INTERFACE_SOCKETCAN + import can from can.interfaces.socketcan.constants import ( CAN_BCM_TX_DELETE, @@ -357,6 +359,16 @@ def test_build_bcm_update_header(self): self.assertEqual(can_id, result.can_id) self.assertEqual(1, result.nframes) + @unittest.skipUnless(TEST_INTERFACE_SOCKETCAN, "Only run when vcan0 is available") + def test_bus_creation_can(self): + bus = can.Bus(interface="socketcan", channel="vcan0", fd=False) + self.assertEqual(bus.protocol, can.CanProtocol.CAN_20) + + @unittest.skipUnless(TEST_INTERFACE_SOCKETCAN, "Only run when vcan0 is available") + def test_bus_creation_can_fd(self): + bus = can.Bus(interface="socketcan", channel="vcan0", fd=True) + self.assertEqual(bus.protocol, can.CanProtocol.CAN_FD) + @unittest.skipUnless(IS_LINUX and IS_PYPY, "Only test when run on Linux with PyPy") def test_pypy_socketcan_support(self): """Wait for PyPy raw CAN socket support diff --git a/test/test_systec.py b/test/test_systec.py index 7495f75eb..86ed31362 100644 --- a/test/test_systec.py +++ b/test/test_systec.py @@ -36,6 +36,8 @@ def setUp(self): def test_bus_creation(self): self.assertIsInstance(self.bus, ucanbus.UcanBus) + self.assertEqual(self.bus.protocol, can.CanProtocol.CAN_20) + self.assertTrue(ucan.UcanInitHwConnectControlEx.called) self.assertTrue( ucan.UcanInitHardwareEx.called or ucan.UcanInitHardwareEx2.called diff --git a/test/test_vector.py b/test/test_vector.py index b6a0632a8..93aba9c7b 100644 --- a/test/test_vector.py +++ b/test/test_vector.py @@ -81,6 +81,8 @@ def mock_xldriver() -> None: def test_bus_creation_mocked(mock_xldriver) -> None: bus = can.Bus(channel=0, interface="vector", _testing=True) assert isinstance(bus, canlib.VectorBus) + assert bus.protocol == can.CanProtocol.CAN_20 + can.interfaces.vector.canlib.xldriver.xlOpenDriver.assert_called() can.interfaces.vector.canlib.xldriver.xlGetApplConfig.assert_called() @@ -97,6 +99,8 @@ def test_bus_creation_mocked(mock_xldriver) -> None: def test_bus_creation() -> None: bus = can.Bus(channel=0, serial=_find_virtual_can_serial(), interface="vector") assert isinstance(bus, canlib.VectorBus) + assert bus.protocol == can.CanProtocol.CAN_20 + bus.shutdown() xl_channel_config = _find_xl_channel_config( @@ -110,12 +114,15 @@ def test_bus_creation() -> None: bus = canlib.VectorBus(channel=0, serial=_find_virtual_can_serial()) assert isinstance(bus, canlib.VectorBus) + assert bus.protocol == can.CanProtocol.CAN_20 bus.shutdown() def test_bus_creation_bitrate_mocked(mock_xldriver) -> None: bus = can.Bus(channel=0, interface="vector", bitrate=200_000, _testing=True) assert isinstance(bus, canlib.VectorBus) + assert bus.protocol == can.CanProtocol.CAN_20 + can.interfaces.vector.canlib.xldriver.xlOpenDriver.assert_called() can.interfaces.vector.canlib.xldriver.xlGetApplConfig.assert_called() @@ -141,6 +148,7 @@ def test_bus_creation_bitrate() -> None: bitrate=200_000, ) assert isinstance(bus, canlib.VectorBus) + assert bus.protocol == can.CanProtocol.CAN_20 xl_channel_config = _find_xl_channel_config( serial=_find_virtual_can_serial(), channel=0 @@ -153,6 +161,8 @@ def test_bus_creation_bitrate() -> None: def test_bus_creation_fd_mocked(mock_xldriver) -> None: bus = can.Bus(channel=0, interface="vector", fd=True, _testing=True) assert isinstance(bus, canlib.VectorBus) + assert bus.protocol == can.CanProtocol.CAN_FD + can.interfaces.vector.canlib.xldriver.xlOpenDriver.assert_called() can.interfaces.vector.canlib.xldriver.xlGetApplConfig.assert_called() @@ -173,6 +183,7 @@ def test_bus_creation_fd() -> None: channel=0, serial=_find_virtual_can_serial(), interface="vector", fd=True ) assert isinstance(bus, canlib.VectorBus) + assert bus.protocol == can.CanProtocol.CAN_FD xl_channel_config = _find_xl_channel_config( serial=_find_virtual_can_serial(), channel=0 @@ -204,6 +215,8 @@ def test_bus_creation_fd_bitrate_timings_mocked(mock_xldriver) -> None: _testing=True, ) assert isinstance(bus, canlib.VectorBus) + assert bus.protocol == can.CanProtocol.CAN_FD + can.interfaces.vector.canlib.xldriver.xlOpenDriver.assert_called() can.interfaces.vector.canlib.xldriver.xlGetApplConfig.assert_called() @@ -346,6 +359,7 @@ def test_bus_creation_timing() -> None: timing=timing, ) assert isinstance(bus, canlib.VectorBus) + assert bus.protocol == can.CanProtocol.CAN_20 xl_channel_config = _find_xl_channel_config( serial=_find_virtual_can_serial(), channel=0 @@ -377,6 +391,8 @@ def test_bus_creation_timingfd_mocked(mock_xldriver) -> None: _testing=True, ) assert isinstance(bus, canlib.VectorBus) + assert bus.protocol == can.CanProtocol.CAN_FD + can.interfaces.vector.canlib.xldriver.xlOpenDriver.assert_called() can.interfaces.vector.canlib.xldriver.xlGetApplConfig.assert_called() @@ -425,6 +441,8 @@ def test_bus_creation_timingfd() -> None: timing=timing, ) + assert bus.protocol == can.CanProtocol.CAN_FD + xl_channel_config = _find_xl_channel_config( serial=_find_virtual_can_serial(), channel=0 )