Skip to content

Commit 42356a8

Browse files
author
Steve Powell
committed
Merge bug23802 into default.
2 parents ba4d2b2 + 8f5cb90 commit 42356a8

File tree

9 files changed

+216
-117
lines changed

9 files changed

+216
-117
lines changed

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 28 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -82,20 +82,20 @@ public class ChannelN extends AMQChannel implements com.rabbitmq.client.Channel
8282
/** The ConfirmListener collection. */
8383
private final Collection<ConfirmListener> confirmListeners = new CopyOnWriteArrayList<ConfirmListener>();
8484

85-
/** Sequence number of next published message requiring confirmation. */
86-
private long nextPublishSeqNo = 0L;
87-
8885
/** The current default consumer, or null if there is none. */
8986
private volatile Consumer defaultConsumer = null;
9087

9188
/** Set of currently unconfirmed messages (i.e. messages that have
92-
* not been ack'd or nack'd by the server yet. */
93-
private volatile SortedSet<Long> unconfirmedSet =
89+
* not been ack'd or nack'd by the server yet.
90+
* Used as monitor and protects nextPublishSeqNo and onlyAcksReceived. */
91+
private SortedSet<Long> unconfirmedSet =
9492
Collections.synchronizedSortedSet(new TreeSet<Long>());
95-
93+
/** Sequence number of next published message requiring confirmation.
94+
* 0 means no confirmations. */
95+
private long nextPublishSeqNo = 0L;
9696
/** Whether any nacks have been received since the last
9797
* waitForConfirms(). */
98-
private volatile boolean onlyAcksReceived = true;
98+
private boolean noNacksReceived = true;
9999

100100
/**
101101
* Construct a new channel on the given connection with the given
@@ -167,8 +167,8 @@ public boolean waitForConfirms()
167167
throw Utility.fixStackTrace(getCloseReason());
168168
}
169169
if (unconfirmedSet.isEmpty()) {
170-
boolean aux = onlyAcksReceived;
171-
onlyAcksReceived = true;
170+
boolean aux = noNacksReceived;
171+
noNacksReceived = true;
172172
return aux;
173173
}
174174
unconfirmedSet.wait();
@@ -182,7 +182,7 @@ public void waitForConfirmsOrDie()
182182
{
183183
if (!waitForConfirms()) {
184184
close(AMQP.REPLY_SUCCESS, "NACKS RECEIVED", true, null, false);
185-
throw new IOException("nacks received");
185+
throw new IOException("nacks received", getCloseReason());
186186
}
187187
}
188188

@@ -321,7 +321,7 @@ public void releaseChannelNumber() {
321321
} else if (method instanceof Basic.Nack) {
322322
Basic.Nack nack = (Basic.Nack) method;
323323
callConfirmListeners(command, nack);
324-
handleAckNack(nack.getDeliveryTag(), nack.getMultiple(), false);
324+
handleAckNack(nack.getDeliveryTag(), nack.getMultiple(), true);
325325
return true;
326326
} else if (method instanceof Basic.RecoverOk) {
327327
for (Consumer callback: _consumers.values()) {
@@ -551,9 +551,11 @@ public void basicPublish(String exchange, String routingKey,
551551
BasicProperties props, byte[] body)
552552
throws IOException
553553
{
554-
if (nextPublishSeqNo > 0) {
555-
unconfirmedSet.add(getNextPublishSeqNo());
556-
nextPublishSeqNo++;
554+
synchronized(unconfirmedSet) {
555+
if (nextPublishSeqNo > 0) {
556+
unconfirmedSet.add(nextPublishSeqNo);
557+
nextPublishSeqNo++;
558+
}
557559
}
558560
BasicProperties useProps = props;
559561
if (props == null) {
@@ -994,7 +996,9 @@ public Tx.RollbackOk txRollback()
994996
public Confirm.SelectOk confirmSelect()
995997
throws IOException
996998
{
997-
if (nextPublishSeqNo == 0) nextPublishSeqNo = 1;
999+
synchronized(unconfirmedSet) {
1000+
if (nextPublishSeqNo == 0) nextPublishSeqNo = 1;
1001+
}
9981002
return (Confirm.SelectOk)
9991003
exnWrappingRpc(new Confirm.Select(false)).getMethod();
10001004

@@ -1012,7 +1016,9 @@ public Channel.FlowOk getFlow() {
10121016

10131017
/** Public API - {@inheritDoc} */
10141018
public long getNextPublishSeqNo() {
1015-
return nextPublishSeqNo;
1019+
synchronized(unconfirmedSet) {
1020+
return nextPublishSeqNo;
1021+
}
10161022
}
10171023

