Skip to content

Commit 54281c6

Browse files
authored
Merge pull request #3834 from albinsuresh/feat/tedge-agent-sync-signal
feat: sync signal support for agent
2 parents dad2734 + ef07467 commit 54281c6

File tree

9 files changed

+172
-24
lines changed

9 files changed

+172
-24
lines changed

crates/core/tedge_agent/src/agent.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,9 @@ use tedge_api::entity_store::EntityRegistrationMessage;
4747
use tedge_api::mqtt_topics::DeviceTopicId;
4848
use tedge_api::mqtt_topics::EntityTopicId;
4949
use tedge_api::mqtt_topics::MqttSchema;
50+
use tedge_api::mqtt_topics::OperationType;
5051
use tedge_api::mqtt_topics::Service;
52+
use tedge_api::mqtt_topics::ServiceTopicId;
5153
use tedge_api::path::DataDir;
5254
use tedge_api::EntityStore;
5355
use tedge_config::tedge_toml::TEdgeConfigReaderService;
@@ -90,6 +92,7 @@ pub(crate) struct AgentConfig {
9092
pub state_dir: Utf8PathBuf,
9193
pub operations_dir: Utf8PathBuf,
9294
pub mqtt_device_topic_id: EntityTopicId,
95+
pub service_topic_id: ServiceTopicId,
9396
pub mqtt_topic_root: Arc<str>,
9497
pub tedge_http_host: Arc<str>,
9598
pub service: TEdgeConfigReaderService,
@@ -116,11 +119,13 @@ impl AgentConfig {
116119
.mqtt_topic_root
117120
.unwrap_or(tedge_config.mqtt.topic_root.clone().into());
118121

119-
let mqtt_device_topic_id = cliopts
122+
let mqtt_device_topic_id: EntityTopicId = cliopts
120123
.mqtt_device_topic_id
121124
.unwrap_or(tedge_config.mqtt.device_topic_id.clone().into())
122125
.parse()
123126
.context("Could not parse the device MQTT topic")?;
127+
let service_topic_id = mqtt_device_topic_id.to_default_service_topic_id("tedge-agent")
128+
.with_context(|| format!("Device topic id {} currently needs default scheme, e.g: 'device/DEVICE_NAME//'", mqtt_device_topic_id))?;
124129

125130
let mqtt_session_name = format!("{TEDGE_AGENT}#{mqtt_topic_root}/{mqtt_device_topic_id}");
126131

@@ -157,6 +162,7 @@ impl AgentConfig {
157162
let operation_config = OperationConfig::from_tedge_config(
158163
mqtt_topic_root.to_string(),
159164
&mqtt_device_topic_id,
165+
service_topic_id.clone().into(),
160166
&tedge_config,
161167
)
162168
.await?;
@@ -213,6 +219,7 @@ impl AgentConfig {
213219
state_dir,
214220
mqtt_topic_root,
215221
mqtt_device_topic_id,
222+
service_topic_id,
216223
tedge_http_host,
217224
identity,
218225
cloud_root_certs,
@@ -307,8 +314,7 @@ impl Agent {
307314
// Health actor
308315
// TODO: take a user-configurable service topic id
309316
let device_topic_id = self.config.mqtt_device_topic_id.clone();
310-
let service_topic_id = device_topic_id.to_default_service_topic_id("tedge-agent")
311-
.with_context(|| format!("Device topic id {} currently needs default scheme, e.g: 'device/DEVICE_NAME//'", device_topic_id))?;
317+
let service_topic_id = self.config.service_topic_id.clone();
312318
let service = Service {
313319
service_topic_id: service_topic_id.clone(),
314320
device_topic_id: DeviceTopicId::new(device_topic_id.clone()),
@@ -384,7 +390,7 @@ impl Agent {
384390
)
385391
.await?;
386392
converter_actor_builder.register_builtin_operation(&mut log_actor);
387-
converter_actor_builder.register_sync_signal_sink(&log_actor);
393+
converter_actor_builder.register_sync_signal_sink(OperationType::LogUpload, &log_actor);
388394
Some(log_actor)
389395
} else {
390396
None

crates/core/tedge_agent/src/operation_workflows/actor.rs

Lines changed: 50 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use tedge_api::mqtt_topics::EntityTopicError;
2222
use tedge_api::mqtt_topics::EntityTopicId;
2323
use tedge_api::mqtt_topics::MqttSchema;
2424
use tedge_api::mqtt_topics::OperationType;
25+
use tedge_api::mqtt_topics::SignalType;
2526
use tedge_api::workflow::extract_json_output;
2627
use tedge_api::workflow::CommandBoard;
2728
use tedge_api::workflow::CommandId;
@@ -139,11 +140,55 @@ impl WorkflowActor {
139140
/// but also from *this* actor as all its state transitions are published over MQTT.
140141
/// Only the former will be actually processed with [Self::process_command_update].
141142
async fn process_mqtt_message(&mut self, message: MqttMessage) -> Result<(), RuntimeError> {
142-
let Ok((operation, cmd_id)) = self.extract_command_identifiers(&message.topic.name) else {
143-
log::error!("Unknown command channel: {}", &message.topic.name);
143+
let Ok((_, channel)) = self.mqtt_schema.entity_channel_of(&message.topic) else {
144+
log::error!("Unknown topic: {}", &message.topic.name);
144145
return Ok(());
145146
};
147+
match channel {
148+
Channel::Command { operation, cmd_id } => {
149+
self.process_command_message(message, operation, cmd_id)
150+
.await
151+
}
152+
Channel::Signal { signal_type } => {
153+
self.process_signal_message(message, signal_type).await
154+
}
155+
_ => {
156+
error!("Unsupported channel: {}", channel);
157+
Ok(())
158+
}
159+
}
160+
}
146161

162+
async fn process_signal_message(
163+
&mut self,
164+
_message: MqttMessage,
165+
signal_type: SignalType,
166+
) -> Result<(), RuntimeError> {
167+
match signal_type {
168+
SignalType::Sync => {
169+
info!("Received sync signal, requesting all builtin actors to sync");
170+
self.sync_signal_dispatcher.sync_all().await?;
171+
}
172+
SignalType::SyncOperation(operation) => {
173+
info!(
174+
"Received sync signal for {}, requesting the corresponding actor to sync",
175+
operation
176+
);
177+
self.sync_signal_dispatcher.sync(operation).await?;
178+
}
179+
SignalType::Custom(_) => {
180+
// Custom signal types are not handled yet
181+
}
182+
}
183+
Ok(())
184+
}
185+
186+
async fn process_command_message(
187+
&mut self,
188+
message: MqttMessage,
189+
operation: OperationType,
190+
cmd_id: String,
191+
) -> Result<(), RuntimeError> {
147192
let Ok(state) = GenericCommandState::from_command_message(&message) else {
148193
log::error!("Invalid command payload: {}", &message.topic.name);
149194
return Ok(());
@@ -430,7 +475,7 @@ impl WorkflowActor {
430475
new_state: GenericCommandState,
431476
) -> Result<(), RuntimeError> {
432477
if new_state.is_finished() {
433-
self.sync_dependent_actors(&new_state).await?;
478+
self.sync_listener_actors(&new_state).await?;
434479
self.finalize_builtin_command_update(new_state).await?;
435480

436481
Ok(())
@@ -463,13 +508,13 @@ impl WorkflowActor {
463508
self.process_command_update(adapted_state).await
464509
}
465510

466-
async fn sync_dependent_actors(
511+
async fn sync_listener_actors(
467512
&mut self,
468513
command: &GenericCommandState,
469514
) -> Result<(), RuntimeError> {
470515
if let Some(command) = command.operation() {
471516
self.sync_signal_dispatcher
472-
.send(command.as_str().into())
517+
.sync_listener(command.as_str().into())
473518
.await?;
474519
}
475520
Ok(())

crates/core/tedge_agent/src/operation_workflows/builder.rs

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,11 @@ use tedge_actors::RuntimeRequestSink;
2424
use tedge_actors::Service;
2525
use tedge_actors::UnboundedLoggingReceiver;
2626
use tedge_api::commands::CmdMetaSyncSignal;
27-
use tedge_api::mqtt_topics::ChannelFilter::AnyCommand;
27+
use tedge_api::mqtt_topics::ChannelFilter;
2828
use tedge_api::mqtt_topics::EntityFilter;
2929
use tedge_api::mqtt_topics::EntityTopicId;
3030
use tedge_api::mqtt_topics::MqttSchema;
31+
use tedge_api::mqtt_topics::OperationType;
3132
use tedge_api::workflow::GenericCommandData;
3233
use tedge_api::workflow::GenericCommandState;
3334
use tedge_api::workflow::OperationName;
@@ -73,7 +74,11 @@ impl WorkflowActorBuilder {
7374

7475
let mqtt_publisher = mqtt_actor.get_sender();
7576
mqtt_actor.connect_sink(
76-
Self::subscriptions(&config.mqtt_schema, &config.device_topic_id),
77+
Self::subscriptions(
78+
&config.mqtt_schema,
79+
&config.device_topic_id,
80+
&config.service_topic_id,
81+
),
7782
&input_sender,
7883
);
7984
let mqtt_publisher = LoggingSender::new("MqttPublisher".into(), mqtt_publisher);
@@ -110,19 +115,36 @@ impl WorkflowActorBuilder {
110115
}
111116

112117
/// Register an actor to receive sync signals on completion of other commands
113-
pub fn register_sync_signal_sink<OperationActor>(&mut self, actor: &OperationActor)
114-
where
118+
pub fn register_sync_signal_sink<OperationActor>(
119+
&mut self,
120+
op_type: OperationType,
121+
actor: &OperationActor,
122+
) where
115123
OperationActor: MessageSink<CmdMetaSyncSignal> + SyncOnCommand,
116124
{
117125
let sender = actor.get_sender();
126+
self.sync_signal_dispatcher
127+
.register_operation_handler(op_type, sender.sender_clone());
118128
for operation in actor.sync_on_commands() {
119129
self.sync_signal_dispatcher
120-
.register_sync_signal_sender(operation, sender.sender_clone());
130+
.register_operation_listener(operation, sender.sender_clone());
121131
}
122132
}
123133

124-
pub fn subscriptions(mqtt_schema: &MqttSchema, device_topic_id: &EntityTopicId) -> TopicFilter {
125-
mqtt_schema.topics(EntityFilter::Entity(device_topic_id), AnyCommand)
134+
pub fn subscriptions(
135+
mqtt_schema: &MqttSchema,
136+
device_topic_id: &EntityTopicId,
137+
service_topic_id: &EntityTopicId,
138+
) -> TopicFilter {
139+
let mut topics = mqtt_schema.topics(
140+
EntityFilter::Entity(device_topic_id),
141+
ChannelFilter::AnyCommand,
142+
);
143+
topics.add_all(mqtt_schema.topics(
144+
EntityFilter::Entity(service_topic_id),
145+
ChannelFilter::AnySignal,
146+
));
147+
topics
126148
}
127149
}
128150

crates/core/tedge_agent/src/operation_workflows/config.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use tedge_config::TEdgeConfig;
77
pub struct OperationConfig {
88
pub mqtt_schema: MqttSchema,
99
pub device_topic_id: EntityTopicId,
10+
pub service_topic_id: EntityTopicId,
1011
pub log_dir: Utf8PathBuf,
1112
pub config_dir: Utf8PathBuf,
1213
pub state_dir: Utf8PathBuf,
@@ -17,13 +18,15 @@ impl OperationConfig {
1718
pub async fn from_tedge_config(
1819
topic_root: String,
1920
device_topic_id: &EntityTopicId,
21+
service_topic_id: EntityTopicId,
2022
tedge_config: &TEdgeConfig,
2123
) -> Result<OperationConfig, tedge_config::TEdgeConfigError> {
2224
let config_dir = tedge_config.root_dir();
2325

2426
Ok(OperationConfig {
2527
mqtt_schema: MqttSchema::with_root(topic_root),
2628
device_topic_id: device_topic_id.clone(),
29+
service_topic_id,
2730
log_dir: tedge_config.logs.path.join("agent"),
2831
config_dir: config_dir.to_owned(),
2932
state_dir: tedge_config.agent.state.path.clone().into(),

crates/core/tedge_agent/src/operation_workflows/message_box.rs

Lines changed: 39 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -49,26 +49,59 @@ impl CommandDispatcher {
4949

5050
#[derive(Default)]
5151
pub(crate) struct SyncSignalDispatcher {
52-
senders: HashMap<OperationType, Vec<DynSender<CmdMetaSyncSignal>>>,
52+
op_handler_senders: HashMap<OperationType, DynSender<CmdMetaSyncSignal>>,
53+
op_listener_senders: HashMap<OperationType, Vec<DynSender<CmdMetaSyncSignal>>>,
5354
}
5455

5556
impl SyncSignalDispatcher {
56-
/// Register where to send sync signals for the given command type
57-
pub(crate) fn register_sync_signal_sender(
57+
/// Register sender for the operation handler to receive sync signals for that operation
58+
pub(crate) fn register_operation_handler(
5859
&mut self,
5960
operation: OperationType,
6061
sender: DynSender<CmdMetaSyncSignal>,
6162
) {
62-
self.senders.entry(operation).or_default().push(sender);
63+
self.op_handler_senders.insert(operation, sender);
6364
}
6465

65-
pub(crate) async fn send(&mut self, operation: OperationType) -> Result<(), ChannelError> {
66-
let Some(senders) = self.senders.get_mut(&operation) else {
66+
/// Register senders for operation listeners that must be notified when the given operation completes
67+
pub(crate) fn register_operation_listener(
68+
&mut self,
69+
operation: OperationType,
70+
sender: DynSender<CmdMetaSyncSignal>,
71+
) {
72+
self.op_listener_senders
73+
.entry(operation)
74+
.or_default()
75+
.push(sender);
76+
}
77+
78+
/// Send sync signal to all registered listeners for the given operation
79+
pub(crate) async fn sync_listener(
80+
&mut self,
81+
operation: OperationType,
82+
) -> Result<(), ChannelError> {
83+
let Some(senders) = self.op_listener_senders.get_mut(&operation) else {
6784
return Ok(());
6885
};
6986
for sender in senders {
7087
sender.send(()).await?;
7188
}
7289
Ok(())
7390
}
91+
92+
/// Send sync signal to the operation handler actor registered for the given operation
93+
pub(crate) async fn sync(&mut self, operation: OperationType) -> Result<(), ChannelError> {
94+
if let Some(sender) = self.op_handler_senders.get_mut(&operation) {
95+
sender.send(()).await?;
96+
};
97+
Ok(())
98+
}
99+
100+
/// Send sync signal to all registered operation handler actors
101+
pub(crate) async fn sync_all(&mut self) -> Result<(), ChannelError> {
102+
for sender in self.op_handler_senders.values_mut() {
103+
sender.send(()).await?;
104+
}
105+
Ok(())
106+
}
74107
}

crates/core/tedge_agent/src/operation_workflows/tests.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -361,9 +361,16 @@ async fn spawn_mqtt_operation_converter(device_topic_id: &str) -> Result<TestHan
361361

362362
let tmp_dir = tempfile::TempDir::new().unwrap();
363363
let tmp_path = Utf8Path::from_path(tmp_dir.path()).unwrap();
364+
let device_topic_id = device_topic_id
365+
.parse::<EntityTopicId>()
366+
.expect("Invalid topic id");
367+
let service_topic_id = device_topic_id
368+
.default_service_for_device("tedge-agent")
369+
.expect("Invalid service topic id");
364370
let config = OperationConfig {
365371
mqtt_schema: MqttSchema::new(),
366-
device_topic_id: device_topic_id.parse().expect("Invalid topic id"),
372+
device_topic_id,
373+
service_topic_id,
367374
log_dir: tmp_path.into(),
368375
config_dir: tmp_path.into(),
369376
state_dir: tmp_path.join("running-operations"),

crates/core/tedge_api/src/mqtt_topics.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -798,6 +798,7 @@ impl OperationType {
798798
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
799799
pub enum SignalType {
800800
Sync,
801+
SyncOperation(OperationType),
801802
Custom(String),
802803
}
803804

@@ -833,6 +834,14 @@ impl<'a> From<&'a str> for SignalType {
833834
fn from(s: &'a str) -> SignalType {
834835
match s {
835836
"sync" => SignalType::Sync,
837+
"sync_restart" => SignalType::SyncOperation(OperationType::Restart),
838+
"sync_firmware_update" => SignalType::SyncOperation(OperationType::FirmwareUpdate),
839+
"sync_software_update" => SignalType::SyncOperation(OperationType::SoftwareUpdate),
840+
"sync_software_list" => SignalType::SyncOperation(OperationType::SoftwareList),
841+
"sync_config_update" => SignalType::SyncOperation(OperationType::ConfigUpdate),
842+
"sync_config_snapshot" => SignalType::SyncOperation(OperationType::ConfigSnapshot),
843+
"sync_log_upload" => SignalType::SyncOperation(OperationType::LogUpload),
844+
"sync_device_profile" => SignalType::SyncOperation(OperationType::DeviceProfile),
836845
custom => SignalType::Custom(custom.to_string()),
837846
}
838847
}
@@ -842,6 +851,7 @@ impl Display for SignalType {
842851
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
843852
match self {
844853
SignalType::Sync => write!(f, "sync"),
854+
SignalType::SyncOperation(operation) => write!(f, "sync_{}", operation),
845855
SignalType::Custom(custom) => write!(f, "{custom}"),
846856
}
847857
}

crates/extensions/c8y_mapper_ext/src/signals.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ impl CumulocityConverter {
2626
}
2727
}
2828
}
29-
SignalType::Custom(_) => {}
29+
SignalType::SyncOperation(_) | SignalType::Custom(_) => {}
3030
}
3131

3232
Ok(messages)

0 commit comments

Comments
 (0)