Skip to content

Commit 90cf7d0

Browse files
feat(mqtt source): support multiple mqtt source topics (#23670)
* enhancement(file source): support multiple mqtt source topics * chore: fix changelog.d file format * Remove mut + mem::take * Add mqtt source to semantic.yml * Improve changelog and rename file * Fix clippy * Fix type in int test * make generate-component-docs * add integration test for many mqtt source topics * Fix imports from merge --------- Co-authored-by: Thomas <[email protected]>
1 parent 2af657d commit 90cf7d0

File tree

6 files changed

+102
-15
lines changed

6 files changed

+102
-15
lines changed

.github/workflows/semantic.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,7 @@ jobs:
159159
kubernetes_logs source
160160
logstash source
161161
mongodb_metrics source
162+
mqtt source
162163
nats source
163164
new source
164165
nginx_metrics source
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
The `mqtt` source config field `topic` can now be a list of mqtt topic strings instead of just a string. If a list is provided, the `mqtt` source client will subscribe to all the topics.
2+
3+
authors: december1981

src/sources/mqtt/config.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use crate::{
2020
TlsSnafu,
2121
},
2222
config::{SourceConfig, SourceContext, SourceOutput},
23-
serde::{default_decoding, default_framing_message_based},
23+
serde::{OneOrMany, default_decoding, default_framing_message_based},
2424
};
2525