10181024
public void asyncRpc(Method method) throws IOException {
@@ -1024,13 +1030,13 @@ public AMQCommand rpc(Method method) throws IOException {
10241030
}
10251031

10261032
protected void handleAckNack(long seqNo, boolean multiple, boolean nack) {
1027-
if (multiple) {
1028-
unconfirmedSet.headSet(seqNo + 1).clear();
1029-
} else {
1030-
unconfirmedSet.remove(seqNo);
1031-
}
10321033
synchronized (unconfirmedSet) {
1033-
onlyAcksReceived = onlyAcksReceived && !nack;
1034+
if (multiple) {
1035+
unconfirmedSet.headSet(seqNo + 1).clear();
1036+
} else {
1037+
unconfirmedSet.remove(seqNo);
1038+
}
1039+
noNacksReceived &= !nack;
10341040
if (unconfirmedSet.isEmpty())
10351041
unconfirmedSet.notifyAll();
10361042
}

test/src/com/rabbitmq/client/test/Bug20004Test.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@
2525
* tests.
2626
*/
2727
public class Bug20004Test extends BrokerTestCase {
28-
public Exception caughtException = null;
29-
public boolean completed = false;
30-
public boolean created = false;
28+
private volatile Exception caughtException = null;
29+
private volatile boolean completed = false;
30+
private volatile boolean created = false;
3131

3232
protected void releaseResources()
3333
throws IOException
@@ -37,6 +37,7 @@ protected void releaseResources()
3737
}
3838
}
3939

40+
@SuppressWarnings("deprecation")
4041
public void testBug20004()
4142
throws IOException
4243
{
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
// The contents of this file are subject to the Mozilla Public License
2+
// Version 1.1 (the "License"); you may not use this file except in
3+
// compliance with the License. You may obtain a copy of the License
4+
// at http://www.mozilla.org/MPL/
5+
//
6+
// Software distributed under the License is distributed on an "AS IS"
7+
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
8+
// the License for the specific language governing rights and
9+
// limitations under the License.
10+
//
11+
// The Original Code is RabbitMQ.
12+
//
13+
// The Initial Developer of the Original Code is VMware, Inc.
14+
// Copyright (c) 2011 VMware, Inc. All rights reserved.
15+
//
16+
17+
package com.rabbitmq.client.test;
18+
19+
import com.rabbitmq.client.ShutdownSignalException;
20+
21+
import java.io.IOException;
22+
import java.util.concurrent.ExecutionException;
23+
import java.util.concurrent.Executors;
24+
import java.util.concurrent.FutureTask;
25+
import java.util.concurrent.TimeoutException;
26+
import java.util.concurrent.TimeUnit;
27+
28+
import junit.framework.AssertionFailedError;
29+
30+
public class ConfirmBase extends BrokerTestCase {
31+
protected void waitForConfirms() throws Exception
32+
{
33+
waitForConfirms("ConfirmBase.waitForConfirms");
34+
}
35+
36+
protected void waitForConfirms(final String testTitle) throws Exception
37+
{
38+
try {
39+
FutureTask<?> waiter = new FutureTask<Object>(new Runnable() {
40+
public void run() {
41+
try {
42+
channel.waitForConfirmsOrDie();
43+
} catch (IOException e) {
44+
throw (ShutdownSignalException)e.getCause();
45+
} catch (InterruptedException _) {
46+
fail(testTitle + ": interrupted");
47+
}
48+
}
49+
}, null);
50+
(Executors.newSingleThreadExecutor()).execute(waiter);
51+
waiter.get(10, TimeUnit.SECONDS);
52+
} catch (ExecutionException ee) {
53+
Throwable t = ee.getCause();
54+
if (t instanceof ShutdownSignalException) throw (ShutdownSignalException) t;
55+
if (t instanceof AssertionFailedError) throw (AssertionFailedError) t;
56+
throw (Exception)t;
57+
} catch (TimeoutException _) {
58+
fail(testTitle + ": timeout");
59+
}
60+
}
61+
}

test/src/com/rabbitmq/client/test/functional/BindingLifecycle.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ public void testQueuePurge() throws IOException {
5656
* longer purged, even if the channel they were sent down is not
5757
* (Tx-)transacted."
5858
*/
59+
@SuppressWarnings("deprecation")
5960
public void testUnackedPurge() throws IOException {
6061
Binding binding = setupExchangeBindings(false);
6162
channel.basicPublish(binding.x, binding.k, null, payload);

0 commit comments

Comments
 (0)