Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -142,3 +142,8 @@ dmypy.json
.idea/modules.xml
.idea/
docker/


# macOS system files
.DS_Store

4 changes: 3 additions & 1 deletion config/devices.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ ip="simulator" # Modbus server IP
port=502 # Modbus server Port
protocol="TCP" # only support for TCP now
littlewordendian=false
#pollinterval=1 # Overrides global setting; device publishes at this interval


[[device.registers]]
number=3 # register number
Expand All @@ -18,7 +20,7 @@ decimalshiftright=0
input=false # true = Input Register false = Holding Register
#see https://thin-edge.github.io/thin-edge.io/html/architecture/thin-edge-json.html
measurementmapping.templatestring="{\"Test\":{\"Int16\":%% }}" # tedge JSON format string, %% will be replaced with the calculated value

#on_change=true # Send data only on value change

[[device.registers]]
number=6
Expand Down
49 changes: 36 additions & 13 deletions tedge_modbus/reader/mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import json
import struct
import sys
import math
from datetime import datetime, timezone
from dataclasses import dataclass

Expand All @@ -26,11 +27,9 @@ class ModbusMapper:

device = None

# store data to be able to compare them later
data = {"hr": {}, "ir": {}, "co": {}, "di": {}}

def __init__(self, device):
self.device = device
self.data = {"hr": {}, "ir": {}, "co": {}, "di": {}}

def validate(self, register_def):
"""Validate definition"""
Expand Down Expand Up @@ -94,21 +93,43 @@ def map_register(self, read_register, register_def):
value = self.parse_int(buffer, register_def.get("signed"), mask)

if register_def.get("measurementmapping") is not None:
value = (
scaled_value = (
value
* (register_def.get("multiplier") or 1)
* (10 ** (register_def.get("decimalshiftright") or 0))
/ (register_def.get("divisor") or 1)
)
data = register_def["measurementmapping"]["templatestring"].replace(
"%%", str(value)
)
messages.append(
MappedMessage(
data,
topics["measurement"].replace("CHILD_ID", self.device.get("name")),

on_change = register_def.get("on_change", False)

last_value = self.data.get(register_type, {}).get(register_key)

has_changed = False
last_value = self.data.get(register_type, {}).get(register_key)

if last_value is not None:
if isinstance(scaled_value, float):
has_changed = not isinstance(last_value, float) or not math.isclose(
scaled_value, last_value
)
else:
has_changed = last_value != scaled_value

if not on_change or last_value is None or has_changed:
data = register_def["measurementmapping"]["templatestring"].replace(
"%%", str(scaled_value)
)
)
messages.append(
MappedMessage(
data,
topics["measurement"].replace(
"CHILD_ID", self.device.get("name")
),
)
)
self.data.setdefault(register_type, {})[register_key] = scaled_value

value = scaled_value
if register_def.get("alarmmapping") is not None:
messages.extend(
self.check_alarm(
Expand All @@ -121,7 +142,9 @@ def map_register(self, read_register, register_def):
value, register_def.get("eventmapping"), register_type, register_key
)
)
self.data[register_type][register_key] = value

if register_def.get("measurementmapping") is None:
self.data.setdefault(register_type, {})[register_key] = value
return messages

def map_coil(self, bits, coil_definition):
Expand Down
5 changes: 4 additions & 1 deletion tedge_modbus/reader/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,11 @@ def poll_device(self, device, poll_model, mapper):
else:
self.logger.error("Failed to poll device %s: %s", device["name"], error)

interval = device.get(
"pollinterval", self.base_config["modbus"]["pollinterval"]
)
self.poll_scheduler.enter(
self.base_config["modbus"]["pollinterval"],
interval,
1,
self.poll_device,
(device, poll_model, mapper),
Expand Down
168 changes: 168 additions & 0 deletions tests/unit/test_mapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
import unittest
import struct
import os
import sys

parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "../.."))
sys.path.insert(0, parent_dir)
from tedge_modbus.reader.mapper import ModbusMapper

import unittest
import struct
import json
from tedge_modbus.reader.mapper import ModbusMapper


class TestMapperOnChange(unittest.TestCase):
def setUp(self):
"""Set up a new ModbusMapper instance for each test."""
self.mock_device = {
"name": "test_device",
"littlewordendian": False,
}
self.mapper = ModbusMapper(self.mock_device)

def test_on_change_true_and_value_changes(self):
register_def = {
"number": 100,
"startbit": 0,
"nobits": 16,
"signed": False,
"on_change": True,
"measurementmapping": {"templatestring": '{"temp": %%}'},
}

# First poll: Should always publish
messages1 = self.mapper.map_register(
read_register=[123], register_def=register_def
)
self.assertEqual(len(messages1), 1, "Should publish on first poll")
data1 = json.loads(messages1[0].data)
self.assertAlmostEqual(data1["temp"], 123.0)

# Second poll with a different value: Should publish
messages2 = self.mapper.map_register(
read_register=[456], register_def=register_def
)
self.assertEqual(len(messages2), 1, "Should publish when value changes")
data2 = json.loads(messages2[0].data)
self.assertAlmostEqual(data2["temp"], 456.0)

def test_on_change_true_and_value_is_same(self):
register_def = {
"number": 100,
"startbit": 0,
"nobits": 16,
"signed": False,
"on_change": True,
"measurementmapping": {"templatestring": '{"temp": %%}'},
}

# First poll: should publish
messages1 = self.mapper.map_register(
read_register=[123], register_def=register_def
)
self.assertEqual(len(messages1), 1, "Should publish on first poll")

# Second poll with the same value: should NOT publish
messages2 = self.mapper.map_register(
read_register=[123], register_def=register_def
)
self.assertEqual(
len(messages2), 0, "Should not publish when value is unchanged"
)

def test_on_change_false_and_value_is_same(self):
register_def = {
"number": 100,
"startbit": 0,
"nobits": 16,
"signed": False,
"on_change": False,
"measurementmapping": {"templatestring": '{"temp": %%}'},
}

messages1 = self.mapper.map_register(
read_register=[123], register_def=register_def
)
self.assertEqual(len(messages1), 1)

messages2 = self.mapper.map_register(
read_register=[123], register_def=register_def
)
self.assertEqual(
len(messages2), 1, "Should always publish when on_change is false"
)

def test_on_change_not_present_and_value_is_same(self):
register_def = {
"number": 100,
"startbit": 0,
"nobits": 16,
"signed": False,
"measurementmapping": {"templatestring": '{"temp": %%}'},
}

messages1 = self.mapper.map_register(
read_register=[123], register_def=register_def
)
self.assertEqual(len(messages1), 1)

messages2 = self.mapper.map_register(
read_register=[123], register_def=register_def
)
self.assertEqual(
len(messages2), 1, "Should default to on_change=false and always publish"
)

def test_on_change_with_float_values(self):
def float_to_regs(f_val):
packed = struct.pack(">f", f_val)
return struct.unpack(">HH", packed)

register_def = {
"number": 102,
"startbit": 0,
"nobits": 32,
"signed": True,
"on_change": True,
"datatype": "float",
"measurementmapping": {"templatestring": '{"voltage": %%}'},
}

# First poll with 123.45
messages1 = self.mapper.map_register(
read_register=list(float_to_regs(123.45)), register_def=register_def
)
self.assertEqual(
len(messages1), 1, "Should publish on first poll for float value"
)
data1 = json.loads(messages1[0].data)
self.assertAlmostEqual(data1["voltage"], 123.45, places=5)

# Second poll, same value
messages2 = self.mapper.map_register(
read_register=list(float_to_regs(123.45)), register_def=register_def
)
self.assertEqual(
len(messages2), 0, "Should not publish when float value is the same"
)

# Third poll, very close value (should be considered the same by math.isclose)
close_value = 123.45 + 1e-8
messages3 = self.mapper.map_register(
read_register=list(float_to_regs(close_value)), register_def=register_def
)
self.assertEqual(
len(messages3), 0, "Should not publish for very close float values"
)

# Fourth poll, different value
messages4 = self.mapper.map_register(
read_register=list(float_to_regs(125.0)), register_def=register_def
)
self.assertEqual(
len(messages4), 1, "Should publish when float value changes significantly"
)
data4 = json.loads(messages4[0].data)
self.assertAlmostEqual(data4["voltage"], 125.0)
82 changes: 82 additions & 0 deletions tests/unit/test_reader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import os
import sys

parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "../.."))
sys.path.insert(0, parent_dir)
import unittest
from unittest.mock import patch, MagicMock
from tedge_modbus.reader.reader import ModbusPoll


