Skip to content

Commit 00000fe

Browse files
authored
Merge pull request #3817 from albinsuresh/feat/update-supported-log-list-on-software-update
feat: publish supported log types when software_update or config_update finishes
2 parents a8fd42e + 5478d39 commit 00000fe

File tree

11 files changed

+378
-46
lines changed

11 files changed

+378
-46
lines changed

crates/core/tedge_agent/src/agent.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,7 @@ impl Agent {
384384
)
385385
.await?;
386386
converter_actor_builder.register_builtin_operation(&mut log_actor);
387+
converter_actor_builder.register_sync_signal_sink(&log_actor);
387388
Some(log_actor)
388389
} else {
389390
None

crates/core/tedge_agent/src/operation_workflows/actor.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use crate::operation_workflows::message_box::CommandDispatcher;
2+
use crate::operation_workflows::message_box::SyncSignalDispatcher;
23
use crate::operation_workflows::persist::WorkflowRepository;
34
use crate::state_repository::state::AgentStateRepository;
45
use async_trait::async_trait;
@@ -53,6 +54,7 @@ pub struct WorkflowActor {
5354
pub(crate) log_dir: Utf8PathBuf,
5455
pub(crate) input_receiver: UnboundedLoggingReceiver<AgentInput>,
5556
pub(crate) builtin_command_dispatcher: CommandDispatcher,
57+
pub(crate) sync_signal_dispatcher: SyncSignalDispatcher,
5658
pub(crate) command_sender: DynSender<InternalCommandState>,
5759
pub(crate) mqtt_publisher: LoggingSender<MqttMessage>,
5860
pub(crate) script_runner: ClientMessageBox<Execute, std::io::Result<Output>>,
@@ -428,7 +430,10 @@ impl WorkflowActor {
428430
new_state: GenericCommandState,
429431
) -> Result<(), RuntimeError> {
430432
if new_state.is_finished() {
431-
self.finalize_builtin_command_update(new_state).await
433+
self.sync_dependent_actors(&new_state).await?;
434+
self.finalize_builtin_command_update(new_state).await?;
435+
436+
Ok(())
432437
} else {
433438
// As not finalized, the builtin state is sent back
434439
// to the builtin operation actor for further processing.
@@ -458,6 +463,18 @@ impl WorkflowActor {
458463
self.process_command_update(adapted_state).await
459464
}
460465

466+
async fn sync_dependent_actors(
467+
&mut self,
468+
command: &GenericCommandState,
469+
) -> Result<(), RuntimeError> {
470+
if let Some(command) = command.operation() {
471+
self.sync_signal_dispatcher
472+
.send(command.as_str().into())
473+
.await?;
474+
}
475+
Ok(())
476+
}
477+
461478
fn open_command_log(
462479
&mut self,
463480
state: &GenericCommandState,

crates/core/tedge_agent/src/operation_workflows/builder.rs

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use crate::operation_workflows::actor::InternalCommandState;
33
use crate::operation_workflows::actor::WorkflowActor;
44
use crate::operation_workflows::config::OperationConfig;
55
use crate::operation_workflows::message_box::CommandDispatcher;
6+
use crate::operation_workflows::message_box::SyncSignalDispatcher;
67
use crate::operation_workflows::persist::WorkflowRepository;
78
use crate::state_repository::state::agent_state_dir;
89
use crate::state_repository::state::AgentStateRepository;
@@ -22,13 +23,15 @@ use tedge_actors::RuntimeRequest;
2223
use tedge_actors::RuntimeRequestSink;
2324
use tedge_actors::Service;
2425
use tedge_actors::UnboundedLoggingReceiver;
26+
use tedge_api::commands::CmdMetaSyncSignal;
2527
use tedge_api::mqtt_topics::ChannelFilter::AnyCommand;
2628
use tedge_api::mqtt_topics::EntityFilter;
2729
use tedge_api::mqtt_topics::EntityTopicId;
2830
use tedge_api::mqtt_topics::MqttSchema;
2931
use tedge_api::workflow::GenericCommandData;
3032
use tedge_api::workflow::GenericCommandState;
3133
use tedge_api::workflow::OperationName;
34+
use tedge_api::workflow::SyncOnCommand;
3235
use tedge_file_system_ext::FsWatchEvent;
3336
use tedge_mqtt_ext::MqttMessage;
3437
use tedge_mqtt_ext::TopicFilter;
@@ -39,6 +42,7 @@ pub struct WorkflowActorBuilder {
3942
input_sender: DynSender<AgentInput>,
4043
input_receiver: UnboundedLoggingReceiver<AgentInput>,
4144
command_dispatcher: CommandDispatcher,
45+
sync_signal_dispatcher: SyncSignalDispatcher,
4246
command_sender: DynSender<InternalCommandState>,
4347
mqtt_publisher: LoggingSender<MqttMessage>,
4448
script_runner: ClientMessageBox<Execute, std::io::Result<Output>>,
@@ -65,6 +69,8 @@ impl WorkflowActorBuilder {
6569
let command_dispatcher = CommandDispatcher::default();
6670
let command_sender = input_sender.sender_clone();
6771

72+
let sync_signal_dispatcher = SyncSignalDispatcher::default();
73+
6874
let mqtt_publisher = mqtt_actor.get_sender();
6975
mqtt_actor.connect_sink(
7076
Self::subscriptions(&config.mqtt_schema, &config.device_topic_id),
@@ -81,6 +87,7 @@ impl WorkflowActorBuilder {
8187
input_sender,
8288
input_receiver,
8389
command_dispatcher,
90+
sync_signal_dispatcher,
8491
command_sender,
8592
mqtt_publisher,
8693
signal_sender,
@@ -98,7 +105,19 @@ impl WorkflowActorBuilder {
98105
actor.connect_sink(NoConfig, &self.input_sender);
99106
for (operation, sender) in actor.into_iter() {
100107
self.command_dispatcher
101-
.register_operation_handler(operation, sender)
108+
.register_operation_handler(operation, sender);
109+
}
110+
}
111+
112+
/// Register an actor to receive sync signals on completion of other commands
113+
pub fn register_sync_signal_sink<OperationActor>(&mut self, actor: &OperationActor)
114+
where
115+
OperationActor: MessageSink<CmdMetaSyncSignal> + SyncOnCommand,
116+
{
117+
let sender = actor.get_sender();
118+
for operation in actor.sync_on_commands() {
119+
self.sync_signal_dispatcher
120+
.register_sync_signal_sender(operation, sender.sender_clone());
102121
}
103122
}
104123

@@ -136,6 +155,7 @@ impl Builder<WorkflowActor> for WorkflowActorBuilder {
136155
log_dir: self.config.log_dir,
137156
input_receiver: self.input_receiver,
138157
builtin_command_dispatcher: self.command_dispatcher,
158+
sync_signal_dispatcher: self.sync_signal_dispatcher,
139159
mqtt_publisher: self.mqtt_publisher,
140160
command_sender: self.command_sender,
141161
script_runner: self.script_runner,

crates/core/tedge_agent/src/operation_workflows/message_box.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ use std::collections::HashMap;
44
use tedge_actors::ChannelError;
55
use tedge_actors::DynSender;
66
use tedge_actors::Sender;
7+
use tedge_api::commands::CmdMetaSyncSignal;
8+
use tedge_api::mqtt_topics::OperationType;
79
use tedge_api::workflow::GenericCommandState;
810
use tedge_api::workflow::OperationName;
911

@@ -44,3 +46,29 @@ impl CommandDispatcher {
4446
self.senders.keys().cloned().collect()
4547
}
4648
}
49+
50+
#[derive(Default)]
51+
pub(crate) struct SyncSignalDispatcher {
52+
senders: HashMap<OperationType, Vec<DynSender<CmdMetaSyncSignal>>>,
53+
}
54+
55+
impl SyncSignalDispatcher {
56+
/// Register where to send sync signals for the given command type
57+
pub(crate) fn register_sync_signal_sender(
58+
&mut self,
59+
operation: OperationType,
60+
sender: DynSender<CmdMetaSyncSignal>,
61+
) {
62+
self.senders.entry(operation).or_default().push(sender);
63+
}
64+
65+
pub(crate) async fn send(&mut self, operation: OperationType) -> Result<(), ChannelError> {
66+
let Some(senders) = self.senders.get_mut(&operation) else {
67+
return Ok(());
68+
};
69+
for sender in senders {
70+
sender.send(()).await?;
71+
}
72+
Ok(())
73+
}
74+
}

crates/core/tedge_api/src/commands.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,8 @@ pub trait CommandPayload {
298298
}
299299
}
300300

301+
pub type CmdMetaSyncSignal = ();
302+
301303
/// All the messages are serialized using json.
302304
pub trait Jsonify {
303305
fn from_json(json_str: &str) -> Result<Self, serde_json::Error>

crates/core/tedge_api/src/workflow/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -506,6 +506,12 @@ pub enum IterationError {
506506
IndexOutOfBounds(usize),
507507
}
508508

509+
/// An actor builder must implement this if it to receive sync signals on completion of other commands
510+
pub trait SyncOnCommand {
511+
/// Return the list of operations for which this actor wants to receive sync signals
512+
fn sync_on_commands(&self) -> Vec<OperationType>;
513+
}
514+
509515
#[cfg(test)]
510516
mod tests {
511517
use super::GenericCommandState;

crates/extensions/tedge_log_manager/src/actor.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use tedge_actors::MessageReceiver;
1212
use tedge_actors::RuntimeError;
1313
use tedge_actors::Sender;
1414
use tedge_actors::SimpleMessageBox;
15+
use tedge_api::commands::CmdMetaSyncSignal;
1516
use tedge_api::commands::CommandStatus;
1617
use tedge_api::commands::LogUploadCmd;
1718
use tedge_api::commands::LogUploadCmdMetadata;
@@ -34,7 +35,7 @@ type MqttTopic = String;
3435
pub type LogUploadRequest = (MqttTopic, UploadRequest);
3536
pub type LogUploadResult = (MqttTopic, UploadResult);
3637

37-
fan_in_message_type!(LogInput[LogUploadCmd, FsWatchEvent, LogUploadResult] : Debug);
38+
fan_in_message_type!(LogInput[LogUploadCmd, CmdMetaSyncSignal, FsWatchEvent, LogUploadResult] : Debug);
3839
fan_in_message_type!(LogOutput[LogUploadCmd, LogUploadCmdMetadata] : Debug);
3940

4041
impl LogOutput {
@@ -80,6 +81,9 @@ impl Actor for LogManagerActor {
8081
LogInput::LogUploadResult((topic, result)) => {
8182
self.process_uploaded_log(&topic, result).await?;
8283
}
84+
LogInput::CmdMetaSyncSignal(_) => {
85+
self.reload_supported_log_types().await?;
86+
}
8387
}
8488
}
8589
Ok(())

crates/extensions/tedge_log_manager/src/config.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ pub struct LogManagerConfig {
2727
pub plugin_config_path: PathBuf,
2828
pub logtype_reload_topic: Topic,
2929
pub logfile_request_topic: TopicFilter,
30+
pub log_metadata_sync_topics: TopicFilter,
3031
pub sudo_enabled: bool,
3132
}
3233

@@ -62,6 +63,15 @@ impl LogManagerConfig {
6263
ChannelFilter::Command(OperationType::LogUpload),
6364
);
6465

66+
let mut log_metadata_sync_topics = mqtt_schema.topics(
67+
EntityFilter::Entity(&mqtt_device_topic_id),
68+
ChannelFilter::Command(OperationType::SoftwareUpdate),
69+
);
70+
log_metadata_sync_topics.add_all(mqtt_schema.topics(
71+
EntityFilter::Entity(&mqtt_device_topic_id),
72+
ChannelFilter::Command(OperationType::ConfigUpdate),
73+
));
74+
6575
Ok(Self {
6676
mqtt_schema,
6777
config_dir,
@@ -72,6 +82,7 @@ impl LogManagerConfig {
7282
plugin_config_path,
7383
logtype_reload_topic,
7484
logfile_request_topic,
85+
log_metadata_sync_topics,
7586
sudo_enabled: true,
7687
})
7788
}

0 commit comments

Comments
 (0)