Skip to content

Commit 3e3e1a3

Browse files
committed
Delete entity on clearing MQTT registration topic
Signed-off-by: Didier Wenzek <[email protected]>
1 parent ea5dbec commit 3e3e1a3

File tree

2 files changed

+48
-20
lines changed

2 files changed

+48
-20
lines changed

crates/core/tedge_agent/src/entity_manager/server.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ impl EntityStoreServer {
110110
async fn process_mqtt_message(&mut self, message: MqttMessage) {
111111
if let Ok((topic_id, channel)) = self.mqtt_schema.entity_channel_of(&message.topic) {
112112
if let Channel::EntityMetadata = channel {
113-
self.process_entity_registration(message);
113+
self.process_entity_registration(topic_id, message).await;
114114
} else {
115115
self.process_entity_data(topic_id).await;
116116
}
@@ -119,9 +119,9 @@ impl EntityStoreServer {
119119
}
120120
}
121121

122-
fn process_entity_registration(&mut self, message: MqttMessage) {
122+
async fn process_entity_registration(&mut self, topic_id: EntityTopicId, message: MqttMessage) {
123123
if message.payload().is_empty() {
124-
// Nothing to do on entity clear messages
124+
let _ = self.deregister_entity(topic_id).await;
125125
return;
126126
}
127127

crates/core/tedge_agent/src/entity_manager/tests.rs

Lines changed: 45 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use crate::entity_manager::server::EntityStoreResponse;
22
use crate::entity_manager::tests::model::Action;
33
use crate::entity_manager::tests::model::Command;
44
use crate::entity_manager::tests::model::Commands;
5+
use crate::entity_manager::tests::model::Protocol::HTTP;
56
use crate::entity_manager::tests::model::Protocol::MQTT;
67
use proptest::proptest;
78
use std::collections::HashSet;
@@ -32,6 +33,28 @@ async fn removing_an_unknown_child_using_mqtt() {
3233
check_registrations(Commands(registrations)).await
3334
}
3435

36+
#[tokio::test]
37+
async fn removing_a_child_using_mqtt() {
38+
let registrations = vec![
39+
// tedge http post /tedge/entity-store/v1/entities '{"@parent":"device/main//","@topic-id":"device/a//","@type":"child-device"}'
40+
Command {
41+
protocol: HTTP,
42+
action: Action::AddDevice {
43+
topic: "a".to_string(),
44+
props: vec![],
45+
},
46+
},
47+
// tedge mqtt pub -r te/device/a// ''
48+
Command {
49+
protocol: MQTT,
50+
action: Action::RemDevice {
51+
topic: "a".to_string(),
52+
},
53+
},
54+
];
55+
check_registrations(Commands(registrations)).await
56+
}
57+
3558
proptest! {
3659
//#![proptest_config(proptest::prelude::ProptestConfig::with_cases(1000))]
3760
#[test]
@@ -151,8 +174,10 @@ mod model {
151174
use std::fmt::Formatter;
152175
use tedge_api::entity::EntityType;
153176
use tedge_api::entity_store::EntityRegistrationMessage;
177+
use tedge_api::mqtt_topics::Channel;
154178
use tedge_api::mqtt_topics::EntityTopicId;
155179
use tedge_api::mqtt_topics::MqttSchema;
180+
use tedge_mqtt_ext::MqttMessage;
156181

157182
#[derive(Clone)]
158183
pub struct Commands(pub Vec<Command>);
@@ -364,15 +389,27 @@ mod model {
364389
}
365390
}
366391

392+
impl From<Action> for MqttMessage {
393+
fn from(action: Action) -> Self {
394+
let schema = MqttSchema::default();
395+
match &action {
396+
Action::AddDevice { .. } | Action::AddService { .. } => {
397+
EntityRegistrationMessage::from(action).to_mqtt_message(&schema)
398+
}
399+
400+
Action::RemDevice { .. } | Action::RemService { .. } => {
401+
let topic = schema.topic_for(&action.topic_id(), &Channel::EntityMetadata);
402+
MqttMessage::new(&topic, "")
403+
}
404+
}
405+
}
406+
}
407+
367408
impl From<(Protocol, Action)> for EntityStoreRequest {
368409
fn from((protocol, action): (Protocol, Action)) -> Self {
369410
match protocol {
370411
Protocol::HTTP => EntityStoreRequest::from(action),
371-
Protocol::MQTT => {
372-
let registration = EntityRegistrationMessage::from(action);
373-
let message = registration.to_mqtt_message(&MqttSchema::default());
374-
EntityStoreRequest::MqttMessage(message)
375-
}
412+
Protocol::MQTT => EntityStoreRequest::MqttMessage(MqttMessage::from(action)),
376413
}
377414
}
378415
}
@@ -451,10 +488,11 @@ mod model {
451488
}
452489

453490
Action::RemDevice { .. } | Action::RemService { .. } => {
454-
if self.entities.contains_key(&topic) {
491+
if self.registered.contains(&topic) {
455492
self.entities.remove(&topic);
493+
self.registered.remove(&topic);
456494

457-
let old_entities = self.deregister(topic);
495+
let old_entities = self.cascade_deregistration(HashSet::from([topic]));
458496
if protocol == Protocol::HTTP {
459497
old_entities
460498
} else {
@@ -485,16 +523,6 @@ mod model {
485523
}
486524
}
487525

488-
fn deregister(&mut self, old_entity: EntityTopicId) -> HashSet<EntityTopicId> {
489-
if self.registered.contains(&old_entity) {
490-
self.registered.remove(&old_entity);
491-
let old_entity = HashSet::from([old_entity]);
492-
self.cascade_deregistration(old_entity)
493-
} else {
494-
HashSet::new()
495-
}
496-
}
497-
498526
fn cascade_registration(
499527
&mut self,
500528
mut new_entities: HashSet<EntityTopicId>,

0 commit comments

Comments
 (0)