From e6a2ea1d40176392a85b44102e45297cb8126a73 Mon Sep 17 00:00:00 2001 From: andygarcha-adc Date: Tue, 12 Aug 2025 15:30:40 -0400 Subject: [PATCH 1/6] Add XD/YD/CTD support and serial interface Expanded ClickPLC driver to support XD, YD, and CTD register read/write operations. Updated register type mapping and improved float decoding using BinaryPayloadDecoder. Added serial interface support to AsyncioModbusClient for both pymodbus 2.x and 3.x compatibility. Updated README to document new register types and access modes. --- README.md | 32 +++++++------- clickplc/driver.py | 101 ++++++++++++++++++++++++++++++++++++++++----- clickplc/util.py | 18 +++++--- 3 files changed, 120 insertions(+), 31 deletions(-) diff --git a/README.md b/README.md index 02ad60e..fd7d2bc 100644 --- a/README.md +++ b/README.md @@ -59,20 +59,24 @@ The entire API is `get` and `set`, and takes a range of inputs: Currently, the following datatypes are supported: -| | | | -|---|---|---| -| x | bool | Input point | -| y | bool | Output point | -| c | bool | (C)ontrol relay | -| t | bool | (T)imer | -| ct | bool | (C)oun(t)er | -| ds | int16 | (D)ata register, (s)ingle signed int | -| dd | int32 | (D)ata register, (d)double signed int | -| dh | uint16| (D) register, (h)ex | -| df | float | (D)ata register, (f)loating point | -| td | int16 | (T)ime (d)elay register | -| ctd | int32 | (C)oun(t)er Current Values, (d)ouble int | -| sd | int16 | (S)ystem (D)ata register | +| | | | | +|---|---|---|---| +| x | bool | Input point | R | +| y | bool | Output point | RW | +| c | bool | (C)ontrol relay | RW | +| t | bool | (T)imer | R | +| ct | bool | (C)oun(t)er | RW | +| sc | bool | (S)ystem (C)ontrol Relay | mixed | +| ds | int16 | (D)ata register, (s)ingle signed int | RW | +| dd | int32 | (D)ata register, (d)double signed int | RW | +| dh | uint16| (D) register, (h)ex | RW | +| df | float | (D)ata register, (f)loating point | RW | +| xd | uint32| Input Register | R | +| yd | uint32| Output Register | RW | +| td | int16 | (T)ime (d)elay register | RW | +| ctd | int32 | (C)oun(t)er Current Values, (d)ouble int | RW | +| sd | int16 | (S)ystem (D)ata register | mixed | +| txt| char| (T)e(xt) | RW | ### Tags / Nicknames diff --git a/clickplc/driver.py b/clickplc/driver.py index 1c6d757..a1b7226 100644 --- a/clickplc/driver.py +++ b/clickplc/driver.py @@ -13,7 +13,9 @@ import struct from collections import defaultdict from string import digits -from typing import Any, ClassVar, overload +from typing import Any, ClassVar, Literal, overload + +from pymodbus.payload import BinaryPayloadDecoder from clickplc.util import AsyncioModbusClient @@ -35,14 +37,16 @@ class ClickPLC(AsyncioModbusClient): 'ds': 'int16', # (D)ata register (s)ingle 'dd': 'int32', # (D)ata register, (d)ouble 'dh': 'int16', # (D)ata register, (h)ex - 'df': 'float', # (D)ata register (f)loating point + 'df': 'float', # (D)ata register (f)loating + 'xd': 'int32', # Input Register + 'yd': 'int32', # Output Register 'td': 'int16', # (T)imer register 'ctd': 'int32', # (C)oun(t)er Current values, (d)ouble 'sd': 'int16', # (S)ystem (D)ata register, single 'txt': 'str', # ASCII Text } - def __init__(self, address, tag_filepath='', timeout=1): + def __init__(self, address, tag_filepath='', timeout=1, interfacetype: Literal["TCP", "Serial"] = 'TCP'): """Initialize PLC connection and data structure. Args: @@ -51,7 +55,9 @@ def __init__(self, address, tag_filepath='', timeout=1): timeout (optional): Timeout when communicating with PLC. Default 1s. """ - super().__init__(address, timeout) + super().__init__(address, timeout, interfacetype) + self.bigendian = Endian.BIG if self.pymodbus35plus else Endian.Big # type:ignore[attr-defined] + self.lilendian = Endian.LITTLE if self.pymodbus35plus else Endian.Little # type:ignore[attr-defined] self.tags = self._load_tags(tag_filepath) self.active_addresses = self._get_address_ranges(self.tags) @@ -425,13 +431,48 @@ async def _get_df(self, start: int, end: int | None) -> dict | float: address = 28672 + 2 * (start - 1) count = 2 * (1 if end is None else (end - start + 1)) registers = await self.read_registers(address, count) - # pack the pairs of 16-bit registers (little-endian) and then unpack as 32-byte floats - packed = struct.pack(f'<{count}H', *registers) - values = struct.unpack(f'<{count // 2}f', packed) - if count == 2: - return values[0] - return {f'df{start + n}': v - for n, v in enumerate(values)} + decoder = BinaryPayloadDecoder.fromRegisters(registers, + byteorder=self.bigendian, + wordorder=self.lilendian) + if end is None: + return decoder.decode_32bit_float() + return {f'df{n}': decoder.decode_32bit_float() for n in range(start, end + 1)} + + async def _get_xd(self, start: int, end: int | None) -> dict: + """Read XD registers. Called by `get`.""" + if start < 1 or start > 8: + raise ValueError('YD must be in [1, 8].') + if end is not None and (end < 1 or end > 500): + raise ValueError('YD end must be in [1, 8].') + address = 57344 + 2 * (start - 1) + count = 1 if end is None else (end - start + 1) + registers = await self.read_registers(address, count) + if not registers or len(registers) < count : + raise ValueError("Failed to read correct number of registers.") + + decoder = BinaryPayloadDecoder.fromRegisters(registers, + byteorder=self.bigendian, + wordorder=self.lilendian) + if end is None : return decoder.decode_16bit_uint() + return {f'td{n}' : decoder.decode_16bit_uint() for n in range(start, end + 1)} + + async def _get_yd(self, start: int, end: int | None) -> dict: + """Read YD registers. Called by `get`.""" + if start < 1 or start > 8: + raise ValueError('YD must be in [1, 8].') + if end is not None and (end < 1 or end > 500): + raise ValueError('YD end must be in [1, 8].') + address = 57856 + 2 * (start - 1) + count = 1 if end is None else (end - start + 1) + registers = await self.read_registers(address, count) + if not registers or len(registers) < count : + raise ValueError("Failed to read correct number of registers.") + + decoder = BinaryPayloadDecoder.fromRegisters(registers, + byteorder=self.bigendian, + wordorder=self.lilendian) + if end is None : return decoder.decode_16bit_uint() + return {f'td{n}' : decoder.decode_16bit_uint() for n in range(start, end + 1)} async def _get_td(self, start: int, end: int | None) -> dict | int: """Read TD registers. Called by `get`. @@ -698,6 +739,25 @@ async def _set_dh(self, start: int, data: list[int]): raise ValueError('Data list longer than available addresses.') await self.write_registers(address, values=data) + async def _set_yd(self, start: int, data: list[int]): + """Set YD registers. Called by `set`.""" + if start < 1 or start > 8: + raise ValueError("YD must bein [1, 8]") + address = 57856 + 2 * (start - 1) + + if len(data) > 8 - start + 1 : + raise ValueError("Data list is longer than available addresses.") + + # pymodbus is expecting list[uint_16] + # convert each hex?? + values: list[bytes] = [] + for datum in data: + packed_4_bytes = struct.pack(' 250 : + raise ValueError("CTD must be in [1, 250].") + address = 49152 + 2 * (start - 1) + if len(data) > 250 - start + 1: + raise ValueError('Data list longer than available addresses.') + + # pymodbus is expectin list[uint_16] + # convert each int_32 into a uint_16 pair (little-endian) with the same byte value + + values: list[bytes] = [] + for datum in data : + packed_4_bytes = struct.pack(' Date: Tue, 12 Aug 2025 15:33:06 -0400 Subject: [PATCH 2/6] Improve XD/YD register handling Enhanced parsing logic for XD and YD register addresses to support special cases like '0u', updated range checks, and improved data packing/unpacking for Modbus operations. Added the u_index() helper to normalize address indices and refactored related methods for clarity and correctness when reading and writing XD/YD registers. --- clickplc/driver.py | 230 +++++++++++++++++++++++++++++++++++++-------- 1 file changed, 191 insertions(+), 39 deletions(-) diff --git a/clickplc/driver.py b/clickplc/driver.py index a1b7226..1d7f058 100644 --- a/clickplc/driver.py +++ b/clickplc/driver.py @@ -27,7 +27,7 @@ class ClickPLC(AsyncioModbusClient): abstracting corner cases and providing a simple asynchronous interface. """ - data_types: ClassVar[dict] = { + data_types: ClassVar[dict[str, str]] = { 'x': 'bool', # Input point 'y': 'bool', # Output point 'c': 'bool', # (C)ontrol relay @@ -112,8 +112,10 @@ async def get(self, address: str | None = None) -> dict: else: start, end = address, None i = next(i for i, s in enumerate(start) if s.isdigit()) - category, start_index = start[:i].lower(), int(start[i:]) - end_index = None if end is None else int(end[i:]) + + category = start[:i].lower() + start_index = 0.5 if start[i:].lower() == "0u" else int(start[i:]) + end_index = 0.5 if end is not None and end[i:].lower() == "0u" else None if end is None else int(end[i:]) if end_index is not None and end_index <= start_index: raise ValueError("End address must be greater than start address.") @@ -141,19 +143,44 @@ async def set(self, address: str, data): """ if address in self.tags: address = self.tags[address]['id'] + # if only one piece of data was sent in, just make it a list anyway. if not isinstance(data, list): data = [data] - i = next(i for i, s in enumerate(address) if s.isdigit()) - category, index = address[:i].lower(), int(address[i:]) + # this will find the first digit in the address + # to divide it into the "letter" part and the "number" part + # the problem is, we have to put some special stuff in here + # to check for xd0u or yd0u. i'm just going to hard-code + # these in and hope that it doesn't ever change ever! + if address.lower() in ('xd0u', 'yd0u'): + category = address[0:2] + index = 0.5 + else: + i = next(i for i, s in enumerate(address) if s.isdigit()) + category, index = address[:i].lower(), int(address[i:]) + + # check to make sure that the category ("X", "Y", "DS", etc) is valud if category not in self.data_types: - raise ValueError(f"{category} currently unsupported.") + raise ValueError(f"{category} is not a valid category. Did you spell it right?") + + # remove the "16"s and "32"s from int16s and int32s. data_type = self.data_types[category].rstrip(digits) + + # go through all the data for datum in data: + # if an int was passed in and we need a float, this is a pretty easy fix. if type(datum) == int and data_type == 'float': # noqa: E721 datum = float(datum) + + # however, we aren't going to handle any other type switching. + # using pydoc.locate, we can turn something like + # the string "float" into the actual type `float`. + # personally, i wonder how long that .locate() takes, we + # could just run it once. it's probably fine. if type(datum) != pydoc.locate(data_type): # noqa: E721 raise ValueError(f"Expected {address} as a {data_type}.") + + # now call the correct set function based on the category. return await getattr(self, '_set_' + category)(index, data) async def _get_x(self, start: int, end: int | None) -> dict: @@ -440,39 +467,113 @@ async def _get_df(self, start: int, end: int | None) -> dict | float: async def _get_xd(self, start: int, end: int | None) -> dict: """Read XD registers. Called by `get`.""" - if start < 1 or start > 8: - raise ValueError('YD must be in [1, 8].') - if end is not None and (end < 1 or end > 500): - raise ValueError('YD end must be in [1, 8].') - address = 57344 + 2 * (start - 1) - count = 1 if end is None else (end - start + 1) - registers = await self.read_registers(address, count) + # check ranges + if start < 0 or start > 8: + raise ValueError('YD must be in [0, 8].') + if end is not None and (end < 0 or end > 8): + raise ValueError('YD end must be in [0, 8].') + # calculate address + address = int(57344 + 2 * (start)) + + # see documentation for `self.u_index()` + _adjusted_start = self.u_index(start) + count = 1 if end is None else (self.u_index(end) - _adjusted_start + 1) + + # ok so that count variable is getting used in two places. + # here, where we're determining how many registers to read, + # and at the bottom of the function, where we determine how many + # to spit back out at the user. the problem is, there's a blank address + # between each of these items - except between 0 and 1. that's 0u. + _addresses = ("0", "0u", "1", "2", "3", "4", "5", "6", "7", "8") + # at this point we have the adjusted_start, so we can just say "how + # many numbers past 1 are there?" + + _adjusted_count = int((end - start) * 2 + 1) if end is not None else 1 + registers = await self.read_registers(address, _adjusted_count) if not registers or len(registers) < count : raise ValueError("Failed to read correct number of registers.") decoder = BinaryPayloadDecoder.fromRegisters(registers, byteorder=self.bigendian, wordorder=self.lilendian) - if end is None : return decoder.decode_16bit_uint() - return {f'td{n}' : decoder.decode_16bit_uint() for n in range(start, end + 1)} - - async def _get_yd(self, start: int, end: int | None) -> dict: + # this still works - it's just one value + if end is None: + return decoder.decode_16bit_uint() + + # honestly this is a complete mess and i should come back and make it not nasty + _values: dict[str, int] = {} + + # if the start is yd0 or yd0u, we need some kind of special case + _start_false = start + while _start_false < 1 : + n = int(_start_false * 2) + _values[f'xd{_addresses[n]}'] = decoder.decode_16bit_uint() + _start_false += 0.5 + + # otherwise go through all the other ones, store the good uint_16 + # and decode the bad one. + if end >= 1: + for n in range(max(_adjusted_start, 2), _adjusted_start + count): + _values[f'xd{_addresses[n]}'] = decoder.decode_16bit_uint() + if n != _adjusted_start + count - 1 : + decoder.decode_16bit_uint() + + return _values + + async def _get_yd(self, start: int | float, end: int | float | None) -> dict: """Read YD registers. Called by `get`.""" - if start < 1 or start > 8: - raise ValueError('YD must be in [1, 8].') - if end is not None and (end < 1 or end > 500): - raise ValueError('YD end must be in [1, 8].') - address = 57856 + 2 * (start - 1) - count = 1 if end is None else (end - start + 1) - registers = await self.read_registers(address, count) + # check ranges + if start < 0 or start > 8: + raise ValueError('YD must be in [0, 8].') + if end is not None and (end < 0 or end > 8): + raise ValueError('YD end must be in [0, 8].') + # calculate address + address = int(57856 + 2 * (start)) + + # see documentation for `self.u_index()` + _adjusted_start = self.u_index(start) + count = 1 if end is None else (self.u_index(end) - _adjusted_start + 1) + + # ok so that count variable is getting used in two places. + # here, where we're determining how many registers to read, + # and at the bottom of the function, where we determine how many + # to spit back out at the user. the problem is, there's a blank address + # between each of these items - except between 0 and 1. that's 0u. + _addresses = ("0", "0u", "1", "2", "3", "4", "5", "6", "7", "8") + # at this point we have the adjusted_start, so we can just say "how + # many numbers past 1 are there?" + + _adjusted_count = int((end - start) * 2 + 1) if end is not None else 1 + registers = await self.read_registers(address, _adjusted_count) if not registers or len(registers) < count : raise ValueError("Failed to read correct number of registers.") decoder = BinaryPayloadDecoder.fromRegisters(registers, byteorder=self.bigendian, wordorder=self.lilendian) - if end is None : return decoder.decode_16bit_uint() - return {f'td{n}' : decoder.decode_16bit_uint() for n in range(start, end + 1)} + # this still works - it's just one value + if end is None: + return decoder.decode_16bit_uint() + + # honestly this is a complete mess and i should come back and make it not nasty + _values: dict[str, int] = {} + + # if the start is yd0 or yd0u, we need some kind of special case + _start_false = start + while _start_false < 1 : + n = int(_start_false * 2) + _values[f'yd{_addresses[n]}'] = decoder.decode_16bit_uint() + _start_false += 0.5 + + # otherwise go through all the other ones, store the good uint_16 + # and decode the bad one. + if end >= 1: + for n in range(max(_adjusted_start, 2), _adjusted_start + count): + _values[f'yd{_addresses[n]}'] = decoder.decode_16bit_uint() + if n != _adjusted_start + count - 1 : + decoder.decode_16bit_uint() + + return _values async def _get_td(self, start: int, end: int | None) -> dict | int: """Read TD registers. Called by `get`. @@ -741,23 +842,74 @@ async def _set_dh(self, start: int, data: list[int]): async def _set_yd(self, start: int, data: list[int]): """Set YD registers. Called by `set`.""" - if start < 1 or start > 8: - raise ValueError("YD must bein [1, 8]") - address = 57856 + 2 * (start - 1) - - if len(data) > 8 - start + 1 : - raise ValueError("Data list is longer than available addresses.") - - # pymodbus is expecting list[uint_16] - # convert each hex?? - values: list[bytes] = [] + # make sure the values are correct + # side note: yd0u will come in with start == 0.5 + if start < 0 or start > 8: + raise ValueError("YD must be in [0, 8]") + # make sure all the data is an int16 for datum in data: - packed_4_bytes = struct.pack(' 16: + raise ValueError(f"Datum {datum} is longer than 16 bits. YD registers cannot hold more than 16 bits.") + # get the correct starting address + address = int(57856 + 2 * (start)) + + # i am going to do this horribly. anyone who comes after me and wants + # to fix it is absolutely welcome to. + horrible_index = self.u_index(start) + if len(data) > 10 - horrible_index: + raise ValueError( + "Data list is longer than available addresses. " + + "Make sure you're accounting for YD0u!" + ) + + # yd sucks. sorry i have to do it like this + values: list[int] = [] + + + for i, datum in enumerate(data): + if (start == 0 and i in (0, 1)) or (start == 0.5 and i == 0): + values.append(datum) + else : + values.extend((datum, 0x0000)) + # remove the last (0x0000). is this necessary? maybe not. i just don't + # want to run into a modbus issue where i'm trying to write something + # past YD8 (so technically YD8u), and it decides to go bananas. + values.pop() await self.write_registers(address, values) return - + + @staticmethod + def u_index(x: int | float) -> int: + """Here's the deal with this method. + + I had to denote for XD and YD if somebody was trying to get/set + XD0u or YD0u. So if that happens, then I pass through 0.5 as the + start or end. The problem is, this messes with trying to figure out how many + values are to be returned / to be set. So this just orders them + in a normal `int` value. + + ``` + u_index(0) + >>> 0 + u_index(0.5) + >>> 1 + u_index(1) + >>> 2 + u_index(2) + >>> 3 + # etc... + ``` + + """ + if x == 0 : + return 0 + if x == 0.5: + return 1 + if isinstance(x, float) : + raise ValueError(f"You cannot send {x} into 'u_index'. It is a float that is not 0.5.") + return x + 1 + async def _set_td(self, start: int, data: list[int]): """Set TD registers. Called by `set`. From e3d94eee4add3d688775f632233a95857853beaf Mon Sep 17 00:00:00 2001 From: andygarcha-adc Date: Tue, 12 Aug 2025 15:35:39 -0400 Subject: [PATCH 3/6] Add _convert_from_registers method Introduces a _convert_from_registers helper to AsyncioModbusClient for converting register data using the underlying client. Minor formatting change in driver.py for address validation. --- clickplc/driver.py | 2 +- clickplc/util.py | 18 +++++++++++++++++- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/clickplc/driver.py b/clickplc/driver.py index 1d7f058..23d0e3e 100644 --- a/clickplc/driver.py +++ b/clickplc/driver.py @@ -203,7 +203,7 @@ async def _get_x(self, start: int, end: int | None) -> dict: a number of addresses not divisible by 8, it will have extra data. The extra data here is discarded before returning. """ - if start % 100 == 0 or start % 100 > 16: + if (start % 100 == 0 or start % 100 > 16): raise ValueError('X start address must be *01-*16.') if start < 1 or start > 816: raise ValueError('X start address must be in [001, 816].') diff --git a/clickplc/util.py b/clickplc/util.py index 15eb25a..fa137df 100644 --- a/clickplc/util.py +++ b/clickplc/util.py @@ -7,7 +7,8 @@ from __future__ import annotations import asyncio -from typing import Literal +from enum import Enum +from typing import Any, Literal try: from pymodbus.client import AsyncModbusTcpClient, AsyncModbusSerialClient # 3.x @@ -88,6 +89,21 @@ async def read_registers(self, address: int, count): r = await self._request('read_holding_registers', address=address, count=count) registers += r.registers return registers + + + def _convert_from_registers( + self, + registers: list[int], + data_type: Any, + word_order: Literal['big', 'little'] = 'big', + string_encoding: str = 'utf-8' + ) -> int | float | str | list[bool] | list[int] | list[float]: + return self.client.convert_from_registers( + registers, + data_type, + word_order=word_order, + string_encoding=string_encoding + ) async def write_coils(self, address: int, values): """Write modbus coils.""" From 5c5d45d5ccc9ef278a576632052c206dcc18a6a0 Mon Sep 17 00:00:00 2001 From: andygarcha-adc Date: Tue, 12 Aug 2025 15:38:51 -0400 Subject: [PATCH 4/6] Refactor register decoding in driver Replaced manual struct packing/unpacking and BinaryPayloadDecoder usage with unified _convert_from_registers calls for register value decoding. Updated methods to handle single and multiple register reads more consistently, improved handling of special cases for XD and YD registers, and adjusted string decoding logic for TXT registers. Updated type hints and imports to reflect changes. --- clickplc/driver.py | 226 ++++++++++++++++++++++----------------------- clickplc/util.py | 2 +- 2 files changed, 112 insertions(+), 116 deletions(-) diff --git a/clickplc/driver.py b/clickplc/driver.py index 23d0e3e..3a5862e 100644 --- a/clickplc/driver.py +++ b/clickplc/driver.py @@ -13,9 +13,9 @@ import struct from collections import defaultdict from string import digits -from typing import Any, ClassVar, Literal, overload +from typing import Any, ClassVar, Literal, Union, overload -from pymodbus.payload import BinaryPayloadDecoder +from pymodbus.constants import Endian from clickplc.util import AsyncioModbusClient @@ -222,6 +222,7 @@ async def _get_x(self, start: int, end: int | None) -> dict: coils = await self.read_coils(start_coil, count) output = {} current = start + # NOTE: could use client.convert_to_registers for bit in coils.bits: if current > end: break @@ -271,6 +272,7 @@ async def _get_y(self, start: int, end: int | None) -> dict: coils = await self.read_coils(start_coil, count) output = {} current = start + # NOTE: could use client.convert_to_registers for bit in coils.bits: if current > end: break @@ -303,6 +305,7 @@ async def _get_c(self, start: int, end: int | None) -> dict | bool: end_coil = 16384 + end - 1 count = end_coil - start_coil + 1 coils = await self.read_coils(start_coil, count) + # NOTE: could use client.convert_to_registers return {f'c{(start + i)}': bit for i, bit in enumerate(coils.bits) if i < count} async def _get_t(self, start: int, end: int | None) -> dict | bool: @@ -328,6 +331,7 @@ async def _get_t(self, start: int, end: int | None) -> dict | bool: end_coil = 14555 + end - 1 count = end_coil - start_coil + 1 coils = await self.read_coils(start_coil, count) + # NOTE: could use client.convert_to_registers return {f't{(start + i)}': bit for i, bit in enumerate(coils.bits) if i < count} async def _get_ct(self, start: int, end: int | None) -> dict | bool: @@ -354,6 +358,7 @@ async def _get_ct(self, start: int, end: int | None) -> dict | bool: count = end_coil - start_coil + 1 coils = await self.read_coils(start_coil, count) + # NOTE: could use client.convert_to_registers return {f'ct{(start + i)}': bit for i, bit in enumerate(coils.bits) if i < count} async def _get_sc(self, start: int, end: int | None) -> dict | bool: @@ -386,9 +391,10 @@ async def _get_sc(self, start: int, end: int | None) -> dict | bool: end_coil = 61440 + (end - 1) count = end_coil - start_coil + 1 coils = await self.read_coils(start_coil, count) + # NOTE: could use client.convert_to_registers return {f'sc{start + i}': bit for i, bit in enumerate(coils.bits) if i < count} - async def _get_ds(self, start: int, end: int | None) -> dict | int: + async def _get_ds(self, start: int, end: int | None) -> Union[dict[str, int], int]: """Read DS registers. Called by `get`. DS entries start at Modbus address 0 (1 in the Click software's @@ -402,12 +408,15 @@ async def _get_ds(self, start: int, end: int | None) -> dict | int: address = 0 + start - 1 count = 1 if end is None else (end - start + 1) registers = await self.read_registers(address, count) - # pack all as unsigned 16-bit little-endian and then unpack as signed 16-bit ints - packed = struct.pack(f'<{count}H', *registers) - values = struct.unpack(f'<{count}h', packed) - if count == 1: - return values[0] - return {f'ds{start + i}': v for i, v in enumerate(values)} + register_values: int | list[int] = self.client.convert_from_registers(registers, data_type=self.client.DATATYPE.INT16) # type: ignore + # one item was returned + if isinstance(register_values, int): + return register_values + + # more than one item is returned + if end is not None: + return {f'ds{n}': register_values[_index] for _index, n in enumerate(range(start, end + 1))} + raise ValueError(f"PyModbus has failed to return the correct type.") async def _get_dd(self, start: int, end: int | None) -> dict | int: if start < 1 or start > 1000: @@ -418,15 +427,16 @@ async def _get_dd(self, start: int, end: int | None) -> dict | int: address = 16384 + 2 * (start - 1) count = 2 if end is None else 2 * (end - start + 1) registers = await self.read_registers(address, count) - - # Pack registers as 16-bit unsigned shorts, little-endian ('<'), then unpack as signed 32-bit ints - packed = struct.pack(f'<{count}H', *registers) - values = struct.unpack(f'<{count // 2}i', packed) # 'i' = signed 32-bit int - - if count == 2: - return values[0] - return {f'dd{start + i}': v for i, v in enumerate(values)} - + registers_real = registers.copy() + for i, item in enumerate(registers): + if i % 2: + registers_real[i - 1] = item + else: + registers_real[i + 1] = item + register_values = self._convert_from_registers(registers_real, data_type=self.client.DATATYPE.INT32) + if end is None: + return register_values + return {f"dd{n}": register_values[_index] for _index, n in enumerate(range(start, end + 1))} async def _get_dh(self, start: int, end: int | None) -> dict | int: if start < 1 or start > 500: @@ -437,11 +447,10 @@ async def _get_dh(self, start: int, end: int | None) -> dict | int: address = 24576 + start - 1 count = 1 if end is None else (end - start + 1) registers = await self.read_registers(address, count) - - if count == 1: - return int(registers[0]) # unsigned 16-bit int can just cast with int() - return {f'dh{start + n}': int(v) for n, v in enumerate(registers)} - + register_values = self._convert_from_registers(registers, data_type=self.client.DATATYPE.INT16) + if end is None: + return register_values + return {f"dh{n}": register_values[_index] for _index, n in enumerate(range(start, end + 1))} async def _get_df(self, start: int, end: int | None) -> dict | float: """Read DF registers. Called by `get`. @@ -458,14 +467,21 @@ async def _get_df(self, start: int, end: int | None) -> dict | float: address = 28672 + 2 * (start - 1) count = 2 * (1 if end is None else (end - start + 1)) registers = await self.read_registers(address, count) - decoder = BinaryPayloadDecoder.fromRegisters(registers, - byteorder=self.bigendian, - wordorder=self.lilendian) + registers_real = registers.copy() + + for i, item in enumerate(registers): + if i % 2: + registers_real[i - 1] = item + else: + registers_real[i + 1] = item + print(registers_real) + register_values = self._convert_from_registers(registers_real, data_type=self.client.DATATYPE.FLOAT32) + print(register_values) if end is None: - return decoder.decode_32bit_float() - return {f'df{n}': decoder.decode_32bit_float() for n in range(start, end + 1)} - - async def _get_xd(self, start: int, end: int | None) -> dict: + return register_values + return {f'df{n}': register_values[_index] for _index, n in enumerate(range(start, end + 1))} + + async def _get_xd(self, start: int, end: int | None) -> dict: """Read XD registers. Called by `get`.""" # check ranges if start < 0 or start > 8: @@ -493,31 +509,26 @@ async def _get_xd(self, start: int, end: int | None) -> dict: if not registers or len(registers) < count : raise ValueError("Failed to read correct number of registers.") - decoder = BinaryPayloadDecoder.fromRegisters(registers, - byteorder=self.bigendian, - wordorder=self.lilendian) + register_values = self._convert_from_registers(registers, data_type=self.client.DATATYPE.INT16) # this still works - it's just one value if end is None: - return decoder.decode_16bit_uint() + return register_values # honestly this is a complete mess and i should come back and make it not nasty _values: dict[str, int] = {} # if the start is yd0 or yd0u, we need some kind of special case - _start_false = start - while _start_false < 1 : - n = int(_start_false * 2) - _values[f'xd{_addresses[n]}'] = decoder.decode_16bit_uint() - _start_false += 0.5 - - # otherwise go through all the other ones, store the good uint_16 - # and decode the bad one. - if end >= 1: - for n in range(max(_adjusted_start, 2), _adjusted_start + count): - _values[f'xd{_addresses[n]}'] = decoder.decode_16bit_uint() - if n != _adjusted_start + count - 1 : - decoder.decode_16bit_uint() - + # case 0: start was 0 + if _adjusted_start < 1: + _values['xd0'] = register_values.pop(0) + # case 1: start was 0 or 0u + if _adjusted_start < 2: + _values['xd0u'] = register_values.pop(0) + # normal case + for n in range(max(_adjusted_start, 2), _adjusted_start + count): + _values[f'xd{_addresses[n]}'] = register_values.pop(0) + if n != _adjusted_start + count - 1: + register_values.pop(0) return _values async def _get_yd(self, start: int | float, end: int | float | None) -> dict: @@ -534,45 +545,28 @@ async def _get_yd(self, start: int | float, end: int | float | None) -> dict: _adjusted_start = self.u_index(start) count = 1 if end is None else (self.u_index(end) - _adjusted_start + 1) - # ok so that count variable is getting used in two places. - # here, where we're determining how many registers to read, - # and at the bottom of the function, where we determine how many - # to spit back out at the user. the problem is, there's a blank address - # between each of these items - except between 0 and 1. that's 0u. - _addresses = ("0", "0u", "1", "2", "3", "4", "5", "6", "7", "8") - # at this point we have the adjusted_start, so we can just say "how - # many numbers past 1 are there?" - _adjusted_count = int((end - start) * 2 + 1) if end is not None else 1 registers = await self.read_registers(address, _adjusted_count) if not registers or len(registers) < count : raise ValueError("Failed to read correct number of registers.") - - decoder = BinaryPayloadDecoder.fromRegisters(registers, - byteorder=self.bigendian, - wordorder=self.lilendian) + register_values = self._convert_from_registers(registers, data_type=self.client.DATATYPE.INT16) # this still works - it's just one value if end is None: - return decoder.decode_16bit_uint() + return register_values - # honestly this is a complete mess and i should come back and make it not nasty _values: dict[str, int] = {} - # if the start is yd0 or yd0u, we need some kind of special case - _start_false = start - while _start_false < 1 : - n = int(_start_false * 2) - _values[f'yd{_addresses[n]}'] = decoder.decode_16bit_uint() - _start_false += 0.5 - - # otherwise go through all the other ones, store the good uint_16 - # and decode the bad one. - if end >= 1: - for n in range(max(_adjusted_start, 2), _adjusted_start + count): - _values[f'yd{_addresses[n]}'] = decoder.decode_16bit_uint() - if n != _adjusted_start + count - 1 : - decoder.decode_16bit_uint() - + # case 0: start was 0 + if _adjusted_start < 1: + _values['yd0'] = register_values.pop(0) + # case 1: start was 0 or 0u + if _adjusted_start < 2: + _values['yd0u'] = register_values.pop(0) + # normal case + for n in range(max(_adjusted_start, 2), _adjusted_start + count): + _values[f'yd{n - 1}'] = register_values.pop(0) + if n != _adjusted_start + count - 1: + register_values.pop(0) return _values async def _get_td(self, start: int, end: int | None) -> dict | int: @@ -589,14 +583,10 @@ async def _get_td(self, start: int, end: int | None) -> dict | int: address = 45056 + (start - 1) count = 1 if end is None else (end - start + 1) registers = await self.read_registers(address, count) - - # pack all as unsigned 16-bit little-endian and then unpack as signed 16-bit ints - packed = struct.pack(f'<{count}H', *registers) - values = struct.unpack(f'<{count}h', packed) - if count == 1: - return values[0] - return {f'td{start + i}': v for i, v in enumerate(values)} - + register_values = self._convert_from_registers(registers, data_type=self.client.DATATYPE.INT16) + if end is None: + return register_values + return {f"td{n}": register_values.pop(0) for n in range(start, end + 1)} async def _get_ctd(self, start: int, end: int | None) -> dict: """Read CTD registers. Called by `get`. @@ -610,14 +600,18 @@ async def _get_ctd(self, start: int, end: int | None) -> dict: raise ValueError('CTD end must be in [1, 250]') address = 49152 + 2 * (start - 1) # 32-bit - count = 2 if end is None else 2 * (end - start + 1) - registers = await self.read_registers(address, count) - - # pack the pairs of 16-bit registers (little-endian) and then unpack as 32-byte signed ints - print(registers, count) - packed = struct.pack(f'<{count}H', *registers) - values = struct.unpack(f'<{count // 2}i', packed) - return {f'ctd{start + n}': v for n, v in enumerate(values)} + count = 1 if end is None else (end - start + 1) + registers = await self.read_registers(address, count * 2) + registers_real = registers.copy() + for i, item in enumerate(registers): + if i % 2: + registers_real[i - 1] = item + else: + registers_real[i + 1] = item + register_values = self._convert_from_registers(registers_real, data_type=self.client.DATATYPE.INT32) + if end is None: + return register_values + return {f"ctd{n}": register_values.pop(0) for n in range(start, end + 1)} async def _get_sd(self, start: int, end: int | None) -> dict | int: """Read SD registers. Called by `get`. @@ -633,10 +627,10 @@ async def _get_sd(self, start: int, end: int | None) -> dict | int: address = 61440 + start - 1 count = 1 if end is None else (end - start + 1) registers = await self.read_registers(address, count) - - if count == 1 and end is None: - return int(registers[0]) # unsigned 16-bit int can just cast with int() - return {f'sd{start + n}': int(v) for n, v in enumerate(registers)} + register_values = self._convert_from_registers(registers, data_type=self.client.DATATYPE.INT16) + if end is None: + return register_values + return {f"sd{n}": register_values.pop(0) for n in range(start, end + 1)} @overload async def _get_txt(self, start: int, end: None) -> str: ... @@ -659,23 +653,27 @@ async def _get_txt(self, start, end): address = 36864 + (start - 1) // 2 if end is None: registers = await self.read_registers(address, 1) - r = registers[0] - assert isinstance(r, int) - if start % 2: - return chr(r & 0x00FF) # if starting on the second byte of a 16-bit register, discard the MSB - return chr((r >> 8) & 0x00FF) # otherwise discard LSB + return self._convert_from_registers(registers, data_type=self.client.DATATYPE.STRING) count = 1 + (end - start) // 2 + (start - 1) % 2 + if (start % 2) == (end % 2) == 0: + count -= 1 registers = await self.read_registers(address, count) - - # Swap the two bytes within each 16-bit register (i.e., 0x4231 -> 0x3142) - swapped = [((reg & 0xFF) << 8) | (reg >> 8) for reg in registers] - byte_data = b''.join(reg.to_bytes(2, 'big') for reg in swapped) - r = byte_data.decode('ascii') + register_values: str = self._convert_from_registers(registers, data_type=self.client.DATATYPE.STRING) + if len(register_values) < count * 2: + raise Exception("You are requesting more text than has been put into the Click PLC.") + + r = '' + i = 0 + for _ in range(count): + msb = register_values[2 * _] + lsb = register_values[2 * _ + 1] + r += lsb + msb if end % 2: # if ending on the first byte of a 16-bit register, discard the final LSB r = r[:-1] if not start % 2: r = r[1:] # if starting on the last byte of a 16-bit register, discard the first MSB + assert len(r) == (end - start + 1), f"{len(r)=}, {end-start+1=}, {r=}" return {f'txt{start}-txt{end}': r} async def _set_y(self, start: int, data: list[bool]): @@ -864,18 +862,16 @@ async def _set_yd(self, start: int, data: list[int]): # yd sucks. sorry i have to do it like this values: list[int] = [] - - - + extended_zero = False for i, datum in enumerate(data): if (start == 0 and i in (0, 1)) or (start == 0.5 and i == 0): values.append(datum) else : + extended_zero = True values.extend((datum, 0x0000)) - # remove the last (0x0000). is this necessary? maybe not. i just don't - # want to run into a modbus issue where i'm trying to write something - # past YD8 (so technically YD8u), and it decides to go bananas. - values.pop() + # remove the last (0x0000) + if extended_zero: + values.pop() await self.write_registers(address, values) return diff --git a/clickplc/util.py b/clickplc/util.py index fa137df..f231e56 100644 --- a/clickplc/util.py +++ b/clickplc/util.py @@ -97,7 +97,7 @@ def _convert_from_registers( data_type: Any, word_order: Literal['big', 'little'] = 'big', string_encoding: str = 'utf-8' - ) -> int | float | str | list[bool] | list[int] | list[float]: + ) -> Any: #int | float | str | list[bool] | list[int] | list[float]: return self.client.convert_from_registers( registers, data_type, From 9de5f6f78efb45e29fb0b97b0785acfb342299d3 Mon Sep 17 00:00:00 2001 From: andygarcha-adc Date: Tue, 12 Aug 2025 15:42:23 -0400 Subject: [PATCH 5/6] Linting --- clickplc/driver.py | 17 +++++++++-------- clickplc/util.py | 10 +++++----- 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/clickplc/driver.py b/clickplc/driver.py index 3a5862e..69605da 100644 --- a/clickplc/driver.py +++ b/clickplc/driver.py @@ -13,7 +13,7 @@ import struct from collections import defaultdict from string import digits -from typing import Any, ClassVar, Literal, Union, overload +from typing import Any, ClassVar, Literal, overload from pymodbus.constants import Endian @@ -37,7 +37,7 @@ class ClickPLC(AsyncioModbusClient): 'ds': 'int16', # (D)ata register (s)ingle 'dd': 'int32', # (D)ata register, (d)ouble 'dh': 'int16', # (D)ata register, (h)ex - 'df': 'float', # (D)ata register (f)loating + 'df': 'float', # (D)ata register (f)loating 'xd': 'int32', # Input Register 'yd': 'int32', # Output Register 'td': 'int16', # (T)imer register @@ -394,7 +394,7 @@ async def _get_sc(self, start: int, end: int | None) -> dict | bool: # NOTE: could use client.convert_to_registers return {f'sc{start + i}': bit for i, bit in enumerate(coils.bits) if i < count} - async def _get_ds(self, start: int, end: int | None) -> Union[dict[str, int], int]: + async def _get_ds(self, start: int, end: int | None) -> dict[str, int] | int: """Read DS registers. Called by `get`. DS entries start at Modbus address 0 (1 in the Click software's @@ -408,15 +408,17 @@ async def _get_ds(self, start: int, end: int | None) -> Union[dict[str, int], in address = 0 + start - 1 count = 1 if end is None else (end - start + 1) registers = await self.read_registers(address, count) - register_values: int | list[int] = self.client.convert_from_registers(registers, data_type=self.client.DATATYPE.INT16) # type: ignore + register_values: int | list[int] = self.client.convert_from_registers( + registers, data_type=self.client.DATATYPE.INT16 + ) # type: ignore # one item was returned if isinstance(register_values, int): return register_values - + # more than one item is returned if end is not None: return {f'ds{n}': register_values[_index] for _index, n in enumerate(range(start, end + 1))} - raise ValueError(f"PyModbus has failed to return the correct type.") + raise ValueError("PyModbus has failed to return the correct type.") async def _get_dd(self, start: int, end: int | None) -> dict | int: if start < 1 or start > 1000: @@ -664,7 +666,6 @@ async def _get_txt(self, start, end): raise Exception("You are requesting more text than has been put into the Click PLC.") r = '' - i = 0 for _ in range(count): msb = register_values[2 * _] lsb = register_values[2 * _ + 1] @@ -930,7 +931,7 @@ async def _set_ctd(self, start: int, data: list[int]): address = 49152 + 2 * (start - 1) if len(data) > 250 - start + 1: raise ValueError('Data list longer than available addresses.') - + # pymodbus is expectin list[uint_16] # convert each int_32 into a uint_16 pair (little-endian) with the same byte value diff --git a/clickplc/util.py b/clickplc/util.py index f231e56..5fc08ea 100644 --- a/clickplc/util.py +++ b/clickplc/util.py @@ -7,14 +7,14 @@ from __future__ import annotations import asyncio -from enum import Enum from typing import Any, Literal try: - from pymodbus.client import AsyncModbusTcpClient, AsyncModbusSerialClient # 3.x + from pymodbus.client import AsyncModbusSerialClient, AsyncModbusTcpClient # 3.x except ImportError: # 2.4.x - 2.5.x from pymodbus.client.asynchronous.async_io import ( # type: ignore - ReconnectingAsyncioModbusTcpClient, ReconnectingAsyncioModbusSerialClient + ReconnectingAsyncioModbusSerialClient, + ReconnectingAsyncioModbusTcpClient, ) import pymodbus.exceptions @@ -39,7 +39,7 @@ def __init__(self, address, timeout=1, interfacetype: Literal["TCP", "Serial"] = elif interfacetype == "TCP": # 2.x self.client = ReconnectingAsyncioModbusTcpClient() # pyright: ignore [reportPossiblyUnboundVariable] elif interfacetype == "Serial": - + self.client = ReconnectingAsyncioModbusSerialClient() # pyright: ignore [reportPossiblyUnboundVariable] self.lock = asyncio.Lock() self.connectTask = asyncio.create_task(self._connect()) @@ -89,7 +89,7 @@ async def read_registers(self, address: int, count): r = await self._request('read_holding_registers', address=address, count=count) registers += r.registers return registers - + def _convert_from_registers( self, From 697fe7ae8e1bd9bf7b38b0a0711a41776014458e Mon Sep 17 00:00:00 2001 From: andygarcha-adc Date: Tue, 12 Aug 2025 15:45:56 -0400 Subject: [PATCH 6/6] Add serial connection parameters to PLC driver Added support for configuring baudrate, parity, stopbits, and bytesize in both ClickPLC and AsyncioModbusClient constructors. These parameters are now passed to the underlying serial client for improved serial connection flexibility. --- clickplc/driver.py | 21 +++++++++++++++++++-- clickplc/util.py | 19 +++++++++++++++++-- 2 files changed, 36 insertions(+), 4 deletions(-) diff --git a/clickplc/driver.py b/clickplc/driver.py index 69605da..186f0c9 100644 --- a/clickplc/driver.py +++ b/clickplc/driver.py @@ -46,7 +46,16 @@ class ClickPLC(AsyncioModbusClient): 'txt': 'str', # ASCII Text } - def __init__(self, address, tag_filepath='', timeout=1, interfacetype: Literal["TCP", "Serial"] = 'TCP'): + def __init__(self, + address, + tag_filepath='', + timeout=1, + interfacetype: Literal["TCP", "Serial"] = 'TCP', + *, + baudrate=38400, + parity='O', + stopbits=1, + bytesize=8): """Initialize PLC connection and data structure. Args: @@ -55,7 +64,15 @@ def __init__(self, address, tag_filepath='', timeout=1, interfacetype: Literal[" timeout (optional): Timeout when communicating with PLC. Default 1s. """ - super().__init__(address, timeout, interfacetype) + super().__init__( + address, + timeout, + interfacetype, + baudrate=baudrate, + parity=parity, + stopbits=stopbits, + bytesize=bytesize + ) self.bigendian = Endian.BIG if self.pymodbus35plus else Endian.Big # type:ignore[attr-defined] self.lilendian = Endian.LITTLE if self.pymodbus35plus else Endian.Little # type:ignore[attr-defined] self.tags = self._load_tags(tag_filepath) diff --git a/clickplc/util.py b/clickplc/util.py index 5fc08ea..ea7e830 100644 --- a/clickplc/util.py +++ b/clickplc/util.py @@ -26,7 +26,15 @@ class AsyncioModbusClient: including standard timeouts, async context manager, and queued requests. """ - def __init__(self, address, timeout=1, interfacetype: Literal["TCP", "Serial"] = "TCP"): + def __init__(self, + address, + timeout=1, + interfacetype: Literal["TCP", "Serial"] = "TCP", + *, + baudrate=38400, + parity='O', + stopbits=1, + bytesize=8): """Set up communication parameters.""" self.ip = address self.port = 5020 if address == '127.0.0.1' else 502 # pymodbus simulator is 127.0.0.1:5020 @@ -35,7 +43,14 @@ def __init__(self, address, timeout=1, interfacetype: Literal["TCP", "Serial"] = if self.pymodbus30plus and interfacetype == "TCP": self.client = AsyncModbusTcpClient(address, timeout=timeout) # pyright: ignore [reportPossiblyUnboundVariable] elif self.pymodbus30plus and interfacetype == "Serial": - self.client = AsyncModbusSerialClient(address, timeout=timeout) + self.client = AsyncModbusSerialClient( + port=address, + timeout=timeout, + baudrate=baudrate, + bytesize=bytesize, + parity=parity, + stopbits=stopbits + ) elif interfacetype == "TCP": # 2.x self.client = ReconnectingAsyncioModbusTcpClient() # pyright: ignore [reportPossiblyUnboundVariable] elif interfacetype == "Serial":