Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion config/modbus.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,9 @@ databits=8

[thinedge]
mqtthost="127.0.0.1"
mqttport=1883
mqttport=1883
# Subscribe to MQTT topics for receiving messages
subscribe_topics = [
"te/device/+///cmd/modbus_SetRegister/+",
"te/device/+///cmd/modbus_SetCoil/+",
]
8 changes: 2 additions & 6 deletions images/simulator/modbus.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,9 @@
}
},
"invalid": [1],
"write": [3],
"write": [3, 48],
"bits": [
{
"addr": 2,
"value": -2,
"action": "increment"
}
48
],
"uint16": [
{
Expand Down
7 changes: 7 additions & 0 deletions operations/c8y_SetCoil.template
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
[exec]
topic = "c8y/devicecontrol/notifications"
on_fragment = "c8y_SetCoil"

[exec.workflow]
operation = "modbus_SetCoil"
input = "${.payload.c8y_SetCoil}"
7 changes: 7 additions & 0 deletions operations/c8y_SetRegister.template
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
[exec]
topic = "c8y/devicecontrol/notifications"
on_fragment = "c8y_SetRegister"

[exec.workflow]
operation = "modbus_SetRegister"
input = "${.payload.c8y_SetRegister}"
1 change: 1 addition & 0 deletions tedge_modbus/operations/__main__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# pylint: disable=R0801, duplicate-code
"""thin-edge.io Modbus operations handlers"""

import sys
Expand Down
146 changes: 146 additions & 0 deletions tedge_modbus/operations/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
"""Common helpers for Modbus operation handlers."""

from __future__ import annotations

import json
import logging

import toml
from pymodbus.client import ModbusSerialClient, ModbusTcpClient


def parse_json_arguments(arguments: str | list[str]) -> dict:
"""Parse JSON arguments which may be a string or list of segments.

Raises ValueError on invalid JSON.
"""
if isinstance(arguments, str):
raw = arguments
else:
raw = arguments[0] if len(arguments) == 1 else ",".join(arguments)
try:
return json.loads(raw)
except json.JSONDecodeError as err:
raise ValueError(f"Invalid JSON payload: {err}") from err


def resolve_target_device(
ip_address: str, slave_id: int, devices_path
) -> tuple[dict, str]:
"""Resolve device connection parameters from ip or devices.toml.

Returns (target_device, protocol).
"""
if ip_address:
ip = ip_address or "127.0.0.1"
protocol = "TCP"
target_device = {
"protocol": "TCP",
"ip": ip,
"port": 502,
"address": slave_id,
}
else:
devices_cfg = toml.load(devices_path)
devices = devices_cfg.get("device", []) or []
target_device = next(
(d for d in devices if d.get("address") == slave_id), None
) or next((d for d in devices if d.get("protocol") == "TCP"), None)
if target_device is None:
raise ValueError(f"No suitable device found in {devices_path}")
protocol = target_device.get("protocol")
return target_device, protocol # type: ignore[return-value]


def backfill_serial_defaults(
target_device: dict, protocol: str, base_config: dict
) -> None:
"""For RTU devices, backfill serial settings from base config if missing."""
if protocol == "RTU":
serial_defaults = base_config.get("serial") or {}
for key in ["port", "baudrate", "stopbits", "parity", "databits"]:
if target_device.get(key) is None and key in serial_defaults:
target_device[key] = serial_defaults[key]


def build_modbus_client(target_device: dict, protocol: str):
"""Create a pymodbus client for given target and protocol."""
if protocol == "TCP":
return ModbusTcpClient(
host=target_device["ip"],
port=target_device["port"],
auto_open=True,
auto_close=True,
debug=True,
)
if protocol == "RTU":
return ModbusSerialClient(
port=target_device["port"],
baudrate=target_device["baudrate"],
stopbits=target_device["stopbits"],
parity=target_device["parity"],
bytesize=target_device["databits"],
)
raise ValueError("Expected protocol to be RTU or TCP. Got " + str(protocol) + ".")


def close_client_quietly(client) -> None:
"""Close a pymodbus client and ignore any exceptions."""
try:
client.close()
except Exception:
pass


def prepare_client(
ip_address: str,
slave_id: int,
devices_path,
base_config: dict,
):
"""Resolve target device, backfill defaults, and build a Modbus client."""
target_device, protocol = resolve_target_device(ip_address, slave_id, devices_path)
backfill_serial_defaults(target_device, protocol, base_config)
return build_modbus_client(target_device, protocol)


def apply_loglevel(logger, base_config: dict) -> None:
"""Apply log level from base configuration to given logger."""
loglevel = base_config["modbus"].get("loglevel") or "INFO"
logger.setLevel(getattr(logging, loglevel.upper(), logging.INFO))


def parse_register_params(payload: dict) -> dict:
"""Parse and validate register operation parameters into a single dict.

Returns a dict with keys: ip_address, slave_id, register, start_bit, num_bits, write_value.
"""
ip_address = (payload.get("ipAddress") or "").strip()
try:
return {
"ip_address": ip_address,
"slave_id": int(payload["address"]),
"register": int(payload["register"]),
"start_bit": int(payload.get("startBit", 0)),
"num_bits": int(payload.get("noBits", 16)),
"write_value": int(payload["value"]),
}
except KeyError as err:
raise ValueError(f"Missing required field: {err}") from err
except (TypeError, ValueError) as err:
raise ValueError(f"Invalid numeric field: {err}") from err


def compute_masked_value(
current_value: int, start_bit: int, num_bits: int, write_value: int
) -> int:
"""Validate bit-field and compute new register value with masked bits applied."""
if start_bit < 0 or num_bits <= 0 or start_bit + num_bits > 16:
raise ValueError(
"startBit and noBits must define a range within a 16-bit register"
)
max_value = (1 << num_bits) - 1
if write_value < 0 or write_value > max_value:
raise ValueError(f"value must be within 0..{max_value} for noBits={num_bits}")
mask = ((1 << num_bits) - 1) << start_bit
return (current_value & ~mask) | ((write_value << start_bit) & mask)
76 changes: 76 additions & 0 deletions tedge_modbus/operations/set_coil.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
#!/usr/bin/env python3
"""Modbus Write Coil Status operation handler"""
import logging

from pymodbus.exceptions import ConnectionException
from .context import Context
from .common import (
parse_json_arguments,
prepare_client,
apply_loglevel,
close_client_quietly,
)

logger = logging.getLogger(__name__)
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)


def run(arguments: str | list[str]) -> None:
"""Run set coil operation handler
Expected arguments (JSON):
{
"input": false,
"address": < Fieldbusaddress >,
"coil": < coilnumber >,
"value": < 0 | 1 >
}
Parse JSON payload"""
payload = parse_json_arguments(arguments)

# Create context with default config directory
context = Context()

# Load configs and set log level
modbus_config = context.base_config
apply_loglevel(logger, modbus_config)
logger.info("New set coil operation. args=%s", arguments)

try:
slave_id = int(payload["address"]) # Fieldbus address
coil_number = int(payload["coil"]) # Coil address
value_int = int(payload["value"]) # 0 or 1
except KeyError as err:
raise ValueError(f"Missing required field: {err}") from err
except (TypeError, ValueError) as err:
raise ValueError(f"Invalid numeric field: {err}") from err

if value_int not in (0, 1):
raise ValueError("value must be 0 or 1 for a coil write")

# Prepare client (resolve target, backfill defaults, build client)
client = prepare_client(
payload["ipAddress"],
slave_id,
context.config_dir / "devices.toml",
modbus_config,
)

try:
coil_value = bool(value_int)
result = client.write_coil(
address=coil_number,
value=coil_value,
slave=slave_id,
)
if result.isError():
raise RuntimeError(f"Failed to write coil {coil_number}: {result}")
logger.info(
"Wrote %s to coil %d on slave %d", coil_value, coil_number, slave_id
)
except ConnectionException as err:
logger.error("Connection error while writing to slave %d: %s", slave_id, err)
raise
finally:
close_client_quietly(client)
101 changes: 101 additions & 0 deletions tedge_modbus/operations/set_register.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
# pylint: disable=duplicate-code
"""Modbus Write register status operation handler"""
import logging

from pymodbus.exceptions import ConnectionException
from .context import Context
from .common import (
parse_json_arguments,
prepare_client,
apply_loglevel,
close_client_quietly,
parse_register_params,
compute_masked_value,
)

logger = logging.getLogger(__name__)
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)


