Skip to content

Commit 8512805

Browse files
authored
spring-projectsGH-10095: Expose quiescentTimeout for MQTT components
* Add ability for a user to set mqtt's `quiescentTimeout` for forceable shutdowns Fixes: spring-projects#10095 * Move QUIESCENT_TIMEOUT constant to ClientManager Update tests to set `setDisconnectCompletionTimeout` and `setQuiescentTimeout` to 1L to reduce test runtime * Mqttv5PahoMessageDrivenChannelAdapter.disconnectForcibly needs to use getQuiescentTimeout for quiescent timeout BackToBackAdapterTests initializeInboundAdapter needs to be a static method * Apply quiescentTimeout attribute to AbstractMqttClientManager. Add the quiescentTimeout in the disconnectForcibly in the Mqttv3ClientManager and Mqttv5ClientManager classes Add documentation to what's new doc describing the addition of quiescence timeout * Update the whats new MQTT announcement * Change what's new notes on mqtt changes to be brief
1 parent fa78072 commit 8512805

File tree

12 files changed

+98
-29
lines changed

12 files changed

+98
-29
lines changed

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/AbstractMqttClientManager.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ public abstract class AbstractMqttClientManager<T, C> implements ClientManager<T
6262

6363
private long disconnectCompletionTimeout = ClientManager.DISCONNECT_COMPLETION_TIMEOUT;
6464

65+
private long quiescentTimeout = ClientManager.QUIESCENT_TIMEOUT;
66+
6567
private boolean manualAcks;
6668

6769
private ApplicationEventPublisher applicationEventPublisher;
@@ -139,6 +141,20 @@ protected long getDisconnectCompletionTimeout() {
139141
return this.disconnectCompletionTimeout;
140142
}
141143

