diff --git a/README.md b/README.md index 02ad60e..fd7d2bc 100644 --- a/README.md +++ b/README.md @@ -59,20 +59,24 @@ The entire API is `get` and `set`, and takes a range of inputs: Currently, the following datatypes are supported: -| | | | -|---|---|---| -| x | bool | Input point | -| y | bool | Output point | -| c | bool | (C)ontrol relay | -| t | bool | (T)imer | -| ct | bool | (C)oun(t)er | -| ds | int16 | (D)ata register, (s)ingle signed int | -| dd | int32 | (D)ata register, (d)double signed int | -| dh | uint16| (D) register, (h)ex | -| df | float | (D)ata register, (f)loating point | -| td | int16 | (T)ime (d)elay register | -| ctd | int32 | (C)oun(t)er Current Values, (d)ouble int | -| sd | int16 | (S)ystem (D)ata register | +| | | | | +|---|---|---|---| +| x | bool | Input point | R | +| y | bool | Output point | RW | +| c | bool | (C)ontrol relay | RW | +| t | bool | (T)imer | R | +| ct | bool | (C)oun(t)er | RW | +| sc | bool | (S)ystem (C)ontrol Relay | mixed | +| ds | int16 | (D)ata register, (s)ingle signed int | RW | +| dd | int32 | (D)ata register, (d)double signed int | RW | +| dh | uint16| (D) register, (h)ex | RW | +| df | float | (D)ata register, (f)loating point | RW | +| xd | uint32| Input Register | R | +| yd | uint32| Output Register | RW | +| td | int16 | (T)ime (d)elay register | RW | +| ctd | int32 | (C)oun(t)er Current Values, (d)ouble int | RW | +| sd | int16 | (S)ystem (D)ata register | mixed | +| txt| char| (T)e(xt) | RW | ### Tags / Nicknames diff --git a/clickplc/driver.py b/clickplc/driver.py index 1c6d757..186f0c9 100644 --- a/clickplc/driver.py +++ b/clickplc/driver.py @@ -13,7 +13,9 @@ import struct from collections import defaultdict from string import digits -from typing import Any, ClassVar, overload +from typing import Any, ClassVar, Literal, overload + +from pymodbus.constants import Endian from clickplc.util import AsyncioModbusClient @@ -25,7 +27,7 @@ class ClickPLC(AsyncioModbusClient): abstracting corner cases and providing a simple asynchronous interface. """ - data_types: ClassVar[dict] = { + data_types: ClassVar[dict[str, str]] = { 'x': 'bool', # Input point 'y': 'bool', # Output point 'c': 'bool', # (C)ontrol relay @@ -35,14 +37,25 @@ class ClickPLC(AsyncioModbusClient): 'ds': 'int16', # (D)ata register (s)ingle 'dd': 'int32', # (D)ata register, (d)ouble 'dh': 'int16', # (D)ata register, (h)ex - 'df': 'float', # (D)ata register (f)loating point + 'df': 'float', # (D)ata register (f)loating + 'xd': 'int32', # Input Register + 'yd': 'int32', # Output Register 'td': 'int16', # (T)imer register 'ctd': 'int32', # (C)oun(t)er Current values, (d)ouble 'sd': 'int16', # (S)ystem (D)ata register, single 'txt': 'str', # ASCII Text } - def __init__(self, address, tag_filepath='', timeout=1): + def __init__(self, + address, + tag_filepath='', + timeout=1, + interfacetype: Literal["TCP", "Serial"] = 'TCP', + *, + baudrate=38400, + parity='O', + stopbits=1, + bytesize=8): """Initialize PLC connection and data structure. Args: @@ -51,7 +64,17 @@ def __init__(self, address, tag_filepath='', timeout=1): timeout (optional): Timeout when communicating with PLC. Default 1s. """ - super().__init__(address, timeout) + super().__init__( + address, + timeout, + interfacetype, + baudrate=baudrate, + parity=parity, + stopbits=stopbits, + bytesize=bytesize + ) + self.bigendian = Endian.BIG if self.pymodbus35plus else Endian.Big # type:ignore[attr-defined] + self.lilendian = Endian.LITTLE if self.pymodbus35plus else Endian.Little # type:ignore[attr-defined] self.tags = self._load_tags(tag_filepath) self.active_addresses = self._get_address_ranges(self.tags) @@ -106,8 +129,10 @@ async def get(self, address: str | None = None) -> dict: else: start, end = address, None i = next(i for i, s in enumerate(start) if s.isdigit()) - category, start_index = start[:i].lower(), int(start[i:]) - end_index = None if end is None else int(end[i:]) + + category = start[:i].lower() + start_index = 0.5 if start[i:].lower() == "0u" else int(start[i:]) + end_index = 0.5 if end is not None and end[i:].lower() == "0u" else None if end is None else int(end[i:]) if end_index is not None and end_index <= start_index: raise ValueError("End address must be greater than start address.") @@ -135,19 +160,44 @@ async def set(self, address: str, data): """ if address in self.tags: address = self.tags[address]['id'] + # if only one piece of data was sent in, just make it a list anyway. if not isinstance(data, list): data = [data] - i = next(i for i, s in enumerate(address) if s.isdigit()) - category, index = address[:i].lower(), int(address[i:]) + # this will find the first digit in the address + # to divide it into the "letter" part and the "number" part + # the problem is, we have to put some special stuff in here + # to check for xd0u or yd0u. i'm just going to hard-code + # these in and hope that it doesn't ever change ever! + if address.lower() in ('xd0u', 'yd0u'): + category = address[0:2] + index = 0.5 + else: + i = next(i for i, s in enumerate(address) if s.isdigit()) + category, index = address[:i].lower(), int(address[i:]) + + # check to make sure that the category ("X", "Y", "DS", etc) is valud if category not in self.data_types: - raise ValueError(f"{category} currently unsupported.") + raise ValueError(f"{category} is not a valid category. Did you spell it right?") + + # remove the "16"s and "32"s from int16s and int32s. data_type = self.data_types[category].rstrip(digits) + + # go through all the data for datum in data: + # if an int was passed in and we need a float, this is a pretty easy fix. if type(datum) == int and data_type == 'float': # noqa: E721 datum = float(datum) + + # however, we aren't going to handle any other type switching. + # using pydoc.locate, we can turn something like + # the string "float" into the actual type `float`. + # personally, i wonder how long that .locate() takes, we + # could just run it once. it's probably fine. if type(datum) != pydoc.locate(data_type): # noqa: E721 raise ValueError(f"Expected {address} as a {data_type}.") + + # now call the correct set function based on the category. return await getattr(self, '_set_' + category)(index, data) async def _get_x(self, start: int, end: int | None) -> dict: @@ -170,7 +220,7 @@ async def _get_x(self, start: int, end: int | None) -> dict: a number of addresses not divisible by 8, it will have extra data. The extra data here is discarded before returning. """ - if start % 100 == 0 or start % 100 > 16: + if (start % 100 == 0 or start % 100 > 16): raise ValueError('X start address must be *01-*16.') if start < 1 or start > 816: raise ValueError('X start address must be in [001, 816].') @@ -189,6 +239,7 @@ async def _get_x(self, start: int, end: int | None) -> dict: coils = await self.read_coils(start_coil, count) output = {} current = start + # NOTE: could use client.convert_to_registers for bit in coils.bits: if current > end: break @@ -238,6 +289,7 @@ async def _get_y(self, start: int, end: int | None) -> dict: coils = await self.read_coils(start_coil, count) output = {} current = start + # NOTE: could use client.convert_to_registers for bit in coils.bits: if current > end: break @@ -270,6 +322,7 @@ async def _get_c(self, start: int, end: int | None) -> dict | bool: end_coil = 16384 + end - 1 count = end_coil - start_coil + 1 coils = await self.read_coils(start_coil, count) + # NOTE: could use client.convert_to_registers return {f'c{(start + i)}': bit for i, bit in enumerate(coils.bits) if i < count} async def _get_t(self, start: int, end: int | None) -> dict | bool: @@ -295,6 +348,7 @@ async def _get_t(self, start: int, end: int | None) -> dict | bool: end_coil = 14555 + end - 1 count = end_coil - start_coil + 1 coils = await self.read_coils(start_coil, count) + # NOTE: could use client.convert_to_registers return {f't{(start + i)}': bit for i, bit in enumerate(coils.bits) if i < count} async def _get_ct(self, start: int, end: int | None) -> dict | bool: @@ -321,6 +375,7 @@ async def _get_ct(self, start: int, end: int | None) -> dict | bool: count = end_coil - start_coil + 1 coils = await self.read_coils(start_coil, count) + # NOTE: could use client.convert_to_registers return {f'ct{(start + i)}': bit for i, bit in enumerate(coils.bits) if i < count} async def _get_sc(self, start: int, end: int | None) -> dict | bool: @@ -353,9 +408,10 @@ async def _get_sc(self, start: int, end: int | None) -> dict | bool: end_coil = 61440 + (end - 1) count = end_coil - start_coil + 1 coils = await self.read_coils(start_coil, count) + # NOTE: could use client.convert_to_registers return {f'sc{start + i}': bit for i, bit in enumerate(coils.bits) if i < count} - async def _get_ds(self, start: int, end: int | None) -> dict | int: + async def _get_ds(self, start: int, end: int | None) -> dict[str, int] | int: """Read DS registers. Called by `get`. DS entries start at Modbus address 0 (1 in the Click software's @@ -369,12 +425,17 @@ async def _get_ds(self, start: int, end: int | None) -> dict | int: address = 0 + start - 1 count = 1 if end is None else (end - start + 1) registers = await self.read_registers(address, count) - # pack all as unsigned 16-bit little-endian and then unpack as signed 16-bit ints - packed = struct.pack(f'<{count}H', *registers) - values = struct.unpack(f'<{count}h', packed) - if count == 1: - return values[0] - return {f'ds{start + i}': v for i, v in enumerate(values)} + register_values: int | list[int] = self.client.convert_from_registers( + registers, data_type=self.client.DATATYPE.INT16 + ) # type: ignore + # one item was returned + if isinstance(register_values, int): + return register_values + + # more than one item is returned + if end is not None: + return {f'ds{n}': register_values[_index] for _index, n in enumerate(range(start, end + 1))} + raise ValueError("PyModbus has failed to return the correct type.") async def _get_dd(self, start: int, end: int | None) -> dict | int: if start < 1 or start > 1000: @@ -385,15 +446,16 @@ async def _get_dd(self, start: int, end: int | None) -> dict | int: address = 16384 + 2 * (start - 1) count = 2 if end is None else 2 * (end - start + 1) registers = await self.read_registers(address, count) - - # Pack registers as 16-bit unsigned shorts, little-endian ('<'), then unpack as signed 32-bit ints - packed = struct.pack(f'<{count}H', *registers) - values = struct.unpack(f'<{count // 2}i', packed) # 'i' = signed 32-bit int - - if count == 2: - return values[0] - return {f'dd{start + i}': v for i, v in enumerate(values)} - + registers_real = registers.copy() + for i, item in enumerate(registers): + if i % 2: + registers_real[i - 1] = item + else: + registers_real[i + 1] = item + register_values = self._convert_from_registers(registers_real, data_type=self.client.DATATYPE.INT32) + if end is None: + return register_values + return {f"dd{n}": register_values[_index] for _index, n in enumerate(range(start, end + 1))} async def _get_dh(self, start: int, end: int | None) -> dict | int: if start < 1 or start > 500: @@ -404,11 +466,10 @@ async def _get_dh(self, start: int, end: int | None) -> dict | int: address = 24576 + start - 1 count = 1 if end is None else (end - start + 1) registers = await self.read_registers(address, count) - - if count == 1: - return int(registers[0]) # unsigned 16-bit int can just cast with int() - return {f'dh{start + n}': int(v) for n, v in enumerate(registers)} - + register_values = self._convert_from_registers(registers, data_type=self.client.DATATYPE.INT16) + if end is None: + return register_values + return {f"dh{n}": register_values[_index] for _index, n in enumerate(range(start, end + 1))} async def _get_df(self, start: int, end: int | None) -> dict | float: """Read DF registers. Called by `get`. @@ -425,13 +486,107 @@ async def _get_df(self, start: int, end: int | None) -> dict | float: address = 28672 + 2 * (start - 1) count = 2 * (1 if end is None else (end - start + 1)) registers = await self.read_registers(address, count) - # pack the pairs of 16-bit registers (little-endian) and then unpack as 32-byte floats - packed = struct.pack(f'<{count}H', *registers) - values = struct.unpack(f'<{count // 2}f', packed) - if count == 2: - return values[0] - return {f'df{start + n}': v - for n, v in enumerate(values)} + registers_real = registers.copy() + + for i, item in enumerate(registers): + if i % 2: + registers_real[i - 1] = item + else: + registers_real[i + 1] = item + print(registers_real) + register_values = self._convert_from_registers(registers_real, data_type=self.client.DATATYPE.FLOAT32) + print(register_values) + if end is None: + return register_values + return {f'df{n}': register_values[_index] for _index, n in enumerate(range(start, end + 1))} + + async def _get_xd(self, start: int, end: int | None) -> dict: + """Read XD registers. Called by `get`.""" + # check ranges + if start < 0 or start > 8: + raise ValueError('YD must be in [0, 8].') + if end is not None and (end < 0 or end > 8): + raise ValueError('YD end must be in [0, 8].') + # calculate address + address = int(57344 + 2 * (start)) + + # see documentation for `self.u_index()` + _adjusted_start = self.u_index(start) + count = 1 if end is None else (self.u_index(end) - _adjusted_start + 1) + + # ok so that count variable is getting used in two places. + # here, where we're determining how many registers to read, + # and at the bottom of the function, where we determine how many + # to spit back out at the user. the problem is, there's a blank address + # between each of these items - except between 0 and 1. that's 0u. + _addresses = ("0", "0u", "1", "2", "3", "4", "5", "6", "7", "8") + # at this point we have the adjusted_start, so we can just say "how + # many numbers past 1 are there?" + + _adjusted_count = int((end - start) * 2 + 1) if end is not None else 1 + registers = await self.read_registers(address, _adjusted_count) + if not registers or len(registers) < count : + raise ValueError("Failed to read correct number of registers.") + + register_values = self._convert_from_registers(registers, data_type=self.client.DATATYPE.INT16) + # this still works - it's just one value + if end is None: + return register_values + + # honestly this is a complete mess and i should come back and make it not nasty + _values: dict[str, int] = {} + + # if the start is yd0 or yd0u, we need some kind of special case + # case 0: start was 0 + if _adjusted_start < 1: + _values['xd0'] = register_values.pop(0) + # case 1: start was 0 or 0u + if _adjusted_start < 2: + _values['xd0u'] = register_values.pop(0) + # normal case + for n in range(max(_adjusted_start, 2), _adjusted_start + count): + _values[f'xd{_addresses[n]}'] = register_values.pop(0) + if n != _adjusted_start + count - 1: + register_values.pop(0) + return _values + + async def _get_yd(self, start: int | float, end: int | float | None) -> dict: + """Read YD registers. Called by `get`.""" + # check ranges + if start < 0 or start > 8: + raise ValueError('YD must be in [0, 8].') + if end is not None and (end < 0 or end > 8): + raise ValueError('YD end must be in [0, 8].') + # calculate address + address = int(57856 + 2 * (start)) + + # see documentation for `self.u_index()` + _adjusted_start = self.u_index(start) + count = 1 if end is None else (self.u_index(end) - _adjusted_start + 1) + + _adjusted_count = int((end - start) * 2 + 1) if end is not None else 1 + registers = await self.read_registers(address, _adjusted_count) + if not registers or len(registers) < count : + raise ValueError("Failed to read correct number of registers.") + register_values = self._convert_from_registers(registers, data_type=self.client.DATATYPE.INT16) + # this still works - it's just one value + if end is None: + return register_values + + _values: dict[str, int] = {} + # if the start is yd0 or yd0u, we need some kind of special case + # case 0: start was 0 + if _adjusted_start < 1: + _values['yd0'] = register_values.pop(0) + # case 1: start was 0 or 0u + if _adjusted_start < 2: + _values['yd0u'] = register_values.pop(0) + # normal case + for n in range(max(_adjusted_start, 2), _adjusted_start + count): + _values[f'yd{n - 1}'] = register_values.pop(0) + if n != _adjusted_start + count - 1: + register_values.pop(0) + return _values async def _get_td(self, start: int, end: int | None) -> dict | int: """Read TD registers. Called by `get`. @@ -447,14 +602,10 @@ async def _get_td(self, start: int, end: int | None) -> dict | int: address = 45056 + (start - 1) count = 1 if end is None else (end - start + 1) registers = await self.read_registers(address, count) - - # pack all as unsigned 16-bit little-endian and then unpack as signed 16-bit ints - packed = struct.pack(f'<{count}H', *registers) - values = struct.unpack(f'<{count}h', packed) - if count == 1: - return values[0] - return {f'td{start + i}': v for i, v in enumerate(values)} - + register_values = self._convert_from_registers(registers, data_type=self.client.DATATYPE.INT16) + if end is None: + return register_values + return {f"td{n}": register_values.pop(0) for n in range(start, end + 1)} async def _get_ctd(self, start: int, end: int | None) -> dict: """Read CTD registers. Called by `get`. @@ -468,14 +619,18 @@ async def _get_ctd(self, start: int, end: int | None) -> dict: raise ValueError('CTD end must be in [1, 250]') address = 49152 + 2 * (start - 1) # 32-bit - count = 2 if end is None else 2 * (end - start + 1) - registers = await self.read_registers(address, count) - - # pack the pairs of 16-bit registers (little-endian) and then unpack as 32-byte signed ints - print(registers, count) - packed = struct.pack(f'<{count}H', *registers) - values = struct.unpack(f'<{count // 2}i', packed) - return {f'ctd{start + n}': v for n, v in enumerate(values)} + count = 1 if end is None else (end - start + 1) + registers = await self.read_registers(address, count * 2) + registers_real = registers.copy() + for i, item in enumerate(registers): + if i % 2: + registers_real[i - 1] = item + else: + registers_real[i + 1] = item + register_values = self._convert_from_registers(registers_real, data_type=self.client.DATATYPE.INT32) + if end is None: + return register_values + return {f"ctd{n}": register_values.pop(0) for n in range(start, end + 1)} async def _get_sd(self, start: int, end: int | None) -> dict | int: """Read SD registers. Called by `get`. @@ -491,10 +646,10 @@ async def _get_sd(self, start: int, end: int | None) -> dict | int: address = 61440 + start - 1 count = 1 if end is None else (end - start + 1) registers = await self.read_registers(address, count) - - if count == 1 and end is None: - return int(registers[0]) # unsigned 16-bit int can just cast with int() - return {f'sd{start + n}': int(v) for n, v in enumerate(registers)} + register_values = self._convert_from_registers(registers, data_type=self.client.DATATYPE.INT16) + if end is None: + return register_values + return {f"sd{n}": register_values.pop(0) for n in range(start, end + 1)} @overload async def _get_txt(self, start: int, end: None) -> str: ... @@ -517,23 +672,26 @@ async def _get_txt(self, start, end): address = 36864 + (start - 1) // 2 if end is None: registers = await self.read_registers(address, 1) - r = registers[0] - assert isinstance(r, int) - if start % 2: - return chr(r & 0x00FF) # if starting on the second byte of a 16-bit register, discard the MSB - return chr((r >> 8) & 0x00FF) # otherwise discard LSB + return self._convert_from_registers(registers, data_type=self.client.DATATYPE.STRING) count = 1 + (end - start) // 2 + (start - 1) % 2 + if (start % 2) == (end % 2) == 0: + count -= 1 registers = await self.read_registers(address, count) - - # Swap the two bytes within each 16-bit register (i.e., 0x4231 -> 0x3142) - swapped = [((reg & 0xFF) << 8) | (reg >> 8) for reg in registers] - byte_data = b''.join(reg.to_bytes(2, 'big') for reg in swapped) - r = byte_data.decode('ascii') + register_values: str = self._convert_from_registers(registers, data_type=self.client.DATATYPE.STRING) + if len(register_values) < count * 2: + raise Exception("You are requesting more text than has been put into the Click PLC.") + + r = '' + for _ in range(count): + msb = register_values[2 * _] + lsb = register_values[2 * _ + 1] + r += lsb + msb if end % 2: # if ending on the first byte of a 16-bit register, discard the final LSB r = r[:-1] if not start % 2: r = r[1:] # if starting on the last byte of a 16-bit register, discard the first MSB + assert len(r) == (end - start + 1), f"{len(r)=}, {end-start+1=}, {r=}" return {f'txt{start}-txt{end}': r} async def _set_y(self, start: int, data: list[bool]): @@ -698,6 +856,74 @@ async def _set_dh(self, start: int, data: list[int]): raise ValueError('Data list longer than available addresses.') await self.write_registers(address, values=data) + async def _set_yd(self, start: int, data: list[int]): + """Set YD registers. Called by `set`.""" + # make sure the values are correct + # side note: yd0u will come in with start == 0.5 + if start < 0 or start > 8: + raise ValueError("YD must be in [0, 8]") + # make sure all the data is an int16 + for datum in data: + if datum.bit_length() > 16: + raise ValueError(f"Datum {datum} is longer than 16 bits. YD registers cannot hold more than 16 bits.") + # get the correct starting address + address = int(57856 + 2 * (start)) + + # i am going to do this horribly. anyone who comes after me and wants + # to fix it is absolutely welcome to. + horrible_index = self.u_index(start) + if len(data) > 10 - horrible_index: + raise ValueError( + "Data list is longer than available addresses. " + + "Make sure you're accounting for YD0u!" + ) + + # yd sucks. sorry i have to do it like this + values: list[int] = [] + extended_zero = False + for i, datum in enumerate(data): + if (start == 0 and i in (0, 1)) or (start == 0.5 and i == 0): + values.append(datum) + else : + extended_zero = True + values.extend((datum, 0x0000)) + # remove the last (0x0000) + if extended_zero: + values.pop() + await self.write_registers(address, values) + return + + @staticmethod + def u_index(x: int | float) -> int: + """Here's the deal with this method. + + I had to denote for XD and YD if somebody was trying to get/set + XD0u or YD0u. So if that happens, then I pass through 0.5 as the + start or end. The problem is, this messes with trying to figure out how many + values are to be returned / to be set. So this just orders them + in a normal `int` value. + + ``` + u_index(0) + >>> 0 + u_index(0.5) + >>> 1 + u_index(1) + >>> 2 + u_index(2) + >>> 3 + # etc... + ``` + + """ + if x == 0 : + return 0 + if x == 0.5: + return 1 + if isinstance(x, float) : + raise ValueError(f"You cannot send {x} into 'u_index'. It is a float that is not 0.5.") + return x + 1 + async def _set_td(self, start: int, data: list[int]): """Set TD registers. Called by `set`. @@ -715,6 +941,25 @@ async def _set_td(self, start: int, data: list[int]): await self.write_registers(address, values) + async def _set_ctd(self, start: int, data: list[int]): + """Set CTD registers. Called by `set`.""" + if start < 1 or start > 250 : + raise ValueError("CTD must be in [1, 250].") + address = 49152 + 2 * (start - 1) + if len(data) > 250 - start + 1: + raise ValueError('Data list longer than available addresses.') + + # pymodbus is expectin list[uint_16] + # convert each int_32 into a uint_16 pair (little-endian) with the same byte value + + values: list[bytes] = [] + for datum in data : + packed_4_bytes = struct.pack(' Any: #int | float | str | list[bool] | list[int] | list[float]: + return self.client.convert_from_registers( + registers, + data_type, + word_order=word_order, + string_encoding=string_encoding + ) + async def write_coils(self, address: int, values): """Write modbus coils.""" await self._request('write_coils', address=address, values=values)