Skip to content

Commit 92af14d

Browse files
committed
Define measurement units using topic metadata
Signed-off-by: Didier Wenzek <[email protected]>
1 parent a66e3e0 commit 92af14d

File tree

3 files changed

+190
-33
lines changed

3 files changed

+190
-33
lines changed

crates/extensions/c8y_mapper_ext/src/converter.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -471,7 +471,7 @@ impl CumulocityConverter {
471471
if let Some(entity) = self.entity_cache.get(source) {
472472
// Need to check if the input Thin Edge JSON is valid before adding a child ID to list
473473
let c8y_json_payload =
474-
json::from_thin_edge_json(input.payload_str()?, entity, measurement_type)?;
474+
json::from_thin_edge_json(input.payload_str()?, entity, measurement_type, None)?;
475475

476476
if c8y_json_payload.len() < self.size_threshold.0 {
477477
mqtt_messages.push(MqttMessage::new(

crates/extensions/c8y_mapper_ext/src/json.rs

Lines changed: 152 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,14 @@
1313
//! "pressure": 220
1414
//! }"#;
1515
//! let entity = CloudEntityMetadata::new("foo".into(), EntityMetadata::main_device(None));
16-
//! let output = from_thin_edge_json(single_value_thin_edge_json, &entity,"");
16+
//! let output = from_thin_edge_json(single_value_thin_edge_json, &entity,"",None);
1717
//! ```
1818
1919
use crate::entity_cache::CloudEntityMetadata;
2020
use crate::serializer;
2121
use clock::Clock;
2222
use clock::WallClock;
23+
use std::collections::HashMap;
2324
use tedge_api::measurement::*;
2425
use time::OffsetDateTime;
2526
use time::{self};
@@ -38,9 +39,10 @@ pub fn from_thin_edge_json(
3839
input: &str,
3940
entity: &CloudEntityMetadata,
4041
m_type: &str,
42+
units: Option<&Units>,
4143
) -> Result<String, CumulocityJsonError> {
4244
let timestamp = WallClock.now();
43-
let c8y_vec = from_thin_edge_json_with_timestamp(input, timestamp, entity, m_type)?;
45+
let c8y_vec = from_thin_edge_json_with_timestamp(input, timestamp, entity, m_type, units)?;
4446
Ok(c8y_vec)
4547
}
4648

@@ -49,12 +51,79 @@ fn from_thin_edge_json_with_timestamp(
4951
timestamp: OffsetDateTime,
5052
entity: &CloudEntityMetadata,
5153
m_type: &str,
54+
units: Option<&Units>,
5255
) -> Result<String, CumulocityJsonError> {
53-
let mut serializer = serializer::C8yJsonSerializer::new(timestamp, entity, m_type);
56+
let mut serializer = serializer::C8yJsonSerializer::new(timestamp, entity, m_type, units);
5457
parse_str(input, &mut serializer)?;
5558
Ok(serializer.into_string()?)
5659
}
5760

61+
/// Units used for measurements of a given type
62+
#[derive(Default)]
63+
pub struct Units {
64+
units: HashMap<String, String>,
65+
groups: HashMap<String, Units>,
66+
}
67+
68+
impl Units {
69+
/// An empty set of measurement units
70+
///
71+
/// This is the default when no measurement metadata is published for a measurement topic
72+
pub fn new() -> Units {
73+
Units {
74+
units: HashMap::new(),
75+
groups: HashMap::new(),
76+
}
77+
}
78+
79+
/// True if no units are actually defined
80+
pub fn is_empty(&self) -> bool {
81+
self.units.is_empty() && self.groups.is_empty()
82+
}
83+
84+
/// Measurement units as defined by metadata published on a measurement topic
85+
pub fn from_metadata(meta: serde_json::Value) -> Self {
86+
let mut units = Units::new();
87+
if let serde_json::Value::Object(map) = meta {
88+
for (k, v) in map {
89+
units.set_unit(k, v);
90+
}
91+
}
92+
units
93+
}
94+
95+
pub fn set_unit(&mut self, measurement: String, meta: serde_json::Value) {
96+
if let serde_json::Value::String(unit) = meta {
97+
// "Temperature": "°C"
98+
self.units.insert(measurement, unit);
99+
} else if let Some(unit) = meta.get("unit") {
100+
// "Temperature": {"unit": "°C"},
101+
self.set_unit(measurement, unit.clone());
102+
} else {
103+
// "Climate": { "Temperature": {"unit": "°C"}, "Humidity": {"unit": "%RH"} }
104+
let group = measurement;
105+
self.set_group_units(group, meta);
106+
}
107+
}
108+
109+
pub fn set_group_units(&mut self, group: String, meta: serde_json::Value) {
110+
let units = Units::from_metadata(meta);
111+
if !units.is_empty() {
112+
self.groups.insert(group, units);
113+
}
114+
}
115+
116+
/// Retrieve the unit to be used for a measurement, if any
117+
pub fn get_unit(&self, measurement: &str) -> Option<&str> {
118+
self.units.get(measurement).map(|x| x.as_str())
119+
}
120+
121+
/// Retrieve the units to be used for a measurement group, if any
122+
pub fn get_group_units(&self, group: &str) -> Option<&Units> {
123+
self.groups.get(group)
124+
}
125+
}
126+
58127
#[cfg(test)]
59128
mod tests {
60129
use super::*;
@@ -77,8 +146,13 @@ mod tests {
77146
let timestamp = datetime!(2021-04-08 0:00:0 +05:00);
78147

79148
let entity = CloudEntityMetadata::new("foo".into(), EntityMetadata::main_device(None));
80-
let output =
81-
from_thin_edge_json_with_timestamp(single_value_thin_edge_json, timestamp, &entity, "");
149+
let output = from_thin_edge_json_with_timestamp(
150+
single_value_thin_edge_json,
151+
timestamp,
152+
&entity,
153+
"",
154+
None,
155+
);
82156

83157
let expected_output = json!({
84158
"time": timestamp
@@ -114,8 +188,13 @@ mod tests {
114188
let timestamp = datetime!(2021-04-08 0:00:0 +05:00);
115189

116190
let entity = CloudEntityMetadata::new("foo".into(), EntityMetadata::main_device(None));
117-
let output =
118-
from_thin_edge_json_with_timestamp(single_value_thin_edge_json, timestamp, &entity, "");
191+
let output = from_thin_edge_json_with_timestamp(
192+
single_value_thin_edge_json,
193+
timestamp,
194+
&entity,
195+
"",
196+
None,
197+
);
119198

120199
let expected_output = json!({
121200
"time": timestamp
@@ -160,7 +239,7 @@ mod tests {
160239
}"#;
161240

162241
let entity = CloudEntityMetadata::new("foo".into(), EntityMetadata::main_device(None));
163-
let output = from_thin_edge_json(single_value_thin_edge_json, &entity, "");
242+
let output = from_thin_edge_json(single_value_thin_edge_json, &entity, "", None);
164243

165244
assert_eq!(
166245
expected_output.split_whitespace().collect::<String>(),
@@ -183,8 +262,13 @@ mod tests {
183262
let timestamp = datetime!(2021-04-08 0:00:0 +05:00);
184263

185264
let entity = CloudEntityMetadata::new("foo".into(), EntityMetadata::main_device(None));
186-
let output =
187-
from_thin_edge_json_with_timestamp(multi_value_thin_edge_json, timestamp, &entity, "");
265+
let output = from_thin_edge_json_with_timestamp(
266+
multi_value_thin_edge_json,
267+
timestamp,
268+
&entity,
269+
"",
270+
None,
271+
);
188272

189273
let expected_output = json!({
190274
"time": timestamp
@@ -221,6 +305,60 @@ mod tests {
221305
);
222306
}
223307

308+
#[test]
309+
fn using_metadata_to_define_measurement_units() {
310+
let input = r#"
311+
{
312+
"time": "2013-06-22T17:03:14.123+02:00",
313+
"Climate":{
314+
"Temperature":23.4,
315+
"Humidity":95.0
316+
},
317+
"Acceleration":{
318+
"X-Axis":0.002,
319+
"Y-Axis":0.015,
320+
"Z-Axis":5.0
321+
}
322+
}"#;
323+
324+
let units = r#"
325+
{
326+
"Climate":{
327+
"Temperature": {"unit": "°C"},
328+
"Humidity": {"unit": "%RH"}
329+
},
330+
"Acceleration":{
331+
"X-Axis": {"unit": "m/s²"},
332+
"Y-Axis": {"unit": "m/s²"},
333+
"Z-Axis": {"unit": "m/s²"}
334+
}
335+
}"#;
336+
337+
let expected_output = r#"
338+
{
339+
"time": "2013-06-22T17:03:14.123+02:00",
340+
"Climate": {
341+
"Temperature": {"value":23.4,"unit":"°C"},
342+
"Humidity":{"value":95.0,"unit":"%RH"}
343+
},
344+
"Acceleration": {
345+
"X-Axis": {"value":0.002,"unit":"m/s²"},
346+
"Y-Axis": {"value":0.015,"unit":"m/s²"},
347+
"Z-Axis": {"value":5.0,"unit":"m/s²"}
348+
},
349+
"type": "ThinEdgeMeasurement"
350+
}"#;
351+
352+
let entity = CloudEntityMetadata::new("foo".into(), EntityMetadata::main_device(None));
353+
let units = Units::from_metadata(serde_json::from_str(units).unwrap());
354+
let output = from_thin_edge_json(input, &entity, "", Some(&units));
355+
356+
assert_eq!(
357+
expected_output.split_whitespace().collect::<String>(),
358+
output.unwrap().split_whitespace().collect::<String>()
359+
);
360+
}
361+
224362
#[test]
225363
fn thin_edge_json_round_tiny_number() {
226364
let input = r#"{
@@ -239,7 +377,7 @@ mod tests {
239377
}"#;
240378

241379
let entity = CloudEntityMetadata::new("foo".into(), EntityMetadata::main_device(None));
242-
let output = from_thin_edge_json(input, &entity, "");
380+
let output = from_thin_edge_json(input, &entity, "", None);
243381

244382
let actual_output = output.unwrap().split_whitespace().collect::<String>();
245383

@@ -272,7 +410,7 @@ mod tests {
272410
}}"#, time, measurement, measurement);
273411

274412
let entity = CloudEntityMetadata::new("foo".into(), EntityMetadata::main_device(None));
275-
let output = from_thin_edge_json(input.as_str(), &entity, "").unwrap();
413+
let output = from_thin_edge_json(input.as_str(), &entity, "", None).unwrap();
276414
assert_eq!(
277415
expected_output.split_whitespace().collect::<String>(),
278416
output
@@ -323,7 +461,8 @@ mod tests {
323461
child_id.into(),
324462
EntityMetadata::child_device(child_id.to_string()).unwrap(),
325463
);
326-
let output = from_thin_edge_json_with_timestamp(thin_edge_json, timestamp, &entity, "");
464+
let output =
465+
from_thin_edge_json_with_timestamp(thin_edge_json, timestamp, &entity, "", None);
327466
assert_json_eq!(
328467
serde_json::from_str::<serde_json::Value>(output.unwrap().as_str()).unwrap(),
329468
expected_output

0 commit comments

Comments
 (0)