Skip to content

Commit c2fa11a

Browse files
Merge pull request #3776 from didier-wenzek/feat/measurement-units
feat: Define measurement units using topic metadata
2 parents a66e3e0 + c6cd05a commit c2fa11a

File tree

9 files changed

+451
-43
lines changed

9 files changed

+451
-43
lines changed

crates/common/tedge_config/src/tedge_toml/tedge_config.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -303,8 +303,8 @@ define_tedge_config! {
303303
mqtt: HostPort<MQTT_TLS_PORT>,
304304

305305
/// Set of MQTT topics the Cumulocity mapper should subscribe to
306-
#[tedge_config(example = "te/+/+/+/+/a/+,te/+/+/+/+/m/+,te/+/+/+/+/e/+")]
307-
#[tedge_config(default(value = "te/+/+/+/+,te/+/+/+/+/twin/+,te/+/+/+/+/m/+,te/+/+/+/+/e/+,te/+/+/+/+/a/+,te/+/+/+/+/status/health"))]
306+
#[tedge_config(example = "te/+/+/+/+/a/+,te/+/+/+/+/m/+,te/+/+/+/+/m/+/meta,te/+/+/+/+/e/+")]
307+
#[tedge_config(default(value = "te/+/+/+/+,te/+/+/+/+/twin/+,te/+/+/+/+/m/+,te/+/+/+/+/m/+/meta,te/+/+/+/+/e/+,te/+/+/+/+/a/+,te/+/+/+/+/status/health"))]
308308
topics: TemplatesSet,
309309

310310
enable: {

crates/extensions/c8y_mapper_ext/src/actor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ impl C8yMapperActor {
177177
}
178178
}
179179
} else {
180-
self.process_message(message.clone()).await?;
180+
self.process_message(message).await?;
181181
}
182182
} else {
183183
self.convert_and_publish(&message).await?;

crates/extensions/c8y_mapper_ext/src/converter.rs

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use crate::entity_cache::UpdateOutcome;
1212
use crate::error::ConversionError;
1313
use crate::error::MessageConversionError;
1414
use crate::json;
15+
use crate::json::Units;
1516
use crate::operations;
1617
use crate::operations::OperationHandler;
1718
use crate::supported_operations::operation::get_child_ops;
@@ -190,6 +191,8 @@ pub struct CumulocityConverter {
190191

191192
supported_operations: SupportedOperations,
192193
pub operation_handler: OperationHandler,
194+
195+
units: Units,
193196
}
194197

195198
impl CumulocityConverter {
@@ -277,6 +280,7 @@ impl CumulocityConverter {
277280
recently_completed_commands: HashMap::new(),
278281
active_commands_last_cleared: Instant::now(),
279282
operation_handler,
283+
units: Units::default(),
280284
})
281285
}
282286

