Skip to content

Commit 993c6ed

Browse files
garyrussellartembilan
authored andcommitted
GH-2471: Close client on outbound adapter
Resolves #2471
1 parent c111e1b commit 993c6ed

File tree

2 files changed

+60
-9
lines changed

2 files changed

+60
-9
lines changed

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/outbound/MqttPahoMessageHandler.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,15 +159,17 @@ protected void doStop() {
159159

160160
private synchronized IMqttAsyncClient checkConnection() throws MqttException {
161161
if (this.client != null && !this.client.isConnected()) {
162+
this.client.setCallback(null);
162163
this.client.close();
163164
this.client = null;
164165
}
165166
if (this.client == null) {
167+
IMqttAsyncClient client = null;
166168
try {
167169
MqttConnectOptions connectionOptions = this.clientFactory.getConnectionOptions();
168170
Assert.state(this.getUrl() != null || connectionOptions.getServerURIs() != null,
169171
"If no 'url' provided, connectionOptions.getServerURIs() must not be null");
170-
IMqttAsyncClient client = this.clientFactory.getAsyncClientInstance(this.getUrl(), this.getClientId());
172+
client = this.clientFactory.getAsyncClientInstance(this.getUrl(), this.getClientId());
171173
incrementClientInstance();
172174
client.setCallback(this);
173175
client.connect(connectionOptions).waitForCompletion(this.completionTimeout);
@@ -177,6 +179,10 @@ private synchronized IMqttAsyncClient checkConnection() throws MqttException {
177179
}
178180
}
179181
catch (MqttException e) {
182+
if (client != null) {
183+
client.close();
184+
client = null;
185+
}
180186
throw new MessagingException("Failed to connect", e);
181187
}
182188
}
@@ -209,6 +215,13 @@ private void sendDeliveryComplete(IMqttDeliveryToken token) {
209215
@Override
210216
public synchronized void connectionLost(Throwable cause) {
211217
logger.error("Lost connection; will attempt reconnect on next request");
218+
try {
219+
this.client.setCallback(null);
220+
this.client.close();
221+
}
222+
catch (MqttException e) {
223+
// NOSONAR
224+
}
212225
this.client = null;
213226
}
214227

spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/MqttAdapterTests.java

Lines changed: 46 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555

5656
import org.aopalliance.intercept.MethodInterceptor;
5757
import org.apache.commons.logging.Log;
58+
import org.eclipse.paho.client.mqttv3.IMqttAsyncClient;
5859
import org.eclipse.paho.client.mqttv3.IMqttClient;
5960
import org.eclipse.paho.client.mqttv3.IMqttToken;
6061
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
@@ -89,6 +90,7 @@
8990
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
9091
import org.springframework.integration.test.util.TestUtils;
9192
import org.springframework.messaging.Message;
93+
import org.springframework.messaging.MessageHandlingException;
9294
import org.springframework.messaging.support.GenericMessage;
9395
import org.springframework.scheduling.TaskScheduler;
9496
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
@@ -113,15 +115,32 @@ public class MqttAdapterTests {
113115
}
114116

115117
@Test
116-
public void testCloseOnBadConnect() throws Exception {
118+
public void testCloseOnBadConnectIn() throws Exception {
117119
final IMqttClient client = mock(IMqttClient.class);
118120
willThrow(new MqttException(0)).given(client).connect(any());
119-
MqttPahoMessageDrivenChannelAdapter adapter = buildAdapter(client, null, ConsumerStopAction.UNSUBSCRIBE_NEVER);
121+
MqttPahoMessageDrivenChannelAdapter adapter = buildAdapterIn(client, null, ConsumerStopAction.UNSUBSCRIBE_NEVER);
120122
adapter.start();
121123
verify(client).close();
122124
adapter.stop();
123125
}
124126

127+
@Test
128+
public void testCloseOnBadConnectOut() throws Exception {
129+
final IMqttAsyncClient client = mock(IMqttAsyncClient.class);
130+
willThrow(new MqttException(0)).given(client).connect(any());
131+
MqttPahoMessageHandler adapter = buildAdapterOut(client);
132+
adapter.start();
133+
try {
134+
adapter.handleMessage(new GenericMessage<>("foo"));
135+
fail("exception expected");
136+
}
137+
catch (MessageHandlingException e) {
138+
// NOSONAR
139+
}
140+
verify(client).close();
141+
adapter.stop();
142+
}
143+
125144
@Test
126145
public void testOutboundOptionsApplied() throws Exception {
127146
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
@@ -303,7 +322,7 @@ public void testInboundOptionsApplied() throws Exception {
303322
@Test
304323
public void testStopActionDefault() throws Exception {
305324
final IMqttClient client = mock(IMqttClient.class);
306-
MqttPahoMessageDrivenChannelAdapter adapter = buildAdapter(client, null, null);
325+
MqttPahoMessageDrivenChannelAdapter adapter = buildAdapterIn(client, null, null);
307326

308327
adapter.start();
309328
adapter.stop();
@@ -313,7 +332,7 @@ public void testStopActionDefault() throws Exception {
313332
@Test
314333
public void testStopActionDefaultNotClean() throws Exception {
315334
final IMqttClient client = mock(IMqttClient.class);
316-
MqttPahoMessageDrivenChannelAdapter adapter = buildAdapter(client, false, null);
335+
MqttPahoMessageDrivenChannelAdapter adapter = buildAdapterIn(client, false, null);
317336

318337
adapter.start();
319338
adapter.stop();
@@ -323,7 +342,7 @@ public void testStopActionDefaultNotClean() throws Exception {
323342
@Test
324343
public void testStopActionAlways() throws Exception {
325344
final IMqttClient client = mock(IMqttClient.class);
326-
MqttPahoMessageDrivenChannelAdapter adapter = buildAdapter(client, false,
345+
MqttPahoMessageDrivenChannelAdapter adapter = buildAdapterIn(client, false,
327346
ConsumerStopAction.UNSUBSCRIBE_ALWAYS);
328347

329348
adapter.start();
@@ -341,7 +360,7 @@ public void testStopActionAlways() throws Exception {
341360
@Test
342361
public void testStopActionNever() throws Exception {
343362
final IMqttClient client = mock(IMqttClient.class);
344-
MqttPahoMessageDrivenChannelAdapter adapter = buildAdapter(client, null, ConsumerStopAction.UNSUBSCRIBE_NEVER);
363+
MqttPahoMessageDrivenChannelAdapter adapter = buildAdapterIn(client, null, ConsumerStopAction.UNSUBSCRIBE_NEVER);
345364

346365
adapter.start();
347366
adapter.stop();
@@ -376,7 +395,7 @@ public void testCustomExpressions() {
376395
@Test
377396
public void testReconnect() throws Exception {
378397
final IMqttClient client = mock(IMqttClient.class);
379-
MqttPahoMessageDrivenChannelAdapter adapter = buildAdapter(client, null, ConsumerStopAction.UNSUBSCRIBE_NEVER);
398+
MqttPahoMessageDrivenChannelAdapter adapter = buildAdapterIn(client, null, ConsumerStopAction.UNSUBSCRIBE_NEVER);
380399
adapter.setRecoveryInterval(10);
381400
Log logger = spy(TestUtils.getPropertyValue(adapter, "logger", Log.class));
382401
new DirectFieldAccessor(adapter).setPropertyValue("logger", logger);
@@ -500,7 +519,7 @@ public void testDifferentQos() throws Exception {
500519
verify(client).setTimeToWait(30_000L);
501520
}
502521

503-
private MqttPahoMessageDrivenChannelAdapter buildAdapter(final IMqttClient client, Boolean cleanSession,
522+
private MqttPahoMessageDrivenChannelAdapter buildAdapterIn(final IMqttClient client, Boolean cleanSession,
504523
ConsumerStopAction action) throws MqttException {
505524
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory() {
506525

@@ -528,6 +547,25 @@ public IMqttClient getClientInstance(String uri, String clientId) throws MqttExc
528547
return adapter;
529548
}
530549

550+
private MqttPahoMessageHandler buildAdapterOut(final IMqttAsyncClient client) throws MqttException {
551+
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory() {
552+
553+
@Override
554+
public IMqttAsyncClient getAsyncClientInstance(String uri, String clientId) throws MqttException {
555+
return client;
556+
}
557+
558+
};
559+
MqttConnectOptions connectOptions = new MqttConnectOptions();
560+
connectOptions.setServerURIs(new String[] { "tcp://localhost:1883" });
561+
factory.setConnectionOptions(connectOptions);
562+
MqttPahoMessageHandler adapter = new MqttPahoMessageHandler("client", factory);
563+
adapter.setDefaultTopic("foo");
564+
adapter.setApplicationEventPublisher(mock(ApplicationEventPublisher.class));
565+
adapter.afterPropertiesSet();
566+
return adapter;
567+
}
568+
531569
private void verifyUnsubscribe(IMqttClient client) throws Exception {
532570
verify(client).connect(any(MqttConnectOptions.class));
533571
verify(client).subscribe(any(String[].class), any(int[].class));

0 commit comments

Comments
 (0)