Skip to content

Commit b7312c6

Browse files
author
Simon MacMullen
committed
Merging bug22895 into default
2 parents 59abe04 + e45f13d commit b7312c6

File tree

2 files changed

+39
-26
lines changed

2 files changed

+39
-26
lines changed

src/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -544,9 +544,18 @@ public boolean processControlCommand(Command c)
544544
// See the detailed comments in ChannelN.processAsync.
545545

546546
Method method = c.getMethod();
547-
547+
548548
if (method instanceof AMQP.Connection.Close) {
549-
handleConnectionClose(c);
549+
if (isOpen()) {
550+
handleConnectionClose(c);
551+
} else {
552+
// Already shutting down, so just send back a CloseOk.
553+
try {
554+
_channel0.quiescingTransmit(new AMQImpl.Connection.CloseOk());
555+
} catch (IOException ioe) {
556+
Utility.emptyStatement();
557+
}
558+
}
550559
return true;
551560
} else {
552561
if (isOpen()) {

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 28 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -189,8 +189,9 @@ public void releaseChannelNumber() {
189189

190190
/**
191191
* Protected API - Filters the inbound command stream, processing
192-
* Basic.Deliver, Basic.Return, Channel.Flow and Channel.Close
193-
* specially.
192+
* Basic.Deliver, Basic.Return and Channel.Close specially. If
193+
* we're in quiescing mode, all inbound commands are ignored,
194+
* except for Channel.Close and Channel.CloseOk.
194195
*/
195196
@Override public boolean processAsync(Command command) throws IOException
196197
{
@@ -199,33 +200,30 @@ public void releaseChannelNumber() {
199200
// If we are not, however, then we are in a quiescing, or
200201
// shutting-down state as the result of an application
201202
// decision to close this channel, and we are to discard all
202-
// incoming commands except for a close-ok.
203+
// incoming commands except for a close and close-ok.
203204

204205
Method method = command.getMethod();
205206

206-
if (method instanceof Channel.Close) {
207-
// Channel should always respond to Channel.Close
208-
// from the server
209-
releaseChannelNumber();
210-
ShutdownSignalException signal = new ShutdownSignalException(false,
211-
false,
212-
command,
213-
this);
214-
synchronized (_channelMutex) {
215-
try {
216-
processShutdownSignal(signal, true, false);
217-
quiescingTransmit(new Channel.CloseOk());
218-
} finally {
219-
notifyOutstandingRpc(signal);
220-
}
221-
}
222-
notifyListeners();
223-
return true;
224-
}
225207
if (isOpen()) {
226208
// We're in normal running mode.
227209

228-
if (method instanceof Basic.Deliver) {
210+
if (method instanceof Channel.Close) {
211+
releaseChannelNumber();
212+
ShutdownSignalException signal = new ShutdownSignalException(false,
213+
false,
214+
command,
215+
this);
216+
synchronized (_channelMutex) {
217+
try {
218+
processShutdownSignal(signal, true, false);
219+
quiescingTransmit(new Channel.CloseOk());
220+
} finally {
221+
notifyOutstandingRpc(signal);
222+
}
223+
}
224+
notifyListeners();
225+
return true;
226+
} else if (method instanceof Basic.Deliver) {
229227
Basic.Deliver m = (Basic.Deliver) method;
230228

231229
Consumer callback = _consumers.get(m.consumerTag);
@@ -300,7 +298,13 @@ public void releaseChannelNumber() {
300298
} else {
301299
// We're in quiescing mode.
302300

303-
if (method instanceof Channel.CloseOk) {
301+
if (method instanceof Channel.Close) {
302+
// We're already shutting down, so just send back an ok.
303+
synchronized (_channelMutex) {
304+
quiescingTransmit(new Channel.CloseOk());
305+
}
306+
return true;
307+
} else if (method instanceof Channel.CloseOk) {
304308
// We're quiescing, and we see a channel.close-ok:
305309
// this is our signal to leave quiescing mode and
306310
// finally shut down for good. Let it be handled as an

0 commit comments

Comments
 (0)