Skip to content

Commit 16376d4

Browse files
committed
merge bug24592
2 parents 5e0914e + c33eb4f commit 16376d4

16 files changed

+226
-216
lines changed

src/com/rabbitmq/client/impl/AMQCommand.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,8 @@ public void transmit(AMQChannel channel) throws IOException {
121121
}
122122
}
123123
}
124+
125+
connection.flush();
124126
}
125127

126128
@Override public String toString() {

src/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 23 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,11 @@
4545
import com.rabbitmq.utility.BlockingCell;
4646
import com.rabbitmq.utility.Utility;
4747

48+
final class Copyright {
49+
final static String COPYRIGHT="Copyright (C) 2007-2012 VMware, Inc.";
50+
final static String LICENSE="Licensed under the MPL. See http://www.rabbitmq.com/";
51+
}
52+
4853
/**
4954
* Concrete class representing and managing an AMQP connection to a broker.
5055
* <p>
@@ -64,21 +69,22 @@ public class AMQConnection extends ShutdownNotifierComponent implements Connecti
6469
* @see Connection#getClientProperties
6570
*/
6671
public static final Map<String, Object> defaultClientProperties() {
72+
Map<String,Object> props = new HashMap<String, Object>();
73+
props.put("product", LongStringHelper.asLongString("RabbitMQ"));
74+
props.put("version", LongStringHelper.asLongString(ClientVersion.VERSION));
75+
props.put("platform", LongStringHelper.asLongString("Java"));
76+
props.put("copyright", LongStringHelper.asLongString(Copyright.COPYRIGHT));
77+
props.put("information", LongStringHelper.asLongString(Copyright.LICENSE));
78+
6779
Map<String, Object> capabilities = new HashMap<String, Object>();
6880
capabilities.put("publisher_confirms", true);
6981
capabilities.put("exchange_exchange_bindings", true);
7082
capabilities.put("basic.nack", true);
7183
capabilities.put("consumer_cancel_notify", true);
72-
return buildTable(new Object[] {
73-
"product", LongStringHelper.asLongString("RabbitMQ"),
74-
"version", LongStringHelper.asLongString(ClientVersion.VERSION),
75-
"platform", LongStringHelper.asLongString("Java"),
76-
"copyright", LongStringHelper.asLongString(
77-
"Copyright (C) 2007-2012 VMware, Inc."),
78-
"information", LongStringHelper.asLongString(
79-
"Licensed under the MPL. See http://www.rabbitmq.com/"),
80-
"capabilities", capabilities
81-
});
84+
85+
props.put("capabilities", capabilities);
86+
87+
return props;
8288
}
8389

8490
private static final Version clientVersion =
@@ -475,6 +481,13 @@ public void writeFrame(Frame f) throws IOException {
475481
_heartbeatSender.signalActivity();
476482
}
477483

484+
/**
485+
* Public API - flush the output buffers
486+
*/
487+
public void flush() throws IOException {
488+
_frameHandler.flush();
489+
}
490+
478491
private static final int negotiatedMaxValue(int clientValue, int serverValue) {
479492
return (clientValue == 0 || serverValue == 0) ?
480493
Math.max(clientValue, serverValue) :
@@ -810,20 +823,4 @@ public AMQCommand transformReply(AMQCommand command) {
810823
private String getHostAddress() {
811824
return getAddress() == null ? null : getAddress().getHostAddress();
812825
}
813-
814-
/**
815-
* Utility for constructing a java.util.Map instance from an
816-
* even-length array containing alternating String keys (on the
817-
* even elements, starting at zero) and values (on the odd
818-
* elements, starting at one).
819-
*/
820-
private static final Map<String, Object> buildTable(Object[] keysValues) {
821-
Map<String, Object> result = new HashMap<String, Object>();
822-
for (int index = 0; index < keysValues.length; index += 2) {
823-
String key = (String) keysValues[index];
824-
Object value = keysValues[index + 1];
825-
result.put(key, value);
826-
}
827-
return result;
828-
}
829826
}

src/com/rabbitmq/client/impl/FrameHandler.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,12 @@ public interface FrameHandler {
7171
*/
7272
void writeFrame(Frame frame) throws IOException;
7373

74+
/**
75+
* Flush the underlying data connection.
76+
* @throws IOException if there is a problem accessing the connection
77+
*/
78+
void flush() throws IOException;
79+
7480
/** Close the underlying data connection (complaint not permitted). */
7581
void close();
7682
}

src/com/rabbitmq/client/impl/HeartbeatSender.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ public void run() {
131131

132132
if (now > (lastActivityTime + this.heartbeatNanos)) {
133133
frameHandler.writeFrame(new Frame(AMQP.FRAME_HEARTBEAT, 0));
134+
frameHandler.flush();
134135
}
135136
} catch (IOException e) {
136137
// ignore

src/com/rabbitmq/client/impl/SocketFrameHandler.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,12 +135,16 @@ public Frame readFrame() throws IOException {
135135
public void writeFrame(Frame frame) throws IOException {
136136
synchronized (_outputStream) {
137137
frame.writeTo(_outputStream);
138-
_outputStream.flush();
139138
}
140139
}
141140

141+
public void flush() throws IOException {
142+
_outputStream.flush();
143+
}
144+
142145
public void close() {
143146
try { _socket.setSoLinger(true, SOCKET_CLOSING_TIMEOUT); } catch (Exception _) {}
147+
try { flush(); } catch (Exception _) {}
144148
try { _socket.close(); } catch (Exception _) {}
145149
}
146150
}

src/com/rabbitmq/client/impl/ValueReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
*/
3737
public class ValueReader
3838
{
39-
private static final long INT_MASK = 0xffffffff;
39+
private static final long INT_MASK = 0xffffffffL;
4040

4141
/**
4242
* Protected API - Cast an int to a long without extending the

test/src/com/rabbitmq/client/test/AMQConnectionTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,10 @@ public InetAddress getAddress() {
216216
public int getPort() {
217217
return -1;
218218
}
219+
220+
public void flush() throws IOException {
221+
// no need to implement this: don't bother writing the frame
222+
}
219223
}
220224

221225
/** Exception handler to facilitate testing. */

test/src/com/rabbitmq/client/test/BrokenFramesTest.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -84,23 +84,23 @@ public void testNoMethod() throws Exception {
8484
assertEquals(AMQP.FRAME_METHOD, unexpectedFrameError.getExpectedFrameType());
8585
return;
8686
}
87-
87+
8888
fail("No UnexpectedFrameError thrown");
8989
}
90-
90+
9191
public void testMethodThenBody() throws Exception {
9292
List<Frame> frames = new ArrayList<Frame>();
93-
93+
9494
byte[] contentBody = new byte[10];
9595
int channelNumber = 0;
96-
96+
9797
Publish method = new Publish(1, "test", "test", false, false);
9898

9999
frames.add(method.toFrame(0));
100100
frames.add(Frame.fromBodyFragment(channelNumber, contentBody, 0, contentBody.length));
101-
101+
102102
myFrameHandler.setFrames(frames.iterator());
103-
103+
104104
try {
105105
new AMQConnection(factory.getUsername(),
106106
factory.getPassword(),
@@ -120,7 +120,7 @@ public void testMethodThenBody() throws Exception {
120120
assertEquals(AMQP.FRAME_HEADER, unexpectedFrameError.getExpectedFrameType());
121121
return;
122122
}
123-
123+
124124
fail("No UnexpectedFrameError thrown");
125125
}
126126

@@ -132,7 +132,7 @@ private UnexpectedFrameError findUnexpectedFrameError(Exception e) {
132132
return (UnexpectedFrameError) t;
133133
}
134134
}
135-
135+
136136
return null;
137137
}
138138

@@ -173,6 +173,10 @@ public InetAddress getAddress() {
173173
public int getPort() {
174174
return -1;
175175
}
176+
177+
public void flush() throws IOException {
178+
// no need to implement this: don't bother writing the frame
179+
}
176180
}
177181

178182
}

test/src/com/rabbitmq/client/test/CloseInMainLoop.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ public void handleDelivery(String consumerTag,
125125

126126
channel.basicPublish("x", "k", null, new byte[10]);
127127

128-
assertTrue(closeLatch.await(200, TimeUnit.MILLISECONDS));
128+
assertTrue(closeLatch.await(1000, TimeUnit.MILLISECONDS));
129129
assertTrue(connection.hadValidShutdown());
130130
}
131131

test/src/com/rabbitmq/client/test/functional/DeadLetterExchange.java

Lines changed: 92 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
import com.rabbitmq.client.AMQP;
44
import com.rabbitmq.client.GetResponse;
5+
import com.rabbitmq.client.QueueingConsumer;
6+
import com.rabbitmq.client.QueueingConsumer.Delivery;
57
import com.rabbitmq.client.test.BrokerTestCase;
68

79
import java.io.IOException;
@@ -22,6 +24,7 @@ public class DeadLetterExchange extends BrokerTestCase {
2224
private static final String DLQ2 = "queue.dlq2";
2325
private static final int MSG_COUNT = 10;
2426
private static final int MSG_COUNT_MANY = 1000;
27+
private static final int TTL = 1000;
2528

2629
@Override
2730
protected void createResources() throws IOException {
@@ -84,13 +87,66 @@ public void testDeclareQueueWithRoutingKeyButNoDeadLetterExchange()
8487
}
8588

8689
public void testDeadLetterQueueTTLExpiredMessages() throws Exception {
87-
ttlTest(1000);
90+
ttlTest(TTL);
8891
}
8992

9093
public void testDeadLetterQueueZeroTTLExpiredMessages() throws Exception {
9194
ttlTest(0);
9295
}
9396

97+
public void testDeadLetterQueueTTLPromptExpiry() throws Exception {
98+
Map<String, Object> args = new HashMap<String, Object>();
99+
args.put("x-message-ttl", TTL);
100+
declareQueue(TEST_QUEUE_NAME, DLX, null, args);
101+
channel.queueBind(TEST_QUEUE_NAME, "amq.direct", "test");
102+
channel.queueBind(DLQ, DLX, "test");
103+
104+
//measure round-trip latency
105+
QueueingConsumer c = new QueueingConsumer(channel);
106+
String cTag = channel.basicConsume(TEST_QUEUE_NAME, true, c);
107+
long start = System.currentTimeMillis();
108+
publish(null, "test");
109+
Delivery d = c.nextDelivery(TTL);
110+
long stop = System.currentTimeMillis();
111+
assertNotNull(d);
112+
channel.basicCancel(cTag);
113+
long latency = stop-start;
114+
115+
channel.basicConsume(DLQ, true, c);
116+
117+
// publish messages at regular intervals until currentTime +
118+
// 3/4th of TTL
119+
int count = 0;
120+
start = System.currentTimeMillis();
121+
stop = start + TTL * 3 / 4;
122+
long now = start;
123+
while (now < stop) {
124+
publish(null, Long.toString(now));
125+
count++;
126+
Thread.sleep(TTL / 100);
127+
now = System.currentTimeMillis();
128+
}
129+
130+
checkPromptArrival(c, count, latency);
131+
132+
start = System.currentTimeMillis();
133+
// publish message - which kicks off the queue's ttl timer -
134+
// and immediately fetch it in noack mode
135+
publishAt(start);
136+
basicGet(TEST_QUEUE_NAME);
137+
// publish a 2nd message and immediately fetch it in ack mode
138+
publishAt(start + TTL * 1 / 2);
139+
GetResponse r = channel.basicGet(TEST_QUEUE_NAME, false);
140+
// publish a 3rd message
141+
publishAt(start + TTL * 3 / 4);
142+
// reject 2nd message after the initial timer has fired but
143+
// before the message is due to expire
144+
waitUntil(start + TTL * 5 / 4);
145+
channel.basicReject(r.getEnvelope().getDeliveryTag(), true);
146+
147+
checkPromptArrival(c, 2, latency);
148+
}
149+
94150
public void testDeadLetterDeletedDLX() throws Exception {
95151
declareQueue(TEST_QUEUE_NAME, DLX, null, null, 1);
96152
channel.queueBind(TEST_QUEUE_NAME, "amq.direct", "test");
@@ -322,6 +378,24 @@ private void sleep(long millis) {
322378
}
323379
}
324380

381+
/* check that each message arrives within epsilon of the
382+
publication time + TTL + latency */
383+
private void checkPromptArrival(QueueingConsumer c,
384+
int count, long latency) throws Exception {
385+
long epsilon = TTL / 50;
386+
for (int i = 0; i < count; i++) {
387+
Delivery d = c.nextDelivery(TTL + TTL + latency + epsilon);
388+
assertNotNull("message #" + i + " did not expire", d);
389+
long now = System.currentTimeMillis();
390+
long publishTime = Long.valueOf(new String(d.getBody()));
391+
long targetTime = publishTime + TTL + latency;
392+
assertTrue("expiry outside bounds (+/- " + epsilon + "): " +
393+
(now - targetTime),
394+
(now >= targetTime - epsilon) &&
395+
(now <= targetTime + epsilon));
396+
}
397+
}
398+
325399
private void declareQueue(Object deadLetterExchange) throws IOException {
326400
declareQueue(TEST_QUEUE_NAME, deadLetterExchange, null, null);
327401
}
@@ -359,10 +433,23 @@ private void publishN(int n) throws IOException {
359433
private void publishN(int n, AMQP.BasicProperties props)
360434
throws IOException
361435
{
362-
for(int x = 0; x < n; x++) {
363-
channel.basicPublish("amq.direct", "test", props,
364-
"test message".getBytes());
365-
}
436+
for(int x = 0; x < n; x++) { publish(props, "test message"); }
437+
}
438+
439+
private void publish(AMQP.BasicProperties props, String body)
440+
throws IOException
441+
{
442+
channel.basicPublish("amq.direct", "test", props, body.getBytes());
443+
}
444+
445+
private void publishAt(long when) throws Exception {
446+
waitUntil(when);
447+
publish(null, Long.toString(System.currentTimeMillis()));
448+
}
449+
450+
private void waitUntil(long when) throws Exception {
451+
long delay = when - System.currentTimeMillis();
452+
Thread.sleep(delay > 0 ? delay : 0);
366453
}
367454

368455
private void consumeN(String queue, int n, WithResponse withResponse)

0 commit comments

Comments
 (0)