2626
/// Configuration for the `mqtt` source.
@@ -32,11 +32,11 @@ pub struct MqttSourceConfig {
3232
#[serde(flatten)]
3333
pub common: MqttCommonConfig,
3434

35-
/// MQTT topic from which messages are to be read.
35+
/// MQTT topic or topics from which messages are to be read.
3636
#[configurable(derived)]
3737
#[serde(default = "default_topic")]
3838
#[derivative(Default(value = "default_topic()"))]
39-
pub topic: String,
39+
pub topic: OneOrMany<String>,
4040

4141
#[configurable(derived)]
4242
#[serde(default = "default_framing_message_based")]
@@ -63,8 +63,8 @@ pub struct MqttSourceConfig {
6363
pub topic_key: OptionalValuePath,
6464
}
6565

66-
fn default_topic() -> String {
67-
"vector".to_owned()
66+
fn default_topic() -> OneOrMany<String> {
67+
OneOrMany::One("vector".into())
6868
}
6969

7070
fn default_topic_key() -> OptionalValuePath {

src/sources/mqtt/integration_tests.rs

Lines changed: 71 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,13 @@ use futures::StreamExt;
77
use rumqttc::{AsyncClient, MqttOptions, QoS};
88
use tokio::time::timeout;
99

10-
use super::MqttSourceConfig;
1110
use crate::{
1211
SourceSender,
1312
common::mqtt::MqttCommonConfig,
1413
config::{SourceConfig, SourceContext, log_schema},
1514
event::Event,
15+
serde::OneOrMany,
16+
sources::mqtt::MqttSourceConfig,
1617
test_util::{
1718
components::{SOURCE_TAGS, assert_source_compliance},
1819
random_lines_with_stream, random_string, trace_init,
@@ -59,13 +60,13 @@ async fn get_mqtt_client() -> AsyncClient {
5960
}
6061

6162
#[tokio::test]
62-
async fn mqtt_happy() {
63+
async fn mqtt_one_topic_happy() {
6364
trace_init();
6465
let topic = "source-test";
6566
// We always want new client ID. If it were stable, subsequent tests could receive data sent in previous runs.
6667
let client_id = format!("sourceTest{}", random_string(6));
6768
let num_events = 10;
68-
let (input, _events) = random_lines_with_stream(100, num_events, None);
69+
let (input, ..) = random_lines_with_stream(100, num_events, None);
6970

7071
assert_source_compliance(&SOURCE_TAGS, async {
7172
let common = MqttCommonConfig {
@@ -77,7 +78,7 @@ async fn mqtt_happy() {
7778

7879
let config = MqttSourceConfig {
7980
common,
80-
topic: topic.to_owned(),
81+
topic: OneOrMany::One(topic.to_owned()),
8182
..MqttSourceConfig::default()
8283
};
8384

@@ -116,3 +117,69 @@ async fn mqtt_happy() {
116117
})
117118
.await;
118119
}
120+
121+
#[tokio::test]
122+
async fn mqtt_many_topics_happy() {
123+
trace_init();
124+
let topic_prefix_1 = "source-prefix-1";
125+
let topic_prefix_2 = "source-prefix-2";
126+
// We always want new client ID. If it were stable, subsequent tests could receive data sent in previous runs.
127+
let client_id = format!("sourceTest{}", random_string(6));
128+
let num_events = 10;
129+
let (input_1, ..) = random_lines_with_stream(100, num_events, None);
130+
let (input_2, ..) = random_lines_with_stream(100, num_events, None);
131+
132+
assert_source_compliance(&SOURCE_TAGS, async {
133+
let common = MqttCommonConfig {
134+
host: mqtt_broker_address(),
135+
port: mqtt_broker_port(),
136+
client_id: Some(client_id),
137+
..Default::default()
138+
};
139+
140+
let config = MqttSourceConfig {
141+
common,
142+
topic: OneOrMany::Many(vec![
143+
format!("{topic_prefix_1}/#"),
144+
format!("{topic_prefix_2}/#"),
145+
]),
146+
..MqttSourceConfig::default()
147+
};
148+
149+
let (tx, rx) = SourceSender::new_test();
150+
tokio::spawn(async move {
151+
config
152+
.build(SourceContext::new_test(tx, None))
153+
.await
154+
.unwrap()
155+
.await
156+
.unwrap()
157+
});
158+
159+
tokio::time::sleep(Duration::from_millis(100)).await;
160+
161+
let client = get_mqtt_client().await;
162+
send_test_events(&client, &format!("{topic_prefix_1}/test"), &input_1).await;
163+
send_test_events(&client, &format!("{topic_prefix_2}/test"), &input_2).await;
164+
165+
let mut expected_messages: HashSet<_> =
166+
input_1.into_iter().chain(input_2.into_iter()).collect();
167+
168+
let events: Vec<Event> = timeout(Duration::from_secs(2), rx.take(num_events * 2).collect())
169+
.await
170+
.unwrap();
171+
172+
for event in events {
173+
let message = event
174+
.as_log()
175+
.get(log_schema().message_key_target_path().unwrap())
176+
.unwrap()
177+
.to_string_lossy();
178+
if !expected_messages.remove(message.as_ref()) {
179+
panic!("Received unexpected message: {message:?}");
180+
}
181+
}
182+
assert!(expected_messages.is_empty());
183+
})
184+
.await;
185+
}

src/sources/mqtt/source.rs

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use itertools::Itertools;
2-
use rumqttc::{Event as MqttEvent, Incoming, Publish, QoS};
2+
use rumqttc::{Event as MqttEvent, Incoming, Publish, QoS, SubscribeFilter};
33
use vector_lib::{
44
config::{LegacyKey, LogNamespace},
55
internal_event::EventsReceived,
@@ -12,6 +12,7 @@ use crate::{
1212
common::mqtt::MqttConnector,
1313
event::{BatchNotifier, Event},
1414
internal_events::{EndpointBytesReceived, StreamClosedError},
15+
serde::OneOrMany,
1516
shutdown::ShutdownSignal,
1617
sources::{mqtt::MqttSourceConfig, util},
1718
};
@@ -41,10 +42,25 @@ impl MqttSource {
4142
pub async fn run(self, mut out: SourceSender, shutdown: ShutdownSignal) -> Result<(), ()> {
4243
let (client, mut connection) = self.connector.connect();
4344

44-
client
45-
.subscribe(&self.config.topic, QoS::AtLeastOnce)
46-
.await
47-
.map_err(|_| ())?;
45+
match &self.config.topic {
46+
OneOrMany::One(topic) => {
47+
client
48+
.subscribe(topic, QoS::AtLeastOnce)
49+
.await
50+
.map_err(|_| ())?;
51+
}
52+
OneOrMany::Many(topics) => {
53+
client
54+
.subscribe_many(
55+
topics
56+
.iter()
57+
.cloned()
58+
.map(|topic| SubscribeFilter::new(topic, QoS::AtLeastOnce)),
59+
)
60+
.await
61+
.map_err(|_| ())?;
62+
}
63+
}
4864

4965
loop {
5066
tokio::select! {

website/cue/reference/components/sources/generated/mqtt.cue

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -639,7 +639,7 @@ generated: components: sources: mqtt: configuration: {
639639
}
640640
}
641641
topic: {
642-
description: "MQTT topic from which messages are to be read."
642+
description: "MQTT topic or topics from which messages are to be read."
643643
required: false
644644
type: string: default: "vector"
645645
}

0 commit comments

Comments
 (0)