Skip to content

Commit 2491098

Browse files
authored
Merge pull request #2739 from fermyon/integrate-mqtt
[Factors] Integrate mqtt in trigger2
2 parents cd64556 + 6ee514d commit 2491098

File tree

7 files changed

+59
-14
lines changed

7 files changed

+59
-14
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/factor-outbound-mqtt/src/host.rs

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,7 @@ use spin_factor_outbound_networking::OutboundAllowedHosts;
66
use spin_world::v2::mqtt::{self as v2, Connection, Error, Qos};
77
use tracing::{instrument, Level};
88

9-
#[async_trait]
10-
pub trait ClientCreator: Send + Sync {
11-
fn create(
12-
&self,
13-
address: String,
14-
username: String,
15-
password: String,
16-
keep_alive_interval: Duration,
17-
) -> Result<Arc<dyn MqttClient>, Error>;
18-
}
9+
use crate::ClientCreator;
1910

2011
pub struct InstanceState {
2112
allowed_hosts: OutboundAllowedHosts,

crates/factor-outbound-mqtt/src/lib.rs

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use spin_factors::{
1515
use spin_world::v2::mqtt::{self as v2, Error, Qos};
1616
use tokio::sync::Mutex;
1717

18-
pub use host::{ClientCreator, MqttClient};
18+
pub use host::MqttClient;
1919

2020
pub struct OutboundMqttFactor {
2121
create_client: Arc<dyn ClientCreator>,
@@ -73,6 +73,19 @@ pub struct NetworkedMqttClient {
7373
const MQTT_CHANNEL_CAP: usize = 1000;
7474

7575
impl NetworkedMqttClient {
76+
/// Create a [`ClientCreator`] that creates a [`NetworkedMqttClient`].
77+
pub fn creator() -> Arc<dyn ClientCreator> {
78+
Arc::new(|address, username, password, keep_alive_interval| {
79+
Ok(Arc::new(NetworkedMqttClient::create(
80+
address,
81+
username,
82+
password,
83+
keep_alive_interval,
84+
)?) as _)
85+
})
86+
}
87+
88+
/// Create a new [`NetworkedMqttClient`] with the given address, username, password, and keep alive interval.
7689
pub fn create(
7790
address: String,
7891
username: String,
@@ -127,3 +140,30 @@ impl MqttClient for NetworkedMqttClient {
127140
Ok(())
128141
}
129142
}
143+
144+
/// A trait for creating MQTT client.
145+
#[async_trait]
146+
pub trait ClientCreator: Send + Sync {
147+
fn create(
148+
&self,
149+
address: String,
150+
username: String,
151+
password: String,
152+
keep_alive_interval: Duration,
153+
) -> Result<Arc<dyn MqttClient>, Error>;
154+
}
155+
156+
impl<F> ClientCreator for F
157+
where
158+
F: Fn(String, String, String, Duration) -> Result<Arc<dyn MqttClient>, Error> + Send + Sync,
159+
{
160+
fn create(
161+
&self,
162+
address: String,
163+
username: String,
164+
password: String,
165+
keep_alive_interval: Duration,
166+
) -> Result<Arc<dyn MqttClient>, Error> {
167+
self(address, username, password, keep_alive_interval)
168+
}
169+
}

crates/runtime-config/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ spin-factor-key-value-spin = { path = "../factor-key-value-spin" }
1616
spin-factor-key-value-redis = { path = "../factor-key-value-redis" }
1717
spin-factor-key-value-azure = { path = "../factor-key-value-azure" }
1818
spin-factor-outbound-http = { path = "../factor-outbound-http" }
19+
spin-factor-outbound-mqtt = { path = "../factor-outbound-mqtt" }
1920
spin-factor-outbound-networking = { path = "../factor-outbound-networking" }
2021
spin-factor-outbound-redis = { path = "../factor-outbound-redis" }
2122
spin-factor-sqlite = { path = "../factor-sqlite" }

crates/runtime-config/src/lib.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use anyhow::Context as _;
44
use spin_factor_key_value::runtime_config::spin::{self as key_value, MakeKeyValueStore};
55
use spin_factor_key_value::{DefaultLabelResolver as _, KeyValueFactor};
66
use spin_factor_outbound_http::OutboundHttpFactor;
7+
use spin_factor_outbound_mqtt::OutboundMqttFactor;
78
use spin_factor_outbound_networking::runtime_config::spin::SpinTlsRuntimeConfig;
89
use spin_factor_outbound_networking::OutboundNetworkingFactor;
910
use spin_factor_outbound_redis::OutboundRedisFactor;
@@ -176,6 +177,12 @@ impl FactorRuntimeConfigSource<OutboundHttpFactor> for TomlRuntimeConfigSource<'
176177
}
177178
}
178179

180+
impl FactorRuntimeConfigSource<OutboundMqttFactor> for TomlRuntimeConfigSource<'_> {
181+
fn get_runtime_config(&mut self) -> anyhow::Result<Option<()>> {
182+
Ok(None)
183+
}
184+
}
185+
179186
impl FactorRuntimeConfigSource<SqliteFactor> for TomlRuntimeConfigSource<'_> {
180187
fn get_runtime_config(&mut self) -> anyhow::Result<Option<spin_factor_sqlite::RuntimeConfig>> {
181188
self.sqlite.resolve_from_toml(self.table.as_ref())

crates/trigger2/Cargo.toml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,14 @@ spin-app = { path = "../app" }
2121
spin-common = { path = "../common" }
2222
spin-componentize = { path = "../componentize" }
2323
spin-core = { path = "../core" }
24+
spin-factor-key-value = { path = "../factor-key-value" }
2425
spin-factor-outbound-http = { path = "../factor-outbound-http" }
26+
spin-factor-outbound-mqtt = { path = "../factor-outbound-mqtt" }
2527
spin-factor-outbound-networking = { path = "../factor-outbound-networking" }
28+
spin-factor-outbound-redis = { path = "../factor-outbound-redis" }
29+
spin-factor-sqlite = { path = "../factor-sqlite" }
2630
spin-factor-variables = { path = "../factor-variables" }
2731
spin-factor-wasi = { path = "../factor-wasi" }
28-
spin-factor-key-value = { path = "../factor-key-value" }
29-
spin-factor-sqlite = { path = "../factor-sqlite" }
30-
spin-factor-outbound-redis = { path = "../factor-outbound-redis" }
3132
spin-factors = { path = "../factors" }
3233
spin-factors-executor = { path = "../factors-executor" }
3334
spin-telemetry = { path = "../telemetry" }

crates/trigger2/src/factors.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use std::path::PathBuf;
22

33
use spin_factor_key_value::KeyValueFactor;
44
use spin_factor_outbound_http::OutboundHttpFactor;
5+
use spin_factor_outbound_mqtt::{NetworkedMqttClient, OutboundMqttFactor};
56
use spin_factor_outbound_networking::OutboundNetworkingFactor;
67
use spin_factor_outbound_redis::OutboundRedisFactor;
78
use spin_factor_sqlite::SqliteFactor;
@@ -19,6 +20,7 @@ pub struct TriggerFactors {
1920
pub outbound_http: OutboundHttpFactor,
2021
pub sqlite: SqliteFactor,
2122
pub redis: OutboundRedisFactor,
23+
pub mqtt: OutboundMqttFactor,
2224
}
2325

2426
impl TriggerFactors {
@@ -36,6 +38,7 @@ impl TriggerFactors {
3638
outbound_http: OutboundHttpFactor::new(),
3739
sqlite: SqliteFactor::new(default_sqlite_label_resolver),
3840
redis: OutboundRedisFactor::new(),
41+
mqtt: OutboundMqttFactor::new(NetworkedMqttClient::creator()),
3942
}
4043
}
4144
}

0 commit comments

Comments
 (0)