def run(arguments: str | list[str]) -> None:
"""Run set register operation handler
Expected arguments (JSON):
{
"input": false,
"ipAddress": <ip address or empty>,
"address": <Fieldbus address>,
"register": <register number>,
"startBit": <start bit>,
"noBits": <number of bits>,
"value": <register value>
}
Parse JSON arguments. Depending on the caller, we may receive the JSON as a single
string or a list of comma-split segments. Handle both cases robustly."""
payload = parse_json_arguments(arguments)

# Create context with default config directory
context = Context()

# Load configs and set log level
modbus_config = context.base_config
apply_loglevel(logger, modbus_config)
logger.info("New set register operation")

# Parse required fields from JSON
params = parse_register_params(payload)

# Prepare client (resolve target, backfill defaults, build client)
client = prepare_client(
params["ip_address"],
params["slave_id"],
context.config_dir / "devices.toml",
modbus_config,
)

# Validate and compute new value

try:
# Read current register value
read_resp = client.read_holding_registers(
address=params["register"], count=1, slave=params["slave_id"]
)
if read_resp.isError():
raise RuntimeError(
f"Failed to read register {params['register']}: {read_resp}"
)
current_value = read_resp.registers[0] & 0xFFFF
new_value = compute_masked_value(
current_value,
params["start_bit"],
params["num_bits"],
params["write_value"],
)

# Write back register
write_resp = client.write_register(
address=params["register"], value=new_value, slave=params["slave_id"]
)
if write_resp.isError():
raise RuntimeError(
f"Failed to write register {params['register']}: {write_resp}"
)
logger.info(
"Updated register %d (bits %d..%d) from 0x%04X to 0x%04X on slave %d",
params["register"],
params["start_bit"],
params["start_bit"] + params["num_bits"] - 1,
current_value,
new_value,
params["slave_id"],
)
except ConnectionException as err:
logger.error(
"Connection error while writing to slave %d: %s",
params["slave_id"],
err,
)
raise
finally:
close_client_quietly(client)
Loading