18
18
19
19
import java .net .MalformedURLException ;
20
20
import java .net .URISyntaxException ;
21
+ import java .util .AbstractMap .SimpleImmutableEntry ;
21
22
import java .util .Arrays ;
22
23
import java .util .HashMap ;
23
24
import java .util .Map ;
25
+ import java .util .stream .Collectors ;
26
+ import java .util .stream .IntStream ;
24
27
25
28
import org .apache .commons .logging .Log ;
26
29
import org .apache .commons .logging .LogFactory ;
27
30
28
31
import org .springframework .amqp .AmqpException ;
32
+ import org .springframework .amqp .rabbit .support .RabbitExceptionTranslator ;
29
33
import org .springframework .beans .factory .DisposableBean ;
30
34
import org .springframework .core .io .Resource ;
31
35
import org .springframework .lang .Nullable ;
34
38
import com .rabbitmq .http .client .Client ;
35
39
import com .rabbitmq .http .client .domain .QueueInfo ;
36
40
37
-
38
41
/**
39
42
* A {@link RoutingConnectionFactory} that determines the node on which a queue is located and
40
43
* returns a factory that connects directly to that node.
@@ -96,19 +99,9 @@ public class LocalizedQueueConnectionFactory implements ConnectionFactory, Routi
96
99
public LocalizedQueueConnectionFactory (ConnectionFactory defaultConnectionFactory ,
97
100
Map <String , String > nodeToAddress , String [] adminUris , String vhost , String username , String password ,
98
101
boolean useSSL , Resource sslPropertiesLocation ) {
99
- Assert .notNull (defaultConnectionFactory , "'defaultConnectionFactory' cannot be null" );
100
- this .defaultConnectionFactory = defaultConnectionFactory ;
101
- this .adminUris = Arrays .copyOf (adminUris , adminUris .length );
102
- this .nodeToAddress .putAll (nodeToAddress );
103
- this .vhost = vhost ;
104
- this .username = username ;
105
- this .password = password ;
106
- this .useSSL = useSSL ;
107
- this .sslPropertiesLocation = sslPropertiesLocation ;
108
- this .keyStore = null ;
109
- this .trustStore = null ;
110
- this .keyStorePassPhrase = null ;
111
- this .trustStorePassPhrase = null ;
102
+
103
+ this (defaultConnectionFactory , adminUris , nodeToAddress , vhost , username , password , useSSL ,
104
+ sslPropertiesLocation , null , null , null , null );
112
105
}
113
106
114
107
/**
@@ -129,19 +122,9 @@ public LocalizedQueueConnectionFactory(ConnectionFactory defaultConnectionFactor
129
122
Map <String , String > nodeToAddress , String [] adminUris , String vhost , String username , String password ,
130
123
boolean useSSL , String keyStore , String trustStore ,
131
124
String keyStorePassPhrase , String trustStorePassPhrase ) {
132
- Assert .notNull (defaultConnectionFactory , "'defaultConnectionFactory' cannot be null" );
133
- this .defaultConnectionFactory = defaultConnectionFactory ;
134
- this .adminUris = Arrays .copyOf (adminUris , adminUris .length );
135
- this .nodeToAddress .putAll (nodeToAddress );
136
- this .vhost = vhost ;
137
- this .username = username ;
138
- this .password = password ;
139
- this .useSSL = useSSL ;
140
- this .sslPropertiesLocation = null ;
141
- this .keyStore = keyStore ;
142
- this .trustStore = trustStore ;
143
- this .keyStorePassPhrase = keyStorePassPhrase ;
144
- this .trustStorePassPhrase = trustStorePassPhrase ;
125
+
126
+ this (defaultConnectionFactory , adminUris , nodeToAddress , vhost , username , password , useSSL , null ,
127
+ keyStore , trustStore , keyStorePassPhrase , trustStorePassPhrase );
145
128
}
146
129
147
130
/**
@@ -159,24 +142,10 @@ public LocalizedQueueConnectionFactory(ConnectionFactory defaultConnectionFactor
159
142
*/
160
143
public LocalizedQueueConnectionFactory (ConnectionFactory defaultConnectionFactory , String [] addresses ,
161
144
String [] adminUris , String [] nodes , String vhost , String username , String password , boolean useSSL ,
162
- Resource sslPropertiesLocation ) {
163
- Assert .notNull (defaultConnectionFactory , "'defaultConnectionFactory' cannot be null" );
164
- Assert .isTrue (addresses .length == nodes .length ,
165
- "'addresses', 'adminAddresses', and 'nodes' properties must have equal length" );
166
- this .defaultConnectionFactory = defaultConnectionFactory ;
167
- this .adminUris = Arrays .copyOf (adminUris , adminUris .length );
168
- for (int i = 0 ; i < addresses .length ; i ++) {
169
- this .nodeToAddress .put (nodes [i ], addresses [i ]);
170
- }
171
- this .vhost = vhost ;
172
- this .username = username ;
173
- this .password = password ;
174
- this .useSSL = useSSL ;
175
- this .sslPropertiesLocation = sslPropertiesLocation ;
176
- this .keyStore = null ;
177
- this .trustStore = null ;
178
- this .keyStorePassPhrase = null ;
179
- this .trustStorePassPhrase = null ;
145
+ @ Nullable Resource sslPropertiesLocation ) {
146
+
147
+ this (defaultConnectionFactory , adminUris , nodesAddressesToMap (nodes , addresses ), vhost , username , password ,
148
+ useSSL , sslPropertiesLocation , null , null , null , null );
180
149
}
181
150
182
151
/**
@@ -198,25 +167,40 @@ public LocalizedQueueConnectionFactory(ConnectionFactory defaultConnectionFactor
198
167
String [] addresses , String [] adminUris , String [] nodes , String vhost ,
199
168
String username , String password , boolean useSSL , String keyStore , String trustStore ,
200
169
String keyStorePassPhrase , String trustStorePassPhrase ) {
170
+
171
+ this (defaultConnectionFactory , adminUris , nodesAddressesToMap (nodes , addresses ), vhost , username , password ,
172
+ useSSL , null , keyStore , trustStore , keyStorePassPhrase , trustStorePassPhrase );
173
+ }
174
+
175
+ private LocalizedQueueConnectionFactory (ConnectionFactory defaultConnectionFactory , String [] adminUris ,
176
+ Map <String , String > nodeToAddress , String vhost , String username , String password , boolean useSSL ,
177
+ @ Nullable Resource sslPropertiesLocation , @ Nullable String keyStore , @ Nullable String trustStore ,
178
+ @ Nullable String keyStorePassPhrase , @ Nullable String trustStorePassPhrase ) {
179
+
201
180
Assert .notNull (defaultConnectionFactory , "'defaultConnectionFactory' cannot be null" );
202
- Assert .isTrue (addresses .length == nodes .length ,
203
- "'addresses', 'adminAddresses', and 'nodes' properties must have equal length" );
204
181
this .defaultConnectionFactory = defaultConnectionFactory ;
205
182
this .adminUris = Arrays .copyOf (adminUris , adminUris .length );
206
- for (int i = 0 ; i < addresses .length ; i ++) {
207
- this .nodeToAddress .put (nodes [i ], addresses [i ]);
208
- }
183
+ this .nodeToAddress .putAll (nodeToAddress );
209
184
this .vhost = vhost ;
210
185
this .username = username ;
211
186
this .password = password ;
212
187
this .useSSL = useSSL ;
213
- this .sslPropertiesLocation = null ;
188
+ this .sslPropertiesLocation = sslPropertiesLocation ;
214
189
this .keyStore = keyStore ;
215
190
this .trustStore = trustStore ;
216
191
this .keyStorePassPhrase = keyStorePassPhrase ;
217
192
this .trustStorePassPhrase = trustStorePassPhrase ;
218
193
}
219
194
195
+ private static Map <String , String > nodesAddressesToMap (String [] nodes , String [] addresses ) {
196
+ Assert .isTrue (addresses .length == nodes .length ,
197
+ "'addresses' and 'nodes' properties must have equal length" );
198
+ return IntStream .range (0 , addresses .length )
199
+ .mapToObj (i -> new SimpleImmutableEntry <>(nodes [i ], addresses [i ]))
200
+ .collect (Collectors .toMap (SimpleImmutableEntry ::getKey , SimpleImmutableEntry ::getValue ,
201
+ (u , v ) -> v )); // TODO in 2.2 use default throwingMerger() (to catch dups)
202
+ }
203
+
220
204
@ Override
221
205
public Connection createConnection () throws AmqpException {
222
206
return this .defaultConnectionFactory .createConnection ();
@@ -320,8 +304,7 @@ protected Client createClient(String adminUri, String username, String password)
320
304
return new Client (adminUri , username , password );
321
305
}
322
306
323
- private synchronized ConnectionFactory nodeConnectionFactory (String queue , String node , String address )
324
- throws Exception {
307
+ private synchronized ConnectionFactory nodeConnectionFactory (String queue , String node , String address ) {
325
308
if (this .logger .isInfoEnabled ()) {
326
309
this .logger .info ("Queue: " + queue + " is on node: " + node + " at: " + address );
327
310
}
@@ -341,9 +324,8 @@ private synchronized ConnectionFactory nodeConnectionFactory(String queue, Strin
341
324
* @param address the address to which the factory should connect.
342
325
* @param node the node.
343
326
* @return the connection factory.
344
- * @throws Exception if errors occur during creation.
345
327
*/
346
- protected ConnectionFactory createConnectionFactory (String address , String node ) throws Exception {
328
+ protected ConnectionFactory createConnectionFactory (String address , String node ) {
347
329
RabbitConnectionFactoryBean rcfb = new RabbitConnectionFactoryBean ();
348
330
rcfb .setUseSSL (this .useSSL );
349
331
rcfb .setSslPropertiesLocation (this .sslPropertiesLocation );
@@ -352,7 +334,14 @@ protected ConnectionFactory createConnectionFactory(String address, String node)
352
334
rcfb .setKeyStorePassphrase (this .keyStorePassPhrase );
353
335
rcfb .setTrustStorePassphrase (this .trustStorePassPhrase );
354
336
rcfb .afterPropertiesSet ();
355
- CachingConnectionFactory ccf = new CachingConnectionFactory (rcfb .getObject ()); // NOSONAR never null
337
+ com .rabbitmq .client .ConnectionFactory rcf ;
338
+ try {
339
+ rcf = rcfb .getObject ();
340
+ }
341
+ catch (Exception e ) {
342
+ throw RabbitExceptionTranslator .convertRabbitAccessException (e );
343
+ }
344
+ CachingConnectionFactory ccf = new CachingConnectionFactory (rcf ); // NOSONAR never null
356
345
ccf .setAddresses (address );
357
346
ccf .setUsername (this .username );
358
347
ccf .setPassword (this .password );
@@ -362,12 +351,21 @@ protected ConnectionFactory createConnectionFactory(String address, String node)
362
351
}
363
352
364
353
@ Override
365
- public void destroy () throws Exception {
354
+ public void destroy () {
355
+ Exception lastException = null ;
366
356
for (ConnectionFactory connectionFactory : this .nodeFactories .values ()) {
367
357
if (connectionFactory instanceof DisposableBean ) {
368
- ((DisposableBean ) connectionFactory ).destroy ();
358
+ try {
359
+ ((DisposableBean ) connectionFactory ).destroy ();
360
+ }
361
+ catch (Exception e ) {
362
+ lastException = e ;
363
+ }
369
364
}
370
365
}
366
+ if (lastException != null ) {
367
+ throw RabbitExceptionTranslator .convertRabbitAccessException (lastException );
368
+ }
371
369
}
372
370
373
371
}
0 commit comments