Skip to content

Commit 48e8b14

Browse files
garyrussellartembilan
authored andcommitted
INT-4463: Full access to MqttConnectOptions
JIRA: https://jira.spring.io/browse/INT-4463 Certain options, such as `maxInFlight` were not exposed. Deprecate the setters on the factory and allow the user to inject a pre-configured `MqttConnectOptions`, thus making all (and any new) properties available to be configured. # Conflicts: # spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/MqttAdapterTests.java
1 parent 8a7cf6c commit 48e8b14

File tree

4 files changed

+133
-133
lines changed

4 files changed

+133
-133
lines changed

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/DefaultMqttPahoClientFactory.java

Lines changed: 74 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2016 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -41,65 +41,97 @@
4141
*/
4242
public class DefaultMqttPahoClientFactory implements MqttPahoClientFactory {
4343

44-
private volatile Boolean cleanSession;
44+
private MqttConnectOptions options = new MqttConnectOptions();
4545

46-
private volatile Integer connectionTimeout;
46+
private MqttClientPersistence persistence;
4747

48-
private volatile Integer keepAliveInterval;
49-
50-
private volatile String password;
51-
52-
private volatile SocketFactory socketFactory;
53-
54-
private volatile Properties sslProperties;
55-
56-
private volatile String userName;
57-
58-
private volatile MqttClientPersistence persistence;
59-
60-
private volatile Will will;
61-
62-
private volatile String[] serverURIs;
63-
64-
private volatile ConsumerStopAction consumerStopAction = ConsumerStopAction.UNSUBSCRIBE_CLEAN;
48+
private ConsumerStopAction consumerStopAction = ConsumerStopAction.UNSUBSCRIBE_CLEAN;
6549

50+
/**
51+
* Set the cleanSession.
52+
* @param cleanSession the cleanSession to set.
53+
* @deprecated use {@link #setConnectionOptions(MqttConnectOptions)} instead.
54+
*/
55+
@Deprecated
6656
public void setCleanSession(Boolean cleanSession) {
67-
this.cleanSession = cleanSession;
57+
this.options.setCleanSession(cleanSession);
6858
}
6959

60+
/**
61+
* Set the connectionTimeout.
62+
* @param connectionTimeout the connectionTimeout to set.
63+
* @deprecated use {@link #setConnectionOptions(MqttConnectOptions)} instead.
64+
*/
65+
@Deprecated
7066
public void setConnectionTimeout(Integer connectionTimeout) {
71-
this.connectionTimeout = connectionTimeout;
67+
this.options.setConnectionTimeout(connectionTimeout);
7268
}
7369

70+
/**
71+
* Set the keepAliveInterval.
72+
* @param keepAliveInterval the keepAliveInterval to set.
73+
* @deprecated use {@link #setConnectionOptions(MqttConnectOptions)} instead.
74+
*/
75+
@Deprecated
7476
public void setKeepAliveInterval(Integer keepAliveInterval) {
75-
this.keepAliveInterval = keepAliveInterval;
77+
this.options.setKeepAliveInterval(keepAliveInterval);
7678
}
7779

80+
/**
81+
* Set the password.
82+
* @param password the password to set.
83+
* @deprecated use {@link #setConnectionOptions(MqttConnectOptions)} instead.
84+
*/
85+
@Deprecated
7886
public void setPassword(String password) {
79-
this.password = password;
87+
this.options.setPassword(password.toCharArray());
8088
}
8189

90+
/**
91+
* Set the socketFactory.
92+
* @param socketFactory the socketFactory to set.
93+
* @deprecated use {@link #setConnectionOptions(MqttConnectOptions)} instead.
94+
*/
95+
@Deprecated
8296
public void setSocketFactory(SocketFactory socketFactory) {
83-
this.socketFactory = socketFactory;
97+
this.options.setSocketFactory(socketFactory);
8498
}
8599

100+
/**
101+
* Set the sslProperties.
102+
* @param sslProperties the sslProperties to set.
103+
* @deprecated use {@link #setConnectionOptions(MqttConnectOptions)} instead.
104+
*/
105+
@Deprecated
86106
public void setSslProperties(Properties sslProperties) {
87-
this.sslProperties = sslProperties;
107+
this.options.setSSLProperties(sslProperties);
88108
}
89109

110+
/**
111+
* Set the userName.
112+
* @param userName the userName to set.
113+
* @deprecated use {@link #setConnectionOptions(MqttConnectOptions)} instead.
114+
*/
115+
@Deprecated
90116
public void setUserName(String userName) {
91-
this.userName = userName;
117+
this.options.setUserName(userName);
92118
}
93119

94120
/**
95121
* Will be used to set the "Last Will and Testament" (LWT) for the connection.
96122
* @param will The will.
97123
* @see MqttConnectOptions#setWill
124+
* @deprecated use {@link #setConnectionOptions(MqttConnectOptions)} instead.
98125
*/
126+
@Deprecated
99127
public void setWill(Will will) {
100-
this.will = will;
128+
this.options.setWill(will.getTopic(), will.getPayload(), will.getQos(), will.isRetained());
101129
}
102130

131+
/**
132+
* Set the persistence to pass into the client constructor.
133+
* @param persistence the persistence to set.
134+
*/
103135
public void setPersistence(MqttClientPersistence persistence) {
104136
this.persistence = persistence;
105137
}
@@ -109,10 +141,12 @@ public void setPersistence(MqttClientPersistence persistence) {
109141
* @param serverURIs The URIs.
110142
* @see MqttConnectOptions#setServerURIs(String[])
111143
* @since 4.1
144+
* @deprecated use {@link #setConnectionOptions(MqttConnectOptions)} instead.
112145
*/
146+
@Deprecated
113147
public void setServerURIs(String... serverURIs) {
114148
Assert.notNull(serverURIs, "'serverURIs' must not be null.");
115-
this.serverURIs = Arrays.copyOf(serverURIs, serverURIs.length);
149+
this.options.setServerURIs(Arrays.copyOf(serverURIs, serverURIs.length));
116150
}
117151

118152
/**
@@ -147,37 +181,19 @@ public IMqttAsyncClient getAsyncClientInstance(String uri, String clientId) thro
147181
return new MqttAsyncClient(uri == null ? "tcp://NO_URL_PROVIDED" : uri, clientId, this.persistence);
148182
}
149183

184+
/**
185+
* Set the preconfigured {@link MqttConnectOptions}.
186+
* @param options the options.
187+
* @since 4.3.16
188+
*/
189+
public void setConnectionOptions(MqttConnectOptions options) {
190+
Assert.notNull(options, "MqttConnectOptions cannot be null");
191+
this.options = options;
192+
}
193+
150194
@Override
151195
public MqttConnectOptions getConnectionOptions() {
152-
MqttConnectOptions options = new MqttConnectOptions();
153-
if (this.cleanSession != null) {
154-
options.setCleanSession(this.cleanSession);
155-
}
156-
if (this.connectionTimeout != null) {
157-
options.setConnectionTimeout(this.connectionTimeout);
158-
}
159-
if (this.keepAliveInterval != null) {
160-
options.setKeepAliveInterval(this.keepAliveInterval);
161-
}
162-
if (this.password != null) {
163-
options.setPassword(this.password.toCharArray());
164-
}
165-
if (this.socketFactory != null) {
166-
options.setSocketFactory(this.socketFactory);
167-
}
168-
if (this.sslProperties != null) {
169-
options.setSSLProperties(this.sslProperties);
170-
}
171-
if (this.userName != null) {
172-
options.setUserName(this.userName);
173-
}
174-
if (this.will != null) {
175-
options.setWill(this.will.getTopic(), this.will.getPayload(), this.will.getQos(), this.will.isRetained());
176-
}
177-
if (this.serverURIs != null) {
178-
options.setServerURIs(this.serverURIs);
179-
}
180-
return options;
196+
return this.options;
181197
}
182198

183199
public static class Will {

spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/BackToBackAdapterTests-context.xml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,11 @@
2020
</int:channel>
2121

2222
<bean id="multiUriClientFactory" class="org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory">
23-
<property name="serverURIs" value="tcp://localhost:1883,tcp://localhost:1883"/>
23+
<property name="connectionOptions">
24+
<bean class="org.eclipse.paho.client.mqttv3.MqttConnectOptions">
25+
<property name="serverURIs" value="tcp://localhost:1883,tcp://localhost:1883"/>
26+
</bean>
27+
</property>
2428
</bean>
2529

2630
</beans>

spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/MqttAdapterTests.java

Lines changed: 43 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -80,7 +80,6 @@
8080
import org.springframework.integration.handler.MessageProcessor;
8181
import org.springframework.integration.mqtt.core.ConsumerStopAction;
8282
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
83-
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory.Will;
8483
import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent;
8584
import org.springframework.integration.mqtt.event.MqttIntegrationEvent;
8685
import org.springframework.integration.mqtt.event.MqttSubscribedEvent;
@@ -112,51 +111,23 @@ public class MqttAdapterTests {
112111
this.alwaysComplete = (IMqttToken) pfb.getObject();
113112
}
114113

115-
@Test
116-
public void testPahoConnectOptions() {
117-
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
118-
factory.setCleanSession(false);
119-
factory.setConnectionTimeout(23);
120-
factory.setKeepAliveInterval(45);
121-
factory.setPassword("pass");
122-
SocketFactory socketFactory = mock(SocketFactory.class);
123-
factory.setSocketFactory(socketFactory);
124-
Properties props = new Properties();
125-
factory.setSslProperties(props);
126-
factory.setUserName("user");
127-
Will will = new Will("foo", "bar".getBytes(), 2, true);
128-
factory.setWill(will);
129-
130-
MqttConnectOptions options = factory.getConnectionOptions();
131-
132-
assertEquals(23, options.getConnectionTimeout());
133-
assertEquals(45, options.getKeepAliveInterval());
134-
assertEquals("pass", new String(options.getPassword()));
135-
assertSame(socketFactory, options.getSocketFactory());
136-
assertSame(props, options.getSSLProperties());
137-
assertEquals("user", options.getUserName());
138-
assertEquals("foo", options.getWillDestination());
139-
assertEquals("bar", new String(options.getWillMessage().getPayload()));
140-
assertEquals(2, options.getWillMessage().getQos());
141-
142-
}
143-
144114
@Test
145115
public void testOutboundOptionsApplied() throws Exception {
146116
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
147-
factory.setCleanSession(false);
148-
factory.setConnectionTimeout(23);
149-
factory.setKeepAliveInterval(45);
150-
factory.setPassword("pass");
117+
MqttConnectOptions connectOptions = new MqttConnectOptions();
118+
connectOptions.setCleanSession(false);
119+
connectOptions.setConnectionTimeout(23);
120+
connectOptions.setKeepAliveInterval(45);
121+
connectOptions.setPassword("pass".toCharArray());
151122
MemoryPersistence persistence = new MemoryPersistence();
152123
factory.setPersistence(persistence);
153124
final SocketFactory socketFactory = mock(SocketFactory.class);
154-
factory.setSocketFactory(socketFactory);
125+
connectOptions.setSocketFactory(socketFactory);
155126
final Properties props = new Properties();
156-
factory.setSslProperties(props);
157-
factory.setUserName("user");
158-
Will will = new Will("foo", "bar".getBytes(), 2, true);
159-
factory.setWill(will);
127+
connectOptions.setSSLProperties(props);
128+
connectOptions.setUserName("user");
129+
connectOptions.setWill("foo", "bar".getBytes(), 2, true);
130+
factory.setConnectionOptions(connectOptions);
160131

161132
factory = spy(factory);
162133
final MqttAsyncClient client = mock(MqttAsyncClient.class);
@@ -205,19 +176,20 @@ public void testOutboundOptionsApplied() throws Exception {
205176
@Test
206177
public void testInboundOptionsApplied() throws Exception {
207178
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
208-
factory.setCleanSession(false);
209-
factory.setConnectionTimeout(23);
210-
factory.setKeepAliveInterval(45);
211-
factory.setPassword("pass");
179+
MqttConnectOptions connectOptions = new MqttConnectOptions();
180+
connectOptions.setCleanSession(false);
181+
connectOptions.setConnectionTimeout(23);
182+
connectOptions.setKeepAliveInterval(45);
183+
connectOptions.setPassword("pass".toCharArray());
212184
MemoryPersistence persistence = new MemoryPersistence();
213185
factory.setPersistence(persistence);
214186
final SocketFactory socketFactory = mock(SocketFactory.class);
215-
factory.setSocketFactory(socketFactory);
187+
connectOptions.setSocketFactory(socketFactory);
216188
final Properties props = new Properties();
217-
factory.setSslProperties(props);
218-
factory.setUserName("user");
219-
Will will = new Will("foo", "bar".getBytes(), 2, true);
220-
factory.setWill(will);
189+
connectOptions.setSSLProperties(props);
190+
connectOptions.setUserName("user");
191+
connectOptions.setWill("foo", "bar".getBytes(), 2, true);
192+
factory.setConnectionOptions(connectOptions);
221193

222194
factory = spy(factory);
223195
final IMqttClient client = mock(IMqttClient.class);
@@ -421,19 +393,19 @@ public void testReconnect() throws Exception {
421393
@Test
422394
public void testSubscribeFailure() throws Exception {
423395
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
424-
factory.setCleanSession(false);
425-
factory.setConnectionTimeout(23);
426-
factory.setKeepAliveInterval(45);
427-
factory.setPassword("pass");
396+
MqttConnectOptions connectOptions = new MqttConnectOptions();
397+
connectOptions.setCleanSession(false);
398+
connectOptions.setConnectionTimeout(23);
399+
connectOptions.setKeepAliveInterval(45);
400+
connectOptions.setPassword("pass".toCharArray());
428401
MemoryPersistence persistence = new MemoryPersistence();
429402
factory.setPersistence(persistence);
430403
final SocketFactory socketFactory = mock(SocketFactory.class);
431-
factory.setSocketFactory(socketFactory);
404+
connectOptions.setSocketFactory(socketFactory);
432405
final Properties props = new Properties();
433-
factory.setSslProperties(props);
434-
factory.setUserName("user");
435-
Will will = new Will("foo", "bar".getBytes(), 2, true);
436-
factory.setWill(will);
406+
connectOptions.setSSLProperties(props);
407+
connectOptions.setUserName("user");
408+
connectOptions.setWill("foo", "bar".getBytes(), 2, true);
437409

438410
factory = spy(factory);
439411
MqttAsyncClient aClient = mock(MqttAsyncClient.class);
@@ -471,19 +443,19 @@ public void testSubscribeFailure() throws Exception {
471443
@Test
472444
public void testDifferentQos() throws Exception {
473445
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
474-
factory.setCleanSession(false);
475-
factory.setConnectionTimeout(23);
476-
factory.setKeepAliveInterval(45);
477-
factory.setPassword("pass");
446+
MqttConnectOptions connectOptions = new MqttConnectOptions();
447+
connectOptions.setCleanSession(false);
448+
connectOptions.setConnectionTimeout(23);
449+
connectOptions.setKeepAliveInterval(45);
450+
connectOptions.setPassword("pass".toCharArray());
478451
MemoryPersistence persistence = new MemoryPersistence();
479452
factory.setPersistence(persistence);
480453
final SocketFactory socketFactory = mock(SocketFactory.class);
481-
factory.setSocketFactory(socketFactory);
454+
connectOptions.setSocketFactory(socketFactory);
482455
final Properties props = new Properties();
483-
factory.setSslProperties(props);
484-
factory.setUserName("user");
485-
Will will = new Will("foo", "bar".getBytes(), 2, true);
486-
factory.setWill(will);
456+
connectOptions.setSSLProperties(props);
457+
connectOptions.setUserName("user");
458+
connectOptions.setWill("foo", "bar".getBytes(), 2, true);
487459

488460
factory = spy(factory);
489461
MqttAsyncClient aClient = mock(MqttAsyncClient.class);
@@ -526,13 +498,15 @@ public IMqttClient getClientInstance(String uri, String clientId) throws MqttExc
526498
}
527499

528500
};
529-
factory.setServerURIs("tcp://localhost:1883");
501+
MqttConnectOptions connectOptions = new MqttConnectOptions();
502+
connectOptions.setServerURIs(new String[] { "tcp://localhost:1883" });
530503
if (cleanSession != null) {
531-
factory.setCleanSession(cleanSession);
504+
connectOptions.setCleanSession(cleanSession);
532505
}
533506
if (action != null) {
534507
factory.setConsumerStopAction(action);
535508
}
509+
factory.setConnectionOptions(connectOptions);
536510
given(client.isConnected()).willReturn(true);
537511
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("client", factory, "foo");
538512
adapter.setApplicationEventPublisher(mock(ApplicationEventPublisher.class));

0 commit comments

Comments
 (0)