diff --git a/can/bit_timing.py b/can/bit_timing.py index 4b0074472..2bb04bfbe 100644 --- a/can/bit_timing.py +++ b/can/bit_timing.py @@ -7,7 +7,7 @@ from can.typechecking import BitTimingDict, BitTimingFdDict -class BitTiming(Mapping): +class BitTiming(Mapping[str, int]): """Representation of a bit timing configuration for a CAN 2.0 bus. The class can be constructed in multiple ways, depending on the information @@ -477,7 +477,7 @@ def __hash__(self) -> int: return tuple(self._data.values()).__hash__() -class BitTimingFd(Mapping): +class BitTimingFd(Mapping[str, int]): """Representation of a bit timing configuration for a CAN FD bus. The class can be constructed in multiple ways, depending on the information diff --git a/can/bus.py b/can/bus.py index f053c4aa7..ec9eb09b7 100644 --- a/can/bus.py +++ b/can/bus.py @@ -5,7 +5,7 @@ import contextlib import logging import threading -from abc import ABC, ABCMeta, abstractmethod +from abc import ABC, abstractmethod from collections.abc import Iterator, Sequence from enum import Enum, auto from time import time @@ -19,7 +19,6 @@ from typing_extensions import Self -import can import can.typechecking from can.broadcastmanager import CyclicSendTaskABC, ThreadBasedCyclicSendTask from can.message import Message @@ -44,7 +43,7 @@ class CanProtocol(Enum): CAN_XL = auto() -class BusABC(metaclass=ABCMeta): +class BusABC(ABC): """The CAN Bus Abstract Base Class that serves as the basis for all concrete interfaces. @@ -68,7 +67,7 @@ class BusABC(metaclass=ABCMeta): @abstractmethod def __init__( self, - channel: Optional[can.typechecking.Channel], + channel: can.typechecking.Channel, can_filters: Optional[can.typechecking.CanFilters] = None, **kwargs: object, ): @@ -447,7 +446,6 @@ def _matches_filters(self, msg: Message) -> bool: for _filter in self._filters: # check if this filter even applies to the message if "extended" in _filter: - _filter = cast("can.typechecking.CanFilterExtended", _filter) if _filter["extended"] != msg.is_extended_id: continue @@ -524,7 +522,7 @@ def protocol(self) -> CanProtocol: return self._can_protocol @staticmethod - def _detect_available_configs() -> list[can.typechecking.AutoDetectedConfig]: + def _detect_available_configs() -> Sequence[can.typechecking.AutoDetectedConfig]: """Detect all configurations/channels that this interface could currently connect with. diff --git a/can/interface.py b/can/interface.py index eee58ff41..4aa010a36 100644 --- a/can/interface.py +++ b/can/interface.py @@ -7,7 +7,7 @@ import concurrent.futures.thread import importlib import logging -from collections.abc import Callable, Iterable +from collections.abc import Callable, Iterable, Sequence from typing import Any, Optional, Union, cast from . import util @@ -142,7 +142,7 @@ def Bus( # noqa: N802 def detect_available_configs( interfaces: Union[None, str, Iterable[str]] = None, timeout: float = 5.0, -) -> list[AutoDetectedConfig]: +) -> Sequence[AutoDetectedConfig]: """Detect all configurations/channels that the interfaces could currently connect with. @@ -175,7 +175,7 @@ def detect_available_configs( # otherwise assume iterable of strings # Collect detection callbacks - callbacks: dict[str, Callable[[], list[AutoDetectedConfig]]] = {} + callbacks: dict[str, Callable[[], Sequence[AutoDetectedConfig]]] = {} for interface_keyword in interfaces: try: bus_class = _get_class_for_interface(interface_keyword) diff --git a/can/interfaces/cantact.py b/can/interfaces/cantact.py index 963a9ee3b..08fe15b72 100644 --- a/can/interfaces/cantact.py +++ b/can/interfaces/cantact.py @@ -4,6 +4,7 @@ import logging import time +from collections.abc import Sequence from typing import Any, Optional, Union from unittest.mock import Mock @@ -14,6 +15,7 @@ CanInterfaceNotImplementedError, error_check, ) +from ..typechecking import AutoDetectedConfig from ..util import check_or_adjust_timing_clock, deprecated_args_alias logger = logging.getLogger(__name__) @@ -31,7 +33,7 @@ class CantactBus(BusABC): """CANtact interface""" @staticmethod - def _detect_available_configs(): + def _detect_available_configs() -> Sequence[AutoDetectedConfig]: try: interface = cantact.Interface() except (NameError, SystemError, AttributeError): @@ -40,7 +42,7 @@ def _detect_available_configs(): ) return [] - channels = [] + channels: list[AutoDetectedConfig] = [] for i in range(0, interface.channel_count()): channels.append({"interface": "cantact", "channel": f"ch:{i}"}) return channels @@ -121,7 +123,14 @@ def __init__( **kwargs, ) - def _recv_internal(self, timeout): + def _recv_internal( + self, timeout: Optional[float] + ) -> tuple[Optional[Message], bool]: + if timeout is None: + raise TypeError( + f"{self.__class__.__name__} expects a numeric `timeout` value." + ) + with error_check("Cannot receive message"): frame = self.interface.recv(int(timeout * 1000)) if frame is None: @@ -140,7 +149,7 @@ def _recv_internal(self, timeout): ) return msg, False - def send(self, msg, timeout=None): + def send(self, msg: Message, timeout: Optional[float] = None) -> None: with error_check("Cannot send message"): self.interface.send( self.channel, @@ -151,13 +160,13 @@ def send(self, msg, timeout=None): msg.data, ) - def shutdown(self): + def shutdown(self) -> None: super().shutdown() with error_check("Cannot shutdown interface"): self.interface.stop() -def mock_recv(timeout): +def mock_recv(timeout: int) -> Optional[dict[str, Any]]: if timeout > 0: return { "id": 0x123, diff --git a/can/interfaces/ixxat/canlib.py b/can/interfaces/ixxat/canlib.py index e6ad25d57..6192367c4 100644 --- a/can/interfaces/ixxat/canlib.py +++ b/can/interfaces/ixxat/canlib.py @@ -9,7 +9,6 @@ CyclicSendTaskABC, Message, ) -from can.typechecking import AutoDetectedConfig class IXXATBus(BusABC): @@ -175,5 +174,5 @@ def state(self) -> BusState: return self.bus.state @staticmethod - def _detect_available_configs() -> list[AutoDetectedConfig]: + def _detect_available_configs() -> Sequence[vcinpl.AutoDetectedIxxatConfig]: return vcinpl._detect_available_configs() diff --git a/can/interfaces/ixxat/canlib_vcinpl.py b/can/interfaces/ixxat/canlib_vcinpl.py index 098b022bb..59f98417d 100644 --- a/can/interfaces/ixxat/canlib_vcinpl.py +++ b/can/interfaces/ixxat/canlib_vcinpl.py @@ -976,8 +976,8 @@ def get_ixxat_hwids(): return hwids -def _detect_available_configs() -> list[AutoDetectedConfig]: - config_list = [] # list in wich to store the resulting bus kwargs +def _detect_available_configs() -> Sequence["AutoDetectedIxxatConfig"]: + config_list = [] # list in which to store the resulting bus kwargs # used to detect HWID device_handle = HANDLE() @@ -1026,3 +1026,7 @@ def _detect_available_configs() -> list[AutoDetectedConfig]: pass # _canlib is None in the CI tests -> return a blank list return config_list + + +class AutoDetectedIxxatConfig(AutoDetectedConfig): + unique_hardware_id: int diff --git a/can/interfaces/serial/serial_can.py b/can/interfaces/serial/serial_can.py index 12ce5aff1..680cf10f6 100644 --- a/can/interfaces/serial/serial_can.py +++ b/can/interfaces/serial/serial_can.py @@ -10,7 +10,8 @@ import io import logging import struct -from typing import Any, Optional +from collections.abc import Sequence +from typing import Any, Optional, cast from can import ( BusABC, @@ -26,21 +27,14 @@ logger = logging.getLogger("can.serial") try: - import serial + import serial.tools.list_ports except ImportError: logger.warning( "You won't be able to use the serial can backend without " - "the serial module installed!" + "the `pyserial` package installed!" ) serial = None -try: - from serial.tools.list_ports import comports as list_comports -except ImportError: - # If unavailable on some platform, just return nothing - def list_comports() -> list[Any]: - return [] - CAN_ERR_FLAG = 0x20000000 CAN_RTR_FLAG = 0x40000000 @@ -63,8 +57,7 @@ def __init__( baudrate: int = 115200, timeout: float = 0.1, rtscts: bool = False, - *args, - **kwargs, + **kwargs: Any, ) -> None: """ :param channel: @@ -107,7 +100,7 @@ def __init__( "could not create the serial device" ) from error - super().__init__(channel, *args, **kwargs) + super().__init__(channel, **kwargs) def shutdown(self) -> None: """ @@ -232,7 +225,7 @@ def _recv_internal( def fileno(self) -> int: try: - return self._ser.fileno() + return cast("int", self._ser.fileno()) except io.UnsupportedOperation: raise NotImplementedError( "fileno is not implemented using current CAN bus on this platform" @@ -241,7 +234,11 @@ def fileno(self) -> int: raise CanOperationError("Cannot fetch fileno") from exception @staticmethod - def _detect_available_configs() -> list[AutoDetectedConfig]: - return [ - {"interface": "serial", "channel": port.device} for port in list_comports() - ] + def _detect_available_configs() -> Sequence[AutoDetectedConfig]: + configs: list[AutoDetectedConfig] = [] + if serial is None: + return configs + + for port in serial.tools.list_ports.comports(): + configs.append({"interface": "serial", "channel": port.device}) + return configs diff --git a/can/interfaces/socketcan/utils.py b/can/interfaces/socketcan/utils.py index 80dcb203f..1c096f66e 100644 --- a/can/interfaces/socketcan/utils.py +++ b/can/interfaces/socketcan/utils.py @@ -9,7 +9,7 @@ import struct import subprocess import sys -from typing import Optional, cast +from typing import Optional from can import typechecking from can.interfaces.socketcan.constants import CAN_EFF_FLAG @@ -28,7 +28,6 @@ def pack_filters(can_filters: Optional[typechecking.CanFilters] = None) -> bytes can_id = can_filter["can_id"] can_mask = can_filter["can_mask"] if "extended" in can_filter: - can_filter = cast("typechecking.CanFilterExtended", can_filter) # Match on either 11-bit OR 29-bit messages instead of both can_mask |= CAN_EFF_FLAG if can_filter["extended"]: diff --git a/can/interfaces/udp_multicast/bus.py b/can/interfaces/udp_multicast/bus.py index 45882ec07..9e0187ea2 100644 --- a/can/interfaces/udp_multicast/bus.py +++ b/can/interfaces/udp_multicast/bus.py @@ -6,10 +6,10 @@ import struct import time import warnings -from typing import Optional, Union +from typing import Any, Optional, Union import can -from can import BusABC, CanProtocol +from can import BusABC, CanProtocol, Message from can.typechecking import AutoDetectedConfig from .utils import is_msgpack_installed, pack_message, unpack_message @@ -98,7 +98,7 @@ def __init__( hop_limit: int = 1, receive_own_messages: bool = False, fd: bool = True, - **kwargs, + **kwargs: Any, ) -> None: is_msgpack_installed() @@ -126,7 +126,9 @@ def is_fd(self) -> bool: ) return self._can_protocol is CanProtocol.CAN_FD - def _recv_internal(self, timeout: Optional[float]): + def _recv_internal( + self, timeout: Optional[float] + ) -> tuple[Optional[Message], bool]: result = self._multicast.recv(timeout) if not result: return None, False @@ -204,7 +206,7 @@ def __init__( # Look up multicast group address in name server and find out IP version of the first suitable target # and then get the address family of it (socket.AF_INET or socket.AF_INET6) - connection_candidates = socket.getaddrinfo( # type: ignore + connection_candidates = socket.getaddrinfo( group, self.port, type=socket.SOCK_DGRAM ) sock = None diff --git a/can/interfaces/udp_multicast/utils.py b/can/interfaces/udp_multicast/utils.py index c6b2630a5..de39833a3 100644 --- a/can/interfaces/udp_multicast/utils.py +++ b/can/interfaces/udp_multicast/utils.py @@ -2,7 +2,7 @@ Defines common functions. """ -from typing import Any, Optional +from typing import Any, Optional, cast from can import CanInterfaceNotImplementedError, Message from can.typechecking import ReadableBytesLike @@ -51,7 +51,7 @@ def pack_message(message: Message) -> bytes: "bitrate_switch": message.bitrate_switch, "error_state_indicator": message.error_state_indicator, } - return msgpack.packb(as_dict, use_bin_type=True) + return cast("bytes", msgpack.packb(as_dict, use_bin_type=True)) def unpack_message( diff --git a/can/interfaces/vector/canlib.py b/can/interfaces/vector/canlib.py index 986f52002..d15b89803 100644 --- a/can/interfaces/vector/canlib.py +++ b/can/interfaces/vector/canlib.py @@ -982,8 +982,8 @@ def reset(self) -> None: ) @staticmethod - def _detect_available_configs() -> list[AutoDetectedConfig]: - configs = [] + def _detect_available_configs() -> Sequence["AutoDetectedVectorConfig"]: + configs: list[AutoDetectedVectorConfig] = [] channel_configs = get_channel_configs() LOG.info("Found %d channels", len(channel_configs)) for channel_config in channel_configs: @@ -999,16 +999,13 @@ def _detect_available_configs() -> list[AutoDetectedConfig]: ) configs.append( { - # data for use in VectorBus.__init__(): "interface": "vector", "channel": channel_config.hw_channel, "serial": channel_config.serial_number, "channel_index": channel_config.channel_index, - # data for use in VectorBus.set_application_config(): "hw_type": channel_config.hw_type, "hw_index": channel_config.hw_index, "hw_channel": channel_config.hw_channel, - # additional information: "supports_fd": bool( channel_config.channel_capabilities & xldefine.XL_ChannelCapabilities.XL_CHANNEL_FLAG_CANFD_ISO_SUPPORT @@ -1016,7 +1013,7 @@ def _detect_available_configs() -> list[AutoDetectedConfig]: "vector_channel_config": channel_config, } ) - return configs # type: ignore + return configs @staticmethod def popup_vector_hw_configuration(wait_for_finish: int = 0) -> None: @@ -1188,6 +1185,19 @@ class VectorChannelConfig(NamedTuple): transceiver_name: str +class AutoDetectedVectorConfig(AutoDetectedConfig): + # data for use in VectorBus.__init__(): + serial: int + channel_index: int + # data for use in VectorBus.set_application_config(): + hw_type: int + hw_index: int + hw_channel: int + # additional information: + supports_fd: bool + vector_channel_config: VectorChannelConfig + + def _get_xl_driver_config() -> xlclass.XLdriverConfig: if xldriver is None: raise VectorError( diff --git a/can/interfaces/vector/exceptions.py b/can/interfaces/vector/exceptions.py index 53c774e6f..b43df5e6c 100644 --- a/can/interfaces/vector/exceptions.py +++ b/can/interfaces/vector/exceptions.py @@ -1,10 +1,14 @@ """Exception/error declarations for the vector interface.""" +from typing import Any, Optional, Union + from can import CanError, CanInitializationError, CanOperationError class VectorError(CanError): - def __init__(self, error_code, error_string, function): + def __init__( + self, error_code: Optional[int], error_string: str, function: str + ) -> None: super().__init__( message=f"{function} failed ({error_string})", error_code=error_code ) @@ -12,7 +16,7 @@ def __init__(self, error_code, error_string, function): # keep reference to args for pickling self._args = error_code, error_string, function - def __reduce__(self): + def __reduce__(self) -> Union[str, tuple[Any, ...]]: return type(self), self._args, {} diff --git a/can/interfaces/virtual.py b/can/interfaces/virtual.py index 6b62e5ceb..e4f68b0c4 100644 --- a/can/interfaces/virtual.py +++ b/can/interfaces/virtual.py @@ -12,7 +12,7 @@ from copy import deepcopy from random import randint from threading import RLock -from typing import Any, Optional +from typing import Any, Final, Optional from can import CanOperationError from can.bus import BusABC, CanProtocol @@ -21,10 +21,9 @@ logger = logging.getLogger(__name__) - # Channels are lists of queues, one for each connection -channels: dict[Optional[Channel], list[queue.Queue[Message]]] = {} -channels_lock = RLock() +channels: Final[dict[Channel, list[queue.Queue[Message]]]] = {} +channels_lock: Final = RLock() class VirtualBus(BusABC): @@ -54,7 +53,7 @@ class VirtualBus(BusABC): def __init__( self, - channel: Optional[Channel] = None, + channel: Channel = "channel-0", receive_own_messages: bool = False, rx_queue_size: int = 0, preserve_timestamps: bool = False, @@ -67,9 +66,9 @@ def __init__( bus by virtual instances constructed with the same channel identifier. :param channel: The channel identifier. This parameter can be an - arbitrary value. The bus instance will be able to see messages - from other virtual bus instances that were created with the same - value. + arbitrary hashable value. The bus instance will be able to see + messages from other virtual bus instances that were created with + the same value. :param receive_own_messages: If set to True, sent messages will be reflected back on the input queue. :param rx_queue_size: The size of the reception queue. The reception @@ -179,7 +178,7 @@ def _detect_available_configs() -> list[AutoDetectedConfig]: available_channels = list(channels.keys()) # find a currently unused channel - def get_extra(): + def get_extra() -> str: return f"channel-{randint(0, 9999)}" extra = get_extra() diff --git a/can/io/mf4.py b/can/io/mf4.py index 7007c3627..68aff87ae 100644 --- a/can/io/mf4.py +++ b/can/io/mf4.py @@ -274,7 +274,7 @@ def on_message_received(self, msg: Message) -> None: self._rtr_buffer = np.zeros(1, dtype=RTR_DTYPE) -class FrameIterator(metaclass=abc.ABCMeta): +class FrameIterator(abc.ABC): """ Iterator helper class for common handling among CAN DataFrames, ErrorFrames and RemoteFrames. """ diff --git a/can/listener.py b/can/listener.py index 6256d33b6..7f8f436a0 100644 --- a/can/listener.py +++ b/can/listener.py @@ -5,7 +5,7 @@ import asyncio import sys import warnings -from abc import ABCMeta, abstractmethod +from abc import ABC, abstractmethod from collections.abc import AsyncIterator from queue import Empty, SimpleQueue from typing import Any, Optional @@ -14,7 +14,7 @@ from can.message import Message -class Listener(metaclass=ABCMeta): +class Listener(ABC): """The basic listener that can be called directly to handle some CAN message:: diff --git a/can/logconvert.py b/can/logconvert.py index 49cdaf4bb..4527dc23e 100644 --- a/can/logconvert.py +++ b/can/logconvert.py @@ -5,17 +5,21 @@ import argparse import errno import sys +from typing import TYPE_CHECKING, NoReturn from can import Logger, LogReader, SizedRotatingLogger +if TYPE_CHECKING: + from can.io.generic import MessageWriter + class ArgumentParser(argparse.ArgumentParser): - def error(self, message): + def error(self, message: str) -> NoReturn: self.print_help(sys.stderr) self.exit(errno.EINVAL, f"{self.prog}: error: {message}\n") -def main(): +def main() -> None: parser = ArgumentParser( description="Convert a log file from one format to another.", ) @@ -47,7 +51,7 @@ def main(): with LogReader(args.input) as reader: if args.file_size: - logger = SizedRotatingLogger( + logger: MessageWriter = SizedRotatingLogger( base_filename=args.output, max_bytes=args.file_size ) else: diff --git a/can/message.py b/can/message.py index d8d94ea84..c1fbffd21 100644 --- a/can/message.py +++ b/can/message.py @@ -8,7 +8,7 @@ from copy import deepcopy from math import isinf, isnan -from typing import Optional +from typing import Any, Optional from . import typechecking @@ -210,7 +210,7 @@ def __copy__(self) -> "Message": error_state_indicator=self.error_state_indicator, ) - def __deepcopy__(self, memo: dict) -> "Message": + def __deepcopy__(self, memo: Optional[dict[int, Any]]) -> "Message": return Message( timestamp=self.timestamp, arbitration_id=self.arbitration_id, diff --git a/can/notifier.py b/can/notifier.py index 2b9944450..b2f550df7 100644 --- a/can/notifier.py +++ b/can/notifier.py @@ -103,7 +103,7 @@ def find_instances(self, bus: BusABC) -> tuple["Notifier", ...]: return tuple(instance_list) -class Notifier(AbstractContextManager): +class Notifier(AbstractContextManager["Notifier"]): _registry: Final = _NotifierRegistry() diff --git a/can/thread_safe_bus.py b/can/thread_safe_bus.py index 9b008667f..35a4f400c 100644 --- a/can/thread_safe_bus.py +++ b/can/thread_safe_bus.py @@ -1,4 +1,12 @@ +from contextlib import nullcontext from threading import RLock +from typing import Any, Optional + +from can import typechecking +from can.bus import BusABC, BusState, CanProtocol +from can.message import Message + +from .interface import Bus try: # Only raise an exception on instantiation but allow module @@ -10,10 +18,6 @@ ObjectProxy = object import_exc = exc -from contextlib import nullcontext - -from .interface import Bus - class ThreadSafeBus(ObjectProxy): # pylint: disable=abstract-method """ @@ -32,65 +36,81 @@ class ThreadSafeBus(ObjectProxy): # pylint: disable=abstract-method instead of :meth:`~can.BusABC.recv` directly. """ - def __init__(self, *args, **kwargs): + __wrapped__: BusABC + + def __init__( + self, + channel: Optional[typechecking.Channel] = None, + interface: Optional[str] = None, + config_context: Optional[str] = None, + ignore_config: bool = False, + **kwargs: Any, + ) -> None: if import_exc is not None: raise import_exc - super().__init__(Bus(*args, **kwargs)) + super().__init__( + Bus( + channel=channel, + interface=interface, + config_context=config_context, + ignore_config=ignore_config, + **kwargs, + ) + ) # now, BusABC.send_periodic() does not need a lock anymore, but the # implementation still requires a context manager - self.__wrapped__._lock_send_periodic = nullcontext() + self.__wrapped__._lock_send_periodic = nullcontext() # type: ignore[assignment] # init locks for sending and receiving separately self._lock_send = RLock() self._lock_recv = RLock() - def recv( - self, timeout=None, *args, **kwargs - ): # pylint: disable=keyword-arg-before-vararg + def recv(self, timeout: Optional[float] = None) -> Optional[Message]: with self._lock_recv: - return self.__wrapped__.recv(timeout=timeout, *args, **kwargs) + return self.__wrapped__.recv(timeout=timeout) - def send( - self, msg, timeout=None, *args, **kwargs - ): # pylint: disable=keyword-arg-before-vararg + def send(self, msg: Message, timeout: Optional[float] = None) -> None: with self._lock_send: - return self.__wrapped__.send(msg, timeout=timeout, *args, **kwargs) + return self.__wrapped__.send(msg=msg, timeout=timeout) # send_periodic does not need a lock, since the underlying # `send` method is already synchronized @property - def filters(self): + def filters(self) -> Optional[typechecking.CanFilters]: with self._lock_recv: return self.__wrapped__.filters @filters.setter - def filters(self, filters): + def filters(self, filters: Optional[typechecking.CanFilters]) -> None: with self._lock_recv: self.__wrapped__.filters = filters - def set_filters( - self, filters=None, *args, **kwargs - ): # pylint: disable=keyword-arg-before-vararg + def set_filters(self, filters: Optional[typechecking.CanFilters] = None) -> None: with self._lock_recv: - return self.__wrapped__.set_filters(filters=filters, *args, **kwargs) + return self.__wrapped__.set_filters(filters=filters) - def flush_tx_buffer(self, *args, **kwargs): + def flush_tx_buffer(self) -> None: with self._lock_send: - return self.__wrapped__.flush_tx_buffer(*args, **kwargs) + return self.__wrapped__.flush_tx_buffer() - def shutdown(self, *args, **kwargs): + def shutdown(self) -> None: with self._lock_send, self._lock_recv: - return self.__wrapped__.shutdown(*args, **kwargs) + return self.__wrapped__.shutdown() @property - def state(self): + def state(self) -> BusState: with self._lock_send, self._lock_recv: return self.__wrapped__.state @state.setter - def state(self, new_state): + def state(self, new_state: BusState) -> None: with self._lock_send, self._lock_recv: self.__wrapped__.state = new_state + + @property + def protocol(self) -> CanProtocol: + with self._lock_send, self._lock_recv: + return self.__wrapped__.protocol diff --git a/can/typechecking.py b/can/typechecking.py index fc0c87c0d..8c25e8b57 100644 --- a/can/typechecking.py +++ b/can/typechecking.py @@ -21,18 +21,16 @@ import struct -class CanFilter(TypedDict): +class _CanFilterBase(TypedDict): can_id: int can_mask: int -class CanFilterExtended(TypedDict): - can_id: int - can_mask: int +class CanFilter(_CanFilterBase, total=False): extended: bool -CanFilters = Sequence[Union[CanFilter, CanFilterExtended]] +CanFilters = Sequence[CanFilter] # TODO: Once buffer protocol support lands in typing, we should switch to that, # since can.message.Message attempts to call bytearray() on the given data, so diff --git a/can/viewer.py b/can/viewer.py index 81e8942a4..97bda1676 100644 --- a/can/viewer.py +++ b/can/viewer.py @@ -159,7 +159,7 @@ def run(self): # Unpack the data and then convert it into SI-units @staticmethod - def unpack_data(cmd: int, cmd_to_struct: dict, data: bytes) -> list[float]: + def unpack_data(cmd: int, cmd_to_struct: TDataStructs, data: bytes) -> list[float]: if not cmd_to_struct or not data: # These messages do not contain a data package return [] diff --git a/pyproject.toml b/pyproject.toml index 9eb98e37b..47d8e7a6d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -130,11 +130,8 @@ disallow_incomplete_defs = true warn_redundant_casts = true warn_unused_ignores = true exclude = [ - "venv", "^doc/conf.py$", - "^build", "^test", - "^can/interfaces/__init__.py", "^can/interfaces/etas", "^can/interfaces/gs_usb", "^can/interfaces/ics_neovi", @@ -144,12 +141,9 @@ exclude = [ "^can/interfaces/nican", "^can/interfaces/neousys", "^can/interfaces/pcan", - "^can/interfaces/serial", "^can/interfaces/socketcan", "^can/interfaces/systec", - "^can/interfaces/udp_multicast", "^can/interfaces/usb2can", - "^can/interfaces/virtual", ] [tool.ruff]