Skip to content

Commit 956ff5c

Browse files
author
Alexandru Scvortov
committed
merge default into bug20337
2 parents e9dd0ba + c746853 commit 956ff5c

File tree

8 files changed

+106
-35
lines changed

8 files changed

+106
-35
lines changed

README-EXAMPLES

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ HelloServer - Acts as an RPC server over AMQP.
6363
HelloJsonClient - Performs a simple JSON-RPC call over AMQP.
6464
HelloJsonServer - Acts as a JSON-RPC server over AMQP.
6565
LogTail - Tails the server logs.
66-
SendString - Sends a user supplied message over AMQP.
66+
SendString - Sends a user-supplied message over AMQP.
6767

6868

6969
More Complex Examples

src/com/rabbitmq/client/ShutdownListener.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,14 @@
1919

2020
import java.util.EventListener;
2121

22+
/**
23+
* A ShutdownListener receives information about the shutdown of connections and
24+
* channels. Note that when a connection is shut down, its associated channels are also
25+
* considered shut down and their ShutdownListeners will be notified (with the same cause).
26+
*
27+
* @see ShutdownNotifier
28+
* @see ShutdownSignalException
29+
*/
2230
public interface ShutdownListener extends EventListener {
23-
2431
public void shutdownCompleted(ShutdownSignalException cause);
25-
2632
}

