Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions tedge_modbus/reader/mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@

topics = {
"measurement": "te/device/CHILD_ID///m/",
"event": "te/device/CHILD_ID///e/",
"alarm": "te/device/CHILD_ID///a/",
"event": "te/device/CHILD_ID///e/TYPE",
"alarm": "te/device/CHILD_ID///a/TYPE",
}


Expand Down Expand Up @@ -215,13 +215,16 @@ def check_alarm(self, value, alarm_mapping, register_type, register_key):
# raise alarm if bit is 1
if (old_data is None or old_data == 0) and value > 0:
severity = alarm_mapping["severity"].lower()
alarm_type = alarm_mapping["type"]
alarm_type = alarm_mapping.get("type", "")
text = alarm_mapping["text"]
topic = topics["alarm"]
topic = topic.replace("CHILD_ID", self.device.get("name"))
topic = topic.replace("SEVERITY", severity)
topic = topic.replace("TYPE", alarm_type)
data = {"text": text, "time": datetime.now(timezone.utc).isoformat()}
data = {
"text": text,
"severity": severity,
"time": datetime.now(timezone.utc).isoformat(),
}
messages.append(MappedMessage(json.dumps(data), topic))
return messages

Expand All @@ -231,7 +234,7 @@ def check_event(self, value, event_mapping, register_type, register_key):
old_data = self.data.get(register_type).get(register_key)
# raise event if value changed
if old_data is None or old_data != value:
eventtype = event_mapping["type"]
eventtype = event_mapping.get("type", "")
text = event_mapping["text"]
topic = topics["event"]
topic = topic.replace("CHILD_ID", self.device.get("name"))
Expand Down
156 changes: 122 additions & 34 deletions tests/unit/test_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,19 @@ def test_on_change_true_and_value_changes(self):
}

# First poll: Should always publish
messages1 = self.mapper.map_register(
messages1, _ = self.mapper.map_register(
read_register=[123], register_def=register_def
)
self.assertEqual(len(messages1), 1, "Should publish on first poll")
data1 = json.loads(messages1[0].data)
self.assertAlmostEqual(data1["temp"], 123.0)

# Second poll with a different value: Should publish
messages2 = self.mapper.map_register(
messages, _ = self.mapper.map_register(
read_register=[456], register_def=register_def
)
self.assertEqual(len(messages2), 1, "Should publish when value changes")
data2 = json.loads(messages2[0].data)
self.assertEqual(len(messages), 1, "Should publish when value changes")
data2 = json.loads(messages[0].data)
self.assertAlmostEqual(data2["temp"], 456.0)

def test_on_change_true_and_value_is_same(self):
Expand All @@ -59,18 +59,16 @@ def test_on_change_true_and_value_is_same(self):
}

# First poll: should publish
messages1 = self.mapper.map_register(
messages1, _ = self.mapper.map_register(
read_register=[123], register_def=register_def
)
self.assertEqual(len(messages1), 1, "Should publish on first poll")

# Second poll with the same value: should NOT publish
messages2 = self.mapper.map_register(
messages, _ = self.mapper.map_register(
read_register=[123], register_def=register_def
)
self.assertEqual(
len(messages2), 0, "Should not publish when value is unchanged"
)
self.assertEqual(len(messages), 0, "Should not publish when value is unchanged")

def test_on_change_false_and_value_is_same(self):
register_def = {
Expand All @@ -82,16 +80,16 @@ def test_on_change_false_and_value_is_same(self):
"measurementmapping": {"templatestring": '{"temp": %%}'},
}

messages1 = self.mapper.map_register(
messages1, _ = self.mapper.map_register(
read_register=[123], register_def=register_def
)
self.assertEqual(len(messages1), 1)

messages2 = self.mapper.map_register(
messages, _ = self.mapper.map_register(
read_register=[123], register_def=register_def
)
self.assertEqual(
len(messages2), 1, "Should always publish when on_change is false"
len(messages), 1, "Should always publish when on_change is false"
)

def test_on_change_not_present_and_value_is_same(self):
Expand All @@ -103,16 +101,16 @@ def test_on_change_not_present_and_value_is_same(self):
"measurementmapping": {"templatestring": '{"temp": %%}'},
}

messages1 = self.mapper.map_register(
messages1, _ = self.mapper.map_register(
read_register=[123], register_def=register_def
)
self.assertEqual(len(messages1), 1)

messages2 = self.mapper.map_register(
messages, _ = self.mapper.map_register(
read_register=[123], register_def=register_def
)
self.assertEqual(
len(messages2), 1, "Should default to on_change=false and always publish"
len(messages), 1, "Should default to on_change=false and always publish"
)

def test_on_change_with_float_values(self):
Expand All @@ -131,7 +129,7 @@ def float_to_regs(f_val):
}

# First poll with 123.45
messages1 = self.mapper.map_register(
messages1, _ = self.mapper.map_register(
read_register=list(float_to_regs(123.45)), register_def=register_def
)
self.assertEqual(
Expand All @@ -141,24 +139,24 @@ def float_to_regs(f_val):
self.assertAlmostEqual(data1["voltage"], 123.45, places=5)

# Second poll, same value
messages2 = self.mapper.map_register(
messages, _ = self.mapper.map_register(
read_register=list(float_to_regs(123.45)), register_def=register_def
)
self.assertEqual(
len(messages2), 0, "Should not publish when float value is the same"
len(messages), 0, "Should not publish when float value is the same"
)

# Third poll, very close value (should be considered the same by math.isclose)
close_value = 123.45 + 1e-8
messages3 = self.mapper.map_register(
messages3, _ = self.mapper.map_register(
read_register=list(float_to_regs(close_value)), register_def=register_def
)
self.assertEqual(
len(messages3), 0, "Should not publish for very close float values"
)

# Fourth poll, different value
messages4 = self.mapper.map_register(
messages4, _ = self.mapper.map_register(
read_register=list(float_to_regs(125.0)), register_def=register_def
)
self.assertEqual(
Expand Down Expand Up @@ -187,27 +185,29 @@ def test_separate_measurements(self):
}

# First poll: Check if all Messagetypes get send out
messages1 = self.mapper.map_register(
messages1, _ = self.mapper.map_register(
read_register=[1], register_def=register_def
)
topics1 = [message.topic for message in messages1]
self.assertTrue("te/device/test_device///m/" in topics1)
self.assertTrue("te/device/test_device///e/" in topics1)
self.assertTrue("te/device/test_device///a/" in topics1)
self.assertTrue("te/device/test_device///e/TestEvent" in topics1)
self.assertTrue("te/device/test_device///a/TestAlarm" in topics1)

self.mapper.map_register(read_register=[0], register_def=register_def)
# Second poll: Check if Measurements get separated
messages2, measurement3 = self.mapper.map_register(
read_register=[1], register_def=register_def, separate_measurement=True
messages, measurement3 = self.mapper.map_register(
read_register=[1],
register_def=register_def,
device_combine_measurements=True,
)
topics2 = [message.topic for message in messages2]
self.assertTrue("te/device/test_device///m/" not in topics2)
self.assertTrue("te/device/test_device///e/" in topics2)
self.assertTrue("te/device/test_device///a/" in topics2)
topics = [message.topic for message in messages]
self.assertTrue("te/device/test_device///m/" not in topics)
self.assertTrue("te/device/test_device///e/TestEvent" in topics)
self.assertTrue("te/device/test_device///a/TestAlarm" in topics)

self.assertTrue("te/device/test_device///m/" in measurement3.topic)
self.assertTrue("te/device/test_device///e/" not in measurement3.topic)
self.assertTrue("te/device/test_device///a/" not in measurement3.topic)
self.assertTrue("te/device/test_device///e/TestEvent" not in measurement3.topic)
self.assertTrue("te/device/test_device///a/TestAlarm" not in measurement3.topic)

def test_combine_messages(self):
register_def1 = {
Expand Down Expand Up @@ -236,13 +236,19 @@ def test_combine_messages(self):
}

_, measurement1 = self.mapper.map_register(
read_register=[25], register_def=register_def1, separate_measurement=True
read_register=[25],
register_def=register_def1,
device_combine_measurements=True,
)
_, measurement2 = self.mapper.map_register(
read_register=[43], register_def=register_def2, separate_measurement=True
read_register=[43],
register_def=register_def2,
device_combine_measurements=True,
)
_, measurement3 = self.mapper.map_register(
read_register=[21], register_def=register_def3, separate_measurement=True
read_register=[21],
register_def=register_def3,
device_combine_measurements=True,
)

measurement1.extend_data(measurement2)
Expand All @@ -252,3 +258,85 @@ def test_combine_messages(self):
self.assertAlmostEqual(data["sensor1"]["temp"], 25.0)
self.assertAlmostEqual(data["sensor1"]["RH"], 43.0)
self.assertAlmostEqual(data["sensor2"]["temp"], 21.0)

def test_alarm_mapping(self):
register_def = {
"number": 100,
"startbit": 0,
"nobits": 16,
"signed": False,
"alarmmapping": {
"severity": "MAJOR",
"text": "This alarm tests the alarm mapping",
"type": "TestAlarm",
},
}
register_def2 = {
"number": 101,
"startbit": 0,
"nobits": 16,
"signed": False,
"alarmmapping": {
"severity": "MAJOR",
"text": "This alarm tests the alarm mapping",
},
}

messages, _ = self.mapper.map_register(
read_register=[1], register_def=register_def
)
topics = [message.topic for message in messages]
self.assertTrue("te/device/test_device///a/TestAlarm" in topics)
self.assertEqual(len(messages), 1)
alarm_data = json.loads(messages[0].data)
self.assertEqual(alarm_data["severity"], "major")
self.assertEqual(alarm_data["text"], "This alarm tests the alarm mapping")

messages2, _ = self.mapper.map_register(
read_register=[1], register_def=register_def2
)
topics2 = [message.topic for message in messages2]
self.assertTrue("te/device/test_device///a/" in topics2)
self.assertEqual(len(messages2), 1)
alarm_data2 = json.loads(messages2[0].data)
self.assertEqual(alarm_data2["severity"], "major")
self.assertEqual(alarm_data2["text"], "This alarm tests the alarm mapping")

def test_event_mapping(self):
register_def = {
"number": 100,
"startbit": 0,
"nobits": 16,
"signed": False,
"eventmapping": {
"text": "This event tests the event mapping",
"type": "TestEvent",
},
}
register_def2 = {
"number": 101,
"startbit": 0,
"nobits": 16,
"signed": False,
"eventmapping": {
"text": "This event tests the event mapping",
},
}

messages, _ = self.mapper.map_register(
read_register=[1], register_def=register_def
)
topics = [message.topic for message in messages]
self.assertTrue("te/device/test_device///e/TestEvent" in topics)
self.assertEqual(len(messages), 1)
event_data = json.loads(messages[0].data)
self.assertEqual(event_data["text"], "This event tests the event mapping")

messages2, _ = self.mapper.map_register(
read_register=[1], register_def=register_def2
)
topics2 = [message.topic for message in messages2]
self.assertTrue("te/device/test_device///e/" in topics2)
self.assertEqual(len(messages2), 1)
event_data2 = json.loads(messages2[0].data)
self.assertEqual(event_data2["text"], "This event tests the event mapping")