Skip to content

Commit d336550

Browse files
Merge pull request #3926 from didier-wenzek/feat/flow-enabled-aws-mapper
feat: AWS mapper is configurable using flows
2 parents c976ee7 + ceeb6f5 commit d336550

File tree

14 files changed

+944
-970
lines changed

14 files changed

+944
-970
lines changed

Cargo.lock

Lines changed: 4 additions & 8 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/core/tedge_mapper/src/aws/mapper.rs

Lines changed: 27 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,23 @@
11
use crate::core::component::TEdgeComponent;
22
use crate::core::mapper::start_basic_actors;
33
use crate::core::mqtt::configure_proxy;
4+
use crate::core::mqtt::flows_status_topic;
45
use anyhow::Context;
56
use async_trait::async_trait;
6-
use aws_mapper_ext::converter::AwsConverter;
7-
use clock::WallClock;
8-
use mqtt_channel::TopicFilter;
9-
use tedge_actors::ConvertingActor;
10-
use tedge_actors::MessageSink;
11-
use tedge_actors::MessageSource;
12-
use tedge_actors::NoConfig;
7+
use aws_mapper_ext::AwsConverter;
8+
use tedge_api::mqtt_topics::EntityTopicId;
139
use tedge_api::mqtt_topics::MqttSchema;
1410
use tedge_api::service_health_topic;
1511
use tedge_config::models::TopicPrefix;
16-
use tedge_config::tedge_toml::mapper_config::AwsMapperConfig;
1712
use tedge_config::tedge_toml::mapper_config::AwsMapperSpecificConfig;
1813
use tedge_config::tedge_toml::ProfileName;
1914
use tedge_config::TEdgeConfig;
15+
use tedge_file_system_ext::FsWatchActorBuilder;
16+
use tedge_flows::FlowsMapperBuilder;
2017
use tedge_mqtt_bridge::rumqttc::Transport;
2118
use tedge_mqtt_bridge::BridgeConfig;
2219
use tedge_mqtt_bridge::MqttBridgeActorBuilder;
20+
use tedge_watch_ext::WatchActorBuilder;
2321
use tracing::warn;
2422
use yansi::Paint;
2523

