diff --git a/examples/nidaqmx_warnings.py b/examples/nidaqmx_warnings.py index 987e6b1e3..23ef56cf8 100644 --- a/examples/nidaqmx_warnings.py +++ b/examples/nidaqmx_warnings.py @@ -29,7 +29,7 @@ assert issubclass(w[-1].category, nidaqmx.DaqWarning) if w: - print("DaqWarning caught: {0}\n".format(w[-1].message)) + print("DaqWarning caught: {}\n".format(w[-1].message)) # Raising warnings as exceptions. @@ -43,7 +43,7 @@ task.write([1.1, 2.2, 3.3, 4.4, 5.5], auto_start=True) task.stop() except nidaqmx.DaqWarning as e: - print("DaqWarning caught as exception: {0}\n".format(e)) + print("DaqWarning caught as exception: {}\n".format(e)) assert e.error_code == DAQmxWarnings.STOPPED_BEFORE_DONE diff --git a/examples/system_properties.py b/examples/system_properties.py index 6f2521090..3eaa2d2b8 100644 --- a/examples/system_properties.py +++ b/examples/system_properties.py @@ -6,7 +6,7 @@ driver_version = local_system.driver_version print( - "DAQmx {0}.{1}.{2}".format( + "DAQmx {}.{}.{}".format( driver_version.major_version, driver_version.minor_version, driver_version.update_version, @@ -15,7 +15,7 @@ for device in local_system.devices: print( - "Device Name: {0}, Product Category: {1}, Product Type: {2}".format( + "Device Name: {}, Product Category: {}, Product Type: {}".format( device.name, device.product_category, device.product_type ) ) diff --git a/generated/nidaqmx/__main__.py b/generated/nidaqmx/__main__.py index c30da9448..e0baada58 100644 --- a/generated/nidaqmx/__main__.py +++ b/generated/nidaqmx/__main__.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import logging from typing import Optional diff --git a/generated/nidaqmx/_grpc_interpreter.py b/generated/nidaqmx/_grpc_interpreter.py index 1b4c051e7..6973d38f9 100644 --- a/generated/nidaqmx/_grpc_interpreter.py +++ b/generated/nidaqmx/_grpc_interpreter.py @@ -5,7 +5,7 @@ import threading import typing import warnings -from typing import Callable, Generic, Optional, TypeVar +from typing import Callable, Generic, TypeVar import google.protobuf.message from google.protobuf.timestamp_pb2 import Timestamp as GrpcTimestamp @@ -52,7 +52,7 @@ def __init__( self._interpreter = interpreter self._event_stream = event_stream self._event_callback = event_callback - self._event_stream_exception: Optional[Exception] = None + self._event_stream_exception: Exception | None = None self._thread = threading.Thread(target=self._thread_main, name=f"nidaqmx {event_name} thread") self._thread.start() diff --git a/generated/nidaqmx/_grpc_time.py b/generated/nidaqmx/_grpc_time.py index ef3350718..77a818d43 100644 --- a/generated/nidaqmx/_grpc_time.py +++ b/generated/nidaqmx/_grpc_time.py @@ -1,3 +1,5 @@ +from __future__ import annotations + from datetime import timezone from datetime import datetime as std_datetime from datetime import tzinfo as dt_tzinfo @@ -20,7 +22,7 @@ _EPOCH_1970 = ht_datetime(1970, 1, 1, tzinfo=timezone.utc) -def convert_time_to_timestamp(dt: Union[std_datetime, ht_datetime], ts: Optional[GrpcTimestamp] = None) -> GrpcTimestamp: +def convert_time_to_timestamp(dt: std_datetime | ht_datetime, ts: GrpcTimestamp | None = None) -> GrpcTimestamp: seconds_since_1970 = 0 if ts is None: @@ -42,7 +44,7 @@ def convert_time_to_timestamp(dt: Union[std_datetime, ht_datetime], ts: Optional ts.FromNanoseconds(seconds_since_1970 * _NS_PER_S + nanos) return ts -def convert_timestamp_to_time(ts: GrpcTimestamp, tzinfo: Optional[dt_tzinfo] = None) -> ht_datetime: +def convert_timestamp_to_time(ts: GrpcTimestamp, tzinfo: dt_tzinfo | None = None) -> ht_datetime: total_nanos = ts.ToNanoseconds() seconds, nanos = divmod(total_nanos, _NS_PER_S) # Convert the nanoseconds to yoctoseconds. diff --git a/generated/nidaqmx/_install_daqmx.py b/generated/nidaqmx/_install_daqmx.py index 18d127146..d63971ec1 100644 --- a/generated/nidaqmx/_install_daqmx.py +++ b/generated/nidaqmx/_install_daqmx.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import contextlib import errno import importlib.resources as pkg_resources @@ -34,7 +36,7 @@ _NETWORK_TIMEOUT_IN_SECONDS = 60 -def _parse_version(version: str) -> Tuple[int, ...]: +def _parse_version(version: str) -> tuple[int, ...]: """ Split the version string into a tuple of integers. @@ -54,7 +56,7 @@ def _parse_version(version: str) -> Tuple[int, ...]: raise click.ClickException(f"Invalid version number '{version}'.") from e -def _get_daqmx_installed_version() -> Optional[str]: +def _get_daqmx_installed_version() -> str | None: """ Check for existing installation of NI-DAQmx. @@ -127,7 +129,7 @@ def _get_daqmx_installed_version() -> Optional[str]: @contextlib.contextmanager def _multi_access_temp_file( *, suffix: str = ".exe", delete: bool = True -) -> Generator[str, None, None]: +) -> Generator[str]: """ Context manager for creating a temporary file. @@ -158,7 +160,7 @@ def _multi_access_temp_file( def _load_data( json_data: str, platform: str -) -> Tuple[Optional[str], Optional[str], Optional[str], Optional[List[str]]]: +) -> tuple[str | None, str | None, str | None, list[str] | None]: """ Load data from JSON string and extract Windows metadata. @@ -199,10 +201,10 @@ def _load_data( raise click.ClickException(f"Failed to parse the driver metadata.\nDetails: {e}") from e for metadata_entry in metadata: - location: Optional[str] = metadata_entry.get("Location") - version: Optional[str] = metadata_entry.get("Version") - release: Optional[str] = metadata_entry.get("Release") - supported_os: Optional[List[str]] = metadata_entry.get("supportedOS") + location: str | None = metadata_entry.get("Location") + version: str | None = metadata_entry.get("Version") + release: str | None = metadata_entry.get("Release") + supported_os: list[str] | None = metadata_entry.get("supportedOS") _logger.debug("From metadata file found location %s and version %s.", location, version) if location and version: return location, version, release, supported_os @@ -211,7 +213,7 @@ def _load_data( def _get_driver_details( platform: str, -) -> Tuple[Optional[str], Optional[str], Optional[str], Optional[List[str]]]: +) -> tuple[str | None, str | None, str | None, list[str] | None]: """ Parse the JSON data and retrieve the download link and version information. diff --git a/generated/nidaqmx/_lib.py b/generated/nidaqmx/_lib.py index c6ee3daa1..b68489678 100644 --- a/generated/nidaqmx/_lib.py +++ b/generated/nidaqmx/_lib.py @@ -199,18 +199,18 @@ def _load_lib(libname: str): encoding = 'utf-8' else: raise ValueError(f"Unsupported NIDAQMX_C_LIBRARY value: {nidaqmx_c_library}") - except (OSError, WindowsError) as e: + except OSError as e: raise DaqNotFoundError(_DAQ_NOT_FOUND_MESSAGE) from e else: try: windll, cdll = _load_lib("nicai_utf8") encoding = 'utf-8' - except (OSError, WindowsError): + except OSError: # Fallback to nicaiu.dll if nicai_utf8.dll cannot be loaded try: windll, cdll = _load_lib("nicaiu") encoding = get_encoding_from_locale() - except (OSError, WindowsError) as e: + except OSError as e: raise DaqNotFoundError(_DAQ_NOT_FOUND_MESSAGE) from e elif sys.platform.startswith('linux'): library_path = find_library('nidaqmx') diff --git a/generated/nidaqmx/_lib_time.py b/generated/nidaqmx/_lib_time.py index 703ec8d57..69444a3fb 100644 --- a/generated/nidaqmx/_lib_time.py +++ b/generated/nidaqmx/_lib_time.py @@ -35,7 +35,7 @@ class AbsoluteTime(ctypes.Structure): @classmethod - def from_datetime(cls, dt: Union[std_datetime, ht_datetime]) -> AbsoluteTime: + def from_datetime(cls, dt: std_datetime | ht_datetime) -> AbsoluteTime: seconds_since_1904 = 0 # Convert the subseconds. @@ -55,7 +55,7 @@ def from_datetime(cls, dt: Union[std_datetime, ht_datetime]) -> AbsoluteTime: return AbsoluteTime(lsb=lsb, msb=seconds_since_1904) - def to_datetime(self, tzinfo: Optional[dt_tzinfo] = None) -> ht_datetime: + def to_datetime(self, tzinfo: dt_tzinfo | None = None) -> ht_datetime: total_yoctoseconds = int( round(AbsoluteTime._YS_PER_S * self.lsb / AbsoluteTime._NUM_SUBSECONDS) ) diff --git a/generated/nidaqmx/_linux_installation_commands.py b/generated/nidaqmx/_linux_installation_commands.py index 3d43186a1..8e26cd743 100644 --- a/generated/nidaqmx/_linux_installation_commands.py +++ b/generated/nidaqmx/_linux_installation_commands.py @@ -1,3 +1,5 @@ +from __future__ import annotations + from dataclasses import dataclass from typing import Callable, Dict, List, Tuple @@ -58,8 +60,8 @@ def _get_version_rhel(dist_version: str) -> str: @dataclass class DistroInfo: get_distro_version: Callable[[str], str] - get_daqmx_version: List[str] - install_commands: List[List[str]] + get_daqmx_version: list[str] + install_commands: list[list[str]] # Mapping of distros to their command templates and version handlers @@ -74,7 +76,7 @@ class DistroInfo: def get_linux_installation_commands( _directory_to_extract_to: str, dist_name: str, dist_version: str, _release_string: str -) -> List[List[str]]: +) -> list[list[str]]: """ Get the installation commands for Linux based on the distribution. diff --git a/generated/nidaqmx/_time.py b/generated/nidaqmx/_time.py index fd0601968..06edadd50 100644 --- a/generated/nidaqmx/_time.py +++ b/generated/nidaqmx/_time.py @@ -9,7 +9,7 @@ from zoneinfo import ZoneInfo # theoretically the same as astimezone(), but with support for dates before 1970 -def _convert_to_desired_timezone(expected_time_utc: Union[std_datetime, ht_datetime], tzinfo: Optional[dt_tzinfo] = None) -> Union[std_datetime, ht_datetime]: +def _convert_to_desired_timezone(expected_time_utc: std_datetime | ht_datetime, tzinfo: dt_tzinfo | None = None) -> std_datetime | ht_datetime: # if timezone matches, no need to do conversion if expected_time_utc.tzinfo is tzinfo: return expected_time_utc diff --git a/generated/nidaqmx/stream_readers.py b/generated/nidaqmx/stream_readers.py index c6f759184..a1fbfd7d6 100644 --- a/generated/nidaqmx/stream_readers.py +++ b/generated/nidaqmx/stream_readers.py @@ -1,3 +1,5 @@ +from __future__ import annotations + from typing import Optional, Tuple import numpy from nidaqmx import DaqError @@ -69,7 +71,7 @@ def _verify_array(self, data, number_of_samples_per_channel, channels_to_read = self._in_stream.channels_to_read number_of_channels = len(channels_to_read.channel_names) - array_shape: Optional[Tuple[int, ...]] = None + array_shape: tuple[int, ...] | None = None if is_many_chan: if is_many_samp: array_shape = (number_of_channels, @@ -114,7 +116,7 @@ def _verify_array_digital_lines( number_of_channels = len(channels_to_read.channel_names) number_of_lines = self._in_stream.di_num_booleans_per_chan - array_shape: Optional[Tuple[int, ...]] = None + array_shape: tuple[int, ...] | None = None if is_many_chan: if is_many_line: array_shape = (number_of_channels, number_of_lines) diff --git a/generated/nidaqmx/system/storage/persisted_channel.py b/generated/nidaqmx/system/storage/persisted_channel.py index 2af85926d..fafa6aede 100644 --- a/generated/nidaqmx/system/storage/persisted_channel.py +++ b/generated/nidaqmx/system/storage/persisted_channel.py @@ -1,4 +1,3 @@ - from nidaqmx import utils __all__ = ['PersistedChannel'] diff --git a/generated/nidaqmx/system/storage/persisted_scale.py b/generated/nidaqmx/system/storage/persisted_scale.py index df16d4a7a..784f94244 100644 --- a/generated/nidaqmx/system/storage/persisted_scale.py +++ b/generated/nidaqmx/system/storage/persisted_scale.py @@ -1,4 +1,3 @@ - from nidaqmx import utils from nidaqmx.scale import _ScaleAlternateConstructor diff --git a/generated/nidaqmx/system/storage/persisted_task.py b/generated/nidaqmx/system/storage/persisted_task.py index 01396f273..cf49bbdad 100644 --- a/generated/nidaqmx/system/storage/persisted_task.py +++ b/generated/nidaqmx/system/storage/persisted_task.py @@ -1,4 +1,3 @@ - from nidaqmx import task from nidaqmx import utils diff --git a/generated/nidaqmx/task/__init__.py b/generated/nidaqmx/task/__init__.py index dd5583a7b..a5a3eef9b 100644 --- a/generated/nidaqmx/task/__init__.py +++ b/generated/nidaqmx/task/__init__.py @@ -1,4 +1,3 @@ - from nidaqmx.task._task import ( Task, _TaskEventType, _TaskAlternateConstructor ) diff --git a/generated/nidaqmx/task/_task.py b/generated/nidaqmx/task/_task.py index af3fb1e1f..e60b501bd 100644 --- a/generated/nidaqmx/task/_task.py +++ b/generated/nidaqmx/task/_task.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import threading import warnings from enum import Enum @@ -601,7 +603,7 @@ def read(self, number_of_samples_per_channel=NUM_SAMPLES_UNSET, # Determine the array shape and size to create if number_of_channels > 1: if not num_samples_not_set: - array_shape: Tuple[int, ...] = ( + array_shape: tuple[int, ...] = ( number_of_channels, number_of_samples_per_channel ) else: @@ -672,13 +674,13 @@ def read(self, number_of_samples_per_channel=NUM_SAMPLES_UNSET, def _read_ctr_pulse( self, - array_shape: Tuple[int, ...], + array_shape: tuple[int, ...], meas_type: UsageTypeCI, number_of_channels: int, number_of_samples_per_channel: int, num_samples_not_set: bool, timeout: float, - ) -> Union[CtrFreq, CtrTick, CtrTime, List[CtrFreq], List[CtrTick], List[CtrTime]]: + ) -> CtrFreq | CtrTick | CtrTime | list[CtrFreq] | list[CtrTick] | list[CtrTime]: if meas_type == UsageTypeCI.PULSE_FREQ: frequencies = numpy.zeros(array_shape, dtype=numpy.float64) duty_cycles = numpy.zeros(array_shape, dtype=numpy.float64) @@ -687,7 +689,7 @@ def _read_ctr_pulse( self._handle, number_of_samples_per_channel, timeout, FillMode.GROUP_BY_CHANNEL.value, frequencies, duty_cycles) - data: Union[List[CtrFreq], List[CtrTick], List[CtrTime]] = [ + data: list[CtrFreq] | list[CtrTick] | list[CtrTime] = [ CtrFreq(freq=f, duty_cycle=d) for f, d in zip(frequencies, duty_cycles) ] @@ -727,11 +729,11 @@ def _read_ctr_pulse( def _read_power( self, - array_shape: Tuple[int, ...], + array_shape: tuple[int, ...], number_of_channels: int, number_of_samples_per_channel: int, timeout: float, - ) -> Union[PowerMeasurement, List[PowerMeasurement], List[List[PowerMeasurement]]]: + ) -> PowerMeasurement | list[PowerMeasurement] | list[list[PowerMeasurement]]: voltages = numpy.zeros(array_shape, dtype=numpy.float64) currents = numpy.zeros(array_shape, dtype=numpy.float64) diff --git a/generated/nidaqmx/task/channels/__init__.py b/generated/nidaqmx/task/channels/__init__.py index 65c224aef..1427c46d1 100644 --- a/generated/nidaqmx/task/channels/__init__.py +++ b/generated/nidaqmx/task/channels/__init__.py @@ -1,4 +1,3 @@ - from nidaqmx.task.channels._channel import Channel from nidaqmx.task.channels._ai_channel import AIChannel from nidaqmx.task.channels._ao_channel import AOChannel diff --git a/generated/nidaqmx/task/collections/__init__.py b/generated/nidaqmx/task/collections/__init__.py index 555c246ac..2c93309e5 100644 --- a/generated/nidaqmx/task/collections/__init__.py +++ b/generated/nidaqmx/task/collections/__init__.py @@ -1,4 +1,3 @@ - from nidaqmx.task.collections._channel_collection import ChannelCollection from nidaqmx.task.collections._ai_channel_collection import AIChannelCollection from nidaqmx.task.collections._ao_channel_collection import AOChannelCollection diff --git a/generated/nidaqmx/task/triggering/__init__.py b/generated/nidaqmx/task/triggering/__init__.py index 155ec697b..b1d4de025 100644 --- a/generated/nidaqmx/task/triggering/__init__.py +++ b/generated/nidaqmx/task/triggering/__init__.py @@ -1,4 +1,3 @@ - from nidaqmx.task.triggering._triggers import Triggers from nidaqmx.task.triggering._arm_start_trigger import ArmStartTrigger from nidaqmx.task.triggering._handshake_trigger import HandshakeTrigger diff --git a/generated/nidaqmx/utils.py b/generated/nidaqmx/utils.py index a003efe69..72de56a1a 100644 --- a/generated/nidaqmx/utils.py +++ b/generated/nidaqmx/utils.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import re from dataclasses import dataclass from typing import List, Optional @@ -37,7 +39,7 @@ def to_flattened_name(self) -> str: return f"{self.base_name}{self.start_index_str}:{self.end_index_str}" -def flatten_channel_string(channel_names: List[str]) -> str: +def flatten_channel_string(channel_names: list[str]) -> str: """ Converts a list of channel names to a comma-delimited list of names. @@ -109,7 +111,7 @@ def flatten_channel_string(channel_names: List[str]) -> str: return ','.join([_f for _f in flattened_channel_list if _f]).strip() -def unflatten_channel_string(channel_names: str) -> List[str]: +def unflatten_channel_string(channel_names: str) -> list[str]: """ Converts a comma-delimited list of channel names to a list of names. @@ -197,8 +199,8 @@ def unflatten_channel_string(channel_names: str) -> List[str]: def _select_interpreter( - grpc_options: Optional[GrpcSessionOptions] = None, - interpreter: Optional[BaseInterpreter] = None + grpc_options: GrpcSessionOptions | None = None, + interpreter: BaseInterpreter | None = None ) -> BaseInterpreter: if interpreter: return interpreter diff --git a/src/codegen/templates/_grpc_interpreter.py.mako b/src/codegen/templates/_grpc_interpreter.py.mako index b38586b9f..cf8beb877 100644 --- a/src/codegen/templates/_grpc_interpreter.py.mako +++ b/src/codegen/templates/_grpc_interpreter.py.mako @@ -22,7 +22,7 @@ import logging import threading import typing import warnings -from typing import Callable, Generic, Optional, TypeVar +from typing import Callable, Generic, TypeVar import google.protobuf.message from google.protobuf.timestamp_pb2 import Timestamp as GrpcTimestamp @@ -69,7 +69,7 @@ class GrpcEventHandler(BaseEventHandler, Generic[TEventResponse]): self._interpreter = interpreter self._event_stream = event_stream self._event_callback = event_callback - self._event_stream_exception: Optional[Exception] = None + self._event_stream_exception: Exception | None = None self._thread = threading.Thread(target=self._thread_main, name=f"nidaqmx {event_name} thread") self._thread.start() diff --git a/src/handwritten/__main__.py b/src/handwritten/__main__.py index c30da9448..e0baada58 100644 --- a/src/handwritten/__main__.py +++ b/src/handwritten/__main__.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import logging from typing import Optional diff --git a/src/handwritten/_grpc_time.py b/src/handwritten/_grpc_time.py index ef3350718..77a818d43 100644 --- a/src/handwritten/_grpc_time.py +++ b/src/handwritten/_grpc_time.py @@ -1,3 +1,5 @@ +from __future__ import annotations + from datetime import timezone from datetime import datetime as std_datetime from datetime import tzinfo as dt_tzinfo @@ -20,7 +22,7 @@ _EPOCH_1970 = ht_datetime(1970, 1, 1, tzinfo=timezone.utc) -def convert_time_to_timestamp(dt: Union[std_datetime, ht_datetime], ts: Optional[GrpcTimestamp] = None) -> GrpcTimestamp: +def convert_time_to_timestamp(dt: std_datetime | ht_datetime, ts: GrpcTimestamp | None = None) -> GrpcTimestamp: seconds_since_1970 = 0 if ts is None: @@ -42,7 +44,7 @@ def convert_time_to_timestamp(dt: Union[std_datetime, ht_datetime], ts: Optional ts.FromNanoseconds(seconds_since_1970 * _NS_PER_S + nanos) return ts -def convert_timestamp_to_time(ts: GrpcTimestamp, tzinfo: Optional[dt_tzinfo] = None) -> ht_datetime: +def convert_timestamp_to_time(ts: GrpcTimestamp, tzinfo: dt_tzinfo | None = None) -> ht_datetime: total_nanos = ts.ToNanoseconds() seconds, nanos = divmod(total_nanos, _NS_PER_S) # Convert the nanoseconds to yoctoseconds. diff --git a/src/handwritten/_install_daqmx.py b/src/handwritten/_install_daqmx.py index 18d127146..d63971ec1 100644 --- a/src/handwritten/_install_daqmx.py +++ b/src/handwritten/_install_daqmx.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import contextlib import errno import importlib.resources as pkg_resources @@ -34,7 +36,7 @@ _NETWORK_TIMEOUT_IN_SECONDS = 60 -def _parse_version(version: str) -> Tuple[int, ...]: +def _parse_version(version: str) -> tuple[int, ...]: """ Split the version string into a tuple of integers. @@ -54,7 +56,7 @@ def _parse_version(version: str) -> Tuple[int, ...]: raise click.ClickException(f"Invalid version number '{version}'.") from e -def _get_daqmx_installed_version() -> Optional[str]: +def _get_daqmx_installed_version() -> str | None: """ Check for existing installation of NI-DAQmx. @@ -127,7 +129,7 @@ def _get_daqmx_installed_version() -> Optional[str]: @contextlib.contextmanager def _multi_access_temp_file( *, suffix: str = ".exe", delete: bool = True -) -> Generator[str, None, None]: +) -> Generator[str]: """ Context manager for creating a temporary file. @@ -158,7 +160,7 @@ def _multi_access_temp_file( def _load_data( json_data: str, platform: str -) -> Tuple[Optional[str], Optional[str], Optional[str], Optional[List[str]]]: +) -> tuple[str | None, str | None, str | None, list[str] | None]: """ Load data from JSON string and extract Windows metadata. @@ -199,10 +201,10 @@ def _load_data( raise click.ClickException(f"Failed to parse the driver metadata.\nDetails: {e}") from e for metadata_entry in metadata: - location: Optional[str] = metadata_entry.get("Location") - version: Optional[str] = metadata_entry.get("Version") - release: Optional[str] = metadata_entry.get("Release") - supported_os: Optional[List[str]] = metadata_entry.get("supportedOS") + location: str | None = metadata_entry.get("Location") + version: str | None = metadata_entry.get("Version") + release: str | None = metadata_entry.get("Release") + supported_os: list[str] | None = metadata_entry.get("supportedOS") _logger.debug("From metadata file found location %s and version %s.", location, version) if location and version: return location, version, release, supported_os @@ -211,7 +213,7 @@ def _load_data( def _get_driver_details( platform: str, -) -> Tuple[Optional[str], Optional[str], Optional[str], Optional[List[str]]]: +) -> tuple[str | None, str | None, str | None, list[str] | None]: """ Parse the JSON data and retrieve the download link and version information. diff --git a/src/handwritten/_lib.py b/src/handwritten/_lib.py index c6ee3daa1..b68489678 100644 --- a/src/handwritten/_lib.py +++ b/src/handwritten/_lib.py @@ -199,18 +199,18 @@ def _load_lib(libname: str): encoding = 'utf-8' else: raise ValueError(f"Unsupported NIDAQMX_C_LIBRARY value: {nidaqmx_c_library}") - except (OSError, WindowsError) as e: + except OSError as e: raise DaqNotFoundError(_DAQ_NOT_FOUND_MESSAGE) from e else: try: windll, cdll = _load_lib("nicai_utf8") encoding = 'utf-8' - except (OSError, WindowsError): + except OSError: # Fallback to nicaiu.dll if nicai_utf8.dll cannot be loaded try: windll, cdll = _load_lib("nicaiu") encoding = get_encoding_from_locale() - except (OSError, WindowsError) as e: + except OSError as e: raise DaqNotFoundError(_DAQ_NOT_FOUND_MESSAGE) from e elif sys.platform.startswith('linux'): library_path = find_library('nidaqmx') diff --git a/src/handwritten/_lib_time.py b/src/handwritten/_lib_time.py index 703ec8d57..69444a3fb 100644 --- a/src/handwritten/_lib_time.py +++ b/src/handwritten/_lib_time.py @@ -35,7 +35,7 @@ class AbsoluteTime(ctypes.Structure): @classmethod - def from_datetime(cls, dt: Union[std_datetime, ht_datetime]) -> AbsoluteTime: + def from_datetime(cls, dt: std_datetime | ht_datetime) -> AbsoluteTime: seconds_since_1904 = 0 # Convert the subseconds. @@ -55,7 +55,7 @@ def from_datetime(cls, dt: Union[std_datetime, ht_datetime]) -> AbsoluteTime: return AbsoluteTime(lsb=lsb, msb=seconds_since_1904) - def to_datetime(self, tzinfo: Optional[dt_tzinfo] = None) -> ht_datetime: + def to_datetime(self, tzinfo: dt_tzinfo | None = None) -> ht_datetime: total_yoctoseconds = int( round(AbsoluteTime._YS_PER_S * self.lsb / AbsoluteTime._NUM_SUBSECONDS) ) diff --git a/src/handwritten/_linux_installation_commands.py b/src/handwritten/_linux_installation_commands.py index 3d43186a1..8e26cd743 100644 --- a/src/handwritten/_linux_installation_commands.py +++ b/src/handwritten/_linux_installation_commands.py @@ -1,3 +1,5 @@ +from __future__ import annotations + from dataclasses import dataclass from typing import Callable, Dict, List, Tuple @@ -58,8 +60,8 @@ def _get_version_rhel(dist_version: str) -> str: @dataclass class DistroInfo: get_distro_version: Callable[[str], str] - get_daqmx_version: List[str] - install_commands: List[List[str]] + get_daqmx_version: list[str] + install_commands: list[list[str]] # Mapping of distros to their command templates and version handlers @@ -74,7 +76,7 @@ class DistroInfo: def get_linux_installation_commands( _directory_to_extract_to: str, dist_name: str, dist_version: str, _release_string: str -) -> List[List[str]]: +) -> list[list[str]]: """ Get the installation commands for Linux based on the distribution. diff --git a/src/handwritten/_time.py b/src/handwritten/_time.py index fd0601968..06edadd50 100644 --- a/src/handwritten/_time.py +++ b/src/handwritten/_time.py @@ -9,7 +9,7 @@ from zoneinfo import ZoneInfo # theoretically the same as astimezone(), but with support for dates before 1970 -def _convert_to_desired_timezone(expected_time_utc: Union[std_datetime, ht_datetime], tzinfo: Optional[dt_tzinfo] = None) -> Union[std_datetime, ht_datetime]: +def _convert_to_desired_timezone(expected_time_utc: std_datetime | ht_datetime, tzinfo: dt_tzinfo | None = None) -> std_datetime | ht_datetime: # if timezone matches, no need to do conversion if expected_time_utc.tzinfo is tzinfo: return expected_time_utc diff --git a/src/handwritten/stream_readers.py b/src/handwritten/stream_readers.py index c6f759184..a1fbfd7d6 100644 --- a/src/handwritten/stream_readers.py +++ b/src/handwritten/stream_readers.py @@ -1,3 +1,5 @@ +from __future__ import annotations + from typing import Optional, Tuple import numpy from nidaqmx import DaqError @@ -69,7 +71,7 @@ def _verify_array(self, data, number_of_samples_per_channel, channels_to_read = self._in_stream.channels_to_read number_of_channels = len(channels_to_read.channel_names) - array_shape: Optional[Tuple[int, ...]] = None + array_shape: tuple[int, ...] | None = None if is_many_chan: if is_many_samp: array_shape = (number_of_channels, @@ -114,7 +116,7 @@ def _verify_array_digital_lines( number_of_channels = len(channels_to_read.channel_names) number_of_lines = self._in_stream.di_num_booleans_per_chan - array_shape: Optional[Tuple[int, ...]] = None + array_shape: tuple[int, ...] | None = None if is_many_chan: if is_many_line: array_shape = (number_of_channels, number_of_lines) diff --git a/src/handwritten/system/storage/persisted_channel.py b/src/handwritten/system/storage/persisted_channel.py index 2af85926d..fafa6aede 100644 --- a/src/handwritten/system/storage/persisted_channel.py +++ b/src/handwritten/system/storage/persisted_channel.py @@ -1,4 +1,3 @@ - from nidaqmx import utils __all__ = ['PersistedChannel'] diff --git a/src/handwritten/system/storage/persisted_scale.py b/src/handwritten/system/storage/persisted_scale.py index df16d4a7a..784f94244 100644 --- a/src/handwritten/system/storage/persisted_scale.py +++ b/src/handwritten/system/storage/persisted_scale.py @@ -1,4 +1,3 @@ - from nidaqmx import utils from nidaqmx.scale import _ScaleAlternateConstructor diff --git a/src/handwritten/system/storage/persisted_task.py b/src/handwritten/system/storage/persisted_task.py index 01396f273..cf49bbdad 100644 --- a/src/handwritten/system/storage/persisted_task.py +++ b/src/handwritten/system/storage/persisted_task.py @@ -1,4 +1,3 @@ - from nidaqmx import task from nidaqmx import utils diff --git a/src/handwritten/task/__init__.py b/src/handwritten/task/__init__.py index dd5583a7b..a5a3eef9b 100644 --- a/src/handwritten/task/__init__.py +++ b/src/handwritten/task/__init__.py @@ -1,4 +1,3 @@ - from nidaqmx.task._task import ( Task, _TaskEventType, _TaskAlternateConstructor ) diff --git a/src/handwritten/task/_task.py b/src/handwritten/task/_task.py index af3fb1e1f..e60b501bd 100644 --- a/src/handwritten/task/_task.py +++ b/src/handwritten/task/_task.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import threading import warnings from enum import Enum @@ -601,7 +603,7 @@ def read(self, number_of_samples_per_channel=NUM_SAMPLES_UNSET, # Determine the array shape and size to create if number_of_channels > 1: if not num_samples_not_set: - array_shape: Tuple[int, ...] = ( + array_shape: tuple[int, ...] = ( number_of_channels, number_of_samples_per_channel ) else: @@ -672,13 +674,13 @@ def read(self, number_of_samples_per_channel=NUM_SAMPLES_UNSET, def _read_ctr_pulse( self, - array_shape: Tuple[int, ...], + array_shape: tuple[int, ...], meas_type: UsageTypeCI, number_of_channels: int, number_of_samples_per_channel: int, num_samples_not_set: bool, timeout: float, - ) -> Union[CtrFreq, CtrTick, CtrTime, List[CtrFreq], List[CtrTick], List[CtrTime]]: + ) -> CtrFreq | CtrTick | CtrTime | list[CtrFreq] | list[CtrTick] | list[CtrTime]: if meas_type == UsageTypeCI.PULSE_FREQ: frequencies = numpy.zeros(array_shape, dtype=numpy.float64) duty_cycles = numpy.zeros(array_shape, dtype=numpy.float64) @@ -687,7 +689,7 @@ def _read_ctr_pulse( self._handle, number_of_samples_per_channel, timeout, FillMode.GROUP_BY_CHANNEL.value, frequencies, duty_cycles) - data: Union[List[CtrFreq], List[CtrTick], List[CtrTime]] = [ + data: list[CtrFreq] | list[CtrTick] | list[CtrTime] = [ CtrFreq(freq=f, duty_cycle=d) for f, d in zip(frequencies, duty_cycles) ] @@ -727,11 +729,11 @@ def _read_ctr_pulse( def _read_power( self, - array_shape: Tuple[int, ...], + array_shape: tuple[int, ...], number_of_channels: int, number_of_samples_per_channel: int, timeout: float, - ) -> Union[PowerMeasurement, List[PowerMeasurement], List[List[PowerMeasurement]]]: + ) -> PowerMeasurement | list[PowerMeasurement] | list[list[PowerMeasurement]]: voltages = numpy.zeros(array_shape, dtype=numpy.float64) currents = numpy.zeros(array_shape, dtype=numpy.float64) diff --git a/src/handwritten/task/channels/__init__.py b/src/handwritten/task/channels/__init__.py index 65c224aef..1427c46d1 100644 --- a/src/handwritten/task/channels/__init__.py +++ b/src/handwritten/task/channels/__init__.py @@ -1,4 +1,3 @@ - from nidaqmx.task.channels._channel import Channel from nidaqmx.task.channels._ai_channel import AIChannel from nidaqmx.task.channels._ao_channel import AOChannel diff --git a/src/handwritten/task/collections/__init__.py b/src/handwritten/task/collections/__init__.py index 555c246ac..2c93309e5 100644 --- a/src/handwritten/task/collections/__init__.py +++ b/src/handwritten/task/collections/__init__.py @@ -1,4 +1,3 @@ - from nidaqmx.task.collections._channel_collection import ChannelCollection from nidaqmx.task.collections._ai_channel_collection import AIChannelCollection from nidaqmx.task.collections._ao_channel_collection import AOChannelCollection diff --git a/src/handwritten/task/triggering/__init__.py b/src/handwritten/task/triggering/__init__.py index 155ec697b..b1d4de025 100644 --- a/src/handwritten/task/triggering/__init__.py +++ b/src/handwritten/task/triggering/__init__.py @@ -1,4 +1,3 @@ - from nidaqmx.task.triggering._triggers import Triggers from nidaqmx.task.triggering._arm_start_trigger import ArmStartTrigger from nidaqmx.task.triggering._handshake_trigger import HandshakeTrigger diff --git a/src/handwritten/utils.py b/src/handwritten/utils.py index a003efe69..72de56a1a 100644 --- a/src/handwritten/utils.py +++ b/src/handwritten/utils.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import re from dataclasses import dataclass from typing import List, Optional @@ -37,7 +39,7 @@ def to_flattened_name(self) -> str: return f"{self.base_name}{self.start_index_str}:{self.end_index_str}" -def flatten_channel_string(channel_names: List[str]) -> str: +def flatten_channel_string(channel_names: list[str]) -> str: """ Converts a list of channel names to a comma-delimited list of names. @@ -109,7 +111,7 @@ def flatten_channel_string(channel_names: List[str]) -> str: return ','.join([_f for _f in flattened_channel_list if _f]).strip() -def unflatten_channel_string(channel_names: str) -> List[str]: +def unflatten_channel_string(channel_names: str) -> list[str]: """ Converts a comma-delimited list of channel names to a list of names. @@ -197,8 +199,8 @@ def unflatten_channel_string(channel_names: str) -> List[str]: def _select_interpreter( - grpc_options: Optional[GrpcSessionOptions] = None, - interpreter: Optional[BaseInterpreter] = None + grpc_options: GrpcSessionOptions | None = None, + interpreter: BaseInterpreter | None = None ) -> BaseInterpreter: if interpreter: return interpreter diff --git a/tests/_event_utils.py b/tests/_event_utils.py index f66741626..6f9d1c7bd 100644 --- a/tests/_event_utils.py +++ b/tests/_event_utils.py @@ -1,6 +1,8 @@ +from __future__ import annotations + import threading import time -from typing import Callable, Generic, List, NamedTuple, Optional, TypeVar, Union +from typing import Callable, Generic, NamedTuple, TypeVar, Union class DoneEvent(NamedTuple): @@ -30,15 +32,15 @@ class SignalEvent(NamedTuple): class BaseEventObserver(Generic[TEvent]): """Base class for event observers.""" - def __init__(self, side_effect: Optional[SideEffect] = None): + def __init__(self, side_effect: SideEffect | None = None): """Initializes the BaseEventObserver.""" self._lock = threading.Lock() self._event_semaphore = threading.Semaphore(value=0) - self._events: List[TEvent] = [] + self._events: list[TEvent] = [] self._side_effect = side_effect @property - def events(self) -> List[TEvent]: + def events(self) -> list[TEvent]: """Returns the list of observed events.""" with self._lock: return self._events[:] diff --git a/tests/acceptance/test_internationalization.py b/tests/acceptance/test_internationalization.py index 7bb6a26a6..8c7037709 100644 --- a/tests/acceptance/test_internationalization.py +++ b/tests/acceptance/test_internationalization.py @@ -1,5 +1,7 @@ +from __future__ import annotations + import pathlib -from typing import Any, Dict, List, Optional, Union +from typing import Any import pytest @@ -16,7 +18,7 @@ def ai_task(task, sim_6363_device): return task -def _get_encoding(obj: Union[Task, Dict[str, Any]]) -> Optional[str]: +def _get_encoding(obj: Task | dict[str, Any]) -> str | None: if getattr(obj, "_grpc_options", None) or (isinstance(obj, dict) and "grpc_options" in obj): # gRPC server limited to MBCS encoding return get_encoding_from_locale() @@ -35,7 +37,7 @@ def _get_encoding(obj: Union[Task, Dict[str, Any]]) -> Optional[str]: ], ) def test___supported_encoding___reset_nonexistent_device___returns_error_with_device_name( - init_kwargs: Dict[str, Any], device_name: str, supported_encodings: List[str] + init_kwargs: dict[str, Any], device_name: str, supported_encodings: list[str] ): if _get_encoding(init_kwargs) not in supported_encodings: pytest.skip("requires compatible encoding") @@ -57,7 +59,7 @@ def test___supported_encoding___reset_nonexistent_device___returns_error_with_de ], ) def test___supported_encoding___logging_file_path___returns_assigned_value( - ai_task: Task, file_path: str, supported_encodings: List[str] + ai_task: Task, file_path: str, supported_encodings: list[str] ): if _get_encoding(ai_task) not in supported_encodings: pytest.skip("requires compatible encoding") @@ -77,7 +79,7 @@ def test___supported_encoding___logging_file_path___returns_assigned_value( ], ) def test___supported_encoding___configure_logging___returns_assigned_values( - ai_task: Task, file_path: str, supported_encodings: List[str] + ai_task: Task, file_path: str, supported_encodings: list[str] ): if _get_encoding(ai_task) not in supported_encodings: pytest.skip("requires compatible encoding") @@ -98,7 +100,7 @@ def test___supported_encoding___configure_logging___returns_assigned_values( ], ) def test___supported_encoding___start_new_file___returns_assigned_value( - ai_task: Task, file_path: str, supported_encodings: List[str] + ai_task: Task, file_path: str, supported_encodings: list[str] ): if _get_encoding(ai_task) not in supported_encodings: pytest.skip("requires compatible encoding") diff --git a/tests/acceptance/test_multi_threading.py b/tests/acceptance/test_multi_threading.py index 76d419f3f..17be16d2c 100644 --- a/tests/acceptance/test_multi_threading.py +++ b/tests/acceptance/test_multi_threading.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import concurrent.futures import functools import random @@ -6,7 +8,7 @@ from concurrent.futures import Future, ThreadPoolExecutor from contextlib import ExitStack from threading import Barrier, Semaphore -from typing import Any, Callable, List, Sequence, Tuple +from typing import Any, Callable, Sequence import pytest @@ -149,7 +151,7 @@ def _get_set_property_thread_main( stop_semaphore: Semaphore, channel: AIChannel, property_name: str, - property_values: List[Any], + property_values: list[Any], ) -> None: start_barrier.wait(timeout=TIMEOUT) while not stop_semaphore.acquire(timeout=0.0): @@ -238,7 +240,7 @@ def _create_ai_task_with_shared_interpreter( return task -def _configure_timing(tasks: List[Task], samples_per_chan: List[int]) -> None: +def _configure_timing(tasks: list[Task], samples_per_chan: list[int]) -> None: assert len(tasks) == len(samples_per_chan) for i in range(len(tasks)): @@ -248,8 +250,8 @@ def _configure_timing(tasks: List[Task], samples_per_chan: List[int]) -> None: def _configure_events( - tasks: List[Task], samples_per_chan: List[int] -) -> Tuple[List[int], List[threading.Event], List[int]]: + tasks: list[Task], samples_per_chan: list[int] +) -> tuple[list[int], list[threading.Event], list[int]]: assert len(tasks) == len(samples_per_chan) samples_acquired = [0 for _ in tasks] diff --git a/tests/component/task/channels/test_ai_channel.py b/tests/component/task/channels/test_ai_channel.py index dc951b792..6f4a3d1f8 100644 --- a/tests/component/task/channels/test_ai_channel.py +++ b/tests/component/task/channels/test_ai_channel.py @@ -1,5 +1,6 @@ +from __future__ import annotations + import pathlib -from typing import List import pytest @@ -380,8 +381,8 @@ def test___task___add_ai_force_bridge_polynomial_chan___sets_channel_attributes( sim_bridge_device: Device, bridge_config: BridgeConfiguration, nominal_bridge_resistance: float, - forward_coeffs: List[float], - reverse_coeffs: List[float], + forward_coeffs: list[float], + reverse_coeffs: list[float], ) -> None: chan: AIChannel = task.ai_channels.add_ai_force_bridge_polynomial_chan( sim_bridge_device.ai_physical_chans[0].name, @@ -410,8 +411,8 @@ def test___task___add_ai_force_bridge_table_chan___sets_channel_attributes( sim_bridge_device: Device, bridge_config: BridgeConfiguration, nominal_bridge_resistance: float, - electrical_vals: List[float], - physical_vals: List[float], + electrical_vals: list[float], + physical_vals: list[float], ) -> None: chan: AIChannel = task.ai_channels.add_ai_force_bridge_table_chan( sim_bridge_device.ai_physical_chans[0].name, diff --git a/tests/component/task/test_triggers.py b/tests/component/task/test_triggers.py index e4f3d8b59..258e98f0c 100644 --- a/tests/component/task/test_triggers.py +++ b/tests/component/task/test_triggers.py @@ -1,5 +1,6 @@ +from __future__ import annotations + from datetime import timezone -from typing import List import pytest from hightime import datetime as ht_datetime, timedelta as ht_timedelta @@ -302,8 +303,8 @@ def test___reference_trigger___cfg_anlg_window_ref_trig___no_errors( ) def test___start_trigger___cfg_anlg_multi_edge_start_trig___no_errors( sim_9775_ai_voltage_multi_edge_task: Task, - trig_slopes: List[Slope], - trig_levels: List[float], + trig_slopes: list[Slope], + trig_levels: list[float], ): trigger_sources = ["cdaqTesterMod3/ai0", "cdaqTesterMod3/ai1"] flatten_trigger_sources = flatten_channel_string([s for s in trigger_sources]) @@ -408,8 +409,8 @@ def test___reference_trigger___cfg_anlg_edge_ref_trig___no_errors( def test___reference_trigger___cfg_anlg_multi_edge_ref_trig___no_errors( sim_9775_ai_voltage_multi_edge_task: Task, pretrig_samples: int, - trig_slopes: List[Slope], - trig_levels: List[float], + trig_slopes: list[Slope], + trig_levels: list[float], ): trigger_sources = ["cdaqTesterMod3/ai0", "cdaqTesterMod3/ai1"] flatten_trigger_sources = flatten_channel_string([s for s in trigger_sources]) diff --git a/tests/component/test_stream_readers_ai.py b/tests/component/test_stream_readers_ai.py index a5216d3e0..52a613cff 100644 --- a/tests/component/test_stream_readers_ai.py +++ b/tests/component/test_stream_readers_ai.py @@ -1,6 +1,7 @@ +from __future__ import annotations + import ctypes import math -from typing import List, Union import numpy import numpy.typing @@ -341,7 +342,7 @@ def pwr_single_channel_task( @pytest.fixture def pwr_multi_channel_task( - task: nidaqmx.Task, sim_ts_power_devices: List[nidaqmx.system.Device] + task: nidaqmx.Task, sim_ts_power_devices: list[nidaqmx.system.Device] ) -> nidaqmx.Task: for chan_index, sim_ts_power_device in enumerate(sim_ts_power_devices): task.ai_channels.add_ai_power_chan( @@ -537,9 +538,9 @@ def test___power_binary_reader___read_many_sample___returns_valid_samples( def test___power_binary_reader___read_many_sample_with_wrong_dtype___raises_error_with_correct_dtype( pwr_multi_channel_task: nidaqmx.Task, voltage_dtype: numpy.typing.DTypeLike, - voltage_default: Union[float, int], + voltage_default: float | int, current_dtype: numpy.typing.DTypeLike, - current_default: Union[float, int], + current_default: float | int, ) -> None: reader = PowerBinaryReader(pwr_multi_channel_task.in_stream) num_channels = pwr_multi_channel_task.number_of_channels diff --git a/tests/component/test_stream_readers_ci.py b/tests/component/test_stream_readers_ci.py index bb709b949..1db2ca0a6 100644 --- a/tests/component/test_stream_readers_ci.py +++ b/tests/component/test_stream_readers_ci.py @@ -1,6 +1,7 @@ +from __future__ import annotations + import ctypes import math -from typing import Union import numpy import numpy.typing @@ -342,9 +343,9 @@ def test___counter_reader___read_many_sample_pulse_ticks___returns_valid_samples def test___counter_reader___read_many_sample_pulse_ticks_with_wrong_dtype___raises_error_with_correct_dtype( ci_pulse_ticks_task: nidaqmx.Task, high_ticks_dtype: numpy.typing.DTypeLike, - high_ticks_default: Union[float, int], + high_ticks_default: float | int, low_ticks_dtype: numpy.typing.DTypeLike, - low_ticks_default: Union[float, int], + low_ticks_default: float | int, ) -> None: reader = CounterReader(ci_pulse_ticks_task.in_stream) samples_to_read = 10 diff --git a/tests/component/test_stream_readers_di.py b/tests/component/test_stream_readers_di.py index 007e78368..b60537a74 100644 --- a/tests/component/test_stream_readers_di.py +++ b/tests/component/test_stream_readers_di.py @@ -1,6 +1,8 @@ +from __future__ import annotations + import ctypes import math -from typing import Callable, List, TypeVar +from typing import Callable, TypeVar import numpy import numpy.typing @@ -149,7 +151,7 @@ def _get_expected_digital_data_for_sample(num_lines: int, sample_number: int) -> return result & line_mask -def _get_expected_digital_data(num_lines: int, num_samples: int) -> List[int]: +def _get_expected_digital_data(num_lines: int, num_samples: int) -> list[int]: return [ _get_expected_digital_data_for_sample(num_lines, sample_number) for sample_number in range(num_samples) @@ -158,13 +160,13 @@ def _get_expected_digital_data(num_lines: int, num_samples: int) -> List[int]: def _get_expected_digital_port_data_port_major( task: nidaqmx.Task, num_samples: int -) -> List[List[int]]: +) -> list[list[int]]: return [_get_expected_digital_data(chan.di_num_lines, num_samples) for chan in task.channels] def _get_expected_digital_port_data_sample_major( task: nidaqmx.Task, num_samples: int -) -> List[List[int]]: +) -> list[list[int]]: result = _get_expected_digital_port_data_port_major(task, num_samples) return numpy.transpose(result).tolist() diff --git a/tests/component/test_stream_writers_ao.py b/tests/component/test_stream_writers_ao.py index 63fccef4e..bef2e3e49 100644 --- a/tests/component/test_stream_writers_ao.py +++ b/tests/component/test_stream_writers_ao.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import ctypes from typing import Callable diff --git a/tests/component/test_stream_writers_co.py b/tests/component/test_stream_writers_co.py index 558a49d78..f6fb3dd9a 100644 --- a/tests/component/test_stream_writers_co.py +++ b/tests/component/test_stream_writers_co.py @@ -1,6 +1,8 @@ +from __future__ import annotations + import ctypes import math -from typing import Callable, List +from typing import Callable import numpy import pytest @@ -77,7 +79,7 @@ def ci_freq_loopback_task( return task -def _get_counter_freq_data(num_samples: int) -> List[CtrFreq]: +def _get_counter_freq_data(num_samples: int) -> list[CtrFreq]: frequencies = numpy.linspace(START_FREQUENCY, END_FREQUENCY, num_samples) duty_cycles = numpy.linspace(START_DUTY_CYCLE, END_DUTY_CYCLE, num_samples) diff --git a/tests/component/test_stream_writers_do.py b/tests/component/test_stream_writers_do.py index a4a2d952a..37d013c2a 100644 --- a/tests/component/test_stream_writers_do.py +++ b/tests/component/test_stream_writers_do.py @@ -1,6 +1,8 @@ +from __future__ import annotations + import ctypes import math -from typing import Callable, List +from typing import Callable import numpy import pytest @@ -215,24 +217,24 @@ def _get_digital_data_for_sample(num_lines: int, sample_number: int) -> int: return result & line_mask -def _get_digital_data(num_lines: int, num_samples: int) -> List[int]: +def _get_digital_data(num_lines: int, num_samples: int) -> list[int]: return [ _get_digital_data_for_sample(num_lines, sample_number) for sample_number in range(num_samples) ] -def _get_digital_port_data_for_sample(task: nidaqmx.Task, sample_number: int) -> List[int]: +def _get_digital_port_data_for_sample(task: nidaqmx.Task, sample_number: int) -> list[int]: return [ _get_digital_data_for_sample(chan.do_num_lines, sample_number) for chan in task.channels ] -def _get_digital_port_data_port_major(task: nidaqmx.Task, num_samples: int) -> List[List[int]]: +def _get_digital_port_data_port_major(task: nidaqmx.Task, num_samples: int) -> list[list[int]]: return [_get_digital_data(chan.do_num_lines, num_samples) for chan in task.channels] -def _get_digital_port_data_sample_major(task: nidaqmx.Task, num_samples: int) -> List[List[int]]: +def _get_digital_port_data_sample_major(task: nidaqmx.Task, num_samples: int) -> list[list[int]]: result = _get_digital_port_data_port_major(task, num_samples) return numpy.transpose(result).tolist() diff --git a/tests/component/test_task_events.py b/tests/component/test_task_events.py index aa365d100..6f1fe7390 100644 --- a/tests/component/test_task_events.py +++ b/tests/component/test_task_events.py @@ -1,8 +1,9 @@ +from __future__ import annotations + import threading import time import traceback from logging import LogRecord -from typing import List import pytest @@ -623,7 +624,7 @@ def _get_exception(record: LogRecord) -> BaseException: def _wait_for_log_records( caplog: pytest.LogCaptureFixture, message_substring: str, expected_count: int, timeout=10.0 -) -> List[LogRecord]: +) -> list[LogRecord]: start_time = time.time() while time.time() - start_time < timeout: matching_records = [ diff --git a/tests/component/test_task_read_ai.py b/tests/component/test_task_read_ai.py index 07f340d1b..ccddb0990 100644 --- a/tests/component/test_task_read_ai.py +++ b/tests/component/test_task_read_ai.py @@ -1,4 +1,4 @@ -from typing import List +from __future__ import annotations import pytest @@ -153,7 +153,7 @@ def test___analog_multi_channel_finite___read_too_many_sample___returns_valid_2d _assert_equal_2d(data, expected, abs=VOLTAGE_EPSILON) -def _assert_equal_2d(data: List[List[float]], expected: List[List[float]], abs: float) -> None: +def _assert_equal_2d(data: list[list[float]], expected: list[list[float]], abs: float) -> None: # pytest.approx() does not support nested data structures. assert len(data) == len(expected) for i in range(len(data)): @@ -186,7 +186,7 @@ def pwr_single_channel_task( @pytest.fixture def pwr_multi_channel_task( - task: nidaqmx.Task, sim_ts_power_devices: List[nidaqmx.system.Device] + task: nidaqmx.Task, sim_ts_power_devices: list[nidaqmx.system.Device] ) -> nidaqmx.Task: for chan_index, sim_ts_power_device in enumerate(sim_ts_power_devices): task.ai_channels.add_ai_power_chan( diff --git a/tests/component/test_watchdog.py b/tests/component/test_watchdog.py index 72efd0cfd..0dd8dda8f 100644 --- a/tests/component/test_watchdog.py +++ b/tests/component/test_watchdog.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import weakref from typing import Callable diff --git a/tests/conftest.py b/tests/conftest.py index 7e8842e11..3f69cbb66 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -6,7 +6,7 @@ import pathlib from concurrent.futures import ThreadPoolExecutor from enum import Enum -from typing import TYPE_CHECKING, Callable, Generator, List +from typing import TYPE_CHECKING, Callable, Generator import pytest @@ -66,10 +66,10 @@ def pytest_generate_tests(metafunc: pytest.Metafunc) -> None: grpc_only = metafunc.definition.get_closest_marker("grpc_only") library_only = metafunc.definition.get_closest_marker("library_only") - params: List[_pytest.mark.structures.ParameterSet] = [] + params: list[_pytest.mark.structures.ParameterSet] = [] if not grpc_only: - library_marks: List[pytest.MarkDecorator] = [] + library_marks: list[pytest.MarkDecorator] = [] library_skip = metafunc.definition.get_closest_marker("library_skip") if library_skip: library_marks.append(pytest.mark.skip(*library_skip.args, **library_skip.kwargs)) @@ -79,7 +79,7 @@ def pytest_generate_tests(metafunc: pytest.Metafunc) -> None: params.append(pytest.param("library_init_kwargs", marks=library_marks)) if not library_only: - grpc_marks: List[pytest.MarkDecorator] = [] + grpc_marks: list[pytest.MarkDecorator] = [] grpc_skip = metafunc.definition.get_closest_marker("grpc_skip") if grpc_skip: grpc_marks.append(pytest.mark.skip(*grpc_skip.args, **grpc_skip.kwargs)) @@ -284,7 +284,7 @@ def sim_ts_voltage_device(sim_ts_chassis: nidaqmx.system.Device) -> nidaqmx.syst @pytest.fixture(scope="function") -def sim_ts_power_devices(sim_ts_chassis: nidaqmx.system.Device) -> List[nidaqmx.system.Device]: +def sim_ts_power_devices(sim_ts_chassis: nidaqmx.system.Device) -> list[nidaqmx.system.Device]: """Gets simulated power devices information.""" devices = [] for device in sim_ts_chassis.chassis_module_devices: @@ -347,7 +347,7 @@ def sim_velocity_device(system: nidaqmx.system.System) -> nidaqmx.system.Device: @pytest.fixture(scope="function") -def multi_threading_test_devices(system: nidaqmx.system.System) -> List[nidaqmx.system.Device]: +def multi_threading_test_devices(system: nidaqmx.system.System) -> list[nidaqmx.system.Device]: """Gets multi threading test devices information.""" devices = [] for device in system.devices: @@ -389,7 +389,7 @@ def test_assets_directory() -> pathlib.Path: @pytest.fixture(scope="session") -def grpc_server_process() -> Generator[GrpcServerProcess, None, None]: +def grpc_server_process() -> Generator[GrpcServerProcess]: """Gets the grpc server process.""" if grpc is None: pytest.skip("The grpc module is not available.") @@ -409,7 +409,7 @@ def grpc_channel(request: pytest.FixtureRequest) -> grpc.Channel: @pytest.fixture(scope="session") def shared_grpc_channel( grpc_server_process: GrpcServerProcess, -) -> Generator[grpc.Channel, None, None]: +) -> Generator[grpc.Channel]: """Gets the shared gRPC channel.""" with grpc.insecure_channel(f"localhost:{grpc_server_process.server_port}") as channel: yield channel @@ -418,7 +418,7 @@ def shared_grpc_channel( @pytest.fixture(scope="function") def temporary_grpc_channel( request: pytest.FixtureRequest, grpc_server_process: GrpcServerProcess -) -> Generator[grpc.Channel, None, None]: +) -> Generator[grpc.Channel]: """Gets a temporary gRPC channel (not shared with other tests).""" marker = request.node.get_closest_marker("temporary_grpc_channel") options = marker.kwargs.get("options", None) @@ -466,7 +466,7 @@ def task(request, generate_task) -> nidaqmx.Task: @pytest.fixture(scope="function") -def generate_task(init_kwargs) -> Generator[Callable[..., nidaqmx.Task], None, None]: +def generate_task(init_kwargs) -> Generator[Callable[..., nidaqmx.Task]]: """Gets a factory function which can be used to generate new tasks. The closure of task objects will be done by this fixture once the test is complete. @@ -541,7 +541,7 @@ def watchdog_task(request, sim_6363_device, generate_watchdog_task) -> nidaqmx.s @pytest.fixture(scope="function") def generate_watchdog_task( init_kwargs, -) -> Generator[Callable[..., nidaqmx.system.WatchdogTask], None, None]: +) -> Generator[Callable[..., nidaqmx.system.WatchdogTask]]: """Gets a factory function which can be used to generate new watchdog tasks. The closure of task objects will be done by this fixture once the test is complete. @@ -569,7 +569,7 @@ def _get_marker_value(request, marker_name, default=None): @pytest.fixture(scope="function") -def thread_pool_executor() -> Generator[ThreadPoolExecutor, None, None]: +def thread_pool_executor() -> Generator[ThreadPoolExecutor]: """Creates a thread pool executor. When the test completes, this fixture shuts down the thread pool executor. If any futures are diff --git a/tests/helpers.py b/tests/helpers.py index 7c7e49e3b..a2d0fd368 100644 --- a/tests/helpers.py +++ b/tests/helpers.py @@ -1,8 +1,10 @@ """This contains the helpers methods used in the DAQmx tests.""" +from __future__ import annotations + import contextlib import pathlib -from typing import Generator, Optional, Union +from typing import Generator from nidaqmx.system.physical_channel import PhysicalChannel @@ -20,8 +22,8 @@ def generate_random_seed(): @contextlib.contextmanager def configure_teds( - phys_chan: PhysicalChannel, teds_file_path: Optional[Union[str, pathlib.PurePath]] = None -) -> Generator[PhysicalChannel, None, None]: + phys_chan: PhysicalChannel, teds_file_path: str | pathlib.PurePath | None = None +) -> Generator[PhysicalChannel]: """Yields a physical channel with TEDS configured and then clears it after the test is done.""" phys_chan.configure_teds(teds_file_path) try: diff --git a/tests/legacy/test_utils.py b/tests/legacy/test_utils.py index 352522e6c..21b885d2e 100644 --- a/tests/legacy/test_utils.py +++ b/tests/legacy/test_utils.py @@ -1,6 +1,6 @@ """Tests for validating utilities functionality.""" -from typing import List +from __future__ import annotations from nidaqmx.utils import flatten_channel_string, unflatten_channel_string @@ -27,7 +27,7 @@ def test_backwards_flatten_flatten_and_unflatten(self): def test_empty_flatten_flatten_and_unflatten(self): """Test to validate flatten and unflatten empty channel.""" - unflattened_channels: List[str] = [] + unflattened_channels: list[str] = [] flattened_channels = "" assert flatten_channel_string(unflattened_channels) == flattened_channels assert unflatten_channel_string(flattened_channels) == unflattened_channels diff --git a/tests/unit/_task_utils.py b/tests/unit/_task_utils.py index 68f0a86a8..08b4fbd3f 100644 --- a/tests/unit/_task_utils.py +++ b/tests/unit/_task_utils.py @@ -1,6 +1,8 @@ """Task helper functions.""" -from typing import Dict, Iterable +from __future__ import annotations + +from typing import Iterable from unittest.mock import Mock from pytest_mock import MockerFixture @@ -43,7 +45,7 @@ def register_event_handler(mocker: MockerFixture, task: Task, event_type: _TaskE def register_event_handlers( mocker: MockerFixture, task: Task, event_types: Iterable[_TaskEventType] -) -> Dict[_TaskEventType, Mock]: +) -> dict[_TaskEventType, Mock]: """Register mock event handlers and return a dictionary mapping event name -> handler.""" return { event_type: register_event_handler(mocker, task, event_type) for event_type in event_types diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py index d7cdc9fde..8a3ef36ef 100644 --- a/tests/unit/conftest.py +++ b/tests/unit/conftest.py @@ -1,5 +1,7 @@ """Fixtures used in the DAQmx unit tests.""" +from __future__ import annotations + from typing import Generator from unittest.mock import Mock @@ -21,7 +23,7 @@ def interpreter(mocker: MockerFixture) -> Mock: @pytest.fixture -def task(interpreter: Mock) -> Generator[Task, None, None]: +def task(interpreter: Mock) -> Generator[Task]: """Create a DAQmx task. This fixture owns the task. Do not use it for test cases that destroy the task, or else you diff --git a/tests/unit/test_task_events.py b/tests/unit/test_task_events.py index 0c97621fe..b6bc59b41 100644 --- a/tests/unit/test_task_events.py +++ b/tests/unit/test_task_events.py @@ -1,4 +1,5 @@ -from typing import Dict +from __future__ import annotations + from unittest.mock import Mock import pytest @@ -80,7 +81,7 @@ def test___events_registered_and_multiple_errors_raised___close___task_resources def _assert_task_resources_cleaned_up( - task: Task, interpreter: Mock, event_handlers: Dict[_TaskEventType, Mock] + task: Task, interpreter: Mock, event_handlers: dict[_TaskEventType, Mock] ) -> None: interpreter.clear_task.assert_called_once_with("MyTaskHandle") assert task._handle is None