Skip to content

Commit 3ea15b9

Browse files
author
Matthew Sackman
committed
After considerable investigation with Matthias, worked out that the ExceptionHandler probably shouldn't be being used, because the DefaultEH does stuff which is probably useful, or at least we don't understand the implications of not using it. SSL Handshake seems to be done lazily - at the point at which data is first exchanged on the socket. Thus the socket opening is not enough to generate the SSL Handshake Exceptions. Now because of the race between the user thread and the MainLoop as to who first really accesses the socket (could be the user thread sending the header, or the main loop blocking on read), it was not deterministic which thread would get he ssl handshake exception.
So slightly tidied the AMQConnection class so that it doesn't do so many bad things in its constructor, and that we make sure we send the header first, before starting up the MainLoop thread. Thus if the MainLoop thread gets started then we know the SSL Handshake has been done. This means that the exception comes out in the right thread. Thus also reverted the factory so that you can't set the ExceptionHandler. All tests pass. There were some associated changes elsewhere, which eclipse found.
1 parent d20f72c commit 3ea15b9

File tree

5 files changed

+45
-100
lines changed

5 files changed

+45
-100
lines changed

src/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 4 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,6 @@
4141
import javax.net.ssl.TrustManager;
4242

4343
import com.rabbitmq.client.impl.AMQConnection;
44-
import com.rabbitmq.client.impl.DefaultExceptionHandler;
45-
import com.rabbitmq.client.impl.ExceptionHandler;
4644
import com.rabbitmq.client.impl.FrameHandler;
4745
import com.rabbitmq.client.impl.SocketFrameHandler;
4846

