Skip to content

Commit a0582b3

Browse files
author
Matthias Radestock
committed
merge bug20004 into default
2 parents 6681c07 + d8d3f6d commit a0582b3

File tree

2 files changed

+62
-39
lines changed

2 files changed

+62
-39
lines changed

src/com/rabbitmq/client/impl/AMQChannel.java

Lines changed: 58 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,13 @@
4949
* @see Connection
5050
*/
5151
public abstract class AMQChannel extends ShutdownNotifierComponent {
52+
/**
53+
* Private; used instead of synchronizing on the channel itself,
54+
* so that clients can themselves use the channel to synchronize
55+
* on.
56+
*/
57+
public final Object _channelMutex = new Object();
58+
5259
/** The connection this channel is associated with. */
5360
public final AMQConnection _connection;
5461

@@ -156,19 +163,23 @@ public void handleCompleteInboundCommand(AMQCommand command) throws IOException
156163
}
157164
}
158165

159-
public synchronized void enqueueRpc(RpcContinuation k)
166+
public void enqueueRpc(RpcContinuation k)
160167
{
161-
if (_activeRpc != null) {
162-
throw new IllegalStateException("cannot execute more than one synchronous AMQP command at a time");
168+
synchronized (_channelMutex) {
169+
if (_activeRpc != null) {
170+
throw new IllegalStateException("cannot execute more than one synchronous AMQP command at a time");
171+
}
172+
_activeRpc = k;
163173
}
164-
_activeRpc = k;
165174
}
166175

167-
public synchronized RpcContinuation nextOutstandingRpc()
176+
public RpcContinuation nextOutstandingRpc()
168177
{
169-
RpcContinuation result = _activeRpc;
170-
_activeRpc = null;
171-
return result;
178+
synchronized (_channelMutex) {
179+
RpcContinuation result = _activeRpc;
180+
_activeRpc = null;
181+
return result;
182+
}
172183
}
173184

174185
public void ensureIsOpen()
@@ -198,18 +209,22 @@ public AMQCommand rpc(Method m)
198209
return k.getReply();
199210
}
200211

201-
public synchronized void rpc(Method m, RpcContinuation k)
212+
public void rpc(Method m, RpcContinuation k)
202213
throws IOException
203214
{
204-
ensureIsOpen();
205-
quiescingRpc(m, k);
215+
synchronized (_channelMutex) {
216+
ensureIsOpen();
217+
quiescingRpc(m, k);
218+
}
206219
}
207220

208-
public synchronized void quiescingRpc(Method m, RpcContinuation k)
221+
public void quiescingRpc(Method m, RpcContinuation k)
209222
throws IOException
210223
{
211-
enqueueRpc(k);
212-
quiescingTransmit(m);
224+
synchronized (_channelMutex) {
225+
enqueueRpc(k);
226+
quiescingTransmit(m);
227+
}
213228
}
214229

215230
/**
@@ -237,13 +252,13 @@ public void processShutdownSignal(ShutdownSignalException signal,
237252
boolean ignoreClosed,
238253
boolean notifyRpc) {
239254
try {
240-
synchronized (this) {
255+
synchronized (_channelMutex) {
241256
if (!ignoreClosed)
242257
ensureIsOpen(); // invariant: we should never be shut down more than once per instance
243258
if (isOpen())
244259
_shutdownCause = signal;
245260

246-
notifyAll();
261+
_channelMutex.notifyAll();
247262
}
248263
} finally {
249264
if (notifyRpc)
@@ -258,33 +273,41 @@ public void notifyOutstandingRpc(ShutdownSignalException signal) {
258273
}
259274
}
260275

261-
public synchronized void transmit(Method m) throws IOException {
262-
transmit(new AMQCommand(m));
276+
public void transmit(Method m) throws IOException {
277+
synchronized (_channelMutex) {
278+
transmit(new AMQCommand(m));
279+
}
263280
}
264281

265-
public synchronized void transmit(AMQCommand c) throws IOException {
266-
ensureIsOpen();
267-
quiescingTransmit(c);
282+
public void transmit(AMQCommand c) throws IOException {
283+
synchronized (_channelMutex) {
284+
ensureIsOpen();
285+
quiescingTransmit(c);
286+
}
268287
}
269288

270-
public synchronized void quiescingTransmit(Method m) throws IOException {
271-
quiescingTransmit(new AMQCommand(m));
289+
public void quiescingTransmit(Method m) throws IOException {
290+
synchronized (_channelMutex) {
291+
quiescingTransmit(new AMQCommand(m));
292+
}
272293
}
273294

274-
public synchronized void quiescingTransmit(AMQCommand c) throws IOException {
275-
if (c.getMethod().hasContent()) {
276-
while (_blockContent) {
277-
try {
278-
wait();
279-
} catch (InterruptedException e) {}
280-
281-
// This is to catch a situation when the thread wakes up during
282-
// shutdown. Currently, no command that has content is allowed
283-
// to send anything in a closing state.
284-
ensureIsOpen();
295+
public void quiescingTransmit(AMQCommand c) throws IOException {
296+
synchronized (_channelMutex) {
297+
if (c.getMethod().hasContent()) {
298+
while (_blockContent) {
299+
try {
300+
_channelMutex.wait();
301+
} catch (InterruptedException e) {}
302+
303+
// This is to catch a situation when the thread wakes up during
304+
// shutdown. Currently, no command that has content is allowed
305+
// to send anything in a closing state.
306+
ensureIsOpen();
307+
}
285308
}
309+
c.transmit(this);
286310
}
287-
c.transmit(this);
288311
}
289312

290313
public AMQConnection getAMQConnection() {

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ public void releaseChannelNumber() {
192192
false,
193193
command,
194194
this);
195-
synchronized(this) {
195+
synchronized (_channelMutex) {
196196
processShutdownSignal(signal, true, true);
197197
quiescingTransmit(new Channel.CloseOk());
198198
}
@@ -248,10 +248,10 @@ public void releaseChannelNumber() {
248248
return true;
249249
} else if (method instanceof Channel.Flow) {
250250
Channel.Flow channelFlow = (Channel.Flow) method;
251-
synchronized(this) {
251+
synchronized (_channelMutex) {
252252
_blockContent = !channelFlow.active;
253253
transmit(new Channel.FlowOk(channelFlow.active));
254-
notifyAll();
254+
_channelMutex.notifyAll();
255255
}
256256
return true;
257257
} else {
@@ -329,7 +329,7 @@ public void close(int closeCode,
329329
try {
330330
// Synchronize the block below to avoid race conditions in case
331331
// connnection wants to send Connection-CloseOK
332-
synchronized(this) {
332+
synchronized (_channelMutex) {
333333
processShutdownSignal(signal, !initiatedByApplication, true);
334334
quiescingRpc(reason, k);
335335
}

0 commit comments

Comments
 (0)