Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,11 @@ This includes the basic configuration for the plugin such as poll rate and the c
- poll rate
- connection to thin-edge.io (MQTT broker needs to match the one of tedge)
- log level (e.g. INFO, WARN, ERROR)
- measurement combination (opt-in feature to reduce the amount of created measurements in the cloud)

### devices.toml

This file includes the information for the connection(s) to the Modbus server(s) and how the Modbus Registers and Coils map to thin-edge’s Measurements, Events and Alarms.
This file includes the information for the connection(s) to the Modbus server(s) and how the Modbus Registers and Coils map to thin-edge’s Measurements, Events and Alarms. It's also possible to overwrite the measurement combination on a device level and on every single measurement mapping.

The device config can be managed via Cumulocity IoT or created with the Cloud Fieldbus operations.

Expand Down
2 changes: 2 additions & 0 deletions config/devices.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ port=502 # Modbus server Port
protocol="TCP" # only support for TCP now
littlewordendian=false
#pollinterval=1 # Overrides global setting; device publishes at this interval
#combinemeasurements=true # Overrides global setting; Combines all measurements of a device to reduce the number of created measurements in the cloud


[[device.registers]]
Expand All @@ -20,6 +21,7 @@ decimalshiftright=0
input=false # true = Input Register false = Holding Register
#see https://thin-edge.github.io/thin-edge.io/html/architecture/thin-edge-json.html
measurementmapping.templatestring="{\"Test\":{\"Int16\":%% }}" # tedge JSON format string, %% will be replaced with the calculated value
#measurementmapping.combinemeasurements=true # Overrides device setting; Combines all measurements of a device to reduce the number of created measurements in the cloud
#on_change=true # Send data only on value change

[[device.registers]]
Expand Down
1 change: 1 addition & 0 deletions config/modbus.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
[modbus]
pollinterval=2
loglevel="INFO"
#combinemeasurements=true # if not set equals false; combines all measurements of a device to reduce the number of created measurements in the cloud

[thinedge]
mqtthost="127.0.0.1"
Expand Down
50 changes: 42 additions & 8 deletions tedge_modbus/reader/mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,28 @@ class MappedMessage:
data: str = ""
topic: str = ""

def extend_data(self, other_message):
"""Combine Json data of two messages with the same topic"""
if self.topic != other_message.topic:
raise ValueError("Messages need to have the same topic")

def merge(d1: dict, d2: dict) -> dict:
"""Recursively merge two dictionaries."""
for k, v in d2.items():
if k in d1 and isinstance(d1[k], dict) and isinstance(v, dict):
d1[k] = merge(d1[k], v)
else:
d1[k] = v
return d1

# Load both JSON strings into dictionaries
d1 = json.loads(self.data)
d2 = json.loads(other_message.data)
# Merge the dictionaries
merged = merge(d1, d2)
# Convert the merged dictionary back to a JSON string and update self.data
self.data = json.dumps(merged)


class ModbusMapper:
"""Modbus mapper"""
Expand Down Expand Up @@ -60,10 +82,13 @@ def parse_float(self, buffer, field_len):
formats[field_len], buffer.to_bytes(int(field_len / 8), sys.byteorder)
)[0]

def map_register(self, read_register, register_def):
def map_register(
self, read_register, register_def, device_combine_measurements=False
):
"""Map register"""
# pylint: disable=too-many-locals
messages = []
separate_measurement = None
start_bit = register_def["startbit"]
field_len = register_def["nobits"]
is_little_endian = register_def.get("littleendian") or False
Expand Down Expand Up @@ -119,15 +144,24 @@ def map_register(self, read_register, register_def):
data = register_def["measurementmapping"]["templatestring"].replace(
"%%", str(scaled_value)
)
messages.append(
MappedMessage(
if register_def["measurementmapping"].get(
"combinemeasurements", device_combine_measurements
):
separate_measurement = MappedMessage(
data,
topics["measurement"].replace(
"CHILD_ID", self.device.get("name")
),
)
)
self.data.setdefault(register_type, {})[register_key] = scaled_value
else:
messages.append(
MappedMessage(
data,
topics["measurement"].replace(
"CHILD_ID", self.device.get("name")
),
)
)

value = scaled_value
if register_def.get("alarmmapping") is not None:
Expand All @@ -143,9 +177,9 @@ def map_register(self, read_register, register_def):
)
)

if register_def.get("measurementmapping") is None:
self.data.setdefault(register_type, {})[register_key] = value
return messages
self.data.setdefault(register_type, {})[register_key] = value

return messages, separate_measurement

def map_coil(self, bits, coil_definition):
"""Map coil"""
Expand Down
20 changes: 19 additions & 1 deletion tedge_modbus/reader/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,11 @@ def poll_device(self, device, poll_model, mapper):
ir_result,
error,
) = self.get_data_from_device(device, poll_model)
device_combine_measurements = device.get(
"combinemeasurements",
self.base_config["modbus"].get("combinemeasurements", False),
)
combined_measuerement = None
if error is None:
# handle all Registers
if device.get("registers") is not None:
Expand All @@ -231,12 +236,25 @@ def poll_device(self, device, poll_model, mapper):
result = self.read_register(
hr_results, address=register_number, count=num_registers
)
msgs = mapper.map_register(result, register_definition)
msgs, temp = mapper.map_register(
result, register_definition, device_combine_measurements
)
if combined_measuerement is not None and temp is not None:
combined_measuerement.extend_data(temp)
elif temp is not None:
combined_measuerement = temp
for msg in msgs:
self.send_tedge_message(msg)
except Exception as e:
self.logger.error("Failed to map register: %s", e)

