Skip to content

Commit dae53d3

Browse files
author
Simon MacMullen
committed
Merge bug24579
2 parents c582202 + f346291 commit dae53d3

File tree

3 files changed

+23
-14
lines changed

3 files changed

+23
-14
lines changed

src/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import java.security.NoSuchAlgorithmException;
2222
import java.util.Map;
2323
import java.util.concurrent.ExecutorService;
24-
import java.util.concurrent.Executors;
2524

2625
import java.net.InetSocketAddress;
2726
import java.net.Socket;
@@ -43,8 +42,9 @@
4342
*/
4443

4544
public class ConnectionFactory implements Cloneable {
46-
45+
4746
/** Default Executor threads */
47+
@Deprecated
4848
public static final int DEFAULT_NUM_CONSUMER_THREADS = 5;
4949
/** Default user name */
5050
public static final String DEFAULT_USER = "guest";
@@ -76,7 +76,6 @@ public class ConnectionFactory implements Cloneable {
7676
/** The default SSL protocol */
7777
private static final String DEFAULT_SSL_PROTOCOL = "SSLv3";
7878

79-
private int numConsumerThreads = DEFAULT_NUM_CONSUMER_THREADS;
8079
private String username = DEFAULT_USER;
8180
private String password = DEFAULT_PASS;
8281
private String virtualHost = DEFAULT_VHOST;
@@ -91,13 +90,15 @@ public class ConnectionFactory implements Cloneable {
9190
private SaslConfig saslConfig = DefaultSaslConfig.PLAIN;
9291

9392
/** @return number of consumer threads in default {@link ExecutorService} */
93+
@Deprecated
9494
public int getNumConsumerThreads() {
95-
return numConsumerThreads;
95+
return DEFAULT_NUM_CONSUMER_THREADS;
9696
}
9797

9898
/** @param numConsumerThreads threads in created private executor service */
99+
@Deprecated
99100
public void setNumConsumerThreads(int numConsumerThreads) {
100-
this.numConsumerThreads = numConsumerThreads;
101+
throw new IllegalArgumentException("setNumConsumerThreads not supported -- create explicit ExecutorService instead.");
101102
}
102103

103104
/** @return the default host to use for connections */
@@ -472,8 +473,7 @@ protected void configureSocket(Socket socket) throws IOException{
472473
* @throws IOException if it encounters a problem
473474
*/
474475
public Connection newConnection(Address[] addrs) throws IOException {
475-
return newConnection(Executors.newFixedThreadPool(this.numConsumerThreads),
476-
addrs);
476+
return newConnection(null, addrs);
477477
}
478478

479479
/**
@@ -490,7 +490,7 @@ public Connection newConnection(ExecutorService executor, Address[] addrs)
490490
for (Address addr : addrs) {
491491
try {
492492
FrameHandler frameHandler = createFrameHandler(addr);
493-
AMQConnection conn =
493+
AMQConnection conn =
494494
new AMQConnection(username,
495495
password,
496496
frameHandler,
@@ -518,7 +518,7 @@ public Connection newConnection(ExecutorService executor, Address[] addrs)
518518
* @throws IOException if it encounters a problem
519519
*/
520520
public Connection newConnection() throws IOException {
521-
return newConnection(Executors.newFixedThreadPool(this.numConsumerThreads),
521+
return newConnection(null,
522522
new Address[] {new Address(getHost(), getPort())}
523523
);
524524
}

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -107,8 +107,7 @@ public class ChannelN extends AMQChannel implements com.rabbitmq.client.Channel
107107
* @see Connection#createChannel
108108
* @param connection The connection associated with this channel
109109
* @param channelNumber The channel number to be associated with this channel
110-
* @param workPool pool in which this channel's consumer work is stored
111-
* @param executor service which executes the work in the workPool
110+
* @param workService service for managing this channel's consumer callbacks
112111
*/
113112
public ChannelN(AMQConnection connection, int channelNumber,
114113
ConsumerWorkService workService) {
@@ -651,7 +650,7 @@ public Exchange.DeleteOk exchangeDelete(String exchange)
651650
public Exchange.BindOk exchangeBind(String destination, String source,
652651
String routingKey, Map<String, Object> arguments)
653652
throws IOException {
654-
return (Exchange.BindOk)
653+
return (Exchange.BindOk)
655654
exnWrappingRpc(new Exchange.Bind.Builder()
656655
.destination(destination)
657656
.source(source)

src/com/rabbitmq/client/impl/ConsumerWorkService.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,25 @@
1818
import java.util.ArrayList;
1919
import java.util.List;
2020
import java.util.concurrent.ExecutorService;
21+
import java.util.concurrent.Executors;
2122

2223
import com.rabbitmq.client.Channel;
2324

2425
final class ConsumerWorkService {
2526
private static final int MAX_RUNNABLE_BLOCK_SIZE = 16;
27+
private static final int DEFAULT_NUM_THREADS = 5;
2628
private final ExecutorService executor;
29+
private final boolean privateExecutor;
2730
private final WorkPool<Channel, Runnable> workPool;
2831

2932
public ConsumerWorkService(ExecutorService executor) {
30-
this.executor = executor;
33+
if (executor == null) {
34+
privateExecutor = true;
35+
this.executor = Executors.newFixedThreadPool(DEFAULT_NUM_THREADS);
36+
} else {
37+
privateExecutor = false;
38+
this.executor = executor;
39+
}
3140
this.workPool = new WorkPool<Channel, Runnable>();
3241
}
3342

@@ -36,7 +45,8 @@ public ConsumerWorkService(ExecutorService executor) {
3645
*/
3746
public void shutdown() {
3847
this.workPool.unregisterAllKeys();
39-
this.executor.shutdown();
48+
if (privateExecutor)
49+
this.executor.shutdown();
4050
}
4151

4252
/**

0 commit comments

Comments
 (0)