@@ -470,8 +474,9 @@ impl CumulocityConverter {
470474

471475
if let Some(entity) = self.entity_cache.get(source) {
472476
// Need to check if the input Thin Edge JSON is valid before adding a child ID to list
477+
let units = self.get_measurement_type_units(source, measurement_type);
473478
let c8y_json_payload =
474-
json::from_thin_edge_json(input.payload_str()?, entity, measurement_type)?;
479+
json::from_thin_edge_json(input.payload_str()?, entity, measurement_type, units)?;
475480

476481
if c8y_json_payload.len() < self.size_threshold.0 {
477482
mqtt_messages.push(MqttMessage::new(
@@ -1163,6 +1168,31 @@ impl CumulocityConverter {
11631168
self.active_commands.contains_key(cmd_id)
11641169
|| self.recently_completed_commands.contains_key(cmd_id)
11651170
}
1171+
1172+
fn get_measurement_type_units(
1173+
&self,
1174+
source: &EntityTopicId,
1175+
measurement_type: &str,
1176+
) -> Option<&Units> {
1177+
let group = format!("{source}/{measurement_type}");
1178+
self.units.get_group_units(&group)
1179+
}
1180+
1181+
fn set_measurement_type_units(
1182+
&mut self,
1183+
source: &EntityTopicId,
1184+
measurement_type: &str,
1185+
message: &MqttMessage,
1186+
) {
1187+
if let Ok(payload) = message.payload_str() {
1188+
let group = format!("{source}/{measurement_type}");
1189+
if let Ok(meta) = serde_json::from_str(payload) {
1190+
self.units.set_group_units(group, meta)
1191+
} else if payload.is_empty() {
1192+
self.units.unset_group_units(group)
1193+
}
1194+
}
1195+
}
11661196
}
11671197

11681198
#[derive(Error, Debug)]
@@ -1205,6 +1235,10 @@ impl CumulocityConverter {
12051235
) -> Result<Vec<MqttMessage>, ConversionError> {
12061236
match &channel {
12071237
Channel::EntityMetadata => self.try_convert_entity_registration(source, message),
1238+
Channel::MeasurementMetadata { measurement_type } => {
1239+
self.set_measurement_type_units(&source, measurement_type, message);
1240+
Ok(vec![])
1241+
}
12081242
_ => {
12091243
self.try_convert_data_message(source, channel, message)
12101244
.await

crates/extensions/c8y_mapper_ext/src/json.rs

Lines changed: 158 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,14 @@
1313
//! "pressure": 220
1414
//! }"#;
1515
//! let entity = CloudEntityMetadata::new("foo".into(), EntityMetadata::main_device(None));
16-
//! let output = from_thin_edge_json(single_value_thin_edge_json, &entity,"");
16+
//! let output = from_thin_edge_json(single_value_thin_edge_json, &entity,"",None);
1717
//! ```
1818
1919
use crate::entity_cache::CloudEntityMetadata;
2020
use crate::serializer;
2121
use clock::Clock;
2222
use clock::WallClock;
23+
use std::collections::HashMap;
2324
use tedge_api::measurement::*;
2425
use time::OffsetDateTime;
2526
use time::{self};
@@ -38,9 +39,10 @@ pub fn from_thin_edge_json(
3839
input: &str,
3940
entity: &CloudEntityMetadata,
4041
m_type: &str,
42+
units: Option<&Units>,
4143
) -> Result<String, CumulocityJsonError> {
4244
let timestamp = WallClock.now();
43-
let c8y_vec = from_thin_edge_json_with_timestamp(input, timestamp, entity, m_type)?;
45+
let c8y_vec = from_thin_edge_json_with_timestamp(input, timestamp, entity, m_type, units)?;
4446
Ok(c8y_vec)
4547
}
4648

@@ -49,12 +51,89 @@ fn from_thin_edge_json_with_timestamp(
4951
timestamp: OffsetDateTime,
5052
entity: &CloudEntityMetadata,
5153
m_type: &str,
54+
units: Option<&Units>,
5255
) -> Result<String, CumulocityJsonError> {
53-
let mut serializer = serializer::C8yJsonSerializer::new(timestamp, entity, m_type);
56+
let mut serializer = serializer::C8yJsonSerializer::new(timestamp, entity, m_type, units);
5457
parse_str(input, &mut serializer)?;
5558
Ok(serializer.into_string()?)
5659
}
5760

61+
/// Units used for measurements of a given type
62+
#[derive(Default)]
63+
pub struct Units {
64+
units: HashMap<String, String>,
65+
group_units: HashMap<String, Units>,
66+
}
67+
68+
impl Units {
69+
/// An empty set of measurement units
70+
///
71+
/// This is the default when no measurement metadata is published for a measurement topic
72+
pub fn new() -> Units {
73+
Units {
74+
units: HashMap::new(),
75+
group_units: HashMap::new(),
76+
}
77+
}
78+
79+
/// True if no units are actually defined
80+
pub fn is_empty(&self) -> bool {
81+
self.units.is_empty() && self.group_units.is_empty()
82+
}
83+
84+
/// Measurement units as defined by metadata published on a measurement topic
85+
pub fn from_metadata(meta: serde_json::Value) -> Self {
86+
let mut units = Units::new();
87+
if let serde_json::Value::Object(map) = meta {
88+
for (k, v) in map {
89+
units.set_unit(k, v);
90+
}
91+
}
92+
units
93+
}
94+
95+
pub fn set_unit(&mut self, measurement: String, meta: serde_json::Value) {
96+
if let Some(serde_json::Value::String(unit)) = meta.get("unit") {
97+
match measurement.split_once('.') {
98+
None => {
99+
// "Temperature": {"unit": "°C"},
100+
self.units.insert(measurement, unit.to_owned());
101+
}
102+
Some((group, measurement)) => {
103+
// "Climate.Temperature": {"unit": "°C"},
104+
self.group_units
105+
.entry(group.to_owned())
106+
.or_default()
107+
.set_unit(measurement.to_owned(), meta);
108+
}
109+
}
110+
}
111+
}
112+
113+
pub fn set_group_units(&mut self, group: String, meta: serde_json::Value) {
114+
let units = Units::from_metadata(meta);
115+
if units.is_empty() {
116+
self.group_units.remove(&group);
117+
} else {
118+
self.group_units.insert(group, units);
119+
}
120+
}
121+
122+
pub fn unset_group_units(&mut self, group: String) {
123+
self.group_units.remove(&group);
124+
}
125+
126+
/// Retrieve the unit to be used for a measurement, if any
127+
pub fn get_unit(&self, measurement: &str) -> Option<&str> {
128+
self.units.get(measurement).map(|x| x.as_str())
129+
}
130+
131+
/// Retrieve the units to be used for a measurement group, if any
132+
pub fn get_group_units(&self, group: &str) -> Option<&Units> {
133+
self.group_units.get(group)
134+
}
135+
}
136+
58137
#[cfg(test)]
59138
mod tests {
60139
use super::*;
@@ -77,8 +156,13 @@ mod tests {
77156
let timestamp = datetime!(2021-04-08 0:00:0 +05:00);
78157

79158
let entity = CloudEntityMetadata::new("foo".into(), EntityMetadata::main_device(None));
80-
let output =
81-
from_thin_edge_json_with_timestamp(single_value_thin_edge_json, timestamp, &entity, "");
159+
let output = from_thin_edge_json_with_timestamp(
160+
single_value_thin_edge_json,
161+
timestamp,
162+
&entity,
163+
"",
164+
None,
165+
);
82166

83167
let expected_output = json!({
84168
"time": timestamp
@@ -114,8 +198,13 @@ mod tests {
114198
let timestamp = datetime!(2021-04-08 0:00:0 +05:00);
115199

116200
let entity = CloudEntityMetadata::new("foo".into(), EntityMetadata::main_device(None));
117-
let output =
118-
from_thin_edge_json_with_timestamp(single_value_thin_edge_json, timestamp, &entity, "");
201+
let output = from_thin_edge_json_with_timestamp(
202+
single_value_thin_edge_json,
203+
timestamp,
204+
&entity,
205+
"",
206+
None,
207+
);
119208

120209
let expected_output = json!({
121210
"time": timestamp
@@ -160,7 +249,7 @@ mod tests {
160249
}"#;
161250

162251
let entity = CloudEntityMetadata::new("foo".into(), EntityMetadata::main_device(None));
163-
let output = from_thin_edge_json(single_value_thin_edge_json, &entity, "");
252+
let output = from_thin_edge_json(single_value_thin_edge_json, &entity, "", None);
164253

165254
assert_eq!(
166255
expected_output.split_whitespace().collect::<String>(),
@@ -183,8 +272,13 @@ mod tests {
183272
let timestamp = datetime!(2021-04-08 0:00:0 +05:00);
184273

185274
let entity = CloudEntityMetadata::new("foo".into(), EntityMetadata::main_device(None));
186-
let output =
187-
from_thin_edge_json_with_timestamp(multi_value_thin_edge_json, timestamp, &entity, "");
275+
let output = from_thin_edge_json_with_timestamp(
276+
multi_value_thin_edge_json,
277+
timestamp,
278+
&entity,
279+
"",
280+
None,
281+
);
188282

189283
let expected_output = json!({
190284
"time": timestamp
@@ -221,6 +315,56 @@ mod tests {
221315
);
222316
}
223317

318+
#[test]
319+
fn using_metadata_to_define_measurement_units() {
320+
let input = r#"
321+
{
322+
"time": "2013-06-22T17:03:14.123+02:00",
323+
"Climate":{
324+
"Temperature":23.4,
325+
"Humidity":95.0
326+
},
327+
"Acceleration":{
328+
"X-Axis":0.002,
329+
"Y-Axis":0.015,
330+
"Z-Axis":5.0
331+
}
332+
}"#;
333+
334+
let units = r#"
335+
{
336+
"Climate.Temperature":{"unit": "°C"},
337+
"Climate.Humidity": {"unit": "%RH"},
338+
"Acceleration.X-Axis": {"unit": "m/s²"},
339+
"Acceleration.Y-Axis": {"unit": "m/s²"},
340+
"Acceleration.Z-Axis": {"unit": "m/s²"}
341+
}"#;
342+
343+
let expected_output = r#"
344+
{
345+
"time": "2013-06-22T17:03:14.123+02:00",
346+
"Climate": {
347+
"Temperature": {"value":23.4,"unit":"°C"},
348+
"Humidity":{"value":95.0,"unit":"%RH"}
349+
},
350+
"Acceleration": {
351+
"X-Axis": {"value":0.002,"unit":"m/s²"},
352+
"Y-Axis": {"value":0.015,"unit":"m/s²"},
353+
"Z-Axis": {"value":5.0,"unit":"m/s²"}
354+
},
355+
"type": "ThinEdgeMeasurement"
356+
}"#;
357+
358+
let entity = CloudEntityMetadata::new("foo".into(), EntityMetadata::main_device(None));
359+
let units = Units::from_metadata(serde_json::from_str(units).unwrap());
360+
let output = from_thin_edge_json(input, &entity, "", Some(&units));
361+
362+
assert_eq!(
363+
expected_output.split_whitespace().collect::<String>(),
364+
output.unwrap().split_whitespace().collect::<String>()
365+
);
366+
}
367+
224368
#[test]
225369
fn thin_edge_json_round_tiny_number() {
226370
let input = r#"{
@@ -239,7 +383,7 @@ mod tests {
239383
}"#;
240384

241385
let entity = CloudEntityMetadata::new("foo".into(), EntityMetadata::main_device(None));
242-
let output = from_thin_edge_json(input, &entity, "");
386+
let output = from_thin_edge_json(input, &entity, "", None);
243387

244388
let actual_output = output.unwrap().split_whitespace().collect::<String>();
245389

@@ -272,7 +416,7 @@ mod tests {
272416
}}"#, time, measurement, measurement);
273417

274418
let entity = CloudEntityMetadata::new("foo".into(), EntityMetadata::main_device(None));
275-
let output = from_thin_edge_json(input.as_str(), &entity, "").unwrap();
419+
let output = from_thin_edge_json(input.as_str(), &entity, "", None).unwrap();
276420
assert_eq!(
277421
expected_output.split_whitespace().collect::<String>(),
278422
output
@@ -323,7 +467,8 @@ mod tests {
323467
child_id.into(),
324468
EntityMetadata::child_device(child_id.to_string()).unwrap(),
325469
);
326-
let output = from_thin_edge_json_with_timestamp(thin_edge_json, timestamp, &entity, "");
470+
let output =
471+
from_thin_edge_json_with_timestamp(thin_edge_json, timestamp, &entity, "", None);
327472
assert_json_eq!(
328473
serde_json::from_str::<serde_json::Value>(output.unwrap().as_str()).unwrap(),
329474
expected_output

0 commit comments

Comments
 (0)