Skip to content

Commit 8d55401

Browse files
author
Matthew Sackman
committed
refactor: a) inline call to open, b) rename startConnection to start
1 parent 3ea15b9 commit 8d55401

File tree

4 files changed

+80
-92
lines changed

4 files changed

+80
-92
lines changed

src/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ private Connection newConnection(Address[] addrs,
175175
try {
176176
AMQConnection conn = new AMQConnection(_params,
177177
frameHandler);
178-
conn.startConnection(!allowRedirects);
178+
conn.start(!allowRedirects);
179179
return conn;
180180
} catch (RedirectException e) {
181181
if (!allowRedirects) {

src/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 76 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -197,12 +197,17 @@ public AMQConnection(ConnectionParameters params,
197197
}
198198

199199
/**
200-
* Start up the connection, including the MainLoop thread
200+
* Start up the connection, including the MainLoop thread.
201+
* Sends the protocol
202+
* version negotiation header, and runs through
203+
* Connection.Start/.StartOk, Connection.Tune/.TuneOk, and then
204+
* calls Connection.Open and waits for the OpenOk. Sets heartbeat
205+
* and frame max values after tuning has taken place.
201206
* @param insist true if broker redirects are disallowed
202207
* @throws RedirectException if the server is redirecting us to a different host/port
203208
* @throws java.io.IOException if an error is encountered
204209
*/
205-
public void startConnection(boolean insist)
210+
public void start(boolean insist)
206211
throws IOException, RedirectException
207212
{
208213
// Make sure that the first thing we do is to send the header,
@@ -223,7 +228,75 @@ public void startConnection(boolean insist)
223228

224229
new MainLoop().start(); // start the main loop going
225230

226-
_knownHosts = open(_params, insist, connStartBlocker);
231+
try {
232+
// See bug 17389. The MainLoop could have shut down already in
233+
// which case we don't want to wait forever for a reply.
234+
235+
// There is no race if the MainLoop shuts down after enqueuing
236+
// the RPC because if that happens the channel will correctly
237+
// pass the exception into RPC, waking it up.
238+
ensureIsOpen();
239+
240+
AMQP.Connection.Start connStart =
241+
(AMQP.Connection.Start) connStartBlocker.getReply().getMethod();
242+
243+
Version serverVersion =
244+
new Version(connStart.getVersionMajor(),
245+
connStart.getVersionMinor());
246+
247+
if (!Version.checkVersion(clientVersion, serverVersion)) {
248+
_frameHandler.close(); //this will cause mainLoop to terminate
249+
//TODO: throw a more specific exception
250+
throw new IOException("protocol version mismatch: expected " +
251+
clientVersion + ", got " + serverVersion);
252+
}
253+
} catch (ShutdownSignalException sse) {
254+
throw AMQChannel.wrap(sse);
255+
}
256+
257+
LongString saslResponse = LongStringHelper.asLongString("\0" + _params.getUserName() +
258+
"\0" + _params.getPassword());
259+
AMQImpl.Connection.StartOk startOk =
260+
new AMQImpl.Connection.StartOk(buildClientPropertiesTable(),
261+
"PLAIN",
262+
saslResponse,
263+
"en_US");
264+
265+
AMQP.Connection.Tune connTune =
266+
(AMQP.Connection.Tune) _channel0.exnWrappingRpc(startOk).getMethod();
267+
268+
int channelMax =
269+
negotiatedMaxValue(getParameters().getRequestedChannelMax(),
270+
connTune.getChannelMax());
271+
setChannelMax(channelMax);
272+
273+
int frameMax =
274+
negotiatedMaxValue(getParameters().getRequestedFrameMax(),
275+
connTune.getFrameMax());
276+
setFrameMax(frameMax);
277+
278+
int heartbeat =
279+
negotiatedMaxValue(getParameters().getRequestedHeartbeat(),
280+
connTune.getHeartbeat());
281+
setHeartbeat(heartbeat);
282+
283+
_channel0.transmit(new AMQImpl.Connection.TuneOk(channelMax,
284+
frameMax,
285+
heartbeat));
286+
287+
Method res = _channel0.exnWrappingRpc(new AMQImpl.Connection.Open(_params.getVirtualHost(),
288+
"",
289+
insist)).getMethod();
290+
if (res instanceof AMQP.Connection.Redirect) {
291+
AMQP.Connection.Redirect redirect = (AMQP.Connection.Redirect) res;
292+
throw new RedirectException(Address.parseAddress(redirect.getHost()),
293+
Address.parseAddresses(redirect.getKnownHosts()));
294+
} else {
295+
AMQP.Connection.OpenOk openOk = (AMQP.Connection.OpenOk) res;
296+
_knownHosts = Address.parseAddresses(openOk.getKnownHosts());
297+
}
298+
299+
return;
227300
}
228301

229302
/**
@@ -327,91 +400,6 @@ public Map<String, Object> buildClientPropertiesTable() {
327400
});
328401
}
329402

330-
/**
331-
* Called by the connection's constructor. Sends the protocol
332-
* version negotiation header, and runs through
333-
* Connection.Start/.StartOk, Connection.Tune/.TuneOk, and then
334-
* calls Connection.Open and waits for the OpenOk. Sets heartbeat
335-
* and frame max values after tuning has taken place.
336-
* @param params the construction parameters for a Connection
337-
* @param connStartBlocker the blocker we're waiting on for the start-ok
338-
* @return the known hosts that came back in the connection.open-ok
339-
* @throws RedirectException if the server asks us to redirect to
340-
* a different host/port.
341-
* @throws java.io.IOException if any other I/O error occurs
342-
*/
343-
public Address[] open(final ConnectionParameters params, boolean insist, SimpleBlockingRpcContinuation connStartBlocker)
344-
throws RedirectException, IOException
345-
{
346-
try {
347-
// See bug 17389. The MainLoop could have shut down already in
348-
// which case we don't want to wait forever for a reply.
349-
350-
// There is no race if the MainLoop shuts down after enqueuing
351-
// the RPC because if that happens the channel will correctly
352-
// pass the exception into RPC, waking it up.
353-
ensureIsOpen();
354-
355-
AMQP.Connection.Start connStart =
356-
(AMQP.Connection.Start) connStartBlocker.getReply().getMethod();
357-
358-
Version serverVersion =
359-
new Version(connStart.getVersionMajor(),
360-
connStart.getVersionMinor());
361-
362-
if (!Version.checkVersion(clientVersion, serverVersion)) {
363-
_frameHandler.close(); //this will cause mainLoop to terminate
364-
//TODO: throw a more specific exception
365-
throw new IOException("protocol version mismatch: expected " +
366-
clientVersion + ", got " + serverVersion);
367-
}
368-
} catch (ShutdownSignalException sse) {
369-
throw AMQChannel.wrap(sse);
370-
}
371-
372-
LongString saslResponse = LongStringHelper.asLongString("\0" + params.getUserName() +
373-
"\0" + params.getPassword());
374-
AMQImpl.Connection.StartOk startOk =
375-
new AMQImpl.Connection.StartOk(buildClientPropertiesTable(),
376-
"PLAIN",
377-
saslResponse,
378-
"en_US");
379-
380-
AMQP.Connection.Tune connTune =
381-
(AMQP.Connection.Tune) _channel0.exnWrappingRpc(startOk).getMethod();
382-
383-
int channelMax =
384-
negotiatedMaxValue(getParameters().getRequestedChannelMax(),
385-
connTune.getChannelMax());
386-
setChannelMax(channelMax);
387-
388-
int frameMax =
389-
negotiatedMaxValue(getParameters().getRequestedFrameMax(),
390-
connTune.getFrameMax());
391-
setFrameMax(frameMax);
392-
393-
int heartbeat =
394-
negotiatedMaxValue(getParameters().getRequestedHeartbeat(),
395-
connTune.getHeartbeat());
396-
setHeartbeat(heartbeat);
397-
398-
_channel0.transmit(new AMQImpl.Connection.TuneOk(channelMax,
399-
frameMax,
400-
heartbeat));
401-
402-
Method res = _channel0.exnWrappingRpc(new AMQImpl.Connection.Open(params.getVirtualHost(),
403-
"",
404-
insist)).getMethod();
405-
if (res instanceof AMQP.Connection.Redirect) {
406-
AMQP.Connection.Redirect redirect = (AMQP.Connection.Redirect) res;
407-
throw new RedirectException(Address.parseAddress(redirect.getHost()),
408-
Address.parseAddresses(redirect.getKnownHosts()));
409-
} else {
410-
AMQP.Connection.OpenOk openOk = (AMQP.Connection.OpenOk) res;
411-
return Address.parseAddresses(openOk.getKnownHosts());
412-
}
413-
}
414-
415403
private static int negotiatedMaxValue(int clientValue, int serverValue) {
416404
return (clientValue == 0 || serverValue == 0) ?
417405
Math.max(clientValue, serverValue) :

test/src/com/rabbitmq/client/test/AMQConnectionTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ public void testConnectionSendsSingleHeaderAndTimesOut() {
103103
MyExceptionHandler handler = new MyExceptionHandler();
104104
assertEquals(0, _mockFrameHandler.countHeadersSent());
105105
try {
106-
new AMQConnection(_params, _mockFrameHandler, handler).startConnection(false);
106+
new AMQConnection(_params, _mockFrameHandler, handler).start(false);
107107
fail("Connection should have thrown exception");
108108
} catch(IOException signal) {
109109
// As expected

test/src/com/rabbitmq/client/test/BrokenFramesTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ public void testNoMethod() throws Exception {
7878
myFrameHandler.setFrames(frames.iterator());
7979

8080
try {
81-
new AMQConnection(params, myFrameHandler).startConnection(false);
81+
new AMQConnection(params, myFrameHandler).start(false);
8282
} catch (IOException e) {
8383
UnexpectedFrameError unexpectedFrameError = findUnexpectedFrameError(e);
8484
assertNotNull(unexpectedFrameError);
@@ -104,7 +104,7 @@ public void testMethodThenBody() throws Exception {
104104
myFrameHandler.setFrames(frames.iterator());
105105

106106
try {
107-
new AMQConnection(params, myFrameHandler).startConnection(false);
107+
new AMQConnection(params, myFrameHandler).start(false);
108108
} catch (IOException e) {
109109
UnexpectedFrameError unexpectedFrameError = findUnexpectedFrameError(e);
110110
assertNotNull(unexpectedFrameError);

0 commit comments

Comments
 (0)