Skip to content

Commit c10f651

Browse files
committed
basic returns are logged and reported for each individual producer
1 parent 0254d20 commit c10f651

File tree

1 file changed

+22
-16
lines changed

1 file changed

+22
-16
lines changed

test/src/com/rabbitmq/examples/MulticastMain.java

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ public static void main(String[] args) {
9090

9191
//setup
9292
String id = UUID.randomUUID().toString();
93-
final Stats stats = new Stats(1000L * samplingInterval);
93+
Stats stats = new Stats(1000L * samplingInterval);
9494
ConnectionFactory factory = new ConnectionFactory();
9595
factory.setHost(hostName);
9696
factory.setPort(portNumber);
@@ -127,6 +127,10 @@ public static void main(String[] args) {
127127
Channel channel = conn.createChannel();
128128
if (producerTxSize > 0) channel.txSelect();
129129
channel.exchangeDeclare(exchangeName, exchangeType);
130+
final Producer p = new Producer(channel, exchangeName, id,
131+
flags, producerTxSize,
132+
1000L * samplingInterval,
133+
rateLimit, minMsgSize, timeLimit);
130134
channel.setReturnListener(new ReturnListener() {
131135
public void handleBasicReturn(int replyCode,
132136
String replyText,
@@ -135,14 +139,10 @@ public void handleBasicReturn(int replyCode,
135139
AMQP.BasicProperties properties,
136140
byte[] body)
137141
throws IOException {
138-
stats.logBasicReturn();
142+
p.logBasicReturn();
139143
}
140144
});
141-
Thread t =
142-
new Thread(new Producer(channel, exchangeName, id,
143-
flags, producerTxSize,
144-
1000L * samplingInterval,
145-
rateLimit, minMsgSize, timeLimit));
145+
Thread t = new Thread(p);
146146
producerThreads[i] = t;
147147
t.start();
148148
}
@@ -227,6 +227,7 @@ public static class Producer implements Runnable {
227227
private long startTime;
228228
private long lastStatsTime;
229229
private int msgCount;
230+
private int basicReturnCount;
230231

231232
public Producer(Channel channel, String exchangeName, String id,
232233
List flags, int txSize,
@@ -246,6 +247,14 @@ public Producer(Channel channel, String exchangeName, String id,
246247
this.message = new byte[minMsgSize];
247248
}
248249

250+
public synchronized void logBasicReturn() {
251+
basicReturnCount++;
252+
}
253+
254+
public synchronized void resetBasicReturns() {
255+
basicReturnCount = 0;
256+
}
257+
249258
public void run() {
250259

251260
long now;
@@ -304,7 +313,11 @@ private void delay(long now)
304313
if (elapsed > interval) {
305314
System.out.println("sending rate: " +
306315
(msgCount * 1000L / elapsed) +
307-
" msg/s");
316+
" msg/s" +
317+
", basic returns: " +
318+
(basicReturnCount * 1000L / elapsed) +
319+
" ret/s");
320+
resetBasicReturns();
308321
msgCount = 0;
309322
lastStatsTime = now;
310323
}
@@ -421,7 +434,6 @@ public static class Stats {
421434
private long minLatency;
422435
private long maxLatency;
423436
private long cumulativeLatency;
424-
private long numBasicReturns;
425437

426438
public Stats(long interval) {
427439
this.interval = interval;
@@ -435,11 +447,6 @@ private void reset(long t) {
435447
minLatency = Long.MAX_VALUE;
436448
maxLatency = Long.MIN_VALUE;
437449
cumulativeLatency = 0L;
438-
numBasicReturns = 0L;
439-
}
440-
441-
public synchronized void logBasicReturn() {
442-
++numBasicReturns;
443450
}
444451

445452
public synchronized void collectStats(long now, long latency) {
@@ -462,8 +469,7 @@ public synchronized void collectStats(long now, long latency) {
462469
minLatency/1000L + "/" +
463470
cumulativeLatency / (1000L * latencyCount) + "/" +
464471
maxLatency/1000L + " microseconds" :
465-
"") +
466-
", basic returns: " + numBasicReturns);
472+
""));
467473
reset(now);
468474
}
469475

0 commit comments

Comments
 (0)