Skip to content
Merged
1 change: 1 addition & 0 deletions .github/workflows/semantic.yml
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ jobs:
kubernetes_logs source
logstash source
mongodb_metrics source
mqtt source
nats source
new source
nginx_metrics source
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
The `mqtt` source config field `topic` can now be a list of mqtt topic strings instead of just a string. If a list is provided, the `mqtt` source client will subscribe to all the topics.

authors: december1981
10 changes: 5 additions & 5 deletions src/sources/mqtt/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::{
TlsSnafu,
},
config::{SourceConfig, SourceContext, SourceOutput},
serde::{default_decoding, default_framing_message_based},
serde::{OneOrMany, default_decoding, default_framing_message_based},
};

/// Configuration for the `mqtt` source.
Expand All @@ -32,11 +32,11 @@ pub struct MqttSourceConfig {
#[serde(flatten)]
pub common: MqttCommonConfig,

/// MQTT topic from which messages are to be read.
/// MQTT topic or topics from which messages are to be read.
#[configurable(derived)]
#[serde(default = "default_topic")]
#[derivative(Default(value = "default_topic()"))]
pub topic: String,
pub topic: OneOrMany<String>,

#[configurable(derived)]
#[serde(default = "default_framing_message_based")]
Expand All @@ -63,8 +63,8 @@ pub struct MqttSourceConfig {
pub topic_key: OptionalValuePath,
}

fn default_topic() -> String {
"vector".to_owned()
fn default_topic() -> OneOrMany<String> {
OneOrMany::One("vector".into())
}

fn default_topic_key() -> OptionalValuePath {
Expand Down
75 changes: 71 additions & 4 deletions src/sources/mqtt/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ use futures::StreamExt;
use rumqttc::{AsyncClient, MqttOptions, QoS};
use tokio::time::timeout;

use super::MqttSourceConfig;
use crate::{
SourceSender,
common::mqtt::MqttCommonConfig,
config::{SourceConfig, SourceContext, log_schema},
event::Event,
serde::OneOrMany,
sources::mqtt::MqttSourceConfig,
test_util::{
components::{SOURCE_TAGS, assert_source_compliance},
random_lines_with_stream, random_string, trace_init,
Expand Down Expand Up @@ -59,13 +60,13 @@ async fn get_mqtt_client() -> AsyncClient {
}

#[tokio::test]
async fn mqtt_happy() {
async fn mqtt_one_topic_happy() {
trace_init();
let topic = "source-test";
// We always want new client ID. If it were stable, subsequent tests could receive data sent in previous runs.
let client_id = format!("sourceTest{}", random_string(6));
let num_events = 10;
let (input, _events) = random_lines_with_stream(100, num_events, None);
let (input, ..) = random_lines_with_stream(100, num_events, None);

assert_source_compliance(&SOURCE_TAGS, async {
let common = MqttCommonConfig {
Expand All @@ -77,7 +78,7 @@ async fn mqtt_happy() {

let config = MqttSourceConfig {
common,
topic: topic.to_owned(),
topic: OneOrMany::One(topic.to_owned()),
..MqttSourceConfig::default()
};

Expand Down Expand Up @@ -116,3 +117,69 @@ async fn mqtt_happy() {
})
.await;
}

#[tokio::test]
async fn mqtt_many_topics_happy() {
trace_init();
let topic_prefix_1 = "source-prefix-1";
let topic_prefix_2 = "source-prefix-2";
// We always want new client ID. If it were stable, subsequent tests could receive data sent in previous runs.
let client_id = format!("sourceTest{}", random_string(6));
let num_events = 10;
let (input_1, ..) = random_lines_with_stream(100, num_events, None);
let (input_2, ..) = random_lines_with_stream(100, num_events, None);

assert_source_compliance(&SOURCE_TAGS, async {
let common = MqttCommonConfig {
host: mqtt_broker_address(),
port: mqtt_broker_port(),
client_id: Some(client_id),
..Default::default()
};

let config = MqttSourceConfig {
common,
topic: OneOrMany::Many(vec![
format!("{topic_prefix_1}/#"),
format!("{topic_prefix_2}/#"),
]),
..MqttSourceConfig::default()
};

let (tx, rx) = SourceSender::new_test();
tokio::spawn(async move {
config
.build(SourceContext::new_test(tx, None))
.await
.unwrap()
.await
.unwrap()
});

tokio::time::sleep(Duration::from_millis(100)).await;

let client = get_mqtt_client().await;
send_test_events(&client, &format!("{topic_prefix_1}/test"), &input_1).await;
send_test_events(&client, &format!("{topic_prefix_2}/test"), &input_2).await;

let mut expected_messages: HashSet<_> =
input_1.into_iter().chain(input_2.into_iter()).collect();

let events: Vec<Event> = timeout(Duration::from_secs(2), rx.take(num_events * 2).collect())
.await
.unwrap();

for event in events {
let message = event
.as_log()
.get(log_schema().message_key_target_path().unwrap())
.unwrap()
.to_string_lossy();
if !expected_messages.remove(message.as_ref()) {
panic!("Received unexpected message: {message:?}");
}
}
assert!(expected_messages.is_empty());
})
.await;
}
26 changes: 21 additions & 5 deletions src/sources/mqtt/source.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use itertools::Itertools;
use rumqttc::{Event as MqttEvent, Incoming, Publish, QoS};
use rumqttc::{Event as MqttEvent, Incoming, Publish, QoS, SubscribeFilter};
use vector_lib::{
config::{LegacyKey, LogNamespace},
internal_event::EventsReceived,
Expand All @@ -12,6 +12,7 @@ use crate::{
common::mqtt::MqttConnector,
event::{BatchNotifier, Event},
internal_events::{EndpointBytesReceived, StreamClosedError},
serde::OneOrMany,
shutdown::ShutdownSignal,
sources::{mqtt::MqttSourceConfig, util},
};
Expand Down Expand Up @@ -41,10 +42,25 @@ impl MqttSource {
pub async fn run(self, mut out: SourceSender, shutdown: ShutdownSignal) -> Result<(), ()> {
let (client, mut connection) = self.connector.connect();

client
.subscribe(&self.config.topic, QoS::AtLeastOnce)
.await
.map_err(|_| ())?;
match &self.config.topic {
OneOrMany::One(topic) => {
client
.subscribe(topic, QoS::AtLeastOnce)
.await
.map_err(|_| ())?;
}
OneOrMany::Many(topics) => {
client
.subscribe_many(
topics
.iter()
.cloned()
.map(|topic| SubscribeFilter::new(topic, QoS::AtLeastOnce)),
)
.await
.map_err(|_| ())?;
}
}

loop {
tokio::select! {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,7 @@ generated: components: sources: mqtt: configuration: {
}
}
topic: {
description: "MQTT topic from which messages are to be read."
description: "MQTT topic or topics from which messages are to be read."
required: false
type: string: default: "vector"
}
Expand Down
Loading