diff --git a/examples/custom_msg.py b/examples/custom_msg.py index cb8381c20..963363758 100755 --- a/examples/custom_msg.py +++ b/examples/custom_msg.py @@ -43,7 +43,7 @@ class CustomModbusResponse(ModbusPDU): function_code = 55 rtu_byte_count_pos = 2 - def __init__(self, values=None, device_id=1, transaction=0): + def __init__(self, values: list[int] | None = None, device_id=1, transaction=0): """Initialize.""" super().__init__(dev_id=device_id, transaction_id=transaction) self.values = values or [] diff --git a/examples/server_async.py b/examples/server_async.py index e89789170..a9df0fb18 100755 --- a/examples/server_async.py +++ b/examples/server_async.py @@ -85,7 +85,7 @@ def setup_server(description=None, context=None, cmdline=None): elif args.store == "sparse": # pragma: no cover # Continuing, or use a sparse DataBlock which can have gaps datablock = lambda : ModbusSparseDataBlock({0x00: 0, 0x05: 1}) # pylint: disable=unnecessary-lambda-assignment - elif args.store == "factory": # pragma: no cover + else: # args.store == "factory" # pragma: no cover # Alternately, use the factory methods to initialize the DataBlocks # or simply do not pass them to have them initialized to 0x00 on the # full address range:: @@ -98,15 +98,15 @@ def setup_server(description=None, context=None, cmdline=None): # (broadcast mode). # However, this can be overloaded by setting the single flag to False and # then supplying a dictionary of device id to context mapping:: - context = {} - - for device_id in range(args.device_ids): - context[device_id] = ModbusDeviceContext( + context = { + device_id : ModbusDeviceContext( di=datablock(), co=datablock(), hr=datablock(), ir=datablock(), ) + for device_id in range(args.device_ids) + } single = False else: diff --git a/pymodbus/client/mixin.py b/pymodbus/client/mixin.py index 48c709588..ee7ba3fb5 100644 --- a/pymodbus/client/mixin.py +++ b/pymodbus/client/mixin.py @@ -760,9 +760,10 @@ def convert_to_registers( # noqa: C901 if data_type == cls.DATATYPE.BITS: if not isinstance(value, list): raise TypeError(f"Value should be list of bool but is {type(value)}.") + value = cast(list[bool], value) if (missing := len(value) % 16): value = value + [False] * (16 - missing) - byte_list = pack_bitstring(cast(list[bool], value)) + byte_list = pack_bitstring(value) elif data_type == cls.DATATYPE.STRING: if not isinstance(value, str): raise TypeError(f"Value should be string but is {type(value)}.") diff --git a/pymodbus/datastore/context.py b/pymodbus/datastore/context.py index 38793c37d..e91821d9c 100644 --- a/pymodbus/datastore/context.py +++ b/pymodbus/datastore/context.py @@ -161,7 +161,7 @@ def __init__(self, devices=None, single=True): :param single: Set to true to treat this as a single context """ self.single = single - self._devices = devices or {} + self._devices = devices or {} # type: ignore[var-annotated, unused-ignore] if self.single: self._devices = {0: self._devices} diff --git a/pymodbus/pdu/device.py b/pymodbus/pdu/device.py index 921be1a04..6fc0acf1c 100644 --- a/pymodbus/pdu/device.py +++ b/pymodbus/pdu/device.py @@ -93,7 +93,7 @@ def __iter__(self): """ return iter(self.stat_data.items()) - def reset(self): + def reset(self) -> None: """Clear all of the modbus plus statistics.""" for key in self.stat_data: self.stat_data[key] = [0x00] * len(self.stat_data[key]) @@ -105,14 +105,14 @@ def summary(self): """ return iter(self.stat_data.values()) - def encode(self): + def encode(self) -> list[int]: """Return a summary of the modbus plus statistics. - :returns: 54 16-bit words representing the status + :returns: An iterator over lists of 8-bit integers representing each statistic """ - total, values = [], sum(self.stat_data.values(), []) # noqa: RUF017 - for i in range(0, len(values), 2): - total.append((values[i] << 8) | values[i + 1]) + values = [v for sublist in self.stat_data.values() for v in sublist] + total = [(values[i] << 8) | values[i + 1] + for i in range(0, len(values), 2)] return total @@ -442,6 +442,8 @@ class ModbusControlBlock: _plus = ModbusPlusStatistics() _events: list[ModbusEvent] = [] + _inst: ModbusControlBlock | None = None + # -------------------------------------------------------------------------# # Magic # -------------------------------------------------------------------------# @@ -461,7 +463,7 @@ def __iter__(self): def __new__(cls): """Create a new instance.""" - if "_inst" not in vars(cls): + if cls._inst is None: cls._inst = object.__new__(cls) return cls._inst diff --git a/pymodbus/pdu/file_message.py b/pymodbus/pdu/file_message.py index ead142654..4c381763b 100644 --- a/pymodbus/pdu/file_message.py +++ b/pymodbus/pdu/file_message.py @@ -105,7 +105,8 @@ def encode(self) -> bytes: def decode(self, data: bytes) -> None: """Decode the response.""" - count, self.records = 1, [] + count = 1 + self.records.clear() byte_count = int(data[0]) while count < byte_count: calc_length, _ = struct.unpack( @@ -151,7 +152,8 @@ def encode(self) -> bytes: def decode(self, data: bytes) -> None: """Decode the incoming request.""" byte_count = int(data[0]) - count, self.records = 1, [] + count = 1 + self.records.clear() while count < byte_count: decoded = struct.unpack(">BHHH", data[count : count + 7]) calc_length = decoded[3] * 2 @@ -204,7 +206,8 @@ def encode(self) -> bytes: def decode(self, data: bytes) -> None: """Decode the incoming request.""" - count, self.records = 1, [] + count = 1 + self.records.clear() byte_count = int(data[0]) while count < byte_count: decoded = struct.unpack(">BHHH", data[count : count + 7]) diff --git a/pymodbus/pdu/mei_message.py b/pymodbus/pdu/mei_message.py index 916bdd8ec..5b0d6f960 100644 --- a/pymodbus/pdu/mei_message.py +++ b/pymodbus/pdu/mei_message.py @@ -2,6 +2,7 @@ from __future__ import annotations import struct +from typing import Any from pymodbus.constants import DeviceInformation, ExcCodes, MoreData from pymodbus.datastore import ModbusDeviceContext @@ -92,7 +93,7 @@ def calculateRtuFrameSize(cls, data: bytes) -> int: count -= 1 return size + 2 - def __init__(self, read_code: int | None = None, information: dict | None = None, dev_id: int = 1, transaction_id: int = 0) -> None: + def __init__(self, read_code: int | None = None, information: dict[int, Any] | None = None, dev_id: int = 1, transaction_id: int = 0) -> None: """Initialize a new instance.""" super().__init__(transaction_id=transaction_id, dev_id=dev_id) self.read_code = read_code or DeviceInformation.BASIC @@ -150,7 +151,8 @@ def decode(self, data: bytes) -> None: self.sub_function_code, self.read_code = params[0:2] self.conformity, self.more_follows = params[2:4] self.next_object_id, self.number_of_objects = params[4:6] - self.information, count = {}, 6 # skip the header information + count = 6 # skip the header information + self.information.clear() while count < len(data): object_id, object_length = struct.unpack(">BB", data[count : count + 2]) diff --git a/pymodbus/server/simulator/http_server.py b/pymodbus/server/simulator/http_server.py index 62e13f820..a0a50e505 100644 --- a/pymodbus/server/simulator/http_server.py +++ b/pymodbus/server/simulator/http_server.py @@ -7,6 +7,7 @@ import importlib import json import os +from typing import TYPE_CHECKING with contextlib.suppress(ImportError): @@ -25,6 +26,9 @@ ) +if TYPE_CHECKING: + from aiohttp import web + MAX_FILTER = 1000 RESPONSE_INACTIVE = -1 @@ -137,7 +141,7 @@ def __init__( del server["port"] device = setup["device_list"][modbus_device] self.datastore_context = ModbusSimulatorContext( - device, custom_actions_dict or {} + device, custom_actions_dict or None ) datastore = None if "device_id" in server: @@ -251,7 +255,7 @@ async def stop(self): self.serving.set_result(True) await asyncio.sleep(0) - async def handle_html_static(self, request): # pragma: no cover + async def handle_html_static(self, request: web.Request): # pragma: no cover """Handle static html.""" if not (page := request.path[1:]): page = "index.html" @@ -264,7 +268,7 @@ async def handle_html_static(self, request): # pragma: no cover except (FileNotFoundError, IsADirectoryError) as exc: raise web.HTTPNotFound(reason="File not found") from exc - async def handle_html(self, request): # pragma: no cover + async def handle_html(self, request: web.Request): # pragma: no cover """Handle html.""" page_type = request.path.split("/")[-1] params = dict(request.query) @@ -280,7 +284,7 @@ async def handle_html(self, request): # pragma: no cover new_page = self.generator_html[page_type][1](params, html) return web.Response(text=new_page, content_type="text/html") - async def handle_json(self, request): + async def handle_json(self, request: web.Request): """Handle api registers.""" command = request.path.split("/")[-1] params = await request.json() diff --git a/pymodbus/transport/transport.py b/pymodbus/transport/transport.py index e01b74fe3..6a42de26f 100644 --- a/pymodbus/transport/transport.py +++ b/pymodbus/transport/transport.py @@ -49,6 +49,7 @@ from __future__ import annotations import asyncio +import copy import dataclasses import ssl from abc import abstractmethod @@ -128,7 +129,7 @@ def generate_ssl( def copy(self) -> CommParams: """Create a copy.""" - return dataclasses.replace(self) + return copy.copy(self) class ModbusProtocol(asyncio.BaseProtocol): diff --git a/pyproject.toml b/pyproject.toml index 551cc98cf..9afc21fd4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -250,7 +250,7 @@ skip = "./build,./doc/source/_static,venv,.venv,.git,htmlcov,CHANGELOG.rst,.*_ca ignore-words-list = "asend" [tool.ruff] -target-version="py39" +target-version="py310" extend-exclude = [ "build", "doc", @@ -312,3 +312,4 @@ line-ending = "auto" [tool.pyright] disableBytesTypePromotions = false +typeCheckingMode = "standard" diff --git a/test/pdu/test_mei_messages.py b/test/pdu/test_mei_messages.py index 2256a082d..635cbeb2e 100644 --- a/test/pdu/test_mei_messages.py +++ b/test/pdu/test_mei_messages.py @@ -150,7 +150,7 @@ def test_read_device_information_decode(self): message = b"\x0e\x01\x01\x00\x00\x05" message += TEST_MESSAGE message += b"\x81\x04Test\x81\x08Repeated\x81\x07Another" - handle = ReadDeviceInformationResponse(read_code=0x00, information=[]) + handle = ReadDeviceInformationResponse(read_code=0x00, information={}) handle.decode(message) assert handle.read_code == DeviceInformation.BASIC assert handle.conformity == 0x01