Skip to content

Commit 94058ff

Browse files
committed
Use the te/+/+/+/+/m/+/meta topics to set measurement units
Signed-off-by: Didier Wenzek <[email protected]>
1 parent 92af14d commit 94058ff

File tree

6 files changed

+84
-15
lines changed

6 files changed

+84
-15
lines changed

crates/common/tedge_config/src/tedge_toml/tedge_config.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -303,8 +303,8 @@ define_tedge_config! {
303303
mqtt: HostPort<MQTT_TLS_PORT>,
304304

305305
/// Set of MQTT topics the Cumulocity mapper should subscribe to
306-
#[tedge_config(example = "te/+/+/+/+/a/+,te/+/+/+/+/m/+,te/+/+/+/+/e/+")]
307-
#[tedge_config(default(value = "te/+/+/+/+,te/+/+/+/+/twin/+,te/+/+/+/+/m/+,te/+/+/+/+/e/+,te/+/+/+/+/a/+,te/+/+/+/+/status/health"))]
306+
#[tedge_config(example = "te/+/+/+/+/a/+,te/+/+/+/+/m/+,te/+/+/+/+/m/+/meta,te/+/+/+/+/e/+")]
307+
#[tedge_config(default(value = "te/+/+/+/+,te/+/+/+/+/twin/+,te/+/+/+/+/m/+,te/+/+/+/+/m/+/meta,te/+/+/+/+/e/+,te/+/+/+/+/a/+,te/+/+/+/+/status/health"))]
308308
topics: TemplatesSet,
309309

310310
enable: {

crates/extensions/c8y_mapper_ext/src/actor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ impl C8yMapperActor {
177177
}
178178
}
179179
} else {
180-
self.process_message(message.clone()).await?;
180+
self.process_message(message).await?;
181181
}
182182
} else {
183183
self.convert_and_publish(&message).await?;

crates/extensions/c8y_mapper_ext/src/converter.rs

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use crate::entity_cache::UpdateOutcome;
1212
use crate::error::ConversionError;
1313
use crate::error::MessageConversionError;
1414
use crate::json;
15+
use crate::json::Units;
1516
use crate::operations;
1617
use crate::operations::OperationHandler;
1718
use crate::supported_operations::operation::get_child_ops;
@@ -190,6 +191,8 @@ pub struct CumulocityConverter {
190191

191192
supported_operations: SupportedOperations,
192193
pub operation_handler: OperationHandler,
194+
195+
units: Units,
193196
}
194197

195198
impl CumulocityConverter {
@@ -277,6 +280,7 @@ impl CumulocityConverter {
277280
recently_completed_commands: HashMap::new(),
278281
active_commands_last_cleared: Instant::now(),
279282
operation_handler,
283+
units: Units::default(),
280284
})
281285
}
282286

