Skip to content

Commit 36fc6b5

Browse files
author
Rob Harrop
committed
Merge heads
2 parents d543cd7 + 88514f5 commit 36fc6b5

33 files changed

+297
-174
lines changed

codegen.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,6 @@ def printHeader():
161161
print
162162
print "import com.rabbitmq.client.impl.ContentHeaderPropertyWriter;"
163163
print "import com.rabbitmq.client.impl.ContentHeaderPropertyReader;"
164-
print "import com.rabbitmq.client.impl.LongString;"
165164
print "import com.rabbitmq.client.impl.LongStringHelper;"
166165

167166
def printProtocolClass():
@@ -422,6 +421,7 @@ def printHeader():
422421
print "import java.util.Map;"
423422
print
424423
print "import com.rabbitmq.client.AMQP;"
424+
print "import com.rabbitmq.client.LongString;"
425425
print "import com.rabbitmq.client.UnknownClassOrMethodId;"
426426
print "import com.rabbitmq.client.UnexpectedMethodError;"
427427

src/com/rabbitmq/client/Channel.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -697,6 +697,21 @@ void basicNack(long deliveryTag, boolean multiple, boolean requeue)
697697
*/
698698
long getNextPublishSeqNo();
699699

700+
/**
701+
* Wait until all messages published since the last call have been
702+
* either ack'd or nack'd by the broker. Note, when called on a
703+
* non-Confirm channel, waitForConfirms returns true immediately.
704+
* @return whether all the messages were ack'd (and none were nack'd)
705+
*/
706+
boolean waitForConfirms() throws InterruptedException;
707+
708+
/** Wait until all messages published since the last call have
709+
* been either ack'd or nack'd by the broker. If any of the
710+
* messages were nack'd, waitForConfirmsOrDie will throw an
711+
* IOException. When called on a non-Confirm channel, it will
712+
* return immediately. */
713+
void waitForConfirmsOrDie() throws IOException, InterruptedException;
714+
700715
/**
701716
* Asynchronously send a method over this channel.
702717
* @param method method to transmit over this channel.
@@ -707,8 +722,8 @@ void basicNack(long deliveryTag, boolean multiple, boolean requeue)
707722
/**
708723
* Synchronously send a method over this channel.
709724
* @param method method to transmit over this channel.
710-
* @return response to method. Caller should cast as appropriate.
725+
* @return command response to method. Caller should cast as appropriate.
711726
* @throws IOException Problem transmitting method.
712727
*/
713-
Method rpc(Method method) throws IOException;
728+
Command rpc(Method method) throws IOException;
714729
}

src/com/rabbitmq/client/Command.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,6 @@
1717

1818
package com.rabbitmq.client;
1919

20-
import com.rabbitmq.client.impl.Method;
21-
22-
2320
/**
2421
* Interface to a container for an AMQP method-and-arguments, with optional content header and body.
2522
*/

src/com/rabbitmq/client/Connection.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ public interface Connection extends ShutdownNotifier { // rename to AMQPConnecti
116116
* @throws IOException if an I/O problem is encountered
117117
*/
118118
void close() throws IOException;
119-
119+
120120
/**
121121
* Close this connection and all its channels.
122122
*

src/com/rabbitmq/client/JDKSaslConfig.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
package com.rabbitmq.client;
1818

19-
import com.rabbitmq.client.impl.LongString;
2019
import com.rabbitmq.client.impl.LongStringHelper;
2120

2221
import javax.security.auth.callback.Callback;

src/com/rabbitmq/client/impl/LongString.java renamed to src/com/rabbitmq/client/LongString.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
//
1616

1717

18-
package com.rabbitmq.client.impl;
18+
package com.rabbitmq.client;
1919

2020
import java.io.DataInputStream;
2121
import java.io.IOException;
@@ -31,26 +31,26 @@ public interface LongString
3131
public static final long MAX_LENGTH = 0xffffffffL;
3232

3333
/**
34-
* Get the length of the content of the long string in bytes
35-
* @return the length in bytes >= 0 <= MAX_LENGTH
34+
* @return the length of the {@link LongString} in bytes >= 0 <= MAX_LENGTH
3635
*/
3736
public long length();
3837

3938
/**
4039
* Get the content stream.
4140
* Repeated calls to this function return the same stream,
4241
* which may not support rewind.
43-
* @return An input stream the reads the content
42+
* @return An input stream that reads the content of the {@link LongString}
4443
* @throws IOException if an error is encountered
4544
*/
4645
public DataInputStream getStream() throws IOException;
4746

4847
/**
49-
* Get the content as a byte array.
50-
* Repeated calls to this function return the same array.
51-
* This function will fail if getContentLength() > Integer.MAX_VALUE
52-
* throwing an IllegalStateException.
53-
* @return the content as an array
48+
* Get the content as a byte array. This need not be a copy. Updates to the
49+
* returned array may change the value of the {@link LongString}.
50+
* Repeated calls to this function may return the same array.
51+
* This function will fail if <code><b>this</b>.length() > Integer.MAX_VALUE</code>,
52+
* throwing an {@link IllegalStateException}.
53+
* @return the array of bytes containing the content of the {@link LongString}
5454
*/
5555
public byte [] getBytes();
5656
}

