Skip to content

Commit 71d4175

Browse files
committed
GH-3454: From MQTT conversion error - to error ch (#3456)
* GH-3454: From MQTT conversion error - to error ch Fixes #3454 The message converter may return null when we try to covert from the MQTT message. The thrown exception may also reset the client connect. * Fix `MqttPahoMessageDrivenChannelAdapter` to catch any conversion errors (including `null` result) and try to send an `ErrorMessage` with that info into the provided `errorChannel`. Otherwise re-throw it as as **Cherry-pick to `5.4.x` & `5.3.x`** * * Apply review language-specific changes # Conflicts: # spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/MqttPahoMessageDrivenChannelAdapter.java # spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/MqttAdapterTests.java # src/reference/asciidoc/mqtt.adoc
1 parent b5ef2f6 commit 71d4175

File tree

3 files changed

+96
-27
lines changed

3 files changed

+96
-27
lines changed

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/MqttPahoMessageDrivenChannelAdapter.java

Lines changed: 44 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2020 the original author or authors.
2+
* Copyright 2002-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -37,6 +37,8 @@
3737
import org.springframework.integration.mqtt.event.MqttSubscribedEvent;
3838
import org.springframework.messaging.Message;
3939
import org.springframework.messaging.MessagingException;
40+
import org.springframework.messaging.converter.MessageConversionException;
41+
import org.springframework.messaging.support.GenericMessage;
4042
import org.springframework.util.Assert;
4143

4244
/**
@@ -169,7 +171,7 @@ protected void doStart() {
169171
}
170172
catch (Exception e) {
171173
logger.error("Exception while connecting and subscribing, retrying", e);
172-
this.scheduleReconnect();
174+
scheduleReconnect();
173175
}
174176
}
175177

@@ -181,7 +183,7 @@ protected synchronized void doStop() {
181183
try {
182184
if (this.consumerStopAction.equals(ConsumerStopAction.UNSUBSCRIBE_ALWAYS)
183185
|| (this.consumerStopAction.equals(ConsumerStopAction.UNSUBSCRIBE_CLEAN)
184-
&& this.cleanSession)) {
186+
&& this.cleanSession)) {
185187

186188
this.client.unsubscribe(getTopic());
187189
}
@@ -237,7 +239,7 @@ public void removeTopic(String... topic) {
237239
super.removeTopic(topic);
238240
}
239241
catch (MqttException e) {
240-
throw new MessagingException("Failed to unsubscribe from topic " + Arrays.asList(topic), e);
242+
throw new MessagingException("Failed to unsubscribe from topic(s) " + Arrays.toString(topic), e);
241243
}
242244
finally {
243245
this.topicLock.unlock();
@@ -365,14 +367,47 @@ public synchronized void connectionLost(Throwable cause) {
365367

366368
@Override
367369
public void messageArrived(String topic, MqttMessage mqttMessage) {
368-
Message<?> message = this.getConverter().toMessage(topic, mqttMessage);
370+
Message<?> message = toMessage(topic, mqttMessage);
371+
if (message != null) {
372+
try {
373+
sendMessage(message);
374+
}
375+
catch (RuntimeException e) {
376+
logger.error("Unhandled exception for " + message.toString(), e);
377+
throw e;
378+
}
379+
}
380+
}
381+
382+
private Message<?> toMessage(String topic, MqttMessage mqttMessage) {
383+
Message<?> message = null;
384+
RuntimeException conversionError = null;
369385
try {
370-
sendMessage(message);
386+
message = getConverter().toMessage(topic, mqttMessage);
371387
}
372-
catch (RuntimeException e) {
373-
logger.error("Unhandled exception for " + message.toString(), e);
374-
throw e;
388+
catch (RuntimeException ex) {
389+
conversionError = ex;
390+
}
391+
392+
if (message == null && conversionError == null) {
393+
conversionError = new IllegalStateException("'MqttMessageConverter' returned 'null'");
394+
}
395+
396+
if (conversionError != null) {
397+
GenericMessage<MqttMessage> failedMessage = new GenericMessage<>(mqttMessage);
398+
if (!sendErrorMessageIfNecessary(failedMessage, conversionError)) {
399+
MessageConversionException conversionException;
400+
if (conversionError instanceof MessageConversionException) {
401+
conversionException = (MessageConversionException) conversionError;
402+
}
403+
else {
404+
conversionException = new MessageConversionException(failedMessage,
405+
"Failed to convert from MQTT Message", conversionError);
406+
}
407+
throw conversionException;
408+
}
375409
}
410+
return message;
376411
}
377412

378413
@Override

spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/MqttAdapterTests.java

Lines changed: 49 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2020 the original author or authors.
2+
* Copyright 2002-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -85,9 +85,12 @@
8585
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
8686
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
8787
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
88+
import org.springframework.integration.mqtt.support.MqttMessageConverter;
8889
import org.springframework.integration.test.util.TestUtils;
8990
import org.springframework.messaging.Message;
9091
import org.springframework.messaging.MessageHandlingException;
92+
import org.springframework.messaging.MessageHeaders;
93+
import org.springframework.messaging.support.ErrorMessage;
9194
import org.springframework.messaging.support.GenericMessage;
9295
import org.springframework.scheduling.TaskScheduler;
9396
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
@@ -102,7 +105,7 @@
102105
*/
103106
public class MqttAdapterTests {
104107

105-
private IMqttToken alwaysComplete;
108+
private final IMqttToken alwaysComplete;
106109

107110
{
108111
ProxyFactoryBean pfb = new ProxyFactoryBean();
@@ -194,12 +197,12 @@ public void testOutboundOptionsApplied() throws Exception {
194197
return deliveryToken;
195198
}).given(client).publish(anyString(), any(MqttMessage.class));
196199

197-
handler.handleMessage(new GenericMessage<String>("Hello, world!"));
200+
handler.handleMessage(new GenericMessage<>("Hello, world!"));
198201

199202
verify(client, times(1)).connect(any(MqttConnectOptions.class));
200203
assertThat(connectCalled.get()).isTrue();
201204
AtomicReference<Object> failed = new AtomicReference<>();
202-
handler.setApplicationEventPublisher(event -> failed.set(event));
205+
handler.setApplicationEventPublisher(failed::set);
203206
handler.connectionLost(new IllegalStateException());
204207
assertThat(failed.get()).isInstanceOf(MqttConnectionFailedEvent.class);
205208
handler.stop();
@@ -254,7 +257,7 @@ public void testInboundOptionsApplied() throws Exception {
254257
return null;
255258
}).given(client).connect(any(MqttConnectOptions.class));
256259

257-
final AtomicReference<MqttCallback> callback = new AtomicReference<MqttCallback>();
260+
final AtomicReference<MqttCallback> callback = new AtomicReference<>();
258261
willAnswer(invocation -> {
259262
callback.set(invocation.getArgument(0));
260263
return null;
@@ -266,12 +269,14 @@ public void testInboundOptionsApplied() throws Exception {
266269
"baz", "fix");
267270
QueueChannel outputChannel = new QueueChannel();
268271
adapter.setOutputChannel(outputChannel);
272+
QueueChannel errorChannel = new QueueChannel();
273+
adapter.setErrorChannel(errorChannel);
269274
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
270275
taskScheduler.initialize();
271276
adapter.setTaskScheduler(taskScheduler);
272277
adapter.setBeanFactory(mock(BeanFactory.class));
273278
ApplicationEventPublisher applicationEventPublisher = mock(ApplicationEventPublisher.class);
274-
final BlockingQueue<MqttIntegrationEvent> events = new LinkedBlockingQueue<MqttIntegrationEvent>();
279+
final BlockingQueue<MqttIntegrationEvent> events = new LinkedBlockingQueue<>();
275280
willAnswer(invocation -> {
276281
events.add(invocation.getArgument(0));
277282
return null;
@@ -294,6 +299,33 @@ public void testInboundOptionsApplied() throws Exception {
294299
assertThat(event).isInstanceOf(MqttSubscribedEvent.class);
295300
assertThat(((MqttSubscribedEvent) event).getMessage()).isEqualTo("Connected and subscribed to [baz, fix]");
296301

302+
adapter.setConverter(new MqttMessageConverter() {
303+
304+
@Override public Message<?> toMessage(String topic, MqttMessage mqttMessage) {
305+
return null;
306+
}
307+
308+
@Override public Object fromMessage(Message<?> message, Class<?> targetClass) {
309+
return null;
310+
}
311+
312+
@Override public Message<?> toMessage(Object payload, MessageHeaders headers) {
313+
return null;
314+
}
315+
316+
317+
});
318+
319+
callback.get().messageArrived("baz", message);
320+
321+
ErrorMessage errorMessage = (ErrorMessage) errorChannel.receive(0);
322+
assertThat(errorMessage).isNotNull()
323+
.extracting(ErrorMessage::getPayload)
324+
.isInstanceOf(IllegalStateException.class);
325+
IllegalStateException exception = (IllegalStateException) errorMessage.getPayload();
326+
assertThat(exception).hasMessage("'MqttMessageConverter' returned 'null'");
327+
assertThat(errorMessage.getOriginalMessage().getPayload()).isSameAs(message);
328+
297329
// lose connection and make first reconnect fail
298330
failConnection.set(true);
299331
RuntimeException e = new RuntimeException("foo");
@@ -416,7 +448,7 @@ public void testReconnect() throws Exception {
416448
// the following assertion should be equalTo, but leq to protect against a slow CI server
417449
assertThat(attemptingReconnectCount.get()).isLessThanOrEqualTo(2);
418450
AtomicReference<Object> failed = new AtomicReference<>();
419-
adapter.setApplicationEventPublisher(event -> failed.set(event));
451+
adapter.setApplicationEventPublisher(failed::set);
420452
adapter.connectionLost(new IllegalStateException());
421453
assertThat(failed.get()).isInstanceOf(MqttConnectionFailedEvent.class);
422454
adapter.stop();
@@ -448,12 +480,11 @@ public void testSubscribeFailure() throws Exception {
448480
new DirectFieldAccessor(client).setPropertyValue("aClient", aClient);
449481
willAnswer(new CallsRealMethods()).given(client).connect(any(MqttConnectOptions.class));
450482
willAnswer(new CallsRealMethods()).given(client).subscribe(any(String[].class), any(int[].class));
451-
willAnswer(new CallsRealMethods()).given(client).subscribe(any(String[].class), any(int[].class),
452-
(IMqttMessageListener[]) isNull());
483+
willAnswer(new CallsRealMethods()).given(client).subscribe(any(String[].class), any(int[].class), isNull());
453484
willReturn(alwaysComplete).given(aClient).connect(any(MqttConnectOptions.class), any(), any());
454485

455486
IMqttToken token = mock(IMqttToken.class);
456-
given(token.getGrantedQos()).willReturn(new int[] { 0x80 });
487+
given(token.getGrantedQos()).willReturn(new int[]{ 0x80 });
457488
willReturn(token).given(aClient).subscribe(any(String[].class), any(int[].class), isNull(), isNull(), any());
458489

459490
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("foo", "bar", factory,
@@ -465,11 +496,11 @@ public void testSubscribeFailure() throws Exception {
465496
}, m -> m.getName().equals("connectAndSubscribe"));
466497
assertThat(method.get()).isNotNull();
467498
Condition<InvocationTargetException> subscribeFailed = new Condition<>(ex ->
468-
((MqttException) ex.getCause()).getReasonCode() == MqttException.REASON_CODE_SUBSCRIBE_FAILED,
469-
"expected the reason code to be REASON_CODE_SUBSCRIBE_FAILED");
499+
((MqttException) ex.getCause()).getReasonCode() == MqttException.REASON_CODE_SUBSCRIBE_FAILED,
500+
"expected the reason code to be REASON_CODE_SUBSCRIBE_FAILED");
470501
assertThatExceptionOfType(InvocationTargetException.class).isThrownBy(() -> method.get().invoke(adapter))
471-
.withCauseInstanceOf(MqttException.class)
472-
.is(subscribeFailed);
502+
.withCauseInstanceOf(MqttException.class)
503+
.is(subscribeFailed);
473504
}
474505

475506
@Test
@@ -502,7 +533,7 @@ public void testDifferentQos() throws Exception {
502533
willReturn(alwaysComplete).given(aClient).connect(any(MqttConnectOptions.class), any(), any());
503534

504535
IMqttToken token = mock(IMqttToken.class);
505-
given(token.getGrantedQos()).willReturn(new int[] { 2, 0 });
536+
given(token.getGrantedQos()).willReturn(new int[]{ 2, 0 });
506537
willReturn(token).given(aClient).subscribe(any(String[].class), any(int[].class), isNull(), isNull(), any());
507538

508539
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("foo", "bar", factory,
@@ -537,7 +568,7 @@ public IMqttClient getClientInstance(String uri, String clientId) throws MqttExc
537568

538569
};
539570
MqttConnectOptions connectOptions = new MqttConnectOptions();
540-
connectOptions.setServerURIs(new String[] { "tcp://localhost:1883" });
571+
connectOptions.setServerURIs(new String[]{ "tcp://localhost:1883" });
541572
if (cleanSession != null) {
542573
connectOptions.setCleanSession(cleanSession);
543574
}
@@ -554,7 +585,7 @@ public IMqttClient getClientInstance(String uri, String clientId) throws MqttExc
554585
return adapter;
555586
}
556587

557-
private MqttPahoMessageHandler buildAdapterOut(final IMqttAsyncClient client) throws MqttException {
588+
private MqttPahoMessageHandler buildAdapterOut(final IMqttAsyncClient client) {
558589
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory() {
559590

560591
@Override
@@ -564,7 +595,7 @@ public IMqttAsyncClient getAsyncClientInstance(String uri, String clientId) thro
564595

565596
};
566597
MqttConnectOptions connectOptions = new MqttConnectOptions();
567-
connectOptions.setServerURIs(new String[] { "tcp://localhost:1883" });
598+
connectOptions.setServerURIs(new String[]{ "tcp://localhost:1883" });
568599
factory.setConnectionOptions(connectOptions);
569600
MqttPahoMessageHandler adapter = new MqttPahoMessageHandler("client", factory);
570601
adapter.setDefaultTopic("foo");

src/reference/asciidoc/mqtt.adoc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,9 @@ A new application context reverts to the configured settings.
145145

146146
Changing the topics while the adapter is stopped (or disconnected from the broker) takes effect the next time a connection is established.
147147

148+
Starting with version `5.2.11`, when the message converter throws an exception or returns `null` from the `MqttMessage` conversion, the `MqttPahoMessageDrivenChannelAdapter` sends an `ErrorMessage` into the `errorChannel`, if provided.
149+
Re-throws this conversion error otherwise into an MQTT client callback.
150+
148151
==== Configuring with Java Configuration
149152

150153
The following Spring Boot application shows an example of how to configure the inbound adapter with Java configuration:

0 commit comments

Comments
 (0)