Skip to content

Commit 7cd22de

Browse files
committed
Merge branch '4.4.x-stable' into 4.x.x-stable
2 parents c2551b0 + 691ba4e commit 7cd22de

File tree

8 files changed

+195
-61
lines changed

8 files changed

+195
-61
lines changed

src/main/java/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 67 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -15,30 +15,44 @@
1515

1616
package com.rabbitmq.client;
1717

18-
import com.rabbitmq.client.impl.*;
18+
import static java.util.concurrent.TimeUnit.*;
19+
20+
import com.rabbitmq.client.impl.AMQConnection;
21+
import com.rabbitmq.client.impl.ConnectionParams;
22+
import com.rabbitmq.client.impl.CredentialsProvider;
23+
import com.rabbitmq.client.impl.DefaultCredentialsProvider;
24+
import com.rabbitmq.client.impl.DefaultExceptionHandler;
25+
import com.rabbitmq.client.impl.FrameHandler;
26+
import com.rabbitmq.client.impl.FrameHandlerFactory;
27+
import com.rabbitmq.client.impl.SocketFrameHandlerFactory;
1928
import com.rabbitmq.client.impl.nio.NioParams;
2029
import com.rabbitmq.client.impl.nio.SocketChannelFrameHandlerFactory;
2130
import com.rabbitmq.client.impl.recovery.AutorecoveringConnection;
22-
23-
import javax.net.SocketFactory;
24-
import javax.net.ssl.SSLContext;
25-
import javax.net.ssl.SSLSocketFactory;
26-
import javax.net.ssl.TrustManager;
2731
import java.io.IOException;
2832
import java.net.URI;
2933
import java.net.URISyntaxException;
3034
import java.net.URLDecoder;
3135
import java.security.KeyManagementException;
3236
import java.security.NoSuchAlgorithmException;
33-
import java.util.*;
34-
import java.util.concurrent.*;
35-
36-
import static java.util.concurrent.TimeUnit.*;
37+
import java.util.Arrays;
38+
import java.util.Collections;
39+
import java.util.HashMap;
40+
import java.util.List;
41+
import java.util.Map;
42+
import java.util.Properties;
43+
import java.util.concurrent.ExecutorService;
44+
import java.util.concurrent.Executors;
45+
import java.util.concurrent.ScheduledExecutorService;
46+
import java.util.concurrent.ThreadFactory;
47+
import java.util.concurrent.TimeoutException;
48+
import javax.net.SocketFactory;
49+
import javax.net.ssl.SSLContext;
50+
import javax.net.ssl.SSLSocketFactory;
51+
import javax.net.ssl.TrustManager;
3752

3853
/**
3954
* Convenience "factory" class to facilitate opening a {@link Connection} to an AMQP broker.
4055
*/
41-
4256
public class ConnectionFactory implements Cloneable {
4357

4458
/** Default user name */
@@ -88,31 +102,30 @@ public class ConnectionFactory implements Cloneable {
88102

89103
private static final String FALLBACK_TLS_PROTOCOL = "TLSv1";
90104

91-
private String username = DEFAULT_USER;
92-
private String password = DEFAULT_PASS;
93-
private String virtualHost = DEFAULT_VHOST;
94-
private String host = DEFAULT_HOST;
95-
private int port = USE_DEFAULT_PORT;
96-
private int requestedChannelMax = DEFAULT_CHANNEL_MAX;
97-
private int requestedFrameMax = DEFAULT_FRAME_MAX;
98-
private int requestedHeartbeat = DEFAULT_HEARTBEAT;
99-
private int connectionTimeout = DEFAULT_CONNECTION_TIMEOUT;
100-
private int handshakeTimeout = DEFAULT_HANDSHAKE_TIMEOUT;
101-
private int shutdownTimeout = DEFAULT_SHUTDOWN_TIMEOUT;
102-
private Map<String, Object> _clientProperties = AMQConnection.defaultClientProperties();
103-
private SocketFactory factory = SocketFactory.getDefault();
104-
private SaslConfig saslConfig = DefaultSaslConfig.PLAIN;
105+
private String virtualHost = DEFAULT_VHOST;
106+
private String host = DEFAULT_HOST;
107+
private int port = USE_DEFAULT_PORT;
108+
private int requestedChannelMax = DEFAULT_CHANNEL_MAX;
109+
private int requestedFrameMax = DEFAULT_FRAME_MAX;
110+
private int requestedHeartbeat = DEFAULT_HEARTBEAT;
111+
private int connectionTimeout = DEFAULT_CONNECTION_TIMEOUT;
112+
private int handshakeTimeout = DEFAULT_HANDSHAKE_TIMEOUT;
113+
private int shutdownTimeout = DEFAULT_SHUTDOWN_TIMEOUT;
114+
private Map<String, Object> _clientProperties = AMQConnection.defaultClientProperties();
115+
private SocketFactory factory = SocketFactory.getDefault();
116+
private SaslConfig saslConfig = DefaultSaslConfig.PLAIN;
105117
private ExecutorService sharedExecutor;
106-
private ThreadFactory threadFactory = Executors.defaultThreadFactory();
118+
private ThreadFactory threadFactory = Executors.defaultThreadFactory();
107119
// minimises the number of threads rapid closure of many
108120
// connections uses, see rabbitmq/rabbitmq-java-client#86
109121
private ExecutorService shutdownExecutor;
110122
private ScheduledExecutorService heartbeatExecutor;
111-
private SocketConfigurator socketConf = new DefaultSocketConfigurator();
112-
private ExceptionHandler exceptionHandler = new DefaultExceptionHandler();
123+
private SocketConfigurator socketConf = new DefaultSocketConfigurator();
124+
private ExceptionHandler exceptionHandler = new DefaultExceptionHandler();
125+
private CredentialsProvider credentialsProvider = new DefaultCredentialsProvider(DEFAULT_USER, DEFAULT_PASS);
113126

114-
private boolean automaticRecovery = true;
115-
private boolean topologyRecovery = true;
127+
private boolean automaticRecovery = true;
128+
private boolean topologyRecovery = true;
116129

117130
// long is used to make sure the users can use both ints
118131
// and longs safely. It is unlikely that anybody'd need
@@ -189,33 +202,50 @@ public void setPort(int port) {
189202
* @return the AMQP user name to use when connecting to the broker
190203
*/
191204
public String getUsername() {
192-
return this.username;
205+
return credentialsProvider.getUsername();
193206
}
194207

195208
/**
196209
* Set the user name.
197210
* @param username the AMQP user name to use when connecting to the broker
198211
*/
199212
public void setUsername(String username) {
200-
this.username = username;
213+
this.credentialsProvider = new DefaultCredentialsProvider(
214+
username,
215+
this.credentialsProvider.getPassword()
216+
);
201217
}
202218

203219
/**
204220
* Retrieve the password.
205221
* @return the password to use when connecting to the broker
206222
*/
207223
public String getPassword() {
208-
return this.password;
224+
return credentialsProvider.getPassword();
209225
}
210226

211227
/**
212228
* Set the password.
213229
* @param password the password to use when connecting to the broker
214230
*/
215231
public void setPassword(String password) {
216-
this.password = password;
232+
this.credentialsProvider = new DefaultCredentialsProvider(
233+
this.credentialsProvider.getUsername(),
234+
password
235+
);
217236
}
218237

238+
/**
239+
* Set a custom credentials provider.
240+
* Default implementation uses static username and password.
241+
* @param credentialsProvider The custom implementation of CredentialsProvider to use when connecting to the broker.
242+
* @see com.rabbitmq.client.impl.DefaultCredentialsProvider
243+
* @since 4.5.0
244+
*/
245+
public void setCredentialsProvider(CredentialsProvider credentialsProvider) {
246+
this.credentialsProvider = credentialsProvider;
247+
}
248+
219249
/**
220250
* Retrieve the virtual host.
221251
* @return the virtual host to use when connecting to the broker
@@ -971,8 +1001,7 @@ public Connection newConnection(ExecutorService executor, AddressResolver addres
9711001
public ConnectionParams params(ExecutorService consumerWorkServiceExecutor) {
9721002
ConnectionParams result = new ConnectionParams();
9731003

974-
result.setUsername(username);
975-
result.setPassword(password);
1004+
result.setCredentialsProvider(credentialsProvider);
9761005
result.setConsumerWorkServiceExecutor(consumerWorkServiceExecutor);
9771006
result.setVirtualHost(virtualHost);
9781007
result.setClientProperties(getClientProperties());
@@ -1072,7 +1101,8 @@ protected AddressResolver createAddressResolver(List<Address> addresses) {
10721101

10731102
@Override public ConnectionFactory clone(){
10741103
try {
1075-
return (ConnectionFactory)super.clone();
1104+
ConnectionFactory clone = (ConnectionFactory)super.clone();
1105+
return clone;
10761106
} catch (CloneNotSupportedException e) {
10771107
throw new Error(e);
10781108
}

src/main/java/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -136,8 +136,7 @@ public static Map<String, Object> defaultClientProperties() {
136136
private final int requestedFrameMax;
137137
private final int handshakeTimeout;
138138
private final int shutdownTimeout;
139-
private final String username;
140-
private final String password;
139+
private final CredentialsProvider credentialsProvider;
141140
private final Collection<BlockedListener> blockedListeners = new CopyOnWriteArrayList<BlockedListener>();
142141
protected final MetricsCollector metricsCollector;
143142
private final int channelRpcTimeout;
@@ -216,8 +215,7 @@ public AMQConnection(ConnectionParams params, FrameHandler frameHandler) {
216215
public AMQConnection(ConnectionParams params, FrameHandler frameHandler, MetricsCollector metricsCollector)
217216
{
218217
checkPreconditions();
219-
this.username = params.getUsername();
220-
this.password = params.getPassword();
218+
this.credentialsProvider = params.getCredentialsProvider();
221219
this._frameHandler = frameHandler;
222220
this._virtualHost = params.getVirtualHost();
223221
this._exceptionHandler = params.getExceptionHandler();
@@ -337,8 +335,10 @@ public void start()
337335
"server offered [" + connStart.getMechanisms() + "]");
338336
}
339337

338+
String username = credentialsProvider.getUsername();
339+
String password = credentialsProvider.getPassword();
340340
LongString challenge = null;
341-
LongString response = sm.handleChallenge(null, this.username, this.password);
341+
LongString response = sm.handleChallenge(null, username, password);
342342

343343
do {
344344
Method method = (challenge == null)
@@ -355,7 +355,7 @@ public void start()
355355
connTune = (AMQP.Connection.Tune) serverResponse;
356356
} else {
357357
challenge = ((AMQP.Connection.Secure) serverResponse).getChallenge();
358-
response = sm.handleChallenge(challenge, this.username, this.password);
358+
response = sm.handleChallenge(challenge, username, password);
359359
}
360360
} catch (ShutdownSignalException e) {
361361
Method shutdownMethod = e.getReason();
@@ -1069,7 +1069,7 @@ public AMQCommand transformReply(AMQCommand command) {
10691069

10701070
@Override public String toString() {
10711071
final String virtualHost = "/".equals(_virtualHost) ? _virtualHost : "/" + _virtualHost;
1072-
return "amqp://" + this.username + "@" + getHostAddress() + ":" + getPort() + virtualHost;
1072+
return "amqp://" + this.credentialsProvider.getUsername() + "@" + getHostAddress() + ":" + getPort() + virtualHost;
10731073
}
10741074

10751075
private String getHostAddress() {

src/main/java/com/rabbitmq/client/impl/ConnectionParams.java

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,7 @@
2626
import java.util.concurrent.ThreadFactory;
2727

2828
public class ConnectionParams {
29-
private String username;
30-
private String password;
29+
private CredentialsProvider credentialsProvider;
3130
private ExecutorService consumerWorkServiceExecutor;
3231
private ScheduledExecutorService heartbeatExecutor;
3332
private ExecutorService shutdownExecutor;
@@ -52,12 +51,8 @@ public class ConnectionParams {
5251

5352
public ConnectionParams() {}
5453

55-
public String getUsername() {
56-
return username;
57-
}
58-
59-
public String getPassword() {
60-
return password;
54+
public CredentialsProvider getCredentialsProvider() {
55+
return credentialsProvider;
6156
}
6257

6358
public ExecutorService getConsumerWorkServiceExecutor() {
@@ -132,12 +127,8 @@ public boolean channelShouldCheckRpcResponseType() {
132127
return channelShouldCheckRpcResponseType;
133128
}
134129

135-
public void setUsername(String username) {
136-
this.username = username;
137-
}
138-
139-
public void setPassword(String password) {
140-
this.password = password;
130+
public void setCredentialsProvider(CredentialsProvider credentialsProvider) {
131+
this.credentialsProvider = credentialsProvider;
141132
}
142133

143134
public void setConsumerWorkServiceExecutor(ExecutorService consumerWorkServiceExecutor) {
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package com.rabbitmq.client.impl;
2+
3+
/**
4+
* Provider interface for establishing credentials for connecting to the broker. Especially useful
5+
* for situations where credentials might change before a recovery takes place or where it is
6+
* convenient to plug in an outside custom implementation.
7+
*
8+
* @since 4.5.0
9+
*/
10+
public interface CredentialsProvider {
11+
12+
String getUsername();
13+
14+
String getPassword();
15+
16+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package com.rabbitmq.client.impl;
2+
3+
/**
4+
* Default implementation of a CredentialsProvider which simply holds a static
5+
* username and password.
6+
*
7+
* @since 4.5.0
8+
*/
9+
public class DefaultCredentialsProvider implements CredentialsProvider {
10+
11+
private final String username;
12+
private final String password;
13+
14+
public DefaultCredentialsProvider(String username, String password) {
15+
this.username = username;
16+
this.password = password;
17+
}
18+
19+
@Override
20+
public String getUsername() {
21+
return username;
22+
}
23+
24+
@Override
25+
public String getPassword() {
26+
return password;
27+
}
28+
29+
}

src/test/java/com/rabbitmq/client/test/ConnectionFactoryTest.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.rabbitmq.client.MetricsCollector;
2222
import com.rabbitmq.client.impl.AMQConnection;
2323
import com.rabbitmq.client.impl.ConnectionParams;
24+
import com.rabbitmq.client.impl.CredentialsProvider;
2425
import com.rabbitmq.client.impl.FrameHandler;
2526
import com.rabbitmq.client.impl.FrameHandlerFactory;
2627
import org.junit.Test;
@@ -29,8 +30,9 @@
2930
import java.util.Queue;
3031
import java.util.concurrent.ArrayBlockingQueue;
3132
import java.util.concurrent.TimeoutException;
33+
import java.util.concurrent.atomic.AtomicBoolean;
3234

33-
import static org.junit.Assert.assertSame;
35+
import static org.junit.Assert.*;
3436
import static org.mockito.Mockito.*;
3537

3638
public class ConnectionFactoryTest {
@@ -62,5 +64,30 @@ protected synchronized FrameHandlerFactory createFrameHandlerFactory() throws IO
6264
);
6365
assertSame(connectionThatSucceeds, returnedConnection);
6466
}
67+
68+
// see https://github.com/rabbitmq/rabbitmq-java-client/pull/350
69+
@Test public void customizeCredentialsProvider() throws Exception {
70+
final CredentialsProvider provider = mock(CredentialsProvider.class);
71+
final AMQConnection connection = mock(AMQConnection.class);
72+
final AtomicBoolean createCalled = new AtomicBoolean(false);
73+
74+
ConnectionFactory connectionFactory = new ConnectionFactory() {
75+
@Override
76+
protected AMQConnection createConnection(ConnectionParams params, FrameHandler frameHandler,
77+
MetricsCollector metricsCollector) {
78+
assertSame(provider, params.getCredentialsProvider());
79+
createCalled.set(true);
80+
return connection;
81+
}
82+
};
83+
connectionFactory.setCredentialsProvider(provider);
84+
connectionFactory.setAutomaticRecoveryEnabled(false);
85+
86+
doNothing().when(connection).start();
87+
88+
Connection returnedConnection = connectionFactory.newConnection();
89+
assertSame(returnedConnection, connection);
90+
assertTrue(createCalled.get());
91+
}
6592

6693
}

0 commit comments

Comments
 (0)