Skip to content
Merged
1 change: 1 addition & 0 deletions .github/workflows/semantic.yml
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ jobs:
kubernetes_logs source
logstash source
mongodb_metrics source
mqtt source
nats source
new source
nginx_metrics source
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
The `mqtt` source config field `topic` can now be a list of mqtt topic strings instead of just a string. If a list is provided, the `mqtt` source client will subscribe to all the topics.

authors: december1981
10 changes: 5 additions & 5 deletions src/sources/mqtt/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::{
TlsSnafu,
},
config::{SourceConfig, SourceContext, SourceOutput},
serde::{default_decoding, default_framing_message_based},
serde::{OneOrMany, default_decoding, default_framing_message_based},
};

use super::source::MqttSource;
Expand All @@ -34,11 +34,11 @@ pub struct MqttSourceConfig {
#[serde(flatten)]
pub common: MqttCommonConfig,

/// MQTT topic from which messages are to be read.
/// MQTT topic or topics from which messages are to be read.
#[configurable(derived)]
#[serde(default = "default_topic")]
#[derivative(Default(value = "default_topic()"))]
pub topic: String,
pub topic: OneOrMany<String>,

#[configurable(derived)]
#[serde(default = "default_framing_message_based")]
Expand All @@ -65,8 +65,8 @@ pub struct MqttSourceConfig {
pub topic_key: OptionalValuePath,
}

fn default_topic() -> String {
"vector".to_owned()
fn default_topic() -> OneOrMany<String> {
OneOrMany::One("vector".into())
}

fn default_topic_key() -> OptionalValuePath {
Expand Down
26 changes: 21 additions & 5 deletions src/sources/mqtt/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@
event::BatchNotifier,
event::Event,
internal_events::{EndpointBytesReceived, StreamClosedError},
serde::OneOrMany,
shutdown::ShutdownSignal,
sources::mqtt::MqttSourceConfig,
sources::util,
};
use rumqttc::{Event as MqttEvent, Incoming, Publish, QoS};
use rumqttc::{Event as MqttEvent, Incoming, Publish, QoS, SubscribeFilter};
use vector_lib::config::LegacyKey;
use vector_lib::lookup::path;

Expand Down Expand Up @@ -42,10 +43,25 @@
pub async fn run(self, mut out: SourceSender, shutdown: ShutdownSignal) -> Result<(), ()> {
let (client, mut connection) = self.connector.connect();

client
.subscribe(&self.config.topic, QoS::AtLeastOnce)
.await
.map_err(|_| ())?;
match &self.config.topic {
OneOrMany::One(topic) => {
client
.subscribe(topic, QoS::AtLeastOnce)
.await
.map_err(|_| ())?;
}
OneOrMany::Many(topics) => {
client
.subscribe_many(
topics
.into_iter()

Check failure on line 57 in src/sources/mqtt/source.rs

View workflow job for this annotation

GitHub Actions / Check clippy

this `.into_iter()` call is equivalent to `.iter()` and will not consume the `Vec`
.cloned()
.map(|topic| SubscribeFilter::new(topic, QoS::AtLeastOnce)),
)
.await
.map_err(|_| ())?;
}
}

loop {
tokio::select! {
Expand Down
Loading