|
| 1 | +# Source: https://github.com/pycom/pycom-modbus/tree/master/uModbus (2018-07-16) |
| 2 | +# This file has been modified and differ from its source version. |
| 3 | +from . import uFunctions as functions |
| 4 | +from . import uConst as Const |
| 5 | +from machine import UART |
| 6 | +from machine import Pin |
| 7 | +import struct |
| 8 | +import time |
| 9 | +import machine |
| 10 | + |
| 11 | + |
| 12 | +class uSerial: |
| 13 | + |
| 14 | + def __init__(self, uart_id, tx, rx, baudrate=9600, data_bits=8, stop_bits=1, parity=None, ctrl_pin=None, debug=False): |
| 15 | + self._mdbus_uart = UART(uart_id, tx=tx, rx=rx, baudrate=baudrate, bits=data_bits, parity=parity, |
| 16 | + stop=stop_bits, timeout_char=10) |
| 17 | + if ctrl_pin is not None: |
| 18 | + self._ctrlPin = Pin(ctrl_pin, mode=Pin.OUT) |
| 19 | + else: |
| 20 | + self._ctrlPin = None |
| 21 | + self.char_time_ms = (1000 * (data_bits + stop_bits + 2)) // baudrate |
| 22 | + self._modbus_debug = debug |
| 23 | + |
| 24 | + def _calculate_crc16(self, data): |
| 25 | + crc = 0xFFFF |
| 26 | + |
| 27 | + for char in data: |
| 28 | + crc = (crc >> 8) ^ Const.CRC16_TABLE[((crc) ^ char) & 0xFF] |
| 29 | + return struct.pack('<H', crc) |
| 30 | + |
| 31 | + def _bytes_to_bool(self, byte_list): |
| 32 | + bool_list = [] |
| 33 | + for index, byte in enumerate(byte_list): |
| 34 | + bool_list.extend([bool(byte & (1 << n)) for n in range(8)]) |
| 35 | + return bool_list |
| 36 | + |
| 37 | + def _to_short(self, byte_array, signed=True): |
| 38 | + response_quantity = int(len(byte_array) / 2) |
| 39 | + fmt = '>' + (('h' if signed else 'H') * response_quantity) |
| 40 | + |
| 41 | + return struct.unpack(fmt, byte_array) |
| 42 | + |
| 43 | + def _exit_read(self, response): |
| 44 | + if response[1] >= Const.ERROR_BIAS: |
| 45 | + if len(response) < Const.ERROR_RESP_LEN: |
| 46 | + return False |
| 47 | + elif (Const.READ_COILS <= response[1] <= Const.READ_INPUT_REGISTER): |
| 48 | + expected_len = Const.RESPONSE_HDR_LENGTH + \ |
| 49 | + 1 + response[2] + Const.CRC_LENGTH |
| 50 | + if len(response) < expected_len: |
| 51 | + return False |
| 52 | + elif len(response) < Const.FIXED_RESP_LEN: |
| 53 | + return False |
| 54 | + return True |
| 55 | + |
| 56 | + def _mdbus_uart_read(self, timeout=2000): |
| 57 | + response = bytearray() |
| 58 | + loop_max = int(timeout / 50) |
| 59 | + for x in range(1, loop_max): |
| 60 | + if self._mdbus_uart.any(): |
| 61 | + response.extend(self._mdbus_uart.read()) |
| 62 | + # variable length function codes may require multiple reads |
| 63 | + if self._exit_read(response): |
| 64 | + break |
| 65 | + time.sleep(0.05) |
| 66 | + return response |
| 67 | + |
| 68 | + def _send_receive(self, modbus_pdu, slave_addr, count, timeout=2000): |
| 69 | + serial_pdu = bytearray() |
| 70 | + serial_pdu.append(slave_addr) |
| 71 | + serial_pdu.extend(modbus_pdu) |
| 72 | + |
| 73 | + crc = self._calculate_crc16(serial_pdu) |
| 74 | + serial_pdu.extend(crc) |
| 75 | + |
| 76 | + # flush the Rx FIFO |
| 77 | + read_data = self._mdbus_uart.read() |
| 78 | + if read_data is not None: |
| 79 | + self.print_debug("ModBus", "R <= {}".format(self.BytesToHexStr(read_data))) |
| 80 | + if self._ctrlPin: |
| 81 | + self._ctrlPin(1) |
| 82 | + |
| 83 | + self.print_debug("ModBus", "T => {}".format(self.BytesToHexStr(serial_pdu))) |
| 84 | + self._mdbus_uart.write(serial_pdu) |
| 85 | + if self._ctrlPin: |
| 86 | + while not self._mdbus_uart.wait_tx_done(2): |
| 87 | + machine.idle() |
| 88 | + time.sleep_ms(1 + self.char_time_ms) |
| 89 | + self._ctrlPin(0) |
| 90 | + return self._validate_resp_hdr(self._mdbus_uart_read(timeout), slave_addr, modbus_pdu[0], count) |
| 91 | + |
| 92 | + def _validate_resp_hdr(self, response, slave_addr, function_code, count): |
| 93 | + |
| 94 | + if len(response): |
| 95 | + self.print_debug("ModBus", "R <= {}".format(self.BytesToHexStr(response))) |
| 96 | + |
| 97 | + resp_crc = response[-Const.CRC_LENGTH:] |
| 98 | + expected_crc = self._calculate_crc16( |
| 99 | + response[0:len(response) - Const.CRC_LENGTH]) |
| 100 | + if (resp_crc[0] != expected_crc[0]) or (resp_crc[1] != expected_crc[1]): |
| 101 | + raise OSError('invalid response CRC') |
| 102 | + |
| 103 | + if (response[0] != slave_addr): |
| 104 | + raise ValueError('wrong slave address') |
| 105 | + |
| 106 | + if (response[1] == (function_code + Const.ERROR_BIAS)): |
| 107 | + raise ValueError( |
| 108 | + 'slave returned exception code: {:d}'.format(response[2])) |
| 109 | + |
| 110 | + hdr_length = (Const.RESPONSE_HDR_LENGTH + 1) if count else Const.RESPONSE_HDR_LENGTH |
| 111 | + return response[hdr_length: len(response) - Const.CRC_LENGTH] |
| 112 | + |
| 113 | + def read_coils(self, slave_addr, starting_addr, coil_qty, timeout=2000): |
| 114 | + modbus_pdu = functions.read_coils(starting_addr, coil_qty) |
| 115 | + |
| 116 | + resp_data = self._send_receive(modbus_pdu, slave_addr, True, timeout) |
| 117 | + if resp_data != None: |
| 118 | + status_pdu = self._bytes_to_bool(resp_data) |
| 119 | + return status_pdu |
| 120 | + |
| 121 | + def read_discrete_inputs(self, slave_addr, starting_addr, input_qty, timeout=2000): |
| 122 | + modbus_pdu = functions.read_discrete_inputs(starting_addr, input_qty) |
| 123 | + |
| 124 | + resp_data = self._send_receive(modbus_pdu, slave_addr, True, timeout) |
| 125 | + if resp_data != None: |
| 126 | + status_pdu = self._bytes_to_bool(resp_data) |
| 127 | + return status_pdu |
| 128 | + |
| 129 | + def read_holding_registers(self, slave_addr, starting_addr, register_qty, signed=True, timeout=2000): |
| 130 | + modbus_pdu = functions.read_holding_registers( |
| 131 | + starting_addr, register_qty) |
| 132 | + |
| 133 | + resp_data = self._send_receive(modbus_pdu, slave_addr, True, timeout) |
| 134 | + if resp_data != None: |
| 135 | + register_value = self._to_short(resp_data, signed) |
| 136 | + return list(register_value) |
| 137 | + |
| 138 | + def read_input_registers(self, slave_addr, starting_address, register_quantity, signed=True, timeout=2000): |
| 139 | + modbus_pdu = functions.read_input_registers( |
| 140 | + starting_address, register_quantity) |
| 141 | + |
| 142 | + resp_data = self._send_receive(modbus_pdu, slave_addr, True, timeout) |
| 143 | + if resp_data != None: |
| 144 | + register_value = self._to_short(resp_data, signed) |
| 145 | + return list(register_value) |
| 146 | + |
| 147 | + def write_single_coil(self, slave_addr, output_address, output_value, timeout=2000): |
| 148 | + modbus_pdu = functions.write_single_coil(output_address, output_value) |
| 149 | + |
| 150 | + resp_data = self._send_receive(modbus_pdu, slave_addr, False, timeout) |
| 151 | + if resp_data != None: |
| 152 | + operation_status = functions.validate_resp_data(resp_data, Const.WRITE_SINGLE_COIL, |
| 153 | + output_address, value=output_value, signed=False) |
| 154 | + return operation_status |
| 155 | + |
| 156 | + def write_single_register(self, slave_addr, register_address, register_value, signed=True, timeout=2000): |
| 157 | + modbus_pdu = functions.write_single_register( |
| 158 | + register_address, register_value, signed) |
| 159 | + |
| 160 | + resp_data = self._send_receive(modbus_pdu, slave_addr, False, timeout) |
| 161 | + if resp_data != None: |
| 162 | + operation_status = functions.validate_resp_data(resp_data, Const.WRITE_SINGLE_REGISTER, |
| 163 | + register_address, value=register_value, signed=signed) |
| 164 | + return operation_status |
| 165 | + |
| 166 | + def write_multiple_coils(self, slave_addr, starting_address, output_values, timeout=2000): |
| 167 | + modbus_pdu = functions.write_multiple_coils( |
| 168 | + starting_address, output_values) |
| 169 | + |
| 170 | + resp_data = self._send_receive(modbus_pdu, slave_addr, False, timeout) |
| 171 | + if resp_data != None: |
| 172 | + operation_status = functions.validate_resp_data(resp_data, Const.WRITE_MULTIPLE_COILS, |
| 173 | + starting_address, quantity=len(output_values)) |
| 174 | + return operation_status |
| 175 | + |
| 176 | + def write_multiple_registers(self, slave_addr, starting_address, register_values, signed=True, timeout=2000): |
| 177 | + modbus_pdu = functions.write_multiple_registers( |
| 178 | + starting_address, register_values, signed) |
| 179 | + |
| 180 | + resp_data = self._send_receive(modbus_pdu, slave_addr, False, timeout) |
| 181 | + if resp_data != None: |
| 182 | + operation_status = functions.validate_resp_data(resp_data, Const.WRITE_MULTIPLE_REGISTERS, |
| 183 | + starting_address, quantity=len(register_values)) |
| 184 | + return operation_status |
| 185 | + |
| 186 | + def BytesToHexStr(self, bins): |
| 187 | + return ''.join(["%02X" % x for x in bins]).strip() |
| 188 | + |
| 189 | + def print_debug(self, tag, msg): |
| 190 | + if self._modbus_debug: |
| 191 | + print("[ \033[32m{}\033[0m ] {}".format(tag, msg)) |
| 192 | + |
| 193 | + def close(self): |
| 194 | + if self._mdbus_uart is None: |
| 195 | + return |
| 196 | + try: |
| 197 | + self._mdbus_uart.deinit() |
| 198 | + except Exception: |
| 199 | + pass |
0 commit comments