class TestReaderPollingInterval(unittest.TestCase):

@patch("tedge_modbus.reader.reader.ModbusPoll.read_base_definition")
@patch("tedge_modbus.reader.reader.ModbusPoll.read_device_definition")
def setUp(self, mock_read_device, mock_read_base):
"""Set up a ModbusPoll instance with mocked file reading."""
# Mock config to prevent errors during initialization
mock_read_base.return_value = {"thinedge": {}, "modbus": {}}
mock_read_device.return_value = {}

self.poll = ModbusPoll(config_dir="/tmp/mock_config")
# Replace the real scheduler with a mock object for testing
self.poll.poll_scheduler = MagicMock()

def test_uses_device_specific_poll_interval(self):
"""
GIVEN a device has a specific poll_interval
WHEN the poller schedules the next poll for that device
THEN it should use the device's interval.
"""
# GIVEN a global poll interval
self.poll.base_config = {"modbus": {"pollinterval": 5}}
# AND a device with its own poll_interval
device_config = {
"name": "fast_poller",
"poll_interval": 1, # This should be used
}

mock_poll_model = MagicMock()
mock_mapper = MagicMock()

# WHEN poll_device is called
# We patch get_data_from_device to avoid real network calls
with patch.object(
self.poll,
"get_data_from_device",
return_value=(None, None, None, None, None),
):
self.poll.poll_device(device_config, mock_poll_model, mock_mapper)

# THEN the scheduler should be called with the device's interval
self.poll.poll_scheduler.enter.assert_called_once()
call_args, _ = self.poll.poll_scheduler.enter.call_args
# The first argument to enter() is the delay
self.assertEqual(call_args[0], 1)

def test_uses_global_poll_interval_as_fallback(self):
"""
GIVEN a device does NOT have a specific poll_interval
WHEN the poller schedules the next poll for that device
THEN it should use the global pollinterval.
"""
# GIVEN a global poll interval
self.poll.base_config = {"modbus": {"pollinterval": 5}} # This should be used
# AND a device without its own poll_interval
device_config = {"name": "normal_poller"}

mock_poll_model = MagicMock()
mock_mapper = MagicMock()

# WHEN poll_device is called
with patch.object(
self.poll,
"get_data_from_device",
return_value=(None, None, None, None, None),
):
self.poll.poll_device(device_config, mock_poll_model, mock_mapper)

# THEN the scheduler should be called with the global interval
self.poll.poll_scheduler.enter.assert_called_once()
call_args, _ = self.poll.poll_scheduler.enter.call_args
self.assertEqual(call_args[0], 5)