src/com/rabbitmq/client/RpcClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ public String stringCall(String message)
234234
* Perform an AMQP wire-protocol-table based RPC roundtrip <br><br>
235235
*
236236
* There are some restrictions on the values appearing in the table: <br>
237-
* they must be of type {@link String}, {@link com.rabbitmq.client.impl.LongString}, {@link Integer}, {@link java.math.BigDecimal}, {@link Date},
237+
* they must be of type {@link String}, {@link LongString}, {@link Integer}, {@link java.math.BigDecimal}, {@link Date},
238238
* or (recursively) a {@link Map} of the enclosing type.
239239
*
240240
* @param message the table to send

src/com/rabbitmq/client/SaslMechanism.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
package com.rabbitmq.client;
1818

19-
import com.rabbitmq.client.impl.LongString;
2019

2120
import java.io.IOException;
2221

src/com/rabbitmq/client/UnexpectedMethodError.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,13 @@
1717

1818
package com.rabbitmq.client;
1919

20-
import com.rabbitmq.client.impl.Method;
21-
2220
/**
2321
* Indicates that a {@link Method} object was supplied that was not
2422
* expected. For instance, {@link Channel#basicGet} throws this if it
2523
* receives anything other than {@link AMQP.Basic.GetOk} or
26-
* {@link AMQP.Basic.GetEmpty}, and {@link com.rabbitmq.client.impl.AMQImpl.DefaultMethodVisitor}
27-
* throws this as the default action within each visitor case.
24+
* {@link AMQP.Basic.GetEmpty}, and the
25+
* {@link com.rabbitmq.client.impl.AMQImpl.DefaultMethodVisitor}
26+
* throws this as the action within each visitor case.
2827
*/
2928
public class UnexpectedMethodError extends Error {
3029
private static final long serialVersionUID = 1L;

src/com/rabbitmq/client/impl/AMQChannel.java

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,17 @@
2222

2323
import com.rabbitmq.client.AlreadyClosedException;
2424
import com.rabbitmq.client.Command;
25+
import com.rabbitmq.client.Method;
2526
import com.rabbitmq.client.Connection;
2627
import com.rabbitmq.client.ShutdownSignalException;
2728
import com.rabbitmq.utility.BlockingValueOrException;
2829

2930
/**
30-
* Base class modelling an AMQ channel. Subclasses implement close()
31-
* and processAsync(), and may choose to override
32-
* processShutdownSignal().
31+
* Base class modelling an AMQ channel. Subclasses implement
32+
* {@link com.rabbitmq.client.Channel#close} and
33+
* {@link #processAsync processAsync()}, and may choose to override
34+
* {@link #processShutdownSignal processShutdownSignal()} and
35+
* {@link #rpc rpc()}.
3336
*
3437
* @see ChannelN
3538
* @see Connection
@@ -118,11 +121,11 @@ public static IOException wrap(ShutdownSignalException ex, String message) {
118121
/**
119122
* Placeholder until we address bug 15786 (implementing a proper exception hierarchy).
120123
*/
121-
public AMQCommand exnWrappingRpc(com.rabbitmq.client.Method m)
124+
public AMQCommand exnWrappingRpc(Method m)
122125
throws IOException
123126
{
124127
try {
125-
return rpc((com.rabbitmq.client.impl.Method)m);
128+
return privateRpc(m);
126129
} catch (AlreadyClosedException ace) {
127130
// Do not wrap it since it means that connection/channel
128131
// was closed in some action in the past
@@ -183,12 +186,18 @@ public void ensureIsOpen()
183186
}
184187

185188
/**
186-
* Protected API - sends a Command to the broker and waits for the
187-
* next inbound Command from the broker: only for use from
189+
* Protected API - sends a {@link Method} to the broker and waits for the
190+
* next in-bound Command from the broker: only for use from
188191
* non-connection-MainLoop threads!
189192
*/
190193
public AMQCommand rpc(Method m)
191194
throws IOException, ShutdownSignalException
195+
{
196+
return privateRpc(m);
197+
}
198+
199+
private AMQCommand privateRpc(Method m)
200+
throws IOException, ShutdownSignalException
192201
{
193202
SimpleBlockingRpcContinuation k = new SimpleBlockingRpcContinuation();
194203
rpc(m, k);
@@ -246,7 +255,7 @@ public void processShutdownSignal(ShutdownSignalException signal,
246255
try {
247256
synchronized (_channelMutex) {
248257
if (!setShutdownCauseIfOpen(signal)) {
249-
if (!ignoreClosed)
258+
if (!ignoreClosed)
250259
throw new AlreadyClosedException("Attempt to use closed channel", this);
251260
}
252261

0 commit comments

Comments
 (0)