Skip to content

Commit fe80cd2

Browse files
committed
feat: c8y-mapper sends all supported operations on-demand via signal channel
Signed-off-by: Rina Fujino <[email protected]>
1 parent 909fbe4 commit fe80cd2

File tree

9 files changed

+232
-9
lines changed

9 files changed

+232
-9
lines changed

crates/core/tedge_api/src/entity.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use thiserror::Error;
1313
/// Although this struct doesn't enforce any restrictions for the values,
1414
/// the consumers may impose restrictions on the accepted values.
1515
16-
#[derive(Debug, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)]
16+
#[derive(Debug, Clone, Hash, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
1717
#[serde(transparent)]
1818
pub struct EntityExternalId(String);
1919

crates/core/tedge_api/src/mqtt_topics.rs

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,8 @@ impl MqttSchema {
179179
ChannelFilter::Command(operation) => format!("/cmd/{operation}/+"),
180180
ChannelFilter::AnyCommandMetadata => "/cmd/+".to_string(),
181181
ChannelFilter::CommandMetadata(operation) => format!("/cmd/{operation}"),
182+
ChannelFilter::AnySignal => "/signal/+".to_string(),
183+
ChannelFilter::Signal(signal_type) => format!("/signal/{signal_type}"),
182184
ChannelFilter::Health => "/status/health".to_string(),
183185
};
184186

@@ -608,6 +610,9 @@ pub enum Channel {
608610
operation: OperationType,
609611
cmd_id: String,
610612
},
613+
Signal {
614+
signal_type: SignalType,
615+
},
611616
MeasurementMetadata {
612617
measurement_type: String,
613618
},
@@ -660,6 +665,9 @@ impl FromStr for Channel {
660665
operation: operation.parse().unwrap(), // Infallible
661666
cmd_id: cmd_id.to_string(),
662667
}),
668+
["signal", signal_type] => Ok(Channel::Signal {
669+
signal_type: signal_type.into(),
670+
}),
663671
["status", "health"] => Ok(Channel::Health),
664672

665673
_ => Err(ChannelError::InvalidCategory(channel.to_string())),
@@ -687,6 +695,7 @@ impl Display for Channel {
687695
Channel::Command { operation, cmd_id } => write!(f, "cmd/{operation}/{cmd_id}"),
688696
Channel::CommandMetadata { operation } => write!(f, "cmd/{operation}"),
689697
Channel::Health => write!(f, "status/health"),
698+
Channel::Signal { signal_type } => write!(f, "signal/{signal_type}"),
690699
}
691700
}
692701
}
@@ -786,6 +795,64 @@ impl OperationType {
786795
}
787796
}
788797

798+
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
799+
pub enum SignalType {
800+
SyncOperations,
801+
Custom(String),
802+
}
803+
804+
// Using a custom Serialize/Deserialize implementations to read "foo" as Custom("foo")
805+
impl<'de> Deserialize<'de> for SignalType {
806+
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
807+
where
808+
D: Deserializer<'de>,
809+
{
810+
let str = String::deserialize(deserializer)?;
811+
Ok(str.as_str().into())
812+
}
813+
}
814+
815+
impl Serialize for SignalType {
816+
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
817+
where
818+
S: Serializer,
819+
{
820+
self.to_string().serialize(serializer)
821+
}
822+
}
823+
824+
impl FromStr for SignalType {
825+
type Err = Infallible;
826+
827+
fn from_str(s: &str) -> Result<Self, Self::Err> {
828+
Ok(s.into())
829+
}
830+
}
831+
832+
impl<'a> From<&'a str> for SignalType {
833+
fn from(s: &'a str) -> SignalType {
834+
match s {
835+
"sync_operations" => SignalType::SyncOperations,
836+
custom => SignalType::Custom(custom.to_string()),
837+
}
838+
}
839+
}
840+
841+
impl Display for SignalType {
842+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
843+
match self {
844+
SignalType::SyncOperations => write!(f, "sync_operations"),
845+
SignalType::Custom(custom) => write!(f, "{custom}"),
846+
}
847+
}
848+
}
849+
850+
impl SignalType {
851+
pub fn name(&self) -> String {
852+
format!("{self}")
853+
}
854+
}
855+
789856
#[derive(Debug, thiserror::Error, PartialEq, Eq, Clone)]
790857
pub enum ChannelError {
791858
#[error("Channel needs to have at least 2 segments")]
@@ -809,6 +876,8 @@ pub enum ChannelFilter {
809876
Alarm,
810877
AnyCommand,
811878
Command(OperationType),
879+
AnySignal,
880+
Signal(SignalType),
812881
MeasurementMetadata,
813882
EventMetadata,
814883
AlarmMetadata,
@@ -831,6 +900,7 @@ impl From<&Channel> for ChannelFilter {
831900
operation,
832901
cmd_id: _,
833902
} => ChannelFilter::Command(operation.clone()),
903+
Channel::Signal { signal_type } => ChannelFilter::Signal(signal_type.clone()),
834904
Channel::MeasurementMetadata {
835905
measurement_type: _,
836906
} => ChannelFilter::MeasurementMetadata,
@@ -1090,6 +1160,15 @@ mod tests {
10901160
),
10911161
mqtt_channel::Topic::new_unchecked("te/device/main///cmd/log_upload")
10921162
);
1163+
assert_eq!(
1164+
mqtt_schema.topic_for(
1165+
&device,
1166+
&Channel::Signal {
1167+
signal_type: SignalType::SyncOperations,
1168+
}
1169+
),
1170+
mqtt_channel::Topic::new_unchecked("te/device/main///signal/sync_operations")
1171+
);
10931172
assert_eq!(
10941173
mqtt_schema.topic_for(&device, &Channel::Health),
10951174
mqtt_channel::Topic::new_unchecked("te/device/main///status/health")

crates/extensions/c8y_mapper_ext/src/config.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ use std::str::FromStr;
1414
use std::sync::Arc;
1515
use tedge_api::mqtt_topics::ChannelFilter::AnyCommand;
1616
use tedge_api::mqtt_topics::ChannelFilter::AnyCommandMetadata;
17+
use tedge_api::mqtt_topics::ChannelFilter::AnySignal;
18+
use tedge_api::mqtt_topics::EntityFilter;
1719
use tedge_api::mqtt_topics::EntityFilter::AnyEntity;
1820
use tedge_api::mqtt_topics::EntityTopicId;
1921
use tedge_api::mqtt_topics::IdGenerator;
@@ -213,6 +215,10 @@ impl C8yMapperConfig {
213215
topics.add_all(mqtt_schema.topics(AnyEntity, AnyCommand));
214216
topics.add_all(mqtt_schema.topics(AnyEntity, AnyCommandMetadata));
215217

218+
// Add signal topic
219+
let mapper_service_tid = EntityTopicId::default_main_service("tedge-mapper-c8y").unwrap();
220+
topics.add_all(mqtt_schema.topics(EntityFilter::Entity(&mapper_service_tid), AnySignal));
221+
216222
// Add user configurable external topic filters
217223
for topic in c8y_config.topics.0.clone() {
218224
if topics.add(&topic).is_err() {

crates/extensions/c8y_mapper_ext/src/converter.rs

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ pub struct CumulocityConverter {
189189
pub recently_completed_commands: HashMap<CmdId, Instant>,
190190
active_commands_last_cleared: Instant,
191191

192-
supported_operations: SupportedOperations,
192+
pub supported_operations: SupportedOperations,
193193
pub operation_handler: OperationHandler,
194194

195195
units: Units,
@@ -1353,6 +1353,8 @@ impl CumulocityConverter {
13531353
Ok(vec![])
13541354
}
13551355

1356+
Channel::Signal { signal_type } => self.process_signal_message(&source, signal_type),
1357+
13561358
Channel::Health => self.process_health_status_message(&source, message).await,
13571359

13581360
_ => Ok(vec![]),
@@ -1427,13 +1429,9 @@ impl CumulocityConverter {
14271429
fn try_init_messages(&mut self) -> Result<Vec<MqttMessage>, ConversionError> {
14281430
let mut messages = vec![self.c8y_agent_inventory_fragment()?];
14291431

1430-
self.supported_operations
1431-
.load_all(&self.config.device_id, &self.config.bridge_config)?;
1432-
let supported_operations_message = self.supported_operations.create_supported_operations(
1433-
&self.config.device_id,
1434-
&self.config.bridge_config.c8y_prefix,
1435-
)?;
1436-
1432+
// supported operations for the main device
1433+
let supported_operations_message =
1434+
self.load_and_create_supported_operations_messages(&self.config.device_id.clone())?;
14371435
let pending_operations_message =
14381436
create_get_pending_operations_message(&self.config.bridge_config.c8y_prefix);
14391437

@@ -1444,6 +1442,19 @@ impl CumulocityConverter {
14441442
Ok(messages)
14451443
}
14461444

1445+
pub fn load_and_create_supported_operations_messages(
1446+
&mut self,
1447+
external_id: &str,
1448+
) -> Result<MqttMessage, ConversionError> {
1449+
self.supported_operations
1450+
.load_all(external_id, &self.config.bridge_config)?;
1451+
let supported_operations_message = self
1452+
.supported_operations
1453+
.create_supported_operations(external_id, &self.config.bridge_config.c8y_prefix)?;
1454+
1455+
Ok(supported_operations_message)
1456+
}
1457+
14471458
pub fn sync_messages(&mut self) -> Vec<MqttMessage> {
14481459
let sync_messages: Vec<MqttMessage> = self.alarm_converter.sync();
14491460
self.alarm_converter = AlarmConverter::Synced;

crates/extensions/c8y_mapper_ext/src/entity_cache.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -386,6 +386,13 @@ impl EntityCache {
386386
pub fn cache_early_data_message(&mut self, message: MqttMessage) {
387387
self.pending_entities.cache_early_data_message(message)
388388
}
389+
390+
pub fn get_all_external_ids(&self) -> Vec<EntityExternalId> {
391+
let mut ids: Vec<EntityExternalId> =
392+
self.external_id_map.keys().map(Clone::clone).collect();
393+
ids.sort();
394+
ids
395+
}
389396
}
390397

391398
#[cfg(test)]

crates/extensions/c8y_mapper_ext/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ pub mod json;
1313
mod operations;
1414
mod serializer;
1515
pub mod service_monitor;
16+
mod signals;
1617
mod supported_operations;
1718

1819
#[cfg(test)]
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
use crate::converter::CumulocityConverter;
2+
use crate::error::ConversionError;
3+
use tedge_api::mqtt_topics::EntityTopicId;
4+
use tedge_api::mqtt_topics::SignalType;
5+
use tedge_mqtt_ext::MqttMessage;
6+
7+
impl CumulocityConverter {
8+
pub fn process_signal_message(
9+
&mut self,
10+
source: &EntityTopicId,
11+
signal_type: &SignalType,
12+
) -> Result<Vec<MqttMessage>, ConversionError> {
13+
let mut messages = Vec::new();
14+
15+
if source.default_service_name() != Some("tedge-mapper-c8y") {
16+
return Ok(messages);
17+
}
18+
19+
match signal_type {
20+
SignalType::SyncOperations => {
21+
for external_id in self.entity_cache.get_all_external_ids() {
22+
if let Ok(message) =
23+
self.load_and_create_supported_operations_messages(external_id.as_ref())
24+
{
25+
messages.push(message);
26+
}
27+
}
28+
}
29+
SignalType::Custom(_) => {}
30+
}
31+
32+
Ok(messages)
33+
}
34+
}

crates/extensions/c8y_mapper_ext/src/supported_operations/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ type OperationName = String;
4848
type OperationNameRef = str;
4949

5050
/// Used to hold and query supported operations for all devices.
51+
#[derive(Debug)]
5152
pub struct SupportedOperations {
5253
/// External ID of the main device.
5354
///

crates/extensions/c8y_mapper_ext/src/tests.rs

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ use tedge_actors::NoMessage;
3333
use tedge_actors::Sender;
3434
use tedge_actors::SimpleMessageBox;
3535
use tedge_actors::SimpleMessageBoxBuilder;
36+
use tedge_api::mqtt_topics::ChannelFilter::AnySignal;
37+
use tedge_api::mqtt_topics::EntityFilter::AnyEntity;
3638
use tedge_api::mqtt_topics::EntityTopicId;
3739
use tedge_api::mqtt_topics::MqttSchema;
3840
use tedge_config::models::AutoLogUpload;
@@ -3113,6 +3115,87 @@ async fn mapper_converts_config_cmd_to_supported_op_and_types_for_child_device()
31133115
.await;
31143116
}
31153117

3118+
#[tokio::test]
3119+
async fn mapper_publishes_all_supported_operations_on_signal() {
3120+
let ttd = TempTedgeDir::new();
3121+
ttd.dir("operations").dir("c8y").file("c8y_Restart");
3122+
ttd.dir("operations")
3123+
.dir("c8y")
3124+
.dir("test-device:device:child01");
3125+
ttd.dir("operations")
3126+
.dir("c8y")
3127+
.dir("test-device:device:child02")
3128+
.file("c8y_Restart");
3129+
ttd.dir("operations")
3130+
.dir("c8y")
3131+
.dir("test-device:device:child02")
3132+
.file("c8y_UploadConfigFile");
3133+
ttd.dir("operations")
3134+
.dir("c8y")
3135+
.dir("test-device:device:main:service:collectd")
3136+
.file("c8y_Restart");
3137+
3138+
let test_handle = spawn_c8y_mapper_actor(&ttd, true).await;
3139+
let TestHandle { mqtt, .. } = test_handle;
3140+
let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS);
3141+
3142+
skip_init_messages(&mut mqtt).await;
3143+
3144+
// Register the tedge-mapper-c8y service upfront
3145+
mqtt.send(MqttMessage::new(
3146+
&Topic::new_unchecked("te/device/main/service/tedge-mapper-c8y"),
3147+
r#"{"@parent":"device/main//","@type":"service","name":"tedge-mapper-c8y","type":"service"}"#,
3148+
))
3149+
.await
3150+
.expect("Send failed");
3151+
mqtt.skip(1).await; // Skip 102 message
3152+
3153+
// Also register other entities
3154+
mqtt.send(MqttMessage::new(
3155+
&Topic::new_unchecked("te/device/child01//"),
3156+
r#"{"@parent":"device/main//","@type":"child-device"}"#,
3157+
))
3158+
.await
3159+
.expect("Send failed");
3160+
mqtt.send(MqttMessage::new(
3161+
&Topic::new_unchecked("te/device/child02//"),
3162+
r#"{"@parent":"device/main//","@type":"child-device"}"#,
3163+
))
3164+
.await
3165+
.expect("Send failed");
3166+
mqtt.send(MqttMessage::new(
3167+
&Topic::new_unchecked("te/device/main/service/collectd"),
3168+
r#"{"@parent":"device/main//","@type":"service"}"#,
3169+
))
3170+
.await
3171+
.expect("Send failed");
3172+
mqtt.skip(3).await; // Skip registration messages
3173+
3174+
mqtt.send(MqttMessage::new(
3175+
&Topic::new_unchecked("te/device/main/service/tedge-mapper-c8y/signal/sync_operations"),
3176+
"{}",
3177+
))
3178+
.await
3179+
.expect("Send failed");
3180+
3181+
assert_received_contains_str(
3182+
&mut mqtt,
3183+
[
3184+
("c8y/s/us", "114,c8y_Restart"),
3185+
("c8y/s/us/test-device:device:child01", "114"),
3186+
(
3187+
"c8y/s/us/test-device:device:child02",
3188+
"114,c8y_Restart,c8y_UploadConfigFile",
3189+
),
3190+
(
3191+
"c8y/s/us/test-device:device:main:service:collectd",
3192+
"114,c8y_Restart",
3193+
),
3194+
],
3195+
)
3196+
.await;
3197+
}
3198+
31163199
fn assert_command_exec_log_content(cfg_dir: TempTedgeDir, expected_contents: &str) {
31173200
let paths = fs::read_dir(cfg_dir.to_path_buf().join("agent")).unwrap();
31183201
for path in paths {
@@ -3283,6 +3366,7 @@ pub(crate) fn test_mapper_config(tmp_dir: &TempTedgeDir) -> C8yMapperConfig {
32833366
topics.add_all(operation_topics);
32843367

32853368
topics.add_all(C8yMapperConfig::default_external_topic_filter());
3369+
topics.add_all(mqtt_schema.topics(AnyEntity, AnySignal));
32863370

32873371
topics.remove_overlapping_patterns();
32883372

0 commit comments

Comments
 (0)