Skip to content
Merged
1 change: 1 addition & 0 deletions .github/workflows/semantic.yml
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ jobs:
kubernetes_logs source
logstash source
mongodb_metrics source
mqtt source
nats source
new source
nginx_metrics source
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
The `mqtt` source config field `topic` can now be a list of mqtt topic strings instead of just a string. If a list is provided, the `mqtt` source client will subscribe to all the topics.

authors: december1981
10 changes: 5 additions & 5 deletions src/sources/mqtt/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::{
TlsSnafu,
},
config::{SourceConfig, SourceContext, SourceOutput},
serde::{default_decoding, default_framing_message_based},
serde::{OneOrMany, default_decoding, default_framing_message_based},
};

use super::source::MqttSource;
Expand All @@ -34,11 +34,11 @@ pub struct MqttSourceConfig {
#[serde(flatten)]
pub common: MqttCommonConfig,

/// MQTT topic from which messages are to be read.
/// MQTT topic or topics from which messages are to be read.
#[configurable(derived)]
#[serde(default = "default_topic")]
#[derivative(Default(value = "default_topic()"))]
pub topic: String,
pub topic: OneOrMany<String>,

#[configurable(derived)]
#[serde(default = "default_framing_message_based")]
Expand All @@ -65,8 +65,8 @@ pub struct MqttSourceConfig {
pub topic_key: OptionalValuePath,
}

fn default_topic() -> String {
"vector".to_owned()
fn default_topic() -> OneOrMany<String> {
OneOrMany::One("vector".into())
}

fn default_topic_key() -> OptionalValuePath {
Expand Down
3 changes: 2 additions & 1 deletion src/sources/mqtt/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#![cfg(test)]

use crate::common::mqtt::MqttCommonConfig;
use crate::serde::OneOrMany;
use crate::test_util::trace_init;
use crate::test_util::{components::SOURCE_TAGS, random_lines_with_stream, random_string};
use rumqttc::{AsyncClient, MqttOptions, QoS};
Expand Down Expand Up @@ -76,7 +77,7 @@ async fn mqtt_happy() {

let config = MqttSourceConfig {
common,
topic: topic.to_owned(),
topic: OneOrMany::One(topic.to_owned()),
..MqttSourceConfig::default()
};

Expand Down
26 changes: 21 additions & 5 deletions src/sources/mqtt/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ use crate::{
event::BatchNotifier,
event::Event,
internal_events::{EndpointBytesReceived, StreamClosedError},
serde::OneOrMany,
shutdown::ShutdownSignal,
sources::mqtt::MqttSourceConfig,
sources::util,
};
use rumqttc::{Event as MqttEvent, Incoming, Publish, QoS};
use rumqttc::{Event as MqttEvent, Incoming, Publish, QoS, SubscribeFilter};
use vector_lib::config::LegacyKey;
use vector_lib::lookup::path;

Expand Down Expand Up @@ -42,10 +43,25 @@ impl MqttSource {
pub async fn run(self, mut out: SourceSender, shutdown: ShutdownSignal) -> Result<(), ()> {
let (client, mut connection) = self.connector.connect();

client
.subscribe(&self.config.topic, QoS::AtLeastOnce)
.await
.map_err(|_| ())?;
match &self.config.topic {
OneOrMany::One(topic) => {
client
.subscribe(topic, QoS::AtLeastOnce)
.await
.map_err(|_| ())?;
}
OneOrMany::Many(topics) => {
client
.subscribe_many(
topics
.iter()
.cloned()
.map(|topic| SubscribeFilter::new(topic, QoS::AtLeastOnce)),
)
.await
.map_err(|_| ())?;
}
}

loop {
tokio::select! {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,7 @@ generated: components: sources: mqtt: configuration: {
}
}
topic: {
description: "MQTT topic from which messages are to be read."
description: "MQTT topic or topics from which messages are to be read."
required: false
type: string: default: "vector"
}
Expand Down
Loading