diff --git a/config/modbus.toml b/config/modbus.toml index 7a098a7..98e64b0 100644 --- a/config/modbus.toml +++ b/config/modbus.toml @@ -12,4 +12,9 @@ databits=8 [thinedge] mqtthost="127.0.0.1" -mqttport=1883 \ No newline at end of file +mqttport=1883 +# Subscribe to MQTT topics for receiving messages + subscribe_topics = [ + "te/device/+///cmd/modbus_SetRegister/+", + "te/device/+///cmd/modbus_SetCoil/+", + ] \ No newline at end of file diff --git a/images/simulator/modbus.json b/images/simulator/modbus.json index e70b235..9f81658 100644 --- a/images/simulator/modbus.json +++ b/images/simulator/modbus.json @@ -34,13 +34,9 @@ } }, "invalid": [1], - "write": [3], + "write": [3, 48], "bits": [ - { - "addr": 2, - "value": -2, - "action": "increment" - } + 48 ], "uint16": [ { diff --git a/operations/c8y_SetCoil.template b/operations/c8y_SetCoil.template new file mode 100644 index 0000000..51736a9 --- /dev/null +++ b/operations/c8y_SetCoil.template @@ -0,0 +1,7 @@ +[exec] + topic = "c8y/devicecontrol/notifications" + on_fragment = "c8y_SetCoil" + +[exec.workflow] +operation = "modbus_SetCoil" +input = "${.payload.c8y_SetCoil}" \ No newline at end of file diff --git a/operations/c8y_SetRegister.template b/operations/c8y_SetRegister.template new file mode 100644 index 0000000..37dafa8 --- /dev/null +++ b/operations/c8y_SetRegister.template @@ -0,0 +1,7 @@ +[exec] + topic = "c8y/devicecontrol/notifications" + on_fragment = "c8y_SetRegister" + +[exec.workflow] +operation = "modbus_SetRegister" +input = "${.payload.c8y_SetRegister}" \ No newline at end of file diff --git a/tedge_modbus/operations/__main__.py b/tedge_modbus/operations/__main__.py index aaf40f9..9f3a34f 100644 --- a/tedge_modbus/operations/__main__.py +++ b/tedge_modbus/operations/__main__.py @@ -1,3 +1,4 @@ +# pylint: disable=R0801, duplicate-code """thin-edge.io Modbus operations handlers""" import sys diff --git a/tedge_modbus/operations/common.py b/tedge_modbus/operations/common.py new file mode 100644 index 0000000..f5e82f4 --- /dev/null +++ b/tedge_modbus/operations/common.py @@ -0,0 +1,146 @@ +"""Common helpers for Modbus operation handlers.""" + +from __future__ import annotations + +import json +import logging + +import toml +from pymodbus.client import ModbusSerialClient, ModbusTcpClient + + +def parse_json_arguments(arguments: str | list[str]) -> dict: + """Parse JSON arguments which may be a string or list of segments. + + Raises ValueError on invalid JSON. + """ + if isinstance(arguments, str): + raw = arguments + else: + raw = arguments[0] if len(arguments) == 1 else ",".join(arguments) + try: + return json.loads(raw) + except json.JSONDecodeError as err: + raise ValueError(f"Invalid JSON payload: {err}") from err + + +def resolve_target_device( + ip_address: str, slave_id: int, devices_path +) -> tuple[dict, str]: + """Resolve device connection parameters from ip or devices.toml. + + Returns (target_device, protocol). + """ + if ip_address: + ip = ip_address or "127.0.0.1" + protocol = "TCP" + target_device = { + "protocol": "TCP", + "ip": ip, + "port": 502, + "address": slave_id, + } + else: + devices_cfg = toml.load(devices_path) + devices = devices_cfg.get("device", []) or [] + target_device = next( + (d for d in devices if d.get("address") == slave_id), None + ) or next((d for d in devices if d.get("protocol") == "TCP"), None) + if target_device is None: + raise ValueError(f"No suitable device found in {devices_path}") + protocol = target_device.get("protocol") + return target_device, protocol # type: ignore[return-value] + + +def backfill_serial_defaults( + target_device: dict, protocol: str, base_config: dict +) -> None: + """For RTU devices, backfill serial settings from base config if missing.""" + if protocol == "RTU": + serial_defaults = base_config.get("serial") or {} + for key in ["port", "baudrate", "stopbits", "parity", "databits"]: + if target_device.get(key) is None and key in serial_defaults: + target_device[key] = serial_defaults[key] + + +def build_modbus_client(target_device: dict, protocol: str): + """Create a pymodbus client for given target and protocol.""" + if protocol == "TCP": + return ModbusTcpClient( + host=target_device["ip"], + port=target_device["port"], + auto_open=True, + auto_close=True, + debug=True, + ) + if protocol == "RTU": + return ModbusSerialClient( + port=target_device["port"], + baudrate=target_device["baudrate"], + stopbits=target_device["stopbits"], + parity=target_device["parity"], + bytesize=target_device["databits"], + ) + raise ValueError("Expected protocol to be RTU or TCP. Got " + str(protocol) + ".") + + +def close_client_quietly(client) -> None: + """Close a pymodbus client and ignore any exceptions.""" + try: + client.close() + except Exception: + pass + + +def prepare_client( + ip_address: str, + slave_id: int, + devices_path, + base_config: dict, +): + """Resolve target device, backfill defaults, and build a Modbus client.""" + target_device, protocol = resolve_target_device(ip_address, slave_id, devices_path) + backfill_serial_defaults(target_device, protocol, base_config) + return build_modbus_client(target_device, protocol) + + +def apply_loglevel(logger, base_config: dict) -> None: + """Apply log level from base configuration to given logger.""" + loglevel = base_config["modbus"].get("loglevel") or "INFO" + logger.setLevel(getattr(logging, loglevel.upper(), logging.INFO)) + + +def parse_register_params(payload: dict) -> dict: + """Parse and validate register operation parameters into a single dict. + + Returns a dict with keys: ip_address, slave_id, register, start_bit, num_bits, write_value. + """ + ip_address = (payload.get("ipAddress") or "").strip() + try: + return { + "ip_address": ip_address, + "slave_id": int(payload["address"]), + "register": int(payload["register"]), + "start_bit": int(payload.get("startBit", 0)), + "num_bits": int(payload.get("noBits", 16)), + "write_value": int(payload["value"]), + } + except KeyError as err: + raise ValueError(f"Missing required field: {err}") from err + except (TypeError, ValueError) as err: + raise ValueError(f"Invalid numeric field: {err}") from err + + +def compute_masked_value( + current_value: int, start_bit: int, num_bits: int, write_value: int +) -> int: + """Validate bit-field and compute new register value with masked bits applied.""" + if start_bit < 0 or num_bits <= 0 or start_bit + num_bits > 16: + raise ValueError( + "startBit and noBits must define a range within a 16-bit register" + ) + max_value = (1 << num_bits) - 1 + if write_value < 0 or write_value > max_value: + raise ValueError(f"value must be within 0..{max_value} for noBits={num_bits}") + mask = ((1 << num_bits) - 1) << start_bit + return (current_value & ~mask) | ((write_value << start_bit) & mask) diff --git a/tedge_modbus/operations/set_coil.py b/tedge_modbus/operations/set_coil.py new file mode 100644 index 0000000..26f90c0 --- /dev/null +++ b/tedge_modbus/operations/set_coil.py @@ -0,0 +1,76 @@ +#!/usr/bin/env python3 +"""Modbus Write Coil Status operation handler""" +import logging + +from pymodbus.exceptions import ConnectionException +from .context import Context +from .common import ( + parse_json_arguments, + prepare_client, + apply_loglevel, + close_client_quietly, +) + +logger = logging.getLogger(__name__) +logging.basicConfig( + level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" +) + + +def run(arguments: str | list[str]) -> None: + """Run set coil operation handler + Expected arguments (JSON): + { + "input": false, + "address": < Fieldbusaddress >, + "coil": < coilnumber >, + "value": < 0 | 1 > + } + Parse JSON payload""" + payload = parse_json_arguments(arguments) + + # Create context with default config directory + context = Context() + + # Load configs and set log level + modbus_config = context.base_config + apply_loglevel(logger, modbus_config) + logger.info("New set coil operation. args=%s", arguments) + + try: + slave_id = int(payload["address"]) # Fieldbus address + coil_number = int(payload["coil"]) # Coil address + value_int = int(payload["value"]) # 0 or 1 + except KeyError as err: + raise ValueError(f"Missing required field: {err}") from err + except (TypeError, ValueError) as err: + raise ValueError(f"Invalid numeric field: {err}") from err + + if value_int not in (0, 1): + raise ValueError("value must be 0 or 1 for a coil write") + + # Prepare client (resolve target, backfill defaults, build client) + client = prepare_client( + payload["ipAddress"], + slave_id, + context.config_dir / "devices.toml", + modbus_config, + ) + + try: + coil_value = bool(value_int) + result = client.write_coil( + address=coil_number, + value=coil_value, + slave=slave_id, + ) + if result.isError(): + raise RuntimeError(f"Failed to write coil {coil_number}: {result}") + logger.info( + "Wrote %s to coil %d on slave %d", coil_value, coil_number, slave_id + ) + except ConnectionException as err: + logger.error("Connection error while writing to slave %d: %s", slave_id, err) + raise + finally: + close_client_quietly(client) diff --git a/tedge_modbus/operations/set_register.py b/tedge_modbus/operations/set_register.py new file mode 100644 index 0000000..998845c --- /dev/null +++ b/tedge_modbus/operations/set_register.py @@ -0,0 +1,101 @@ +# pylint: disable=duplicate-code +"""Modbus Write register status operation handler""" +import logging + +from pymodbus.exceptions import ConnectionException +from .context import Context +from .common import ( + parse_json_arguments, + prepare_client, + apply_loglevel, + close_client_quietly, + parse_register_params, + compute_masked_value, +) + +logger = logging.getLogger(__name__) +logging.basicConfig( + level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" +) + + +def run(arguments: str | list[str]) -> None: + """Run set register operation handler + Expected arguments (JSON): + { + "input": false, + "ipAddress": , + "address": , + "register": , + "startBit": , + "noBits": , + "value": + } + Parse JSON arguments. Depending on the caller, we may receive the JSON as a single + string or a list of comma-split segments. Handle both cases robustly.""" + payload = parse_json_arguments(arguments) + + # Create context with default config directory + context = Context() + + # Load configs and set log level + modbus_config = context.base_config + apply_loglevel(logger, modbus_config) + logger.info("New set register operation") + + # Parse required fields from JSON + params = parse_register_params(payload) + + # Prepare client (resolve target, backfill defaults, build client) + client = prepare_client( + params["ip_address"], + params["slave_id"], + context.config_dir / "devices.toml", + modbus_config, + ) + + # Validate and compute new value + + try: + # Read current register value + read_resp = client.read_holding_registers( + address=params["register"], count=1, slave=params["slave_id"] + ) + if read_resp.isError(): + raise RuntimeError( + f"Failed to read register {params['register']}: {read_resp}" + ) + current_value = read_resp.registers[0] & 0xFFFF + new_value = compute_masked_value( + current_value, + params["start_bit"], + params["num_bits"], + params["write_value"], + ) + + # Write back register + write_resp = client.write_register( + address=params["register"], value=new_value, slave=params["slave_id"] + ) + if write_resp.isError(): + raise RuntimeError( + f"Failed to write register {params['register']}: {write_resp}" + ) + logger.info( + "Updated register %d (bits %d..%d) from 0x%04X to 0x%04X on slave %d", + params["register"], + params["start_bit"], + params["start_bit"] + params["num_bits"] - 1, + current_value, + new_value, + params["slave_id"], + ) + except ConnectionException as err: + logger.error( + "Connection error while writing to slave %d: %s", + params["slave_id"], + err, + ) + raise + finally: + close_client_quietly(client) diff --git a/tedge_modbus/reader/reader.py b/tedge_modbus/reader/reader.py index 177fde3..52ff55d 100644 --- a/tedge_modbus/reader/reader.py +++ b/tedge_modbus/reader/reader.py @@ -18,6 +18,8 @@ from .banner import BANNER from .mapper import MappedMessage, ModbusMapper +from ..operations import set_coil, set_register + DEFAULT_FILE_DIR = "/etc/tedge/plugins/modbus" BASE_CONFIG_NAME = "modbus.toml" @@ -410,6 +412,133 @@ def send_tedge_message( topic=msg.topic, payload=msg.data, retain=retain, qos=qos ) + def on_connect( + self, client, userdata, flags, rc + ): # pylint: disable=unused-argument + """Callback for when the client receives a CONNACK response from the server""" + if rc == 0: + self.logger.debug("Connected to MQTT broker successfully") + # Subscribe to topics if configured + self._subscribe_to_topics(client) + else: + self.logger.error("Failed to connect to MQTT broker, return code %d", rc) + + def on_message(self, client, userdata, msg): # pylint: disable=unused-argument + """Callback for when a PUBLISH message is received from the server""" + try: + topic = msg.topic + payload = msg.payload.decode("utf-8") + self.logger.debug("Received message on topic %s: %s", topic, payload) + self._handle_subscribed_message(topic, payload) + except Exception as e: + self.logger.error("Error processing subscribed message: %s", e) + + def on_disconnect(self, client, userdata, rc): # pylint: disable=unused-argument + """Callback for when the client disconnects from the broker""" + if rc != 0: + self.logger.warning( + "Unexpected disconnection from MQTT broker, return code %d", rc + ) + else: + self.logger.debug("Disconnected from MQTT broker") + + def _subscribe_to_topics(self, client): + """Subscribe to configured MQTT topics""" + subscribe_topics = self.base_config.get("thinedge", {}).get( + "subscribe_topics", [] + ) + if subscribe_topics: + for topic in subscribe_topics: + try: + result = client.subscribe(topic) + if result[0] == mqtt_client.MQTT_ERR_SUCCESS: + self.logger.debug("Successfully subscribed to topic: %s", topic) + else: + self.logger.error( + "Failed to subscribe to topic %s, error code: %d", + topic, + result[0], + ) + except Exception as e: + self.logger.error("Error subscribing to topic %s: %s", topic, e) + + def _handle_subscribed_message(self, topic, payload): + """Handle messages received from subscribed topics""" + self.logger.debug("Handling message from topic %s: %s", topic, payload) + + try: + payload_data = json.loads(payload) + except json.JSONDecodeError as e: + if payload != "": + self.logger.error("Failed to parse JSON payload: %s", e) + return + + if payload_data["status"] == "init": + # Publish executing status + payload_data["status"] = "executing" + self.send_tedge_message( + MappedMessage(json.dumps(payload_data), topic), retain=True, qos=1 + ) + return + + # Handle modbus_SetRegister commands + if ( + "///cmd/modbus_SetRegister/" in topic + and payload_data["status"] == "executing" + ): + self.logger.info("Processing modbus_SetRegister command") + try: + self.logger.debug("Register data: %s", payload_data) + + register_json = json.dumps(payload_data) + set_register.run(register_json) + self.logger.debug("Successfully processed modbus_SetRegister command") + payload_data["status"] = "successful" + self.send_tedge_message( + MappedMessage(json.dumps(payload_data), topic), + retain=True, + qos=1, + ) + except Exception as e: + self.logger.error("Error processing modbus_SetRegister command: %s", e) + payload_data["status"] = "failed" + payload_data["reason"] = ( + f"Error processing modbus_SetRegister command: {e}" + ) + + self.send_tedge_message( + MappedMessage(json.dumps(payload_data), topic), retain=True, qos=1 + ) + + # Handle modbus_SetCoil commands + elif ( + "///cmd/modbus_SetCoil/" in topic and payload_data["status"] == "executing" + ): + self.logger.info("Processing modbus_SetCoil command") + try: + self.logger.debug("Coil data: %s", payload_data) + + coil_json = json.dumps(payload_data) + set_coil.run(coil_json) + self.logger.debug("Successfully processed modbus_SetCoil command") + payload_data["status"] = "successful" + self.send_tedge_message( + MappedMessage(json.dumps(payload_data), topic), + retain=True, + qos=1, + ) + except Exception as e: + self.logger.error("Error processing modbus_SetCoil command: %s", e) + payload_data["status"] = "failed" + payload_data["reason"] = f"Error processing modbus_SetCoil command: {e}" + self.send_tedge_message( + MappedMessage(json.dumps(payload_data), topic), retain=True, qos=1 + ) + + # Add more topic-specific handlers as needed + else: + self.logger.debug("No specific handler for topic: %s", topic) + def connect_to_tedge(self): """Connect to the thin-edge.io MQTT broker and return a connected MQTT client""" while True: @@ -418,8 +547,18 @@ def connect_to_tedge(self): port = self.base_config["thinedge"]["mqttport"] client_id = "modbus-client" client = mqtt_client.Client(client_id) + + # Set up callbacks + client.on_connect = self.on_connect + client.on_message = self.on_message + client.on_disconnect = self.on_disconnect + client.connect(broker, port) self.logger.debug("Connected to MQTT broker at %s:%d", broker, port) + + # Start the network loop to handle callbacks + client.loop_start() + return client except Exception as e: self.logger.error("Failed to connect to thin-edge.io: %s", e) @@ -493,6 +632,13 @@ def register_child_devices(self, devices): self.send_tedge_message( MappedMessage(json.dumps(payload), topic), retain=True, qos=1 ) + cmd_payload = "{}" + for cmd in ["modbus_SetRegister", "modbus_SetCoil"]: + cmd_topic = topic + f"/cmd/{cmd}" + + self.send_tedge_message( + MappedMessage(cmd_payload, cmd_topic), retain=True, qos=1 + ) def main(): diff --git a/tests/modbus_reader/operations.robot b/tests/modbus_reader/operations.robot new file mode 100644 index 0000000..ba21b2d --- /dev/null +++ b/tests/modbus_reader/operations.robot @@ -0,0 +1,19 @@ +*** Settings *** +Resource ../resources/common.robot +Library Cumulocity +Library DateTime + +Suite Setup Set Child Device1 + + +*** Test Cases *** +Device should set supported operations for writing to registers and coils + Cumulocity.Should Contain Supported Operations c8y_SetRegister c8y_SetCoil + +Device should write to a registers + ${operation}= Cumulocity.Create Operation {"c8y_SetRegister": { "input": false, "address": 1, "startBit": 0, "noBits": 16, "ipAddress": "simulator", "value": 23, "register": 3 }} description=Write to register + Operation Should Be SUCCESSFUL ${operation} + +Device should write to a coil + ${operation}= Cumulocity.Create Operation {"c8y_SetCoil": { "input": false, "coil": 48, "address": 1, "value": 1, "ipAddress": "simulator" } } description=Write to coil + Operation Should Be SUCCESSFUL ${operation}