Skip to content

Commit a82a41e

Browse files
committed
GH-8720: Check MQTT topics if not empty strings
Fixes #8720 Validate MQTT topics for empty strings in the channel adapters configuration Use plural names for varargs params **Cherry-pick to `6.1.x` & `6.0.x`** # Conflicts: # spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/AbstractMqttMessageDrivenChannelAdapter.java # spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/outbound/AbstractMqttMessageHandler.java # spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/MqttAdapterTests.java
1 parent a2356ec commit a82a41e

File tree

3 files changed

+66
-34
lines changed

3 files changed

+66
-34
lines changed

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/AbstractMqttMessageDrivenChannelAdapter.java

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2022 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -78,12 +78,22 @@ public AbstractMqttMessageDrivenChannelAdapter(@Nullable String url, String clie
7878
Assert.noNullElements(topic, "'topics' cannot have null elements");
7979
this.url = url;
8080
this.clientId = clientId;
81+
validateTopics(topic);
8182
this.topics = new LinkedHashSet<>();
8283
for (String t : topic) {
8384
this.topics.add(new Topic(t, 1));
8485
}
8586
}
8687

88+
private static void validateTopics(String[] topics) {
89+
Assert.notNull(topics, "'topics' cannot be null");
90+
Assert.noNullElements(topics, "'topics' cannot have null elements");
91+
92+
for (String topic : topics) {
93+
Assert.hasText(topic, "The topic to subscribe cannot be empty string");
94+
}
95+
}
96+
8797
public void setConverter(MqttMessageConverter converter) {
8898
Assert.notNull(converter, "'converter' cannot be null");
8999
this.converter = converter;
@@ -207,6 +217,7 @@ protected long getCompletionTimeout() {
207217
*/
208218
@ManagedOperation
209219
public void addTopic(String topic, int qos) {
220+
validateTopics(new String[] {topic});
210221
this.topicLock.lock();
211222
try {
212223
Topic topik = new Topic(topic, qos);
@@ -223,16 +234,16 @@ public void addTopic(String topic, int qos) {
223234

224235
/**
225236
* Add a topic (or topics) to the subscribed list (qos=1).
226-
* @param topic The topics.
227-
* @throws MessagingException if the topic is already in the list.
237+
* @param topics The topics.
238+
* @throws MessagingException if the topics is already in the list.
228239
* @since 4.1
229240
*/
230241
@ManagedOperation
231-
public void addTopic(String... topic) {
232-
Assert.notNull(topic, "'topic' cannot be null");
242+
public void addTopic(String... topics) {
243+
validateTopics(topics);
233244
this.topicLock.lock();
234245
try {
235-
for (String t : topic) {
246+
for (String t : topics) {
236247
addTopic(t, 1);
237248
}
238249
}
@@ -243,25 +254,24 @@ public void addTopic(String... topic) {
243254

244255
/**
245256
* Add topics to the subscribed list.
246-
* @param topic The topics.
257+
* @param topics The topics.
247258
* @param qos The qos for each topic.
248-
* @throws MessagingException if a topic is already in the list.
259+
* @throws MessagingException if a topics is already in the list.
249260
* @since 4.1
250261
*/
251262
@ManagedOperation
252-
public void addTopics(String[] topic, int[] qos) {
253-
Assert.notNull(topic, "'topic' cannot be null.");
254-
Assert.noNullElements(topic, "'topic' cannot contain any null elements.");
255-
Assert.isTrue(topic.length == qos.length, "topic and qos arrays must the be the same length.");
263+
public void addTopics(String[] topics, int[] qos) {
264+
validateTopics(topics);
265+
Assert.isTrue(topics.length == qos.length, "topics and qos arrays must the be the same length.");
256266
this.topicLock.lock();
257267
try {
258-
for (String topik : topic) {
259-
if (this.topics.contains(new Topic(topik, 0))) {
260-
throw new MessagingException("Topic '" + topik + "' is already subscribed.");
268+
for (String topic : topics) {
269+
if (this.topics.contains(new Topic(topic, 0))) {
270+
throw new MessagingException("Topic '" + topic + "' is already subscribed.");
261271
}
262272
}
263-
for (int i = 0; i < topic.length; i++) {
264-
addTopic(topic[i], qos[i]);
273+
for (int i = 0; i < topics.length; i++) {
274+
addTopic(topics[i], qos[i]);
265275
}
266276
}
267277
finally {

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/outbound/AbstractMqttMessageHandler.java

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2021 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,6 +18,7 @@
1818

1919
import java.util.concurrent.atomic.AtomicBoolean;
2020

21+
import org.springframework.beans.factory.BeanFactory;
2122
import org.springframework.beans.factory.BeanFactoryAware;
2223
import org.springframework.context.ApplicationEventPublisher;
2324
import org.springframework.context.ApplicationEventPublisherAware;
@@ -107,6 +108,7 @@ protected ApplicationEventPublisher getApplicationEventPublisher() {
107108
* @param defaultTopic the default topic.
108109
*/
109110
public void setDefaultTopic(String defaultTopic) {
111+
Assert.hasText(defaultTopic, "'defaultTopic' must not be empty");
110112
this.defaultTopic = defaultTopic;
111113
}
112114

@@ -295,14 +297,17 @@ protected long getDisconnectCompletionTimeout() {
295297
@Override
296298
protected void onInit() {
297299
super.onInit();
298-
if (this.topicProcessor instanceof BeanFactoryAware && getBeanFactory() != null) {
299-
((BeanFactoryAware) this.topicProcessor).setBeanFactory(getBeanFactory());
300-
}
301-
if (this.qosProcessor instanceof BeanFactoryAware && getBeanFactory() != null) {
302-
((BeanFactoryAware) this.qosProcessor).setBeanFactory(getBeanFactory());
303-
}
304-
if (this.retainedProcessor instanceof BeanFactoryAware && getBeanFactory() != null) {
305-
((BeanFactoryAware) this.retainedProcessor).setBeanFactory(getBeanFactory());
300+
BeanFactory beanFactory = getBeanFactory();
301+
if (beanFactory != null) {
302+
if (this.topicProcessor instanceof BeanFactoryAware) {
303+
((BeanFactoryAware) this.topicProcessor).setBeanFactory(beanFactory);
304+
}
305+
if (this.qosProcessor instanceof BeanFactoryAware) {
306+
((BeanFactoryAware) this.qosProcessor).setBeanFactory(beanFactory);
307+
}
308+
if (this.retainedProcessor instanceof BeanFactoryAware) {
309+
((BeanFactoryAware) this.retainedProcessor).setBeanFactory(beanFactory);
310+
}
306311
}
307312
}
308313

@@ -333,11 +338,13 @@ public boolean isRunning() {
333338
protected void handleMessageInternal(Message<?> message) {
334339
Object mqttMessage = this.converter.fromMessage(message, Object.class);
335340
String topic = this.topicProcessor.processMessage(message);
336-
if (topic == null && this.defaultTopic == null) {
337-
throw new IllegalStateException(
338-
"No topic could be determined from the message and no default topic defined");
341+
if (topic == null) {
342+
topic = this.defaultTopic;
339343
}
340-
publish(topic == null ? this.defaultTopic : topic, mqttMessage, message);
344+
345+
Assert.state(topic != null, "No topic could be determined from the message and no default topic defined");
346+
347+
publish(topic, mqttMessage, message);
341348
}
342349

343350
protected abstract void publish(String topic, Object mqttMessage, Message<?> message);

spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/MqttAdapterTests.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import org.springframework.integration.channel.QueueChannel;
6363
import org.springframework.integration.handler.MessageProcessor;
6464
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
65+
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
6566
import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent;
6667
import org.springframework.integration.mqtt.event.MqttIntegrationEvent;
6768
import org.springframework.integration.mqtt.event.MqttSubscribedEvent;
@@ -83,6 +84,7 @@
8384

8485
import static org.assertj.core.api.Assertions.assertThat;
8586
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
87+
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
8688
import static org.assertj.core.api.Assertions.fail;
8789
import static org.mockito.ArgumentMatchers.any;
8890
import static org.mockito.ArgumentMatchers.anyLong;
@@ -468,7 +470,7 @@ public void testSubscribeFailure() throws Exception {
468470
willReturn(alwaysComplete).given(aClient).connect(any(MqttConnectOptions.class), any(), any());
469471

470472
IMqttToken token = mock(IMqttToken.class);
471-
given(token.getGrantedQos()).willReturn(new int[]{ 0x80 });
473+
given(token.getGrantedQos()).willReturn(new int[] {0x80});
472474
willReturn(token).given(aClient).subscribe(any(String[].class), any(int[].class), isNull(), isNull(), any());
473475

474476
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("foo", "bar", factory,
@@ -516,7 +518,7 @@ public void testDifferentQos() throws Exception {
516518
willReturn(alwaysComplete).given(aClient).connect(any(MqttConnectOptions.class), any(), any());
517519

518520
IMqttToken token = mock(IMqttToken.class);
519-
given(token.getGrantedQos()).willReturn(new int[]{ 2, 0 });
521+
given(token.getGrantedQos()).willReturn(new int[] {2, 0});
520522
willReturn(token).given(aClient).subscribe(any(String[].class), any(int[].class), isNull(), isNull(), any());
521523

522524
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("foo", "bar", factory,
@@ -543,6 +545,19 @@ public void testDifferentQos() throws Exception {
543545
verify(client).disconnectForcibly(5_000L);
544546
}
545547

548+
@Test
549+
public void emptyTopicNotAllowed() {
550+
assertThatIllegalArgumentException()
551+
.isThrownBy(() ->
552+
new MqttPahoMessageDrivenChannelAdapter("client_id", mock(MqttPahoClientFactory.class), ""))
553+
.withMessage("The topic to subscribe cannot be empty string");
554+
555+
var adapter = new MqttPahoMessageDrivenChannelAdapter("client_id", mock(MqttPahoClientFactory.class), "topic1");
556+
assertThatIllegalArgumentException()
557+
.isThrownBy(() -> adapter.addTopic(""))
558+
.withMessage("The topic to subscribe cannot be empty string");
559+
}
560+
546561
@Test
547562
public void testNoNPEOnReconnectAndStopRaceCondition() throws Exception {
548563
final IMqttClient client = mock(IMqttClient.class);
@@ -597,7 +612,7 @@ public IMqttClient getClientInstance(String uri, String clientId) throws MqttExc
597612

598613
};
599614
MqttConnectOptions connectOptions = new MqttConnectOptions();
600-
connectOptions.setServerURIs(new String[]{ "tcp://localhost:1883" });
615+
connectOptions.setServerURIs(new String[] {"tcp://localhost:1883"});
601616
if (cleanSession != null) {
602617
connectOptions.setCleanSession(cleanSession);
603618
}
@@ -621,7 +636,7 @@ public IMqttAsyncClient getAsyncClientInstance(String uri, String clientId) {
621636

622637
};
623638
MqttConnectOptions connectOptions = new MqttConnectOptions();
624-
connectOptions.setServerURIs(new String[]{ "tcp://localhost:1883" });
639+
connectOptions.setServerURIs(new String[] {"tcp://localhost:1883"});
625640
factory.setConnectionOptions(connectOptions);
626641
MqttPahoMessageHandler adapter = new MqttPahoMessageHandler("client", factory);
627642
adapter.setDefaultTopic("foo");

0 commit comments

Comments
 (0)