@@ -32,15 +30,16 @@ impl TEdgeComponent for AwsMapper {
3230
async fn start(
3331
&self,
3432
tedge_config: TEdgeConfig,
35-
_config_dir: &tedge_config::Path,
33+
config_dir: &tedge_config::Path,
3634
) -> Result<(), anyhow::Error> {
3735
let aws_config = tedge_config.mapper_config::<AwsMapperSpecificConfig>(&self.profile)?;
3836
let prefix = &aws_config.bridge.topic_prefix;
3937
let aws_mapper_name = format!("tedge-mapper-{prefix}");
4038
let (mut runtime, mut mqtt_actor) =
4139
start_basic_actors(&aws_mapper_name, &tedge_config).await?;
42-
4340
let mqtt_schema = MqttSchema::with_root(tedge_config.mqtt.topic_root.clone());
41+
let service_topic_id = EntityTopicId::default_main_service(&aws_mapper_name)?;
42+
4443
if tedge_config.mqtt.bridge.built_in {
4544
let device_id = aws_config.device.id()?;
4645
let device_topic_id = tedge_config.mqtt.device_topic_id.clone();
@@ -78,37 +77,36 @@ impl TEdgeComponent for AwsMapper {
7877
} else if tedge_config.proxy.address.or_none().is_some() {
7978
warn!("`proxy.address` is configured without the built-in bridge enabled. The bridge MQTT connection to the cloud will {} communicate via the configured proxy.", "not".bold())
8079
}
81-
let clock = Box::new(WallClock);
8280
let aws_converter = AwsConverter::new(
8381
aws_config.cloud_specific.mapper.timestamp,
84-
clock,
85-
mqtt_schema,
82+
&mqtt_schema,
8683
aws_config.cloud_specific.mapper.timestamp_format,
8784
prefix.value().clone(),
8885
aws_config.mapper.mqtt.max_payload_size.0,
86+
aws_config.topics.to_string(),
8987
);
90-
let mut aws_converting_actor = ConvertingActor::builder("AwsConverter", aws_converter);
91-
92-
aws_converting_actor.connect_source(get_topic_filter(&aws_config), &mut mqtt_actor);
93-
aws_converting_actor.connect_sink(NoConfig, &mqtt_actor);
94-
95-
runtime.spawn(aws_converting_actor).await?;
88+
let flows_dir =
89+
tedge_flows::flows_dir(config_dir, "aws", self.profile.as_ref().map(|p| p.as_ref()));
90+
let flows = aws_converter.flow_registry(flows_dir).await?;
91+
let flows_status = flows_status_topic(&mqtt_schema, &service_topic_id);
92+
93+
let mut fs_actor = FsWatchActorBuilder::new();
94+
let mut cmd_watcher_actor = WatchActorBuilder::new();
95+
96+
let mut flows_mapper = FlowsMapperBuilder::try_new(flows, flows_status).await?;
97+
flows_mapper.connect(&mut mqtt_actor);
98+
flows_mapper.connect_fs(&mut fs_actor);
99+
flows_mapper.connect_cmd(&mut cmd_watcher_actor);
100+
101+
runtime.spawn(flows_mapper).await?;
102+
runtime.spawn(fs_actor).await?;
103+
runtime.spawn(cmd_watcher_actor).await?;
96104
runtime.spawn(mqtt_actor).await?;
97105
runtime.run_to_completion().await?;
98106
Ok(())
99107
}
100108
}
101109

102-
fn get_topic_filter(aws_config: &AwsMapperConfig) -> TopicFilter {
103-
let mut topics = TopicFilter::empty();
104-
for topic in aws_config.topics.0.clone() {
105-
if topics.try_add(&topic).is_err() {
106-
warn!("The configured topic '{topic}' is invalid and ignored.");
107-
}
108-
}
109-
topics
110-
}
111-
112110
fn built_in_bridge_rules(
113111
remote_client_id: &str,
114112
topic_prefix: &TopicPrefix,

crates/core/tedge_mapper/src/az/mapper.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use crate::az::SkipMosquittoHealthStatus;
21
use crate::core::component::TEdgeComponent;
32
use crate::core::mapper::start_basic_actors;
43
use crate::core::mqtt::configure_proxy;
@@ -100,7 +99,6 @@ impl TEdgeComponent for AzureMapper {
10099
let flows_dir =
101100
tedge_flows::flows_dir(config_dir, "az", self.profile.as_ref().map(|p| p.as_ref()));
102101
let mut flows = ConnectedFlowRegistry::new(flows_dir);
103-
flows.register_builtin(SkipMosquittoHealthStatus);
104102
flows
105103
.persist_builtin_flow("mea", az_converter.builtin_flow().as_str())
106104
.await?;
Lines changed: 0 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,42 +1 @@
1-
use std::time::SystemTime;
2-
use tedge_flows::ConfigError;
3-
use tedge_flows::FlowError;
4-
use tedge_flows::JsonValue;
5-
use tedge_flows::Message;
6-
use tedge_flows::Transformer;
7-
81
pub mod mapper;
9-
10-
#[derive(Clone, Default)]
11-
struct SkipMosquittoHealthStatus;
12-
13-
impl Transformer for SkipMosquittoHealthStatus {
14-
fn name(&self) -> &str {
15-
"skip-mosquitto-health-status"
16-
}
17-
18-
fn set_config(&mut self, _config: JsonValue) -> Result<(), ConfigError> {
19-
Ok(())
20-
}
21-
22-
fn on_message(
23-
&self,
24-
_timestamp: SystemTime,
25-
message: &Message,
26-
) -> Result<Vec<Message>, FlowError> {
27-
// don't convert mosquitto bridge notification topic
28-
// https://github.com/thin-edge/thin-edge.io/issues/2236
29-
if let [_, _, _, _, _, "status", "health"] =
30-
message.topic.split('/').collect::<Vec<_>>()[..]
31-
{
32-
if message
33-
.payload_str()
34-
.map(|s| s == "0" || s == "1")
35-
.unwrap_or(false)
36-
{
37-
return Ok(vec![]);
38-
}
39-
}
40-
Ok(vec![message.clone()])
41-
}
42-
}

crates/extensions/aws_mapper_ext/Cargo.toml

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,24 +10,21 @@ homepage = { workspace = true }
1010
repository = { workspace = true }
1111

1212
[dependencies]
13-
clock = { workspace = true }
14-
log = { workspace = true }
15-
serde = { workspace = true }
16-
serde_json = { workspace = true }
17-
tedge_actors = { workspace = true }
13+
camino = { workspace = true }
1814
tedge_api = { workspace = true }
1915
tedge_config = { workspace = true }
16+
tedge_flows = { workspace = true }
2017
tedge_mqtt_ext = { workspace = true }
2118
tedge_utils = { workspace = true }
22-
thiserror = { workspace = true }
23-
time = { workspace = true }
2419

2520
[dev-dependencies]
2621
assert-json-diff = { workspace = true }
2722
assert_matches = { workspace = true }
2823
serde = { workspace = true }
2924
serde_json = { workspace = true }
25+
tempfile = { workspace = true }
3026
time = { workspace = true, features = ["macros"] }
27+
tokio = { workspace = true, features = ["test-util"] }
3128

3229
[lints]
3330
workspace = true

0 commit comments

Comments
 (0)