Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 61 additions & 34 deletions can/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
CyclicSendTasks.
"""

import concurrent.futures.thread
import importlib
import logging
from collections.abc import Iterable
from collections.abc import Callable, Iterable
from typing import Any, Optional, Union, cast

from . import util
Expand Down Expand Up @@ -140,6 +141,7 @@ def Bus( # noqa: N802

def detect_available_configs(
interfaces: Union[None, str, Iterable[str]] = None,
timeout: float = 5.0,
) -> list[AutoDetectedConfig]:
"""Detect all configurations/channels that the interfaces could
currently connect with.
Expand All @@ -148,59 +150,84 @@ def detect_available_configs(

Automated configuration detection may not be implemented by
every interface on every platform. This method will not raise
an error in that case, but with rather return an empty list
an error in that case, but will rather return an empty list
for that interface.

:param interfaces: either
- the name of an interface to be searched in as a string,
- an iterable of interface names to search in, or
- `None` to search in all known interfaces.
:param timeout: maximum number of seconds to wait for all interface
detection tasks to complete. If exceeded, any pending tasks
will be cancelled, a warning will be logged, and the method
will return results gathered so far.
:rtype: list[dict]
:return: an iterable of dicts, each suitable for usage in
the constructor of :class:`can.BusABC`.
the constructor of :class:`can.BusABC`. Interfaces that
timed out will be logged as warnings and excluded.
"""

# Figure out where to search
# Determine which interfaces to search
if interfaces is None:
interfaces = BACKENDS
elif isinstance(interfaces, str):
interfaces = (interfaces,)
# else it is supposed to be an iterable of strings
# otherwise assume iterable of strings

result = []
for interface in interfaces:
# Collect detection callbacks
callbacks: dict[str, Callable[[], list[AutoDetectedConfig]]] = {}
for interface_keyword in interfaces:
try:
bus_class = _get_class_for_interface(interface)
bus_class = _get_class_for_interface(interface_keyword)
callbacks[interface_keyword] = (
bus_class._detect_available_configs # pylint: disable=protected-access
)
except CanInterfaceNotImplementedError:
log_autodetect.debug(
'interface "%s" cannot be loaded for detection of available configurations',
interface,
interface_keyword,
)
continue

# get available channels
try:
available = list(
bus_class._detect_available_configs() # pylint: disable=protected-access
)
except NotImplementedError:
log_autodetect.debug(
'interface "%s" does not support detection of available configurations',
interface,
)
else:
log_autodetect.debug(
'interface "%s" detected %i available configurations',
interface,
len(available),
)

# add the interface name to the configs if it is not already present
for config in available:
if "interface" not in config:
config["interface"] = interface

# append to result
result += available
result: list[AutoDetectedConfig] = []

# Use manual executor to allow shutdown without waiting
executor = concurrent.futures.ThreadPoolExecutor()
try:
futures_to_keyword = {
executor.submit(func): kw for kw, func in callbacks.items()
}
done, not_done = concurrent.futures.wait(
futures_to_keyword,
timeout=timeout,
return_when=concurrent.futures.ALL_COMPLETED,
)
# Log timed-out tasks
if not_done:
log_autodetect.warning(
"Timeout (%.2fs) reached for interfaces: %s",
timeout,
", ".join(sorted(futures_to_keyword[fut] for fut in not_done)),
)
# Process completed futures
for future in done:
keyword = futures_to_keyword[future]
try:
available = future.result()
except NotImplementedError:
log_autodetect.debug(
'interface "%s" does not support detection of available configurations',
keyword,
)
else:
log_autodetect.debug(
'interface "%s" detected %i available configurations',
keyword,
len(available),
)
for config in available:
config.setdefault("interface", keyword)
result.extend(available)
finally:
# shutdown immediately, do not wait for pending threads
executor.shutdown(wait=False, cancel_futures=True)
return result
2 changes: 2 additions & 0 deletions can/interfaces/usb2can/serial_selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
log = logging.getLogger("can.usb2can")

try:
import pythoncom
import win32com.client
except ImportError:
log.warning(
Expand Down Expand Up @@ -50,6 +51,7 @@ def find_serial_devices(serial_matcher: str = "") -> list[str]:
only device IDs starting with this string are returned
"""
serial_numbers = []
pythoncom.CoInitialize()
wmi = win32com.client.GetObject("winmgmts:")
for usb_controller in wmi.InstancesOf("Win32_USBControllerDevice"):
usb_device = wmi.Get(usb_controller.Dependent)
Expand Down
2 changes: 1 addition & 1 deletion doc/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@
]

# mock windows specific attributes
autodoc_mock_imports = ["win32com"]
autodoc_mock_imports = ["win32com", "pythoncom"]
ctypes.windll = MagicMock()
ctypesutil.HRESULT = ctypes.c_long

Expand Down
2 changes: 1 addition & 1 deletion doc/interfaces/gs_usb.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ Geschwister Schneider and candleLight
Windows/Linux/Mac CAN driver based on usbfs or WinUSB WCID for Geschwister Schneider USB/CAN devices
and candleLight USB CAN interfaces.

Install: ``pip install "python-can[gs_usb]"``
Install: ``pip install "python-can[gs-usb]"``

Usage: pass device ``index`` or ``channel`` (starting from 0) if using automatic device detection:

Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ docs = [
"furo",
]
lint = [
"pylint==3.2.*",
"pylint==3.3.*",
"ruff==0.11.12",
"black==25.1.*",
"mypy==1.16.*",
Expand Down Expand Up @@ -205,6 +205,7 @@ disable = [
"too-many-branches",
"too-many-instance-attributes",
"too-many-locals",
"too-many-positional-arguments",
"too-many-public-methods",
"too-many-statements",
]