@@ -57,11 +55,6 @@ public class ConnectionFactory {
5755
* Holds the SocketFactory used to manufacture outbound sockets.
5856
*/
5957
private SocketFactory _factory = SocketFactory.getDefault();
60-
61-
/**
62-
* Holds the ExceptionHandler used in new AMQConnections
63-
*/
64-
private ExceptionHandler _exceptionHandler = new DefaultExceptionHandler();
6558

6659
/**
6760
* Instantiate a ConnectionFactory with a default set of parameters.
@@ -104,23 +97,6 @@ public void setSocketFactory(SocketFactory factory) {
10497
_factory = factory;
10598
}
10699

107-
/**
108-
* Retrieve the ExceptionHandler used in new AMQConnections
109-
* @return
110-
*/
111-
public ExceptionHandler getExceptionHandler() {
112-
return _exceptionHandler;
113-
}
114-
115-
/**
116-
* Set the ExceptionHandler used in new AMQConnections
117-
*
118-
* @param handler The ExceptionHandler to use
119-
*/
120-
public void setExceptionHandler(ExceptionHandler handler) {
121-
_exceptionHandler = handler;
122-
}
123-
124100
/**
125101
* Convenience method for setting up a SSL socket factory, using
126102
* the DEFAULT_SSL_PROTOCOL and a trusting TrustManager.
@@ -197,7 +173,10 @@ private Connection newConnection(Address[] addrs,
197173
redirectCount = 0;
198174
boolean allowRedirects = redirectCount < maxRedirects;
199175
try {
200-
return new AMQConnection(_params, !allowRedirects, frameHandler, _exceptionHandler);
176+
AMQConnection conn = new AMQConnection(_params,
177+
frameHandler);
178+
conn.startConnection(!allowRedirects);
179+
return conn;
201180
} catch (RedirectException e) {
202181
if (!allowRedirects) {
203182
//this should never happen with a well-behaved server

src/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 34 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import com.rabbitmq.client.MissedHeartbeatException;
4848
import com.rabbitmq.client.RedirectException;
4949
import com.rabbitmq.client.ShutdownSignalException;
50+
import com.rabbitmq.client.impl.AMQChannel.SimpleBlockingRpcContinuation;
5051
import com.rabbitmq.utility.BlockingCell;
5152
import com.rabbitmq.utility.Utility;
5253

@@ -167,31 +168,22 @@ public Address[] getKnownHosts() {
167168
/**
168169
* Construct a new connection to a broker.
169170
* @param params the initialization parameters for a connection
170-
* @param insist true if broker redirects are disallowed
171171
* @param frameHandler interface to an object that will handle the frame I/O for this connection
172-
* @throws RedirectException if the server is redirecting us to a different host/port
173-
* @throws java.io.IOException if an error is encountered
174172
*/
175173
public AMQConnection(ConnectionParameters params,
176-
boolean insist,
177-
FrameHandler frameHandler) throws RedirectException, IOException {
178-
this(params, insist, frameHandler, new DefaultExceptionHandler());
174+
FrameHandler frameHandler) {
175+
this(params, frameHandler, new DefaultExceptionHandler());
179176
}
180177

181178
/**
182179
* Construct a new connection to a broker.
183180
* @param params the initialization parameters for a connection
184-
* @param insist true if broker redirects are disallowed
185181
* @param frameHandler interface to an object that will handle the frame I/O for this connection
186182
* @param exceptionHandler interface to an object that will handle any special exceptions encountered while using this connection
187-
* @throws RedirectException if the server is redirecting us to a different host/port
188-
* @throws java.io.IOException if an error is encountered
189183
*/
190184
public AMQConnection(ConnectionParameters params,
191-
boolean insist,
192185
FrameHandler frameHandler,
193186
ExceptionHandler exceptionHandler)
194-
throws RedirectException, IOException
195187
{
196188
checkPreconditions();
197189
_params = params;
@@ -202,10 +194,36 @@ public AMQConnection(ConnectionParameters params,
202194
_heartbeat = 0;
203195
_exceptionHandler = exceptionHandler;
204196
_brokerInitiatedShutdown = false;
197+
}
205198

206-
new MainLoop(); // start the main loop going
207-
208-
_knownHosts = open(_params, insist);
199+
/**
200+
* Start up the connection, including the MainLoop thread
201+
* @param insist true if broker redirects are disallowed
202+
* @throws RedirectException if the server is redirecting us to a different host/port
203+
* @throws java.io.IOException if an error is encountered
204+
*/
205+
public void startConnection(boolean insist)
206+
throws IOException, RedirectException
207+
{
208+
// Make sure that the first thing we do is to send the header,
209+
// which should cause any socket errors to show up for us, rather
210+
// than risking them pop out in the MainLoop
211+
AMQChannel.SimpleBlockingRpcContinuation connStartBlocker =
212+
new AMQChannel.SimpleBlockingRpcContinuation();
213+
// We enqueue an RPC continuation here without sending an RPC
214+
// request, since the protocol specifies that after sending
215+
// the version negotiation header, the client (connection
216+
// initiator) is to wait for a connection.start method to
217+
// arrive.
218+
_channel0.enqueueRpc(connStartBlocker);
219+
// The following two lines are akin to AMQChannel's
220+
// transmit() method for this pseudo-RPC.
221+
_frameHandler.setTimeout(HANDSHAKE_TIMEOUT);
222+
_frameHandler.sendHeader();
223+
224+
new MainLoop().start(); // start the main loop going
225+
226+
_knownHosts = open(_params, insist, connStartBlocker);
209227
}
210228

211229
/**
@@ -316,28 +334,16 @@ public Map<String, Object> buildClientPropertiesTable() {
316334
* calls Connection.Open and waits for the OpenOk. Sets heartbeat
317335
* and frame max values after tuning has taken place.
318336
* @param params the construction parameters for a Connection
337+
* @param connStartBlocker the blocker we're waiting on for the start-ok
319338
* @return the known hosts that came back in the connection.open-ok
320339
* @throws RedirectException if the server asks us to redirect to
321340
* a different host/port.
322341
* @throws java.io.IOException if any other I/O error occurs
323342
*/
324-
public Address[] open(final ConnectionParameters params, boolean insist)
343+
public Address[] open(final ConnectionParameters params, boolean insist, SimpleBlockingRpcContinuation connStartBlocker)
325344
throws RedirectException, IOException
326345
{
327346
try {
328-
AMQChannel.SimpleBlockingRpcContinuation connStartBlocker =
329-
new AMQChannel.SimpleBlockingRpcContinuation();
330-
// We enqueue an RPC continuation here without sending an RPC
331-
// request, since the protocol specifies that after sending
332-
// the version negotiation header, the client (connection
333-
// initiator) is to wait for a connection.start method to
334-
// arrive.
335-
_channel0.enqueueRpc(connStartBlocker);
336-
// The following two lines are akin to AMQChannel's
337-
// transmit() method for this pseudo-RPC.
338-
_frameHandler.setTimeout(HANDSHAKE_TIMEOUT);
339-
_frameHandler.sendHeader();
340-
341347
// See bug 17389. The MainLoop could have shut down already in
342348
// which case we don't want to wait forever for a reply.
343349

@@ -414,11 +420,6 @@ private static int negotiatedMaxValue(int clientValue, int serverValue) {
414420

415421
private class MainLoop extends Thread {
416422

417-
/** Start the main loop going. */
418-
public MainLoop() {
419-
start();
420-
}
421-
422423
/**
423424
* Channel reader thread main loop. Reads a frame, and if it is
424425
* not a heartbeat frame, dispatches it to the channel it refers to.

test/src/com/rabbitmq/client/test/AMQConnectionTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ public void testConnectionSendsSingleHeaderAndTimesOut() {
103103
MyExceptionHandler handler = new MyExceptionHandler();
104104
assertEquals(0, _mockFrameHandler.countHeadersSent());
105105
try {
106-
new AMQConnection(_params, false, _mockFrameHandler, handler);
106+
new AMQConnection(_params, _mockFrameHandler, handler).startConnection(false);
107107
fail("Connection should have thrown exception");
108108
} catch(IOException signal) {
109109
// As expected

test/src/com/rabbitmq/client/test/BrokenFramesTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ public void testNoMethod() throws Exception {
7878
myFrameHandler.setFrames(frames.iterator());
7979

8080
try {
81-
new AMQConnection(params, false, myFrameHandler);
81+
new AMQConnection(params, myFrameHandler).startConnection(false);
8282
} catch (IOException e) {
8383
UnexpectedFrameError unexpectedFrameError = findUnexpectedFrameError(e);
8484
assertNotNull(unexpectedFrameError);
@@ -104,7 +104,7 @@ public void testMethodThenBody() throws Exception {
104104
myFrameHandler.setFrames(frames.iterator());
105105

106106
try {
107-
new AMQConnection(params, false, myFrameHandler);
107+
new AMQConnection(params, myFrameHandler).startConnection(false);
108108
} catch (IOException e) {
109109
UnexpectedFrameError unexpectedFrameError = findUnexpectedFrameError(e);
110110
assertNotNull(unexpectedFrameError);

test/src/com/rabbitmq/client/test/ssl/BadVerifiedConnection.java

Lines changed: 4 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -41,46 +41,20 @@
4141

4242
import javax.net.ssl.KeyManagerFactory;
4343
import javax.net.ssl.SSLContext;
44+
import javax.net.ssl.SSLHandshakeException;
4445
import javax.net.ssl.TrustManagerFactory;
4546

46-
import com.rabbitmq.client.Channel;
47-
import com.rabbitmq.client.Connection;
4847
import com.rabbitmq.client.ConnectionFactory;
49-
import com.rabbitmq.client.Consumer;
50-
import com.rabbitmq.client.impl.ExceptionHandler;
5148

5249
/**
5350
* Test for bug 19356 - SSL Support in rabbitmq
5451
*
5552
*/
5653
public class BadVerifiedConnection extends UnverifiedConnection {
57-
private boolean exceptionCaught = false;
58-
5954
public void openConnection()
6055
throws IOException
6156
{
62-
final Object lock = new Object();
63-
6457
connectionFactory = new ConnectionFactory();
65-
connectionFactory.setExceptionHandler(new ExceptionHandler() {
66-
67-
public void handleConsumerException(Channel channel,
68-
Throwable exception, Consumer consumer, String consumerTag,
69-
String methodName) {
70-
}
71-
72-
public void handleReturnListenerException(Channel channel,
73-
Throwable exception) {
74-
}
75-
76-
public void handleUnexpectedConnectionDriverException(
77-
Connection conn, Throwable exception) {
78-
synchronized (lock) {
79-
exceptionCaught = true;
80-
lock.notifyAll();
81-
}
82-
}});
83-
8458
try {
8559
String keystorePath = System.getProperty("keystore.empty.path");
8660
assertNotNull(keystorePath);
@@ -117,18 +91,9 @@ public void handleUnexpectedConnectionDriverException(
11791
try {
11892
connection = connectionFactory.newConnection("localhost", 5671);
11993
fail();
120-
} catch (Throwable e) {
121-
synchronized (lock) {
122-
while (! exceptionCaught) {
123-
try {
124-
lock.wait(10000);
125-
if (! exceptionCaught) {
126-
fail();
127-
}
128-
} catch (InterruptedException e1) {
129-
}
130-
}
131-
}
94+
} catch (SSLHandshakeException e) {
95+
} catch (IOException e) {
96+
fail();
13297
}
13398
}
13499
}

0 commit comments

Comments
 (0)