144+
/**
145+
* Set the quiescentTimeout timeout when disconnecting.
146+
* Default is {@link ClientManager#QUIESCENT_TIMEOUT} milliseconds.
147+
* @param quiescentTimeout The timeout.
148+
* @since 7.0.0
149+
*/
150+
public void setQuiescentTimeout(long quiescentTimeout) {
151+
this.quiescentTimeout = quiescentTimeout;
152+
}
153+
154+
protected long getQuiescentTimeout() {
155+
return this.quiescentTimeout;
156+
}
157+
142158
@Override
143159
public boolean isManualAcks() {
144160
return this.manualAcks;

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/ClientManager.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ public interface ClientManager<T, C> extends SmartLifecycle, MqttComponent<C> {
3939
*/
4040
long DEFAULT_COMPLETION_TIMEOUT = 30_000L;
4141

42+
Long QUIESCENT_TIMEOUT = 30_000L;
43+
4244
/**
4345
* The default disconnect completion timeout in milliseconds.
4446
*/

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/Mqttv3ClientManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ public void stop() {
151151
return;
152152
}
153153
try {
154-
client.disconnectForcibly(getDisconnectCompletionTimeout());
154+
client.disconnectForcibly(getQuiescentTimeout(), getDisconnectCompletionTimeout());
155155
if (getConnectionInfo().isAutomaticReconnect()) {
156156
MqttUtils.stopClientReconnectCycle(client);
157157
}

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/Mqttv5ClientManager.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.eclipse.paho.mqttv5.common.MqttException;
2727
import org.eclipse.paho.mqttv5.common.MqttMessage;
2828
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
29+
import org.eclipse.paho.mqttv5.common.packet.MqttReturnCode;
2930

3031
import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent;
3132
import org.springframework.integration.mqtt.support.MqttUtils;
@@ -153,7 +154,8 @@ public void stop() {
153154
}
154155

155156
try {
156-
client.disconnectForcibly(getDisconnectCompletionTimeout());
157+
client.disconnectForcibly(getQuiescentTimeout(), getDisconnectCompletionTimeout(),
158+
MqttReturnCode.RETURN_CODE_SUCCESS, new MqttProperties());
157159
if (getConnectionInfo().isAutomaticReconnect()) {
158160
MqttUtils.stopClientReconnectCycle(client);
159161
}

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/AbstractMqttMessageDrivenChannelAdapter.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
* @author Mikhail Polivakha
5151
* @author Artem Vozhdayenko
5252
* @author Jiri Soucek
53+
* @author Glenn Renfro
5354
*
5455
* @since 4.0
5556
*
@@ -73,6 +74,8 @@ public abstract class AbstractMqttMessageDrivenChannelAdapter<T, C> extends Mess
7374

7475
private long disconnectCompletionTimeout = ClientManager.DISCONNECT_COMPLETION_TIMEOUT;
7576

77+
private long quiescentTimeout = ClientManager.QUIESCENT_TIMEOUT;
78+
7679
private boolean manualAcks;
7780

7881
private ApplicationEventPublisher applicationEventPublisher;
@@ -199,6 +202,20 @@ protected long getDisconnectCompletionTimeout() {
199202
return this.disconnectCompletionTimeout;
200203
}
201204

205+
/**
206+
* Set the quiescentTimeout timeout when disconnecting.
207+
* Default is {@link ClientManager#QUIESCENT_TIMEOUT} milliseconds.
208+
* @param quiescentTimeout The timeout.
209+
* @since 7.0.0
210+
*/
211+
public void setQuiescentTimeout(long quiescentTimeout) {
212+
this.quiescentTimeout = quiescentTimeout;
213+
}
214+
215+
protected long getQuiescentTimeout() {
216+
return this.quiescentTimeout;
217+
}
218+
202219
@Override
203220
protected void onInit() {
204221
super.onInit();

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/MqttPahoMessageDrivenChannelAdapter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
* @author Gary Russell
5959
* @author Artem Bilan
6060
* @author Artem Vozhdayenko
61+
* @author Glenn Renfro
6162
*
6263
* @since 4.0
6364
*
@@ -227,7 +228,7 @@ protected void doStop() {
227228
}
228229

229230
try {
230-
this.client.disconnectForcibly(getDisconnectCompletionTimeout());
231+
this.client.disconnectForcibly(getQuiescentTimeout(), getDisconnectCompletionTimeout());
231232
if (getConnectionInfo().isAutomaticReconnect()) {
232233
MqttUtils.stopClientReconnectCycle(this.client);
233234
}

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.eclipse.paho.mqttv5.common.MqttMessage;
3838
import org.eclipse.paho.mqttv5.common.MqttSubscription;
3939
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
40+
import org.eclipse.paho.mqttv5.common.packet.MqttReturnCode;
4041

4142
import org.springframework.beans.factory.BeanCreationException;
4243
import org.springframework.context.ApplicationEventPublisher;
@@ -81,6 +82,7 @@
8182
* @author Lucas Bowler
8283
* @author Artem Vozhdayenko
8384
* @author Matthias Thoma
85+
* @author Glenn Renfro
8486
*
8587
* @since 5.5.5
8688
*
@@ -296,7 +298,8 @@ protected void doStop() {
296298

297299
}
298300
if (getClientManager() == null) {
299-
this.mqttClient.disconnectForcibly(getDisconnectCompletionTimeout());
301+
this.mqttClient.disconnectForcibly(getQuiescentTimeout(), getDisconnectCompletionTimeout(),
302+
MqttReturnCode.RETURN_CODE_SUCCESS, new MqttProperties());
300303
if (getConnectionInfo().isAutomaticReconnect()) {
301304
MqttUtils.stopClientReconnectCycle(this.mqttClient);
302305
}

spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/BackToBackAdapterTests.java

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
/**
6565
* @author Gary Russell
6666
* @author Artem Bilan
67+
* @author Glenn Renfro
6768
*
6869
* @since 4.0
6970
*
@@ -73,6 +74,10 @@
7374
@DirtiesContext
7475
public class BackToBackAdapterTests implements MosquittoContainerTest {
7576

77+
private static final long QUIESCENT_TIMEOUT = 1;
78+
79+
private static final long DISCONNECT_COMPLETION_TIMEOUT = 1L;
80+
7681
@TempDir
7782
static File folder;
7883

@@ -108,9 +113,7 @@ public void testSingleTopic() {
108113
MqttPahoMessageDrivenChannelAdapter inbound =
109114
new MqttPahoMessageDrivenChannelAdapter(MosquittoContainerTest.mqttUrl(), "si-test-in", "mqtt-foo");
110115
QueueChannel outputChannel = new QueueChannel();
111-
inbound.setOutputChannel(outputChannel);
112-
inbound.setTaskScheduler(taskScheduler);
113-
inbound.setBeanFactory(mock(BeanFactory.class));
116+
initializeInboundAdapter(inbound, outputChannel);
114117
inbound.afterPropertiesSet();
115118
inbound.start();
116119
adapter.handleMessage(new GenericMessage<>("foo"));
@@ -147,9 +150,7 @@ private void testJsonCommon(String... trusted) {
147150
MqttPahoMessageDrivenChannelAdapter inbound =
148151
new MqttPahoMessageDrivenChannelAdapter(MosquittoContainerTest.mqttUrl(), "si-test-in", "mqtt-foo");
149152
QueueChannel outputChannel = new QueueChannel();
150-
inbound.setOutputChannel(outputChannel);
151-
inbound.setTaskScheduler(taskScheduler);
152-
inbound.setBeanFactory(mock(BeanFactory.class));
153+
initializeInboundAdapter(inbound, outputChannel);
153154
inbound.setConverter(converter);
154155
inbound.afterPropertiesSet();
155156
inbound.start();
@@ -178,9 +179,7 @@ public void testAddRemoveTopic() {
178179
MqttPahoMessageDrivenChannelAdapter inbound =
179180
new MqttPahoMessageDrivenChannelAdapter(MosquittoContainerTest.mqttUrl(), "si-test-in");
180181
QueueChannel outputChannel = new QueueChannel();
181-
inbound.setOutputChannel(outputChannel);
182-
inbound.setTaskScheduler(taskScheduler);
183-
inbound.setBeanFactory(mock(BeanFactory.class));
182+
initializeInboundAdapter(inbound, outputChannel);
184183
inbound.afterPropertiesSet();
185184
inbound.start();
186185
inbound.addTopic("mqtt-foo");
@@ -226,9 +225,7 @@ public void testTwoTopics() {
226225
new MqttPahoMessageDrivenChannelAdapter(MosquittoContainerTest.mqttUrl(),
227226
"si-test-in", "mqtt-foo", "mqtt-bar");
228227
QueueChannel outputChannel = new QueueChannel();
229-
inbound.setOutputChannel(outputChannel);
230-
inbound.setTaskScheduler(taskScheduler);
231-
inbound.setBeanFactory(mock(BeanFactory.class));
228+
initializeInboundAdapter(inbound, outputChannel);
232229
inbound.afterPropertiesSet();
233230
inbound.start();
234231
adapter.handleMessage(new GenericMessage<>("foo"));
@@ -261,9 +258,7 @@ public void testAsync() throws Exception {
261258
MqttPahoMessageDrivenChannelAdapter inbound =
262259
new MqttPahoMessageDrivenChannelAdapter(MosquittoContainerTest.mqttUrl(), "si-test-in", "mqtt-foo");
263260
QueueChannel outputChannel = new QueueChannel();
264-
inbound.setOutputChannel(outputChannel);
265-
inbound.setTaskScheduler(taskScheduler);
266-
inbound.setBeanFactory(mock(BeanFactory.class));
261+
initializeInboundAdapter(inbound, outputChannel);
267262
inbound.afterPropertiesSet();
268263
inbound.start();
269264
GenericMessage<String> message = new GenericMessage<>("foo");
@@ -299,9 +294,7 @@ public void testAsyncPersisted() throws Exception {
299294
new MqttPahoMessageDrivenChannelAdapter(MosquittoContainerTest.mqttUrl(),
300295
"si-test-in", "mqtt-foo", "mqtt-bar");
301296
QueueChannel outputChannel = new QueueChannel();
302-
inbound.setOutputChannel(outputChannel);
303-
inbound.setTaskScheduler(taskScheduler);
304-
inbound.setBeanFactory(mock(BeanFactory.class));
297+
initializeInboundAdapter(inbound, outputChannel);
305298
inbound.afterPropertiesSet();
306299
inbound.start();
307300
Message<String> message1 = new GenericMessage<>("foo");
@@ -396,6 +389,14 @@ public void onApplicationEvent(MqttSubscribedEvent event) {
396389

397390
}
398391

392+
private static void initializeInboundAdapter(MqttPahoMessageDrivenChannelAdapter inbound, QueueChannel outputChannel) {
393+
inbound.setOutputChannel(outputChannel);
394+
inbound.setTaskScheduler(taskScheduler);
395+
inbound.setQuiescentTimeout(QUIESCENT_TIMEOUT);
396+
inbound.setDisconnectCompletionTimeout(DISCONNECT_COMPLETION_TIMEOUT);
397+
inbound.setBeanFactory(mock(BeanFactory.class));
398+
}
399+
399400
private class EventPublisher implements ApplicationEventPublisher {
400401

401402
private volatile MqttMessageDeliveredEvent delivered;

spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/ClientManagerBackToBackTests.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,10 @@
6363
*/
6464
class ClientManagerBackToBackTests implements MosquittoContainerTest {
6565

66+
private static final long QUIESCENT_TIMEOUT = 1L;
67+
68+
private static final long DISCONNECT_COMPLETION_TIMEOUT = 1L;
69+
6670
@Test
6771
void testSameV3ClientIdWorksForPubAndSub() throws Exception {
6872
testSubscribeAndPublish(Mqttv3Config.class, Mqttv3Config.TOPIC_NAME, Mqttv3Config.subscribedLatch);
@@ -191,7 +195,10 @@ public Mqttv3ClientManager mqttv3ClientManager() {
191195
MqttConnectOptions connectionOptions = new MqttConnectOptions();
192196
connectionOptions.setServerURIs(new String[] {MosquittoContainerTest.mqttUrl()});
193197
connectionOptions.setAutomaticReconnect(true);
194-
return new Mqttv3ClientManager(connectionOptions, "client-manager-client-id-v3");
198+
Mqttv3ClientManager result = new Mqttv3ClientManager(connectionOptions, "client-manager-client-id-v3");
199+
result.setQuiescentTimeout(QUIESCENT_TIMEOUT);
200+
result.setDisconnectCompletionTimeout(DISCONNECT_COMPLETION_TIMEOUT);
201+
return result;
195202
}
196203

197204
@Bean
@@ -234,7 +241,10 @@ public Mqttv3ClientManager mqttv3ClientManager() {
234241
MqttConnectOptions connectionOptions = new MqttConnectOptions();
235242
connectionOptions.setServerURIs(new String[] {MosquittoContainerTest.mqttUrl()});
236243
connectionOptions.setAutomaticReconnect(true);
237-
return new Mqttv3ClientManager(connectionOptions, "client-manager-client-id-v3-reconnect");
244+
Mqttv3ClientManager result = new Mqttv3ClientManager(connectionOptions, "client-manager-client-id-v3-reconnect");
245+
result.setQuiescentTimeout(QUIESCENT_TIMEOUT);
246+
result.setDisconnectCompletionTimeout(DISCONNECT_COMPLETION_TIMEOUT);
247+
return result;
238248
}
239249

240250
@Bean
@@ -269,7 +279,10 @@ public Mqttv3ClientManager mqttv3ClientManager() {
269279
MqttConnectOptions connectionOptions = new MqttConnectOptions();
270280
connectionOptions.setServerURIs(new String[] {MosquittoContainerTest.mqttUrl()});
271281
connectionOptions.setAutomaticReconnect(true);
272-
return new Mqttv3ClientManager(connectionOptions, "client-manager-client-id-v3");
282+
Mqttv3ClientManager result = new Mqttv3ClientManager(connectionOptions, "client-manager-client-id-v3");
283+
result.setQuiescentTimeout(QUIESCENT_TIMEOUT);
284+
result.setDisconnectCompletionTimeout(DISCONNECT_COMPLETION_TIMEOUT);
285+
return result;
273286
}
274287

275288
@Bean

spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/MqttAdapterTests.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@
100100
* @author Gary Russell
101101
* @author Artem Bilan
102102
* @author Artem Vozhdayenko
103+
* @author Glenn Renfro
103104
*
104105
* @since 4.0
105106
*
@@ -519,7 +520,7 @@ public void testDifferentQos() throws Exception {
519520

520521
new DirectFieldAccessor(adapter).setPropertyValue("running", Boolean.TRUE);
521522
adapter.stop();
522-
verify(client).disconnectForcibly(5_000L);
523+
verify(client).disconnectForcibly(30_000L, 5_000L);
523524
}
524525

525526
@Test
@@ -589,14 +590,14 @@ private void verifyUnsubscribe(IMqttAsyncClient client) throws Exception {
589590
verify(client).connect(any(MqttConnectOptions.class));
590591
verify(client).subscribe(any(String[].class), any(int[].class), any());
591592
verify(client).unsubscribe(any(String[].class));
592-
verify(client).disconnectForcibly(anyLong());
593+
verify(client).disconnectForcibly(anyLong(), anyLong());
593594
}
594595

595596
private void verifyNotUnsubscribe(IMqttAsyncClient client) throws Exception {
596597
verify(client).connect(any(MqttConnectOptions.class));
597598
verify(client).subscribe(any(String[].class), any(int[].class), any());
598599
verify(client, never()).unsubscribe(any(String[].class));
599-
verify(client).disconnectForcibly(anyLong());
600+
verify(client).disconnectForcibly(anyLong(), anyLong());
600601
}
601602

602603
@Configuration

0 commit comments

Comments
 (0)