@@ -470,8 +474,9 @@ impl CumulocityConverter {
470474

471475
if let Some(entity) = self.entity_cache.get(source) {
472476
// Need to check if the input Thin Edge JSON is valid before adding a child ID to list
477+
let units = self.get_measurement_type_units(source, measurement_type);
473478
let c8y_json_payload =
474-
json::from_thin_edge_json(input.payload_str()?, entity, measurement_type, None)?;
479+
json::from_thin_edge_json(input.payload_str()?, entity, measurement_type, units)?;
475480

476481
if c8y_json_payload.len() < self.size_threshold.0 {
477482
mqtt_messages.push(MqttMessage::new(
@@ -1163,6 +1168,29 @@ impl CumulocityConverter {
11631168
self.active_commands.contains_key(cmd_id)
11641169
|| self.recently_completed_commands.contains_key(cmd_id)
11651170
}
1171+
1172+
fn get_measurement_type_units(
1173+
&self,
1174+
source: &EntityTopicId,
1175+
measurement_type: &str,
1176+
) -> Option<&Units> {
1177+
let group = format!("{source}/{measurement_type}");
1178+
self.units.get_group_units(&group)
1179+
}
1180+
1181+
fn set_measurement_type_units(
1182+
&mut self,
1183+
source: &EntityTopicId,
1184+
measurement_type: &str,
1185+
message: &MqttMessage,
1186+
) {
1187+
if let Ok(payload) = message.payload_str() {
1188+
if let Ok(meta) = serde_json::from_str(payload) {
1189+
let group = format!("{source}/{measurement_type}");
1190+
self.units.set_group_units(group, meta)
1191+
}
1192+
}
1193+
}
11661194
}
11671195

11681196
#[derive(Error, Debug)]
@@ -1205,6 +1233,10 @@ impl CumulocityConverter {
12051233
) -> Result<Vec<MqttMessage>, ConversionError> {
12061234
match &channel {
12071235
Channel::EntityMetadata => self.try_convert_entity_registration(source, message),
1236+
Channel::MeasurementMetadata { measurement_type } => {
1237+
self.set_measurement_type_units(&source, measurement_type, message);
1238+
Ok(vec![])
1239+
}
12081240
_ => {
12091241
self.try_convert_data_message(source, channel, message)
12101242
.await

crates/extensions/c8y_mapper_ext/src/json.rs

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ fn from_thin_edge_json_with_timestamp(
6262
#[derive(Default)]
6363
pub struct Units {
6464
units: HashMap<String, String>,
65-
groups: HashMap<String, Units>,
65+
group_units: HashMap<String, Units>,
6666
}
6767

6868
impl Units {
@@ -72,13 +72,13 @@ impl Units {
7272
pub fn new() -> Units {
7373
Units {
7474
units: HashMap::new(),
75-
groups: HashMap::new(),
75+
group_units: HashMap::new(),
7676
}
7777
}
7878

7979
/// True if no units are actually defined
8080
pub fn is_empty(&self) -> bool {
81-
self.units.is_empty() && self.groups.is_empty()
81+
self.units.is_empty() && self.group_units.is_empty()
8282
}
8383

8484
/// Measurement units as defined by metadata published on a measurement topic
@@ -93,12 +93,11 @@ impl Units {
9393
}
9494

9595
pub fn set_unit(&mut self, measurement: String, meta: serde_json::Value) {
96-
if let serde_json::Value::String(unit) = meta {
97-
// "Temperature": "°C"
98-
self.units.insert(measurement, unit);
99-
} else if let Some(unit) = meta.get("unit") {
96+
if let Some(unit) = meta.get("unit") {
10097
// "Temperature": {"unit": "°C"},
101-
self.set_unit(measurement, unit.clone());
98+
if let serde_json::Value::String(unit_name) = unit {
99+
self.units.insert(measurement, unit_name.to_owned());
100+
}
102101
} else {
103102
// "Climate": { "Temperature": {"unit": "°C"}, "Humidity": {"unit": "%RH"} }
104103
let group = measurement;
@@ -109,7 +108,7 @@ impl Units {
109108
pub fn set_group_units(&mut self, group: String, meta: serde_json::Value) {
110109
let units = Units::from_metadata(meta);
111110
if !units.is_empty() {
112-
self.groups.insert(group, units);
111+
self.group_units.insert(group, units);
113112
}
114113
}
115114

@@ -120,7 +119,7 @@ impl Units {
120119

121120
/// Retrieve the units to be used for a measurement group, if any
122121
pub fn get_group_units(&self, group: &str) -> Option<&Units> {
123-
self.groups.get(group)
122+
self.group_units.get(group)
124123
}
125124
}
126125

tests/RobotFramework/tests/cumulocity/telemetry/thin-edge_device_telemetry.robot

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,44 @@ Thin-edge devices support sending simple measurements
2020
... series=temperature
2121
Log ${measurements}
2222

23+
Thin-edge devices support sending simple measurements with units
24+
# Using a test-specific measurement is required to not interfer with other tests
25+
Execute Command tedge mqtt pub -r te/device/main///m/t1/meta '{ "temperature": { "unit": "°C" } }'
26+
Execute Command tedge mqtt pub te/device/main///m/t1 '{ "temperature": 25.111 }'
27+
Execute Command tedge mqtt pub te/device/main///m/t1 '{ "temperature": 25.222 }'
28+
${measurements}= Device Should Have Measurements
29+
... minimum=2
30+
... maximum=2
31+
... type=t1
32+
... value=temperature
33+
... series=temperature
34+
Log ${measurements}
35+
Should Be Equal As Numbers ${measurements[0]["temperature"]["temperature"]["value"]} 25.111
36+
Should Be Equal ${measurements[0]["temperature"]["temperature"]["unit"]} °C
37+
Should Be Equal As Numbers ${measurements[1]["temperature"]["temperature"]["value"]} 25.222
38+
Should Be Equal ${measurements[1]["temperature"]["temperature"]["unit"]} °C
39+
40+
Thin-edge devices support sending complex measurements with units
41+
# Using a test-specific measurement is required to not interfer with other tests
42+
Execute Command tedge mqtt pub -r te/device/main///m/t2/meta '{ "Climate": { "Temperature": { "unit": "°C" }, "Pressure": { "unit": "bar" }}}'
43+
Execute Command tedge mqtt pub te/device/main///m/t2 '{ "Climate": { "Temperature": 19.42, "Pressure": 1.013 }, "Engine": { "Temperature": 350.42, "Pressure": 321.0 }}'
44+
${measurements}= Device Should Have Measurements
45+
... minimum=1
46+
... maximum=1
47+
... type=t2
48+
... value=Climate
49+
... series=Temperature
50+
Log ${measurements}
51+
Should Be Equal As Numbers ${measurements[0]["Climate"]["Temperature"]["value"]} 19.42
52+
Should Contain ${measurements[0]["Climate"]["Temperature"]} unit
53+
Should Be Equal ${measurements[0]["Climate"]["Temperature"]["unit"]} °C
54+
Should Be Equal As Numbers ${measurements[0]["Climate"]["Pressure"]["value"]} 1.013
55+
Should Be Equal ${measurements[0]["Climate"]["Pressure"]["unit"]} bar
56+
Should Be Equal As Numbers ${measurements[0]["Engine"]["Temperature"]["value"]} 350.42
57+
Should Not Contain ${measurements[0]["Engine"]["Temperature"]} unit
58+
Should Be Equal As Numbers ${measurements[0]["Engine"]["Pressure"]["value"]} 321.0
59+
Should Not Contain ${measurements[0]["Engine"]["Pressure"]} unit
60+
2361
Thin-edge devices support sending simple measurements with custom type
2462
Execute Command tedge mqtt pub te/device/main///m/ '{ "type":"CustomType", "temperature": 25 }'
2563
${measurements}= Device Should Have Measurements

tests/RobotFramework/tests/tedge/call_tedge_config_list.robot

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ set/unset c8y.topics
9797
${unset} Execute Command tedge config list
9898
Should Contain
9999
... ${unset}
100-
... c8y.topics=["te/+/+/+/+", "te/+/+/+/+/twin/+", "te/+/+/+/+/m/+", "te/+/+/+/+/e/+", "te/+/+/+/+/a/+", "te/+/+/+/+/status/health"]
100+
... c8y.topics=["te/+/+/+/+", "te/+/+/+/+/twin/+", "te/+/+/+/+/m/+", "te/+/+/+/+/m/+/meta", "te/+/+/+/+/e/+", "te/+/+/+/+/a/+", "te/+/+/+/+/status/health"]
101101

102102
set/unset c8y.proxy.bind.address
103103
Execute Command sudo tedge config set c8y.proxy.bind.address 127.1.1.1 # Changing c8y.proxy.bind.address

0 commit comments

Comments
 (0)