# send combined measurement if any
try:
if combined_measuerement is not None:
self.send_tedge_message(combined_measuerement)
except Exception as e:
self.logger.error("Failed to send combined measurement: %s", e)

# all Coils
if device.get("coils") is not None:
for coil_def in device["coils"]:
Expand Down
86 changes: 86 additions & 0 deletions tests/unit/test_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,3 +166,89 @@ def float_to_regs(f_val):
)
data4 = json.loads(messages4[0].data)
self.assertAlmostEqual(data4["voltage"], 125.0)

def test_separate_measurements(self):
register_def = {
"number": 100,
"startbit": 0,
"nobits": 16,
"signed": False,
"on_change": False,
"measurementmapping": {"templatestring": '{"temp": %%}'},
"alarmmapping": {
"severity": "MAJOR",
"text": "This alarm should be created once",
"type": "TestAlarm",
},
"eventmapping": {
"text": "This event should be created once",
"type": "TestEvent",
},
}

# First poll: Check if all Messagetypes get send out
messages1 = self.mapper.map_register(
read_register=[1], register_def=register_def
)
topics1 = [message.topic for message in messages1]
self.assertTrue("te/device/test_device///m/" in topics1)
self.assertTrue("te/device/test_device///e/" in topics1)
self.assertTrue("te/device/test_device///a/" in topics1)

self.mapper.map_register(read_register=[0], register_def=register_def)
# Second poll: Check if Measurements get separated
messages2, measurement3 = self.mapper.map_register(
read_register=[1], register_def=register_def, separate_measurement=True
)
topics2 = [message.topic for message in messages2]
self.assertTrue("te/device/test_device///m/" not in topics2)
self.assertTrue("te/device/test_device///e/" in topics2)
self.assertTrue("te/device/test_device///a/" in topics2)

self.assertTrue("te/device/test_device///m/" in measurement3.topic)
self.assertTrue("te/device/test_device///e/" not in measurement3.topic)
self.assertTrue("te/device/test_device///a/" not in measurement3.topic)

def test_combine_messages(self):
register_def1 = {
"number": 100,
"startbit": 0,
"nobits": 16,
"signed": False,
"on_change": False,
"measurementmapping": {"templatestring": '{"sensor1":{"temp":%% }}'},
}
register_def2 = {
"number": 100,
"startbit": 0,
"nobits": 16,
"signed": False,
"on_change": False,
"measurementmapping": {"templatestring": '{"sensor1":{"RH":%% }}'},
}
register_def3 = {
"number": 100,
"startbit": 0,
"nobits": 16,
"signed": False,
"on_change": False,
"measurementmapping": {"templatestring": '{"sensor2":{"temp":%% }}'},
}

_, measurement1 = self.mapper.map_register(
read_register=[25], register_def=register_def1, separate_measurement=True
)
_, measurement2 = self.mapper.map_register(
read_register=[43], register_def=register_def2, separate_measurement=True
)
_, measurement3 = self.mapper.map_register(
read_register=[21], register_def=register_def3, separate_measurement=True
)

measurement1.extend_data(measurement2)
measurement1.extend_data(measurement3)

data = json.loads(measurement1.data)
self.assertAlmostEqual(data["sensor1"]["temp"], 25.0)
self.assertAlmostEqual(data["sensor1"]["RH"], 43.0)
self.assertAlmostEqual(data["sensor2"]["temp"], 21.0)
16 changes: 13 additions & 3 deletions tests/unit/test_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,16 @@ def setUp(self, mock_read_device, mock_read_base):

def test_uses_device_specific_poll_interval(self):
"""
GIVEN a device has a specific poll_interval
GIVEN a device has a specific pollinterval
WHEN the poller schedules the next poll for that device
THEN it should use the device's interval.
"""
# GIVEN a global poll interval
self.poll.base_config = {"modbus": {"pollinterval": 5}}
# AND a device with its own poll_interval
# AND a device with its own pollinterval
device_config = {
"name": "fast_poller",
"poll_interval": 1, # This should be used
"pollinterval": 1, # This should be used
}

mock_poll_model = MagicMock()
Expand Down Expand Up @@ -80,3 +80,13 @@ def test_uses_global_poll_interval_as_fallback(self):
self.poll.poll_scheduler.enter.assert_called_once()
call_args, _ = self.poll.poll_scheduler.enter.call_args
self.assertEqual(call_args[0], 5)

# Todo: Implement the following tests
def test_defaults_to_no_measurement_combination(self):
pass

def test_global_measurement_combination(self):
pass

def test_device_specific_measurement_combination(self):
pass