Skip to content

Commit ef07467

Browse files
committed
feat: log_upload sync signal support for agent
1 parent 7c44d81 commit ef07467

File tree

9 files changed

+111
-32
lines changed

9 files changed

+111
-32
lines changed

crates/core/tedge_agent/src/agent.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,9 @@ use tedge_api::entity_store::EntityRegistrationMessage;
4747
use tedge_api::mqtt_topics::DeviceTopicId;
4848
use tedge_api::mqtt_topics::EntityTopicId;
4949
use tedge_api::mqtt_topics::MqttSchema;
50+
use tedge_api::mqtt_topics::OperationType;
5051
use tedge_api::mqtt_topics::Service;
52+
use tedge_api::mqtt_topics::ServiceTopicId;
5153
use tedge_api::path::DataDir;
5254
use tedge_api::EntityStore;
5355
use tedge_config::tedge_toml::TEdgeConfigReaderService;
@@ -90,6 +92,7 @@ pub(crate) struct AgentConfig {
9092
pub state_dir: Utf8PathBuf,
9193
pub operations_dir: Utf8PathBuf,
9294
pub mqtt_device_topic_id: EntityTopicId,
95+
pub service_topic_id: ServiceTopicId,
9396
pub mqtt_topic_root: Arc<str>,
9497
pub tedge_http_host: Arc<str>,
9598
pub service: TEdgeConfigReaderService,
@@ -116,11 +119,13 @@ impl AgentConfig {
116119
.mqtt_topic_root
117120
.unwrap_or(tedge_config.mqtt.topic_root.clone().into());
118121

119-
let mqtt_device_topic_id = cliopts
122+
let mqtt_device_topic_id: EntityTopicId = cliopts
120123
.mqtt_device_topic_id
121124
.unwrap_or(tedge_config.mqtt.device_topic_id.clone().into())
122125
.parse()
123126
.context("Could not parse the device MQTT topic")?;
127+
let service_topic_id = mqtt_device_topic_id.to_default_service_topic_id("tedge-agent")
128+
.with_context(|| format!("Device topic id {} currently needs default scheme, e.g: 'device/DEVICE_NAME//'", mqtt_device_topic_id))?;
124129

125130
let mqtt_session_name = format!("{TEDGE_AGENT}#{mqtt_topic_root}/{mqtt_device_topic_id}");
126131

@@ -157,6 +162,7 @@ impl AgentConfig {
157162
let operation_config = OperationConfig::from_tedge_config(
158163
mqtt_topic_root.to_string(),
159164
&mqtt_device_topic_id,
165+
service_topic_id.clone().into(),
160166
&tedge_config,
161167
)
162168
.await?;
@@ -213,6 +219,7 @@ impl AgentConfig {
213219
state_dir,
214220
mqtt_topic_root,
215221
mqtt_device_topic_id,
222+
service_topic_id,
216223
tedge_http_host,
217224
identity,
218225
cloud_root_certs,
@@ -307,8 +314,7 @@ impl Agent {
307314
// Health actor
308315
// TODO: take a user-configurable service topic id
309316
let device_topic_id = self.config.mqtt_device_topic_id.clone();
310-
let service_topic_id = device_topic_id.to_default_service_topic_id("tedge-agent")
311-
.with_context(|| format!("Device topic id {} currently needs default scheme, e.g: 'device/DEVICE_NAME//'", device_topic_id))?;
317+
let service_topic_id = self.config.service_topic_id.clone();
312318
let service = Service {
313319
service_topic_id: service_topic_id.clone(),
314320
device_topic_id: DeviceTopicId::new(device_topic_id.clone()),
@@ -384,7 +390,7 @@ impl Agent {
384390
)
385391
.await?;
386392
converter_actor_builder.register_builtin_operation(&mut log_actor);
387-
converter_actor_builder.register_sync_signal_sink(&log_actor);
393+
converter_actor_builder.register_sync_signal_sink(OperationType::LogUpload, &log_actor);
388394
Some(log_actor)
389395
} else {
390396
None

crates/core/tedge_agent/src/operation_workflows/actor.rs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ impl WorkflowActor {
154154
}
155155
_ => {
156156
error!("Unsupported channel: {}", channel);
157-
return Ok(());
157+
Ok(())
158158
}
159159
}
160160
}
@@ -168,13 +168,19 @@ impl WorkflowActor {
168168
SignalType::Sync => {
169169
info!("Received sync signal, requesting all builtin actors to sync");
170170
self.sync_signal_dispatcher.sync_all().await?;
171-
Ok(())
171+
}
172+
SignalType::SyncOperation(operation) => {
173+
info!(
174+
"Received sync signal for {}, requesting the corresponding actor to sync",
175+
operation
176+
);
177+
self.sync_signal_dispatcher.sync(operation).await?;
172178
}
173179
SignalType::Custom(_) => {
174180
// Custom signal types are not handled yet
175-
Ok(())
176181
}
177182
}
183+
Ok(())
178184
}
179185

180186
async fn process_command_message(
@@ -469,7 +475,7 @@ impl WorkflowActor {
469475
new_state: GenericCommandState,
470476
) -> Result<(), RuntimeError> {
471477
if new_state.is_finished() {
472-
self.sync_dependent_actors(&new_state).await?;
478+
self.sync_listener_actors(&new_state).await?;
473479
self.finalize_builtin_command_update(new_state).await?;
474480

475481
Ok(())
@@ -502,13 +508,13 @@ impl WorkflowActor {
502508
self.process_command_update(adapted_state).await
503509
}
504510

505-
async fn sync_dependent_actors(
511+
async fn sync_listener_actors(
506512
&mut self,
507513
command: &GenericCommandState,
508514
) -> Result<(), RuntimeError> {
509515
if let Some(command) = command.operation() {
510516
self.sync_signal_dispatcher
511-
.send(command.as_str().into())
517+
.sync_listener(command.as_str().into())
512518
.await?;
513519
}
514520
Ok(())

crates/core/tedge_agent/src/operation_workflows/builder.rs

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use tedge_api::mqtt_topics::ChannelFilter;
2828
use tedge_api::mqtt_topics::EntityFilter;
2929
use tedge_api::mqtt_topics::EntityTopicId;
3030
use tedge_api::mqtt_topics::MqttSchema;
31-
use tedge_api::mqtt_topics::SignalType;
31+
use tedge_api::mqtt_topics::OperationType;
3232
use tedge_api::workflow::GenericCommandData;
3333
use tedge_api::workflow::GenericCommandState;
3434
use tedge_api::workflow::OperationName;
@@ -74,7 +74,11 @@ impl WorkflowActorBuilder {
7474

7575
let mqtt_publisher = mqtt_actor.get_sender();
7676
mqtt_actor.connect_sink(
77-
Self::subscriptions(&config.mqtt_schema, &config.device_topic_id),
77+
Self::subscriptions(
78+
&config.mqtt_schema,
79+
&config.device_topic_id,
80+
&config.service_topic_id,
81+
),
7882
&input_sender,
7983
);
8084
let mqtt_publisher = LoggingSender::new("MqttPublisher".into(), mqtt_publisher);
@@ -111,25 +115,34 @@ impl WorkflowActorBuilder {
111115
}
112116

113117
/// Register an actor to receive sync signals on completion of other commands
114-
pub fn register_sync_signal_sink<OperationActor>(&mut self, actor: &OperationActor)
115-
where
118+
pub fn register_sync_signal_sink<OperationActor>(
119+
&mut self,
120+
op_type: OperationType,
121+
actor: &OperationActor,
122+
) where
116123
OperationActor: MessageSink<CmdMetaSyncSignal> + SyncOnCommand,
117124
{
118125
let sender = actor.get_sender();
126+
self.sync_signal_dispatcher
127+
.register_operation_handler(op_type, sender.sender_clone());
119128
for operation in actor.sync_on_commands() {
120129
self.sync_signal_dispatcher
121-
.register_sync_signal_sender(operation, sender.sender_clone());
130+
.register_operation_listener(operation, sender.sender_clone());
122131
}
123132
}
124133

125-
pub fn subscriptions(mqtt_schema: &MqttSchema, device_topic_id: &EntityTopicId) -> TopicFilter {
134+
pub fn subscriptions(
135+
mqtt_schema: &MqttSchema,
136+
device_topic_id: &EntityTopicId,
137+
service_topic_id: &EntityTopicId,
138+
) -> TopicFilter {
126139
let mut topics = mqtt_schema.topics(
127140
EntityFilter::Entity(device_topic_id),
128141
ChannelFilter::AnyCommand,
129142
);
130143
topics.add_all(mqtt_schema.topics(
131-
EntityFilter::Entity(device_topic_id),
132-
ChannelFilter::Signal(SignalType::Sync),
144+
EntityFilter::Entity(service_topic_id),
145+
ChannelFilter::AnySignal,
133146
));
134147
topics
135148
}

crates/core/tedge_agent/src/operation_workflows/config.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use tedge_config::TEdgeConfig;
77
pub struct OperationConfig {
88
pub mqtt_schema: MqttSchema,
99
pub device_topic_id: EntityTopicId,
10+
pub service_topic_id: EntityTopicId,
1011
pub log_dir: Utf8PathBuf,
1112
pub config_dir: Utf8PathBuf,
1213
pub state_dir: Utf8PathBuf,
@@ -17,13 +18,15 @@ impl OperationConfig {
1718
pub async fn from_tedge_config(
1819
topic_root: String,
1920
device_topic_id: &EntityTopicId,
21+
service_topic_id: EntityTopicId,
2022
tedge_config: &TEdgeConfig,
2123
) -> Result<OperationConfig, tedge_config::TEdgeConfigError> {
2224
let config_dir = tedge_config.root_dir();
2325

2426
Ok(OperationConfig {
2527
mqtt_schema: MqttSchema::with_root(topic_root),
2628
device_topic_id: device_topic_id.clone(),
29+
service_topic_id,
2730
log_dir: tedge_config.logs.path.join("agent"),
2831
config_dir: config_dir.to_owned(),
2932
state_dir: tedge_config.agent.state.path.clone().into(),

crates/core/tedge_agent/src/operation_workflows/message_box.rs

Lines changed: 34 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -49,21 +49,38 @@ impl CommandDispatcher {
4949

5050
#[derive(Default)]
5151
pub(crate) struct SyncSignalDispatcher {
52-
senders: HashMap<OperationType, Vec<DynSender<CmdMetaSyncSignal>>>,
52+
op_handler_senders: HashMap<OperationType, DynSender<CmdMetaSyncSignal>>,
53+
op_listener_senders: HashMap<OperationType, Vec<DynSender<CmdMetaSyncSignal>>>,
5354
}
5455

5556
impl SyncSignalDispatcher {
56-
/// Register where to send sync signals for the given command type
57-
pub(crate) fn register_sync_signal_sender(
57+
/// Register sender for the operation handler to receive sync signals for that operation
58+
pub(crate) fn register_operation_handler(
5859
&mut self,
5960
operation: OperationType,
6061
sender: DynSender<CmdMetaSyncSignal>,
6162
) {
62-
self.senders.entry(operation).or_default().push(sender);
63+
self.op_handler_senders.insert(operation, sender);
6364
}
6465

65-
pub(crate) async fn send(&mut self, operation: OperationType) -> Result<(), ChannelError> {
66-
let Some(senders) = self.senders.get_mut(&operation) else {
66+
/// Register senders for operation listeners that must be notified when the given operation completes
67+
pub(crate) fn register_operation_listener(
68+
&mut self,
69+
operation: OperationType,
70+
sender: DynSender<CmdMetaSyncSignal>,
71+
) {
72+
self.op_listener_senders
73+
.entry(operation)
74+
.or_default()
75+
.push(sender);
76+
}
77+
78+
/// Send sync signal to all registered listeners for the given operation
79+
pub(crate) async fn sync_listener(
80+
&mut self,
81+
operation: OperationType,
82+
) -> Result<(), ChannelError> {
83+
let Some(senders) = self.op_listener_senders.get_mut(&operation) else {
6784
return Ok(());
6885
};
6986
for sender in senders {
@@ -72,12 +89,18 @@ impl SyncSignalDispatcher {
7289
Ok(())
7390
}
7491

75-
/// Send sync signal to all registered actors
92+
/// Send sync signal to the operation handler actor registered for the given operation
93+
pub(crate) async fn sync(&mut self, operation: OperationType) -> Result<(), ChannelError> {
94+
if let Some(sender) = self.op_handler_senders.get_mut(&operation) {
95+
sender.send(()).await?;
96+
};
97+
Ok(())
98+
}
99+
100+
/// Send sync signal to all registered operation handler actors
76101
pub(crate) async fn sync_all(&mut self) -> Result<(), ChannelError> {
77-
for senders in self.senders.values_mut() {
78-
for sender in senders {
79-
sender.send(()).await?;
80-
}
102+
for sender in self.op_handler_senders.values_mut() {
103+
sender.send(()).await?;
81104
}
82105
Ok(())
83106
}

crates/core/tedge_agent/src/operation_workflows/tests.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -361,9 +361,16 @@ async fn spawn_mqtt_operation_converter(device_topic_id: &str) -> Result<TestHan
361361

362362
let tmp_dir = tempfile::TempDir::new().unwrap();
363363
let tmp_path = Utf8Path::from_path(tmp_dir.path()).unwrap();
364+
let device_topic_id = device_topic_id
365+
.parse::<EntityTopicId>()
366+
.expect("Invalid topic id");
367+
let service_topic_id = device_topic_id
368+
.default_service_for_device("tedge-agent")
369+
.expect("Invalid service topic id");
364370
let config = OperationConfig {
365371
mqtt_schema: MqttSchema::new(),
366-
device_topic_id: device_topic_id.parse().expect("Invalid topic id"),
372+
device_topic_id,
373+
service_topic_id,
367374
log_dir: tmp_path.into(),
368375
config_dir: tmp_path.into(),
369376
state_dir: tmp_path.join("running-operations"),

crates/core/tedge_api/src/mqtt_topics.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -798,6 +798,7 @@ impl OperationType {
798798
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
799799
pub enum SignalType {
800800
Sync,
801+
SyncOperation(OperationType),
801802
Custom(String),
802803
}
803804

@@ -833,6 +834,14 @@ impl<'a> From<&'a str> for SignalType {
833834
fn from(s: &'a str) -> SignalType {
834835
match s {
835836
"sync" => SignalType::Sync,
837+
"sync_restart" => SignalType::SyncOperation(OperationType::Restart),
838+
"sync_firmware_update" => SignalType::SyncOperation(OperationType::FirmwareUpdate),
839+
"sync_software_update" => SignalType::SyncOperation(OperationType::SoftwareUpdate),
840+
"sync_software_list" => SignalType::SyncOperation(OperationType::SoftwareList),
841+
"sync_config_update" => SignalType::SyncOperation(OperationType::ConfigUpdate),
842+
"sync_config_snapshot" => SignalType::SyncOperation(OperationType::ConfigSnapshot),
843+
"sync_log_upload" => SignalType::SyncOperation(OperationType::LogUpload),
844+
"sync_device_profile" => SignalType::SyncOperation(OperationType::DeviceProfile),
836845
custom => SignalType::Custom(custom.to_string()),
837846
}
838847
}
@@ -842,6 +851,7 @@ impl Display for SignalType {
842851
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
843852
match self {
844853
SignalType::Sync => write!(f, "sync"),
854+
SignalType::SyncOperation(operation) => write!(f, "sync_{}", operation),
845855
SignalType::Custom(custom) => write!(f, "{custom}"),
846856
}
847857
}

crates/extensions/c8y_mapper_ext/src/signals.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ impl CumulocityConverter {
2626
}
2727
}
2828
}
29-
SignalType::Custom(_) => {}
29+
SignalType::SyncOperation(_) | SignalType::Custom(_) => {}
3030
}
3131

3232
Ok(messages)

tests/RobotFramework/tests/cumulocity/log/log_operation_plugins.robot

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,13 +56,24 @@ Supported log types updated on sync signal
5656
Install Package Using APT haveged
5757

5858
${start_time}= Get Unix Timestamp
59-
Execute Command tedge mqtt pub te/device/main///signal/sync '{}'
59+
Execute Command tedge mqtt pub te/device/main/service/tedge-agent/signal/sync '{}'
6060

6161
Should Have MQTT Messages
6262
... topic=te/device/main///cmd/log_upload
6363
... date_from=${start_time}
6464
... message_contains=haveged::journald
6565

66+
Supported log types updated on sync log_upload signal
67+
Install Package Using APT anacron
68+
69+
${start_time}= Get Unix Timestamp
70+
Execute Command tedge mqtt pub te/device/main/service/tedge-agent/signal/sync_log_upload '{}'
71+
72+
Should Have MQTT Messages
73+
... topic=te/device/main///cmd/log_upload
74+
... date_from=${start_time}
75+
... message_contains=anacron::journald
76+
6677
Log operation journald plugin can return logs for all units
6778
Should Contain Supported Log Types all-units::journald
6879
${start_timestamp}= Get Current Date UTC -1 hours result_format=%Y-%m-%dT%H:%M:%S+0000

0 commit comments

Comments
 (0)