diff --git a/can/bus.py b/can/bus.py index b9ccfcfad..2f8c1d7ee 100644 --- a/can/bus.py +++ b/can/bus.py @@ -450,6 +450,14 @@ def _detect_available_configs() -> List[can.typechecking.AutoDetectedConfig]: def fileno(self) -> int: raise NotImplementedError("fileno is not implemented using current CAN bus") + @classmethod + def list_adapters(cls) -> List[Any]: + """Lists all adapters for this interface. The adapter identifier can be used to open a specific adapter. + + MAY NOT BE IMPLEMENTED BY ALL INTERFACES + """ + raise NotImplementedError() + class _SelfRemovingCyclicTask(CyclicSendTaskABC, ABC): """Removes itself from a bus. diff --git a/can/ctypesutil.py b/can/ctypesutil.py index 7c1e1f573..8d52f8603 100644 --- a/can/ctypesutil.py +++ b/can/ctypesutil.py @@ -79,7 +79,7 @@ def map_symbol( if sys.platform == "win32": HRESULT = ctypes.HRESULT -elif sys.platform == "cygwin": +else: class HRESULT(ctypes.c_long): pass diff --git a/can/interfaces/ixxat/__init__.py b/can/interfaces/ixxat/__init__.py index fc2cae0f3..be55c0d49 100644 --- a/can/interfaces/ixxat/__init__.py +++ b/can/interfaces/ixxat/__init__.py @@ -1,7 +1,8 @@ """ -Ctypes wrapper module for IXXAT Virtual CAN Interface V3 on win32 systems +Ctypes wrapper module for IXXAT Virtual CAN Interface on win32 systems Copyright (C) 2016 Giuseppe Corbelli +Copyright (c) 2021 Marcel Kanter """ -from can.interfaces.ixxat.canlib import IXXATBus, get_ixxat_hwids +from can.interfaces.ixxat.canlib import IXXATBus diff --git a/can/interfaces/ixxat/canlib.py b/can/interfaces/ixxat/canlib.py index ced840ff3..e33abf5b8 100644 --- a/can/interfaces/ixxat/canlib.py +++ b/can/interfaces/ixxat/canlib.py @@ -21,7 +21,7 @@ LimitedDurationCyclicSendTaskABC, RestartableCyclicTaskABC, ) -from can.ctypesutil import CLibrary, HANDLE, PHANDLE, HRESULT as ctypes_HRESULT +from can.ctypesutil import CLibrary, HANDLE, PHANDLE, HRESULT from . import constants, structures from .exceptions import * @@ -136,7 +136,7 @@ def __check_status(result, function, arguments): # void VCIAPI vciFormatError (HRESULT hrError, PCHAR pszText, UINT32 dwsize); _canlib.map_symbol( - "vciFormatError", None, (ctypes_HRESULT, ctypes.c_char_p, ctypes.c_uint32) + "vciFormatError", None, (HRESULT, ctypes.c_char_p, ctypes.c_uint32) ) # Hack to have vciFormatError as a free function vciFormatError = functools.partial(__vciFormatError, _canlib) @@ -408,7 +408,7 @@ class IXXATBus(BusABC): }, } - def __init__(self, channel, can_filters=None, **kwargs): + def __init__(self, channel=0, can_filters=None, **kwargs): """ :param int channel: The Channel id to create this bus with. @@ -416,14 +416,20 @@ def __init__(self, channel, can_filters=None, **kwargs): :param list can_filters: See :meth:`can.BusABC.set_filters`. - :param bool receive_own_messages: - Enable self-reception of sent messages. - - :param int UniqueHardwareId: - UniqueHardwareId to connect (optional, will use the first found if not supplied) - :param int bitrate: Channel bitrate in bit/s + + :param String adapter: + adapter to connect (optional, will use the first found if not supplied) + + :param int rxFifoSize: + Set the receive FIFO size to this value. + + :param int txFifoSize: + Set the transmit FIFO size to this value. + + :param bool receive_own_messages: + Enable self-reception of sent messages. """ if _canlib is None: raise CanInterfaceNotImplementedError( @@ -433,7 +439,7 @@ def __init__(self, channel, can_filters=None, **kwargs): log.info("Got configuration of: %s", kwargs) # Configuration options bitrate = kwargs.get("bitrate", 500000) - UniqueHardwareId = kwargs.get("UniqueHardwareId", None) + adapter = kwargs.get("adapter", None) rxFifoSize = kwargs.get("rxFifoSize", 16) txFifoSize = kwargs.get("txFifoSize", 16) self._receive_own_messages = kwargs.get("receive_own_messages", False) @@ -461,10 +467,10 @@ def __init__(self, channel, can_filters=None, **kwargs): self._payload = (ctypes.c_byte * 8)() # Search for supplied device - if UniqueHardwareId is None: - log.info("Searching for first available device") + if adapter is None: + log.info("Searching for first available adapter") else: - log.info("Searching for unique HW ID %s", UniqueHardwareId) + log.info("Searching for adapter %s", adapter) _canlib.vciEnumDeviceOpen(ctypes.byref(self._device_handle)) while True: try: @@ -472,20 +478,19 @@ def __init__(self, channel, can_filters=None, **kwargs): self._device_handle, ctypes.byref(self._device_info) ) except StopIteration: - if UniqueHardwareId is None: + if adapter is None: raise VCIDeviceNotFoundError( "No IXXAT device(s) connected or device(s) in use by other process(es)." ) else: raise VCIDeviceNotFoundError( "Unique HW ID {} not connected or not available.".format( - UniqueHardwareId + adapter ) ) else: - if (UniqueHardwareId is None) or ( - self._device_info.UniqueHardwareId.AsChar - == bytes(UniqueHardwareId, "ascii") + if (adapter is None) or ( + self._device_info.UniqueHardwareId.AsChar == bytes(adapter, "ascii") ): break else: @@ -762,6 +767,30 @@ def shutdown(self): _canlib.canControlClose(self._control_handle) _canlib.vciDeviceClose(self._device_handle) + @classmethod + def list_adapters(cls): + """Get a list of hardware ids of all available IXXAT adapters.""" + adapters = [] + device_handle = HANDLE() + device_info = structures.VCIDEVICEINFO() + + if _canlib is None: + raise CanInterfaceNotImplementedError( + "The IXXAT VCI library has not been initialized. Check the logs for more details." + ) + + _canlib.vciEnumDeviceOpen(ctypes.byref(device_handle)) + while True: + try: + _canlib.vciEnumDeviceNext(device_handle, ctypes.byref(device_info)) + except StopIteration: + break + else: + adapters.append(device_info.UniqueHardwareId.AsChar.decode("ascii")) + _canlib.vciEnumDeviceClose(device_handle) + + return adapters + class CyclicSendTask(LimitedDurationCyclicSendTaskABC, RestartableCyclicTaskABC): """A message in the cyclic transmit list.""" @@ -827,22 +856,3 @@ def _format_can_status(status_flags: int): return "CAN status message: {}".format(", ".join(states)) else: return "Empty CAN status message" - - -def get_ixxat_hwids(): - """Get a list of hardware ids of all available IXXAT devices.""" - hwids = [] - device_handle = HANDLE() - device_info = structures.VCIDEVICEINFO() - - _canlib.vciEnumDeviceOpen(ctypes.byref(device_handle)) - while True: - try: - _canlib.vciEnumDeviceNext(device_handle, ctypes.byref(device_info)) - except StopIteration: - break - else: - hwids.append(device_info.UniqueHardwareId.AsChar.decode("ascii")) - _canlib.vciEnumDeviceClose(device_handle) - - return hwids diff --git a/doc/interfaces/ixxat.rst b/doc/interfaces/ixxat.rst index 929221733..8ff1765e7 100644 --- a/doc/interfaces/ixxat.rst +++ b/doc/interfaces/ixxat.rst @@ -3,7 +3,7 @@ IXXAT Virtual CAN Interface =========================== -Interface to `IXXAT `__ Virtual CAN Interface V3 SDK. Works on Windows. +Interface to `IXXAT `__ Virtual CAN Interface SDK. Works on Windows. The Linux ECI SDK is currently unsupported, however on Linux some devices are supported with :doc:`socketcan`. @@ -26,21 +26,21 @@ Bus Configuration file ------------------ + The simplest configuration file would be:: [default] interface = ixxat channel = 0 -Python-can will search for the first IXXAT device available and open the first channel. +Python-can will search for the first IXXAT adapter available and open the first channel. ``interface`` and ``channel`` parameters are interpreted by frontend ``can.interfaces.interface`` module, while the following parameters are optional and are interpreted by IXXAT implementation. * ``bitrate`` (default 500000) Channel bitrate -* ``UniqueHardwareId`` (default first device) Unique hardware ID of the IXXAT device +* ``adapter`` (default first adapter) Unique hardware ID of the IXXAT device * ``rxFifoSize`` (default 16) Number of RX mailboxes * ``txFifoSize`` (default 16) Number of TX mailboxes -* ``extended`` (default False) Allow usage of extended IDs Filtering @@ -55,14 +55,15 @@ VCI documentation, section "Message filters" for more info. List available devices ---------------------- -In case you have connected multiple IXXAT devices, you have to select them by using their unique hardware id. -To get a list of all connected IXXAT you can use the function ``get_ixxat_hwids()`` as demonstrated below: - - >>> from can.interfaces.ixxat import get_ixxat_hwids - >>> for hwid in get_ixxat_hwids(): - ... print("Found IXXAT with hardware id '%s'." % hwid) - Found IXXAT with hardware id 'HW441489'. - Found IXXAT with hardware id 'HW107422'. + +In case you have connected multiple IXXAT adapters, you have to select them by using their unique hardware id. +To get a list of all connected IXXAT adapters you can use the function ``list_adapters()`` as demonstrated below: + + >>> from can.interfaces.ixxat import IXXATBus + >>> for hwid in IXXATBus.list_adapters(): + ... print("Found IXXAT adapter with hardware id '%s'." % hwid) + Found IXXAT adapter with hardware id 'HW441489'. + Found IXXAT adapter with hardware id 'HW107422'. Internals diff --git a/test/test_interface_ixxat.py b/test/test_interface_ixxat.py index 55618c769..34f09ff93 100644 --- a/test/test_interface_ixxat.py +++ b/test/test_interface_ixxat.py @@ -1,12 +1,15 @@ """ Unittest for ixxat interface. -Run only this test: -python setup.py test --addopts "--verbose -s test/test_interface_ixxat.py" +Copyright (c) 2020, 2021 Marcel Kanter """ import unittest import can +import sys + +from can.interfaces.ixxat import IXXATBus +from can.exceptions import CanInterfaceNotImplementedError class SoftwareTestCase(unittest.TestCase): @@ -15,10 +18,7 @@ class SoftwareTestCase(unittest.TestCase): """ def setUp(self): - try: - bus = can.Bus(interface="ixxat", channel=0) - bus.shutdown() - except can.CanInterfaceNotImplementedError: + if sys.platform != "win32" and sys.platform != "cygwin": raise unittest.SkipTest("not available on this platform") def test_bus_creation(self): @@ -34,28 +34,60 @@ def test_bus_creation(self): with self.assertRaises(ValueError): can.Bus(interface="ixxat", channel=0, txFifoSize=0) + # non-existent channel (use arbitrary high value) + with self.assertRaises(can.CanInitializationError): + can.Bus(interface="ixxat", channel=0xFFFF) + + def test_adapter_enumeration(self): + # Enumeration of adapters should always work (if the driver is installed) and the result should support len and be iterable + try: + adapters = IXXATBus.list_adapters() + except CanInterfaceNotImplementedError: + raise unittest.SkipTest("Maybe the driver is not installed.") + + n = 0 + for adapter in adapters: + n += 1 + self.assertEqual(len(adapters), n) + class HardwareTestCase(unittest.TestCase): """ Test cases that rely on an existing/connected hardware. + THEY NEED TO BE EXECUTED WITH AT LEAST ONE CONNECTED ADAPTER! """ def setUp(self): - try: - bus = can.Bus(interface="ixxat", channel=0) - bus.shutdown() - except can.CanInterfaceNotImplementedError: + if sys.platform != "win32" and sys.platform != "cygwin": raise unittest.SkipTest("not available on this platform") def test_bus_creation(self): - # non-existent channel -> use arbitrary high value - with self.assertRaises(can.CanInitializationError): - can.Bus(interface="ixxat", channel=0xFFFF) + # Test the enumeration of all adapters by opening and closing each adapter + try: + adapters = IXXATBus.list_adapters() + except CanInterfaceNotImplementedError: + raise unittest.SkipTest("Maybe the driver is not installed.") + + for adapter in adapters: + bus = can.Bus(interface="ixxat", adapter=adapter) + bus.shutdown() def test_send_after_shutdown(self): - with can.Bus(interface="ixxat", channel=0) as bus: - with self.assertRaises(can.CanOperationError): - bus.send(can.Message(arbitration_id=0x3FF, dlc=0)) + # At least one adapter is needed, skip the test if none can be found + try: + adapters = IXXATBus.list_adapters() + except CanInterfaceNotImplementedError: + raise unittest.SkipTest("Maybe the driver is not installed.") + + if len(adapters) == 0: + raise unittest.SkipTest("No adapters found") + + bus = can.Bus(interface="ixxat", channel=0) + msg = can.Message(arbitration_id=0x3FF, dlc=0) + # Intentionally close the bus now and try to send afterwards. This should lead to an CanOperationError + bus.shutdown() + with self.assertRaises(can.CanOperationError): + bus.send(msg) if __name__ == "__main__":