Skip to content

Commit f3d0441

Browse files
authored
GH-8720: Check MQTT topics if not empty strings
Fixes #8720 Validate MQTT topics for empty strings in the channel adapters configuration Use plural names for varargs params **Cherry-pick to `6.1.x` & `6.0.x`**
1 parent 7d4e7e9 commit f3d0441

File tree

3 files changed

+62
-32
lines changed

3 files changed

+62
-32
lines changed

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/AbstractMqttMessageDrivenChannelAdapter.java

Lines changed: 27 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -94,14 +94,22 @@ public AbstractMqttMessageDrivenChannelAdapter(ClientManager<T, C> clientManager
9494
this.clientId = null;
9595
}
9696

97-
private static Map<String, Integer> initTopics(String[] topic) {
98-
Assert.notNull(topic, "'topics' cannot be null");
99-
Assert.noNullElements(topic, "'topics' cannot have null elements");
97+
private static Map<String, Integer> initTopics(String[] topics) {
98+
validateTopics(topics);
10099

101-
return Arrays.stream(topic)
100+
return Arrays.stream(topics)
102101
.collect(Collectors.toMap(Function.identity(), (key) -> 1, (x, y) -> y, LinkedHashMap::new));
103102
}
104103

104+
private static void validateTopics(String[] topics) {
105+
Assert.notNull(topics, "'topics' cannot be null");
106+
Assert.noNullElements(topics, "'topics' cannot have null elements");
107+
108+
for (String topic : topics) {
109+
Assert.hasText(topic, "The topic to subscribe cannot be empty string");
110+
}
111+
}
112+
105113
public void setConverter(MqttMessageConverter converter) {
106114
Assert.notNull(converter, "'converter' cannot be null");
107115
this.converter = converter;
@@ -178,7 +186,7 @@ public String[] getTopic() {
178186

179187
/**
180188
* Set the completion timeout when disconnecting.
181-
* Default {@value #DISCONNECT_COMPLETION_TIMEOUT} milliseconds.
189+
* Default {@value ClientManager#DISCONNECT_COMPLETION_TIMEOUT} milliseconds.
182190
* @param completionTimeout The timeout.
183191
* @since 5.1.10
184192
*/
@@ -256,6 +264,7 @@ protected long getCompletionTimeout() {
256264
*/
257265
@ManagedOperation
258266
public void addTopic(String topic, int qos) {
267+
validateTopics(new String[] {topic});
259268
this.topicLock.lock();
260269
try {
261270
if (this.topics.containsKey(topic)) {
@@ -271,16 +280,16 @@ public void addTopic(String topic, int qos) {
271280

272281
/**
273282
* Add a topic (or topics) to the subscribed list (qos=1).
274-
* @param topic The topics.
275-
* @throws MessagingException if the topic is already in the list.
283+
* @param topics The topics.
284+
* @throws MessagingException if the topics is already in the list.
276285
* @since 4.1
277286
*/
278287
@ManagedOperation
279-
public void addTopic(String... topic) {
280-
Assert.notNull(topic, "'topic' cannot be null");
288+
public void addTopic(String... topics) {
289+
validateTopics(topics);
281290
this.topicLock.lock();
282291
try {
283-
for (String t : topic) {
292+
for (String t : topics) {
284293
addTopic(t, 1);
285294
}
286295
}
@@ -291,25 +300,24 @@ public void addTopic(String... topic) {
291300

292301
/**
293302
* Add topics to the subscribed list.
294-
* @param topic The topics.
303+
* @param topics The topics.
295304
* @param qos The qos for each topic.
296-
* @throws MessagingException if a topic is already in the list.
305+
* @throws MessagingException if a topics is already in the list.
297306
* @since 4.1
298307
*/
299308
@ManagedOperation
300-
public void addTopics(String[] topic, int[] qos) {
301-
Assert.notNull(topic, "'topic' cannot be null.");
302-
Assert.noNullElements(topic, "'topic' cannot contain any null elements.");
303-
Assert.isTrue(topic.length == qos.length, "topic and qos arrays must the be the same length.");
309+
public void addTopics(String[] topics, int[] qos) {
310+
validateTopics(topics);
311+
Assert.isTrue(topics.length == qos.length, "topics and qos arrays must the be the same length.");
304312
this.topicLock.lock();
305313
try {
306-
for (String newTopic : topic) {
314+
for (String newTopic : topics) {
307315
if (this.topics.containsKey(newTopic)) {
308316
throw new MessagingException("Topic '" + newTopic + "' is already subscribed.");
309317
}
310318
}
311-
for (int i = 0; i < topic.length; i++) {
312-
addTopic(topic[i], qos[i]);
319+
for (int i = 0; i < topics.length; i++) {
320+
addTopic(topics[i], qos[i]);
313321
}
314322
}
315323
finally {

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/outbound/AbstractMqttMessageHandler.java

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2022 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,6 +20,7 @@
2020
import java.util.concurrent.locks.Lock;
2121
import java.util.concurrent.locks.ReentrantLock;
2222

23+
import org.springframework.beans.factory.BeanFactory;
2324
import org.springframework.beans.factory.BeanFactoryAware;
2425
import org.springframework.context.ApplicationEventPublisher;
2526
import org.springframework.context.ApplicationEventPublisherAware;
@@ -126,6 +127,7 @@ protected ApplicationEventPublisher getApplicationEventPublisher() {
126127
* @param defaultTopic the default topic.
127128
*/
128129
public void setDefaultTopic(String defaultTopic) {
130+
Assert.hasText(defaultTopic, "'defaultTopic' must not be empty");
129131
this.defaultTopic = defaultTopic;
130132
}
131133

@@ -320,14 +322,17 @@ protected ClientManager<T, C> getClientManager() {
320322
@Override
321323
protected void onInit() {
322324
super.onInit();
323-
if (this.topicProcessor instanceof BeanFactoryAware && getBeanFactory() != null) {
324-
((BeanFactoryAware) this.topicProcessor).setBeanFactory(getBeanFactory());
325-
}
326-
if (this.qosProcessor instanceof BeanFactoryAware && getBeanFactory() != null) {
327-
((BeanFactoryAware) this.qosProcessor).setBeanFactory(getBeanFactory());
328-
}
329-
if (this.retainedProcessor instanceof BeanFactoryAware && getBeanFactory() != null) {
330-
((BeanFactoryAware) this.retainedProcessor).setBeanFactory(getBeanFactory());
325+
BeanFactory beanFactory = getBeanFactory();
326+
if (beanFactory != null) {
327+
if (this.topicProcessor instanceof BeanFactoryAware beanFactoryAware) {
328+
beanFactoryAware.setBeanFactory(beanFactory);
329+
}
330+
if (this.qosProcessor instanceof BeanFactoryAware beanFactoryAware) {
331+
beanFactoryAware.setBeanFactory(beanFactory);
332+
}
333+
if (this.retainedProcessor instanceof BeanFactoryAware beanFactoryAware) {
334+
beanFactoryAware.setBeanFactory(beanFactory);
335+
}
331336
}
332337
}
333338

@@ -358,11 +363,13 @@ public boolean isRunning() {
358363
protected void handleMessageInternal(Message<?> message) {
359364
Object mqttMessage = this.converter.fromMessage(message, Object.class);
360365
String topic = this.topicProcessor.processMessage(message);
361-
if (topic == null && this.defaultTopic == null) {
362-
throw new IllegalStateException(
363-
"No topic could be determined from the message and no default topic defined");
366+
if (topic == null) {
367+
topic = this.defaultTopic;
364368
}
365-
publish(topic == null ? this.defaultTopic : topic, mqttMessage, message);
369+
370+
Assert.state(topic != null, "No topic could be determined from the message and no default topic defined");
371+
372+
publish(topic, mqttMessage, message);
366373
}
367374

368375
protected abstract void publish(String topic, Object mqttMessage, Message<?> message);

spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/MqttAdapterTests.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import org.springframework.integration.channel.QueueChannel;
5555
import org.springframework.integration.handler.MessageProcessor;
5656
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
57+
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
5758
import org.springframework.integration.mqtt.core.Mqttv3ClientManager;
5859
import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent;
5960
import org.springframework.integration.mqtt.event.MqttIntegrationEvent;
@@ -73,6 +74,7 @@
7374
import org.springframework.util.ReflectionUtils;
7475

7576
import static org.assertj.core.api.Assertions.assertThat;
77+
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
7678
import static org.assertj.core.api.Assertions.fail;
7779
import static org.mockito.ArgumentMatchers.any;
7880
import static org.mockito.ArgumentMatchers.anyLong;
@@ -515,6 +517,19 @@ public void testDifferentQos() throws Exception {
515517
verify(client).disconnectForcibly(5_000L);
516518
}
517519

520+
@Test
521+
public void emptyTopicNotAllowed() {
522+
assertThatIllegalArgumentException()
523+
.isThrownBy(() ->
524+
new MqttPahoMessageDrivenChannelAdapter("client_id", mock(MqttPahoClientFactory.class), ""))
525+
.withMessage("The topic to subscribe cannot be empty string");
526+
527+
var adapter = new MqttPahoMessageDrivenChannelAdapter("client_id", mock(MqttPahoClientFactory.class), "topic1");
528+
assertThatIllegalArgumentException()
529+
.isThrownBy(() -> adapter.addTopic(""))
530+
.withMessage("The topic to subscribe cannot be empty string");
531+
}
532+
518533
private MqttPahoMessageDrivenChannelAdapter buildAdapterIn(final IMqttAsyncClient client, Boolean cleanSession)
519534
throws MqttException {
520535

0 commit comments

Comments
 (0)