Skip to content

Commit 1b6c3fd

Browse files
committed
refactor(c8y_ModbusDevice): use dataclass to represent modbus target device
1 parent fd1ecfa commit 1b6c3fd

File tree

1 file changed

+51
-36
lines changed

1 file changed

+51
-36
lines changed

tedge_modbus/operations/c8y_modbus_device.py

Lines changed: 51 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
"""Cumulocity IoT Modbus device operation handler
33
"""
44
import logging
5+
from dataclasses import dataclass
56
import requests
67
import toml
78

@@ -15,34 +16,36 @@
1516
)
1617

1718

18-
def update_or_create_device_mapping(
19-
mapping, child_name, modbus_address, modbus_server, modbus_type, new_mapping
20-
):
19+
@dataclass
20+
class ModebusDevice:
21+
"""Modbus device details"""
22+
23+
modbus_type: str
24+
modbus_address: str
25+
child_name: str
26+
modbus_server: str
27+
device_id: str
28+
mapping_path: str
29+
30+
31+
def update_or_create_device_mapping(target: ModebusDevice, mapping, new_mapping):
2132
"""Update or create device mapping"""
2233
devices = mapping.setdefault("device", [])
2334
for i, device in enumerate(devices):
24-
if device.get("name") == child_name:
25-
devices[i] = get_device_from_mapping(
26-
child_name, modbus_address, modbus_server, modbus_type, new_mapping
27-
)
35+
if device.get("name") == target.child_name:
36+
devices[i] = get_device_from_mapping(target, new_mapping)
2837
return
29-
devices.append(
30-
get_device_from_mapping(
31-
child_name, modbus_address, modbus_server, modbus_type, new_mapping
32-
)
33-
)
38+
devices.append(get_device_from_mapping(target, new_mapping))
3439

3540

36-
def get_device_from_mapping(
37-
child_name, modbus_address, modbus_server, modbus_type, mapping
38-
):
41+
def get_device_from_mapping(target: ModebusDevice, mapping):
3942
"""Get a device from a given mapping definition"""
4043
device = {}
41-
device["name"] = child_name
42-
device["address"] = int(modbus_address)
43-
device["ip"] = modbus_server
44+
device["name"] = target.child_name
45+
device["address"] = int(target.modbus_address)
46+
device["ip"] = target.modbus_server
4447
device["port"] = 502
45-
device["protocol"] = modbus_type
48+
device["protocol"] = target.modbus_type
4649
device["littlewordendian"] = True
4750

4851
# Registers
@@ -71,6 +74,18 @@ def get_device_from_mapping(
7174
return device
7275

7376

77+
def parse_arguments(arguments) -> ModebusDevice:
78+
"""Parse operation arguments"""
79+
return ModebusDevice(
80+
modbus_type=arguments[2], # Only works for TCP.
81+
modbus_address=arguments[3],
82+
child_name=arguments[4],
83+
modbus_server=arguments[5],
84+
device_id=arguments[6],
85+
mapping_path=arguments[7],
86+
)
87+
88+
7489
def run(arguments, context: Context):
7590
"""main"""
7691
logger.info("New c8y_ModbusDevice operation")
@@ -82,44 +97,41 @@ def run(arguments, context: Context):
8297
+ "."
8398
)
8499
config_path = context.config_dir / "devices.toml"
85-
modbus_type = arguments[2] # Only works for TCP.
86-
modbus_address = arguments[3]
87-
child_name = arguments[4]
88-
modbus_server = arguments[5]
89-
device_id = arguments[6]
90-
mapping_path = arguments[7]
100+
target = parse_arguments(arguments)
91101

92102
# Fail if modbus_type is not TCP
93-
if modbus_type != "TCP":
94-
raise ValueError("Expected modbus_type to be TCP. Got " + modbus_type + ".")
103+
if target.modbus_type != "TCP":
104+
raise ValueError(
105+
"Expected modbus_type to be TCP. Got " + target.modbus_type + "."
106+
)
95107

96108
# Update external id of child device
97-
logger.debug("Create external id for child device %s", device_id)
98-
url = f"{context.c8y_proxy}/identity/globalIds/{device_id}/externalIds"
109+
logger.debug("Create external id for child device %s", target.device_id)
110+
url = f"{context.c8y_proxy}/identity/globalIds/{target.device_id}/externalIds"
99111
data = {
100-
"externalId": f"{context.device_id}:device:{child_name}",
112+
"externalId": f"{context.device_id}:device:{target.child_name}",
101113
"type": "c8y_Serial",
102114
}
103115
response = requests.post(url, json=data, timeout=60)
104116
if response.status_code != 201:
105117
raise ValueError(
106-
f"Error creating external id for child device with id {device_id}. "
118+
f"Error creating external id for child device with id {target.device_id}. "
107119
f"Got response {response.status_code} from {url}. Expected 201."
108120
)
109121
logger.info(
110122
"Created external id for child device with id %s to %s",
111-
device_id,
123+
target.device_id,
112124
data["externalId"],
113125
)
114126

115127
# Get the mapping json via rest
116-
url = f"{context.c8y_proxy}{mapping_path}"
128+
url = f"{context.c8y_proxy}{target.mapping_path}"
117129
logger.debug("Getting mapping json from %s", url)
118130
response = requests.get(url, timeout=60)
119131
logger.info("Got mapping json from %s with response %d", url, response.status_code)
120132
if response.status_code != 200:
121133
raise ValueError(
122-
f"Error getting mapping at {mapping_path}. "
134+
f"Error getting mapping at {target.mapping_path}. "
123135
f"Got response {response.status_code} from {url}. Expected 200."
124136
)
125137
new_mapping = response.json()
@@ -131,10 +143,13 @@ def run(arguments, context: Context):
131143

132144
# Update or create device data for the device with the same childName
133145
logger.debug(
134-
"Updating or creating device data for device with childName %s", child_name
146+
"Updating or creating device data for device with childName %s",
147+
target.child_name,
135148
)
136149
update_or_create_device_mapping(
137-
mapping, child_name, modbus_address, modbus_server, modbus_type, new_mapping
150+
target,
151+
mapping,
152+
new_mapping,
138153
)
139154

140155
logger.debug("Created mapping toml: %s", mapping)

0 commit comments

Comments
 (0)