src/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.io.IOException;
2121
import java.net.InetAddress;
2222
import java.net.SocketException;
23+
import java.net.SocketTimeoutException;
2324
import java.util.Collections;
2425
import java.util.HashMap;
2526
import java.util.Map;
@@ -108,6 +109,9 @@ public static final Map<String, Object> defaultClientProperties() {
108109
/** Flag indicating whether the client received Connection.Close message from the broker */
109110
private volatile boolean _brokerInitiatedShutdown;
110111

112+
/** Flag indicating we are still negotiating the connection in start */
113+
private volatile boolean _inConnectionNegotiation;
114+
111115
/** Manages heart-beat sending for this connection */
112116
private final HeartbeatSender _heartbeatSender;
113117

@@ -175,7 +179,7 @@ public Map<String, Object> getServerProperties() {
175179
* @param password for <code><b>username</b></code>
176180
* @param frameHandler for sending and receiving frames on this connection
177181
* @param executor thread pool service for consumer threads for channels on this connection
178-
* @param virtualHost
182+
* @param virtualHost virtual host of this connection
179183
* @param clientProperties client info used in negotiating with the server
180184
* @param requestedFrameMax max size of frame offered
181185
* @param requestedChannelMax max number of channels offered
@@ -211,7 +215,7 @@ public AMQConnection(String username,
211215
* @param password for <code><b>username</b></code>
212216
* @param frameHandler for sending and receiving frames on this connection
213217
* @param executor thread pool service for consumer threads for channels on this connection
214-
* @param virtualHost
218+
* @param virtualHost virtual host of this connection
215219
* @param clientProperties client info used in negotiating with the server
216220
* @param requestedFrameMax max size of frame offered
217221
* @param requestedChannelMax max number of channels offered
@@ -248,6 +252,8 @@ public AMQConnection(String username,
248252

249253
this._heartbeatSender = new HeartbeatSender(frameHandler);
250254
this._brokerInitiatedShutdown = false;
255+
256+
this._inConnectionNegotiation = true; // we start out waiting for the first protocol response
251257
}
252258

253259
/**
@@ -385,6 +391,9 @@ public void start()
385391
throw AMQChannel.wrap(sse);
386392
}
387393

394+
// We can now respond to errors having finished tailoring the connection
395+
this._inConnectionNegotiation = false;
396+
388397
return;
389398
}
390399

@@ -538,7 +547,11 @@ private class MainLoop extends Thread {
538547
* Called when a frame-read operation times out
539548
* @throws MissedHeartbeatException if heart-beats have been missed
540549
*/
541-
private void handleSocketTimeout() throws MissedHeartbeatException {
550+
private void handleSocketTimeout() throws SocketTimeoutException {
551+
if (_inConnectionNegotiation) {
552+
throw new SocketTimeoutException("Timeout during Connection negotiation");
553+
}
554+
542555
if (_heartbeat == 0) { // No heart-beating
543556
return;
544557
}
@@ -623,10 +636,14 @@ public SocketCloseWait(ShutdownSignalException sse) {
623636
}
624637

625638
/**
626-
* Protected API - causes all attached channels to terminate with
627-
* a ShutdownSignal built from the argument, and stops this
628-
* connection from accepting further work from the application.
629-
*
639+
* Protected API - causes all attached channels to terminate (shutdown) with a ShutdownSignal
640+
* built from the argument, and stops this connection from accepting further work from the
641+
* application. {@link com.rabbitmq.client.ShutdownListener ShutdownListener}s for the
642+
* connection are notified when the main loop terminates.
643+
* @param reason object being shutdown
644+
* @param initiatedByApplication true if caused by a client command
645+
* @param cause trigger exception which caused shutdown
646+
* @param notifyRpc true if outstanding rpc should be informed of shutdown
630647
* @return a shutdown signal built using the given arguments
631648
*/
632649
public ShutdownSignalException shutdown(Object reason,
@@ -722,6 +739,7 @@ public void close(int closeCode,
722739
close(closeCode, closeMessage, initiatedByApplication, cause, -1, false);
723740
}
724741

742+
// TODO: Make this private
725743
/**
726744
* Protected API - Close this connection with the given code, message, source
727745
* and timeout value for all the close operations to complete.

src/com/rabbitmq/client/impl/ChannelManager.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
/**
3131
* Manages a set of channels, indexed by channel number (<code><b>1.._channelMax</b></code>).
3232
*/
33-
3433
public final class ChannelManager {
3534
private static final int SHUTDOWN_TIMEOUT_SECONDS = 10;
3635

@@ -77,6 +76,10 @@ public ChannelN getChannel(int channelNumber) {
7776
}
7877
}
7978

79+
/**
80+
* Handle shutdown. All the managed {@link com.rabbitmq.client.Channel Channel}s are shutdown.
81+
* @param signal reason for shutdown
82+
*/
8083
public void handleSignal(ShutdownSignalException signal) {
8184
Set<ChannelN> channels;
8285
synchronized(this.monitor) {
@@ -86,6 +89,7 @@ public void handleSignal(ShutdownSignalException signal) {
8689
releaseChannelNumber(channel);
8790
channel.processShutdownSignal(signal, true, true);
8891
shutdownSet.add(channel.getShutdownLatch());
92+
channel.notifyListeners();
8993
}
9094
scheduleShutdownProcessing();
9195
}

src/com/rabbitmq/client/impl/ShutdownNotifierComponent.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,11 @@
2424
import com.rabbitmq.client.ShutdownNotifier;
2525
import com.rabbitmq.client.ShutdownSignalException;
2626

27+
/**
28+
* A class that manages {@link ShutdownListener}s and remembers the reason for a shutdown. Both
29+
* {@link com.rabbitmq.client.Channel Channel}s and {@link com.rabbitmq.client.Connection
30+
* Connection}s have shutdown listeners.
31+
*/
2732
public class ShutdownNotifierComponent implements ShutdownNotifier {
2833

2934
/** Monitor for shutdown listeners and shutdownCause */
@@ -87,6 +92,11 @@ public boolean isOpen() {
8792
}
8893
}
8994

95+
/**
96+
* Internal: this is the means of registering shutdown.
97+
* @param sse the reason for the shutdown
98+
* @return <code>true</code> if the component is open; <code>false</code> otherwise.
99+
*/
90100
public boolean setShutdownCauseIfOpen(ShutdownSignalException sse) {
91101
synchronized (this.monitor) {
92102
if (isOpen()) {

test/src/com/rabbitmq/client/test/AMQConnectionTest.java

Lines changed: 55 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,8 @@
4444
public class AMQConnectionTest extends TestCase {
4545
// private static final String CLOSE_MESSAGE = "terminated by test";
4646

47-
/**
48-
* Build a suite of tests
47+
/**
48+
* Build a suite of tests
4949
* @return the test suite for this class
5050
*/
5151
public static TestSuite suite() {
@@ -79,7 +79,7 @@ public static TestSuite suite() {
7979
}
8080

8181
/** Check the AMQConnection does send exactly 1 initial header, and deal correctly with
82-
* the frame handler throwing an exception when we try to read data
82+
* the frame handler throwing an exception when we try to read data
8383
*/
8484
public void testConnectionSendsSingleHeaderAndTimesOut() {
8585
IOException exception = new SocketTimeoutException();
@@ -100,14 +100,14 @@ public void testConnectionSendsSingleHeaderAndTimesOut() {
100100
handler).start();
101101
fail("Connection should have thrown exception");
102102
} catch(IOException signal) {
103-
// As expected
103+
// As expected
104104
}
105105
assertEquals(1, _mockFrameHandler.countHeadersSent());
106106
// _connection.close(0, CLOSE_MESSAGE);
107107
List<Throwable> exceptionList = handler.getHandledExceptions();
108108
assertEquals(Collections.<Throwable>singletonList(exception), exceptionList);
109109
}
110-
110+
111111
/** Check we can open a connection once, but not twice.
112112
* @throws IOException */
113113
// public void testCanOpenConnectionOnceOnly() throws IOException {
@@ -121,18 +121,49 @@ public void testConnectionSendsSingleHeaderAndTimesOut() {
121121
// }
122122
// }
123123

124-
// add test that we time out if no initial Start command is received,
125-
// setting a timeout and having the FrameHandler return null
126-
124+
/**
125+
* Test that we catch timeout between connect and negotiation of the connection being finished.
126+
*/
127+
public void testConnectionHangInNegotiation() {
128+
this._mockFrameHandler.setTimeoutCount(10); // to limit hang
129+
MyExceptionHandler handler = new MyExceptionHandler();
130+
assertEquals(0, this._mockFrameHandler.countHeadersSent());
131+
try {
132+
new AMQConnection(factory.getUsername(),
133+
factory.getPassword(),
134+
this._mockFrameHandler,
135+
Executors.newFixedThreadPool(1),
136+
factory.getVirtualHost(),
137+
factory.getClientProperties(),
138+
factory.getRequestedFrameMax(),
139+
factory.getRequestedChannelMax(),
140+
factory.getRequestedHeartbeat(),
141+
factory.getSaslConfig(),
142+
handler).start();
143+
fail("Connection should have thrown exception");
144+
} catch(IOException signal) {
145+
// As expected
146+
}
147+
assertEquals(1, this._mockFrameHandler.countHeadersSent());
148+
// _connection.close(0, CLOSE_MESSAGE);
149+
List<Throwable> exceptionList = handler.getHandledExceptions();
150+
assertEquals("Only one exception expected", 1, exceptionList.size());
151+
assertEquals("Wrong type of exception returned.", SocketTimeoutException.class, exceptionList.get(0).getClass());
152+
}
153+
127154
/** Mock frame handler to facilitate testing. */
128155
private static class MockFrameHandler implements FrameHandler {
129156
/** How many times has sendHeader() been called? */
130157
private int _numHeadersSent;
131-
158+
159+
private int timeout;
160+
132161
/** An optional exception for us to throw on reading frames */
133162
private IOException _exceptionOnReadingFrames;
134163

135-
/** count how many headers we've sent
164+
private int timeoutCount = 0;
165+
166+
/** count how many headers we've sent
136167
* @return the number of sent headers
137168
*/
138169
public int countHeadersSent() {
@@ -143,20 +174,27 @@ public void setExceptionOnReadingFrames(IOException exception) {
143174
_exceptionOnReadingFrames = exception;
144175
}
145176

177+
public void setTimeoutCount(int timeoutCount) {
178+
this.timeoutCount = timeoutCount;
179+
}
180+
146181
public Frame readFrame() throws IOException {
147182
if (_exceptionOnReadingFrames != null) {
148183
throw _exceptionOnReadingFrames;
149184
}
150-
return null;
151-
// throw new SocketTimeoutException(); // simulate a socket timeout
185+
if (this.timeoutCount > 0) {
186+
if (--this.timeoutCount == 0)
187+
throw new IOException("Mock Framehandler: too many timeouts.");
188+
}
189+
return null; // simulate a socket timeout
152190
}
153191

154192
public void sendHeader() throws IOException {
155-
_numHeadersSent++;
193+
_numHeadersSent++;
156194
}
157195

158196
public void setTimeout(int timeoutMs) throws SocketException {
159-
// no need to implement this: don't bother changing the timeout
197+
this.timeout = timeoutMs;
160198
}
161199

162200
public void writeFrame(Frame frame) throws IOException {
@@ -168,7 +206,7 @@ public void close() {
168206
}
169207

170208
public int getTimeout() throws SocketException {
171-
return 0;
209+
return this.timeout;
172210
}
173211

174212
public InetAddress getAddress() {
@@ -180,7 +218,7 @@ public int getPort() {
180218
}
181219
}
182220

183-
/** Mock frame handler to facilitate testing. */
221+
/** Exception handler to facilitate testing. */
184222
private class MyExceptionHandler implements ExceptionHandler {
185223
private List<Throwable> _handledExceptions = new ArrayList<Throwable>();
186224

@@ -208,7 +246,7 @@ public void handleConsumerException(Channel ch,
208246
{
209247
fail("handleConsumerException " + consumerTag + " " + methodName + ": " + ex);
210248
}
211-
249+
212250
public List<Throwable> getHandledExceptions() {
213251
return _handledExceptions;
214252
}

test/src/com/rabbitmq/client/test/CloseInMainLoop.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public class CloseInMainLoop extends BrokerTestCase{
3232

3333
class SpecialConnection extends AMQConnection{
3434
private AtomicBoolean validShutdown = new AtomicBoolean(false);
35-
35+
3636
public boolean hadValidShutdown(){
3737
if(isOpen()) throw new IllegalStateException("hadValidShutdown called while connection is still open");
3838
return validShutdown.get();
@@ -61,6 +61,7 @@ public void handleConsumerException(Channel channel,
6161
String consumerTag,
6262
String methodName) {
6363
try {
64+
// TODO: change this to call 4-parameter close and make 6-parm one private
6465
((AMQConnection) channel.getConnection())
6566
.close(AMQP.INTERNAL_ERROR,
6667
"Internal error in Consumer " + consumerTag,

test/src/com/rabbitmq/client/test/functional/UnexpectedFrames.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -145,12 +145,6 @@ private void expectUnexpectedFrameError(Confuser confuser)
145145
//NB: the frame confuser relies on the encoding of the
146146
//method field to be at least 8 bytes long
147147
channel.basicPublish("", "routing key", null, "Hello".getBytes());
148-
// TODO remove when bug 24086 is fixed.
149-
try {
150-
Thread.sleep(100);
151-
} catch (InterruptedException e) {
152-
e.printStackTrace();
153-
}
154148
expectError(AMQP.UNEXPECTED_FRAME);
155149
}
156150

0 commit comments

Comments
 (0)