Skip to content

Commit 7c44d81

Browse files
committed
feat: sync signal support for agent
1 parent dad2734 commit 7c44d81

File tree

4 files changed

+73
-4
lines changed

4 files changed

+73
-4
lines changed

crates/core/tedge_agent/src/operation_workflows/actor.rs

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use tedge_api::mqtt_topics::EntityTopicError;
2222
use tedge_api::mqtt_topics::EntityTopicId;
2323
use tedge_api::mqtt_topics::MqttSchema;
2424
use tedge_api::mqtt_topics::OperationType;
25+
use tedge_api::mqtt_topics::SignalType;
2526
use tedge_api::workflow::extract_json_output;
2627
use tedge_api::workflow::CommandBoard;
2728
use tedge_api::workflow::CommandId;
@@ -139,11 +140,49 @@ impl WorkflowActor {
139140
/// but also from *this* actor as all its state transitions are published over MQTT.
140141
/// Only the former will be actually processed with [Self::process_command_update].
141142
async fn process_mqtt_message(&mut self, message: MqttMessage) -> Result<(), RuntimeError> {
142-
let Ok((operation, cmd_id)) = self.extract_command_identifiers(&message.topic.name) else {
143-
log::error!("Unknown command channel: {}", &message.topic.name);
143+
let Ok((_, channel)) = self.mqtt_schema.entity_channel_of(&message.topic) else {
144+
log::error!("Unknown topic: {}", &message.topic.name);
144145
return Ok(());
145146
};
147+
match channel {
148+
Channel::Command { operation, cmd_id } => {
149+
self.process_command_message(message, operation, cmd_id)
150+
.await
151+
}
152+
Channel::Signal { signal_type } => {
153+
self.process_signal_message(message, signal_type).await
154+
}
155+
_ => {
156+
error!("Unsupported channel: {}", channel);
157+
return Ok(());
158+
}
159+
}
160+
}
146161

162+
async fn process_signal_message(
163+
&mut self,
164+
_message: MqttMessage,
165+
signal_type: SignalType,
166+
) -> Result<(), RuntimeError> {
167+
match signal_type {
168+
SignalType::Sync => {
169+
info!("Received sync signal, requesting all builtin actors to sync");
170+
self.sync_signal_dispatcher.sync_all().await?;
171+
Ok(())
172+
}
173+
SignalType::Custom(_) => {
174+
// Custom signal types are not handled yet
175+
Ok(())
176+
}
177+
}
178+
}
179+
180+
async fn process_command_message(
181+
&mut self,
182+
message: MqttMessage,
183+
operation: OperationType,
184+
cmd_id: String,
185+
) -> Result<(), RuntimeError> {
147186
let Ok(state) = GenericCommandState::from_command_message(&message) else {
148187
log::error!("Invalid command payload: {}", &message.topic.name);
149188
return Ok(());

crates/core/tedge_agent/src/operation_workflows/builder.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,11 @@ use tedge_actors::RuntimeRequestSink;
2424
use tedge_actors::Service;
2525
use tedge_actors::UnboundedLoggingReceiver;
2626
use tedge_api::commands::CmdMetaSyncSignal;
27-
use tedge_api::mqtt_topics::ChannelFilter::AnyCommand;
27+
use tedge_api::mqtt_topics::ChannelFilter;
2828
use tedge_api::mqtt_topics::EntityFilter;
2929
use tedge_api::mqtt_topics::EntityTopicId;
3030
use tedge_api::mqtt_topics::MqttSchema;
31+
use tedge_api::mqtt_topics::SignalType;
3132
use tedge_api::workflow::GenericCommandData;
3233
use tedge_api::workflow::GenericCommandState;
3334
use tedge_api::workflow::OperationName;
@@ -122,7 +123,15 @@ impl WorkflowActorBuilder {
122123
}
123124

124125
pub fn subscriptions(mqtt_schema: &MqttSchema, device_topic_id: &EntityTopicId) -> TopicFilter {
125-
mqtt_schema.topics(EntityFilter::Entity(device_topic_id), AnyCommand)
126+
let mut topics = mqtt_schema.topics(
127+
EntityFilter::Entity(device_topic_id),
128+
ChannelFilter::AnyCommand,
129+
);
130+
topics.add_all(mqtt_schema.topics(
131+
EntityFilter::Entity(device_topic_id),
132+
ChannelFilter::Signal(SignalType::Sync),
133+
));
134+
topics
126135
}
127136
}
128137

crates/core/tedge_agent/src/operation_workflows/message_box.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,4 +71,14 @@ impl SyncSignalDispatcher {
7171
}
7272
Ok(())
7373
}
74+
75+
/// Send sync signal to all registered actors
76+
pub(crate) async fn sync_all(&mut self) -> Result<(), ChannelError> {
77+
for senders in self.senders.values_mut() {
78+
for sender in senders {
79+
sender.send(()).await?;
80+
}
81+
}
82+
Ok(())
83+
}
7484
}

tests/RobotFramework/tests/cumulocity/log/log_operation_plugins.robot

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,17 @@ Supported log types updated on config update
5252
... date_from=${start_time}
5353
... message_contains=software-management
5454

55+
Supported log types updated on sync signal
56+
Install Package Using APT haveged
57+
58+
${start_time}= Get Unix Timestamp
59+
Execute Command tedge mqtt pub te/device/main///signal/sync '{}'
60+
61+
Should Have MQTT Messages
62+
... topic=te/device/main///cmd/log_upload
63+
... date_from=${start_time}
64+
... message_contains=haveged::journald
65+
5566
Log operation journald plugin can return logs for all units
5667
Should Contain Supported Log Types all-units::journald
5768
${start_timestamp}= Get Current Date UTC -1 hours result_format=%Y-%m-%dT%H:%M:%S+0000

0 commit comments

Comments
 (0)