Skip to content

Commit 0395e43

Browse files
Merge pull request #214 from rabbitmq/rabbitmq-java-client-210
Enable automatic recovery by default
2 parents 8d630e9 + 7104a23 commit 0395e43

File tree

10 files changed

+64
-69
lines changed

10 files changed

+64
-69
lines changed

src/main/java/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ public class ConnectionFactory implements Cloneable {
100100
private SocketConfigurator socketConf = new DefaultSocketConfigurator();
101101
private ExceptionHandler exceptionHandler = new DefaultExceptionHandler();
102102

103-
private boolean automaticRecovery = false;
103+
private boolean automaticRecovery = true;
104104
private boolean topologyRecovery = true;
105105

106106
// long is used to make sure the users can use both ints

src/main/java/com/rabbitmq/client/impl/recovery/RecoveryAwareChannelN.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,16 +82,20 @@ private AMQImpl.Basic.Deliver offsetDeliveryTag(AMQImpl.Basic.Deliver method) {
8282

8383
@Override
8484
public void basicAck(long deliveryTag, boolean multiple) throws IOException {
85+
// FIXME no check if deliveryTag = 0 (ack all)
8586
long realTag = deliveryTag - activeDeliveryTagOffset;
86-
if (realTag > 0) {
87+
// 0 tag means ack all
88+
if (realTag >= 0) {
8789
super.basicAck(realTag, multiple);
8890
}
8991
}
9092

9193
@Override
9294
public void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException {
95+
// FIXME no check if deliveryTag = 0 (nack all)
9396
long realTag = deliveryTag - activeDeliveryTagOffset;
94-
if (realTag > 0) {
97+
// 0 tag means nack all
98+
if (realTag >= 0) {
9599
super.basicNack(realTag, multiple, requeue);
96100
}
97101
}

src/test/java/com/rabbitmq/client/test/BrokerTestCase.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,14 @@ protected void finished(Description description) {
6060

6161
protected ConnectionFactory newConnectionFactory() {
6262
ConnectionFactory connectionFactory = TestUtils.connectionFactory();
63+
connectionFactory.setAutomaticRecoveryEnabled(isAutomaticRecoveryEnabled());
6364
return connectionFactory;
6465
}
6566

67+
protected boolean isAutomaticRecoveryEnabled() {
68+
return true;
69+
}
70+
6671
protected Connection connection;
6772
protected Channel channel;
6873

src/test/java/com/rabbitmq/client/test/SharedThreadPoolTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
public class SharedThreadPoolTest extends BrokerTestCase {
3232
@Test public void willShutDownExecutor() throws IOException, TimeoutException {
3333
ConnectionFactory cf = TestUtils.connectionFactory();
34+
cf.setAutomaticRecoveryEnabled(false);
3435
ExecutorService executor = Executors.newFixedThreadPool(8);
3536
cf.setSharedExecutor(executor);
3637

src/test/java/com/rabbitmq/client/test/functional/DeadLetterExchange.java

Lines changed: 16 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -15,35 +15,19 @@
1515

1616
package com.rabbitmq.client.test.functional;
1717

18-
import static org.junit.Assert.assertEquals;
19-
import static org.junit.Assert.assertNotNull;
20-
import static org.junit.Assert.assertNull;
21-
import static org.junit.Assert.assertTrue;
22-
import static org.junit.Assert.fail;
18+
import com.rabbitmq.client.*;
19+
import com.rabbitmq.client.AMQP.BasicProperties;
20+
import com.rabbitmq.client.QueueingConsumer.Delivery;
21+
import com.rabbitmq.client.test.BrokerTestCase;
22+
import org.junit.Test;
2323

2424
import java.io.IOException;
25-
import java.util.ArrayList;
26-
import java.util.Arrays;
27-
import java.util.Collections;
28-
import java.util.HashMap;
29-
import java.util.List;
30-
import java.util.Map;
25+
import java.util.*;
3126
import java.util.concurrent.Callable;
3227
import java.util.concurrent.CountDownLatch;
3328
import java.util.concurrent.TimeUnit;
3429

35-
import org.junit.Test;
36-
37-
import com.rabbitmq.client.AMQP;
38-
import com.rabbitmq.client.AMQP.BasicProperties;
39-
import com.rabbitmq.client.Channel;
40-
import com.rabbitmq.client.DefaultConsumer;
41-
import com.rabbitmq.client.Envelope;
42-
import com.rabbitmq.client.GetResponse;
43-
import com.rabbitmq.client.MessageProperties;
44-
import com.rabbitmq.client.QueueingConsumer;
45-
import com.rabbitmq.client.QueueingConsumer.Delivery;
46-
import com.rabbitmq.client.test.BrokerTestCase;
30+
import static org.junit.Assert.*;
4731

4832
public class DeadLetterExchange extends BrokerTestCase {
4933
public static final String DLX = "dead.letter.exchange";
@@ -149,23 +133,23 @@ protected void releaseResources() throws IOException {
149133
Map<String, Object> args = new HashMap<String, Object>();
150134
args.put(DLX_RK_ARG, "foo");
151135

152-
channel.queueDeclare("bar", false, true, false, args);
136+
channel.queueDeclare(randomQueueName(), false, true, false, args);
153137
fail("dlx must be defined if dl-rk is set");
154138
} catch (IOException ex) {
155139
checkShutdownSignal(AMQP.PRECONDITION_FAILED, ex);
156140
}
157141
}
158142

159143
@Test public void redeclareQueueWithRoutingKeyButNoDeadLetterExchange()
160-
throws IOException
161-
{
144+
throws IOException, InterruptedException {
162145
try {
146+
String queueName = randomQueueName();
163147
Map<String, Object> args = new HashMap<String, Object>();
164-
channel.queueDeclare("bar", false, true, false, args);
148+
channel.queueDeclare(queueName, false, true, false, args);
165149

166150
args.put(DLX_RK_ARG, "foo");
167151

168-
channel.queueDeclare("bar", false, true, false, args);
152+
channel.queueDeclare(queueName, false, true, false, args);
169153
fail("x-dead-letter-exchange must be specified if " +
170154
"x-dead-letter-routing-key is set");
171155
} catch (IOException ex) {
@@ -711,4 +695,8 @@ public void process(GetResponse getResponse) {
711695

712696
public void process(GetResponse response);
713697
}
698+
699+
private static String randomQueueName() {
700+
return DeadLetterExchange.class.getSimpleName() + "-" + UUID.randomUUID().toString();
701+
}
714702
}

src/test/java/com/rabbitmq/client/test/functional/Heartbeat.java

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,13 @@
1616

1717
package com.rabbitmq.client.test.functional;
1818

19-
import static org.junit.Assert.assertEquals;
20-
import static org.junit.Assert.assertFalse;
21-
import static org.junit.Assert.assertTrue;
19+
import com.rabbitmq.client.impl.recovery.AutorecoveringConnection;
20+
import com.rabbitmq.client.test.BrokerTestCase;
21+
import org.junit.Test;
2222

2323
import java.io.IOException;
2424

25-
import org.junit.Test;
26-
27-
import com.rabbitmq.client.impl.AMQConnection;
28-
import com.rabbitmq.client.test.BrokerTestCase;
25+
import static org.junit.Assert.*;
2926

3027
public class Heartbeat extends BrokerTestCase {
3128

@@ -41,7 +38,7 @@ public Heartbeat()
4138
assertEquals(1, connection.getHeartbeat());
4239
Thread.sleep(3100);
4340
assertTrue(connection.isOpen());
44-
((AMQConnection)connection).setHeartbeat(0);
41+
((AutorecoveringConnection)connection).getDelegate().setHeartbeat(0);
4542
assertEquals(0, connection.getHeartbeat());
4643
Thread.sleep(3100);
4744
assertFalse(connection.isOpen());

src/test/java/com/rabbitmq/client/test/functional/UnexpectedFrames.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,18 @@
1515

1616
package com.rabbitmq.client.test.functional;
1717

18-
import java.io.IOException;
19-
import java.net.Socket;
20-
21-
import javax.net.SocketFactory;
22-
23-
import com.rabbitmq.client.impl.*;
24-
import com.rabbitmq.client.test.TestUtils;
25-
import org.junit.Test;
26-
2718
import com.rabbitmq.client.AMQP;
2819
import com.rabbitmq.client.ConnectionFactory;
2920
import com.rabbitmq.client.DefaultSocketConfigurator;
21+
import com.rabbitmq.client.impl.*;
22+
import com.rabbitmq.client.impl.recovery.AutorecoveringConnection;
3023
import com.rabbitmq.client.test.BrokerTestCase;
24+
import com.rabbitmq.client.test.TestUtils;
25+
import org.junit.Test;
26+
27+
import javax.net.SocketFactory;
28+
import java.io.IOException;
29+
import java.net.Socket;
3130

3231
/**
3332
* Test that the server correctly handles us when we send it bad frames
@@ -177,7 +176,7 @@ public Frame confuse(Frame frame) {
177176
}
178177

179178
private void expectError(int error, Confuser confuser) throws IOException {
180-
((ConfusedFrameHandler)((AMQConnection)connection).getFrameHandler()).
179+
((ConfusedFrameHandler)((AutorecoveringConnection)connection).getDelegate().getFrameHandler()).
181180
confuser = confuser;
182181

183182
//NB: the frame confuser relies on the encoding of the

src/test/java/com/rabbitmq/client/test/server/ChannelLimitNegotiation.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
import javax.net.SocketFactory;
2727

28+
import com.rabbitmq.client.impl.recovery.AutorecoveringConnection;
2829
import com.rabbitmq.client.test.TestUtils;
2930
import org.junit.Test;
3031

@@ -103,7 +104,7 @@ protected int negotiateChannelMax(int requestedChannelMax, int serverMax) {
103104
assertNull(conn.createChannel(n + 1));
104105

105106
// Construct a channel directly
106-
final ChannelN ch = new ChannelN((AMQConnection) conn, n + 1,
107+
final ChannelN ch = new ChannelN(((AutorecoveringConnection) conn).getDelegate(), n + 1,
107108
new ConsumerWorkService(Executors.newSingleThreadExecutor(),
108109
Executors.defaultThreadFactory(), ConnectionFactory.DEFAULT_SHUTDOWN_TIMEOUT));
109110
conn.addShutdownListener(new ShutdownListener() {

src/test/java/com/rabbitmq/client/test/server/ExclusiveQueueDurability.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,14 @@
3333
*/
3434
public class ExclusiveQueueDurability extends BrokerTestCase {
3535

36+
@Override
37+
protected boolean isAutomaticRecoveryEnabled() {
38+
// With automatic recovery enabled, queue can be re-created when launching the test suite
39+
// (because FunctionalTests are launched independently and as part of the HATests)
40+
// This then makes this test fail.
41+
return false;
42+
}
43+
3644
void verifyQueueMissing(Channel channel, String queueName)
3745
throws IOException {
3846
try {

src/test/java/com/rabbitmq/client/test/server/Permissions.java

Lines changed: 11 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -16,28 +16,20 @@
1616

1717
package com.rabbitmq.client.test.server;
1818

19-
import static org.junit.Assert.assertFalse;
20-
import static org.junit.Assert.assertTrue;
21-
import static org.junit.Assert.fail;
19+
import com.rabbitmq.client.*;
20+
import com.rabbitmq.client.impl.AMQChannel;
21+
import com.rabbitmq.client.impl.recovery.AutorecoveringChannel;
22+
import com.rabbitmq.client.test.BrokerTestCase;
23+
import com.rabbitmq.client.test.TestUtils;
24+
import com.rabbitmq.tools.Host;
25+
import org.junit.Test;
2226

2327
import java.io.IOException;
2428
import java.util.HashMap;
2529
import java.util.Map;
2630
import java.util.concurrent.TimeoutException;
2731

28-
import com.rabbitmq.client.test.TestUtils;
29-
import org.junit.Test;
30-
31-
import com.rabbitmq.client.AMQP;
32-
import com.rabbitmq.client.AlreadyClosedException;
33-
import com.rabbitmq.client.AuthenticationFailureException;
34-
import com.rabbitmq.client.Channel;
35-
import com.rabbitmq.client.Connection;
36-
import com.rabbitmq.client.ConnectionFactory;
37-
import com.rabbitmq.client.QueueingConsumer;
38-
import com.rabbitmq.client.impl.AMQChannel;
39-
import com.rabbitmq.client.test.BrokerTestCase;
40-
import com.rabbitmq.tools.Host;
32+
import static org.junit.Assert.*;
4133

4234
public class Permissions extends BrokerTestCase
4335
{
@@ -222,9 +214,9 @@ public void with(String name) throws IOException {
222214
{
223215
runTest(false, false, true, false, new WithName() {
224216
public void with(String name) throws IOException {
225-
((AMQChannel)channel)
226-
.exnWrappingRpc(new AMQP.Queue.Purge.Builder()
227-
.queue(name)
217+
AMQChannel channelDelegate = (AMQChannel) ((AutorecoveringChannel)channel).getDelegate();
218+
channelDelegate.exnWrappingRpc(new AMQP.Queue.Purge.Builder()
219+
.queue(name)
228220
.build());
229221
}});
230222
}

0 commit comments

Comments
 (0)