Skip to content

Commit e51d5cd

Browse files
author
Steve Powell
committed
Corrected start/stop logic for AsyncLogger.
2 parents afb649c + 8520066 commit e51d5cd

File tree

4 files changed

+45
-17
lines changed

4 files changed

+45
-17
lines changed

src/com/rabbitmq/client/ConsumerCancelledException.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ public class ConsumerCancelledException extends RuntimeException implements
2424
/** Default for non-checking. */
2525
private static final long serialVersionUID = 1L;
2626

27-
@Override
2827
public ConsumerCancelledException sensibleClone() {
2928
try {
3029
return (ConsumerCancelledException) super.clone();

src/com/rabbitmq/tools/Tracer.java

Lines changed: 42 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.concurrent.BlockingQueue;
3232
import java.util.concurrent.TimeUnit;
3333
import java.util.concurrent.atomic.AtomicBoolean;
34+
import java.util.concurrent.atomic.AtomicInteger;
3435

3536
import com.rabbitmq.client.AMQP;
3637
import com.rabbitmq.client.impl.AMQCommand;
@@ -388,7 +389,7 @@ public static class AsyncLogger implements Logger {
388389

389390
private final Runnable loggerRunnable;
390391

391-
private final AtomicBoolean started;
392+
private final SafeCounter countStarted;
392393
private volatile Thread loggerThread = null;
393394

394395
/**
@@ -430,7 +431,7 @@ public AsyncLogger(OutputStream os, int flushInterval) {
430431
throw new IllegalArgumentException("Flush interval ("
431432
+ flushInterval + "ms) must be positive and at least "
432433
+ MIN_FLUSH_INTERVAL + "ms.");
433-
this.started = new AtomicBoolean(false);
434+
this.countStarted = new SafeCounter();
434435

435436
PrintStream printStream = new PrintStream(new BufferedOutputStream(
436437
os, BUFFER_SIZE), false);
@@ -448,8 +449,17 @@ public void log(String message) {
448449
}
449450
}
450451

452+
public boolean start() {
453+
if (this.countStarted.testZeroAndIncrement()) {
454+
this.loggerThread = new Thread(this.loggerRunnable);
455+
this.loggerThread.start();
456+
return true;
457+
}
458+
return false; // meaning already started
459+
}
460+
451461
public boolean stop() {
452-
if (this.started.compareAndSet(true, false)) {
462+
if (this.countStarted.decrementAndTestZero()) {
453463
if (this.loggerThread != null) {
454464
try {
455465
this.queue.put(new Pr<String, LogCmd>(null, LogCmd.STOP));
@@ -464,16 +474,7 @@ public boolean stop() {
464474
return false; // meaning already stopped
465475
}
466476

467-
public boolean start() {
468-
if (this.started.compareAndSet(false, true)) {
469-
this.loggerThread = new Thread(this.loggerRunnable);
470-
this.loggerThread.start();
471-
return true;
472-
}
473-
return false; // meaning already started
474-
}
475-
476-
private static class AsyncLoggerRunnable implements Runnable {
477+
private class AsyncLoggerRunnable implements Runnable {
477478
private final int flushInterval;
478479
private final PrintStream ps;
479480
private final BlockingQueue<Pr<String, LogCmd> > queue;
@@ -515,6 +516,7 @@ public void run() {
515516
this.ps.flush();
516517

517518
} catch (InterruptedException ie) {
519+
AsyncLogger.this.countStarted.reset();
518520
drainCurrentQueue();
519521
this.ps.println("Interrupted.");
520522
this.ps.flush();
@@ -531,4 +533,31 @@ private void drainCurrentQueue() {
531533
}
532534
}
533535
}
536+
537+
private static class SafeCounter {
538+
private final Object countMonitor = new Object();
539+
private int count;
540+
public SafeCounter() {
541+
this.count = 0;
542+
}
543+
public boolean testZeroAndIncrement() {
544+
synchronized (this.countMonitor) {
545+
int val = this.count;
546+
this.count++;
547+
return (val == 0);
548+
}
549+
}
550+
public boolean decrementAndTestZero() {
551+
synchronized (this.countMonitor) {
552+
if (this.count == 0) return false;
553+
--this.count;
554+
return (0 == this.count);
555+
}
556+
}
557+
public void reset() {
558+
synchronized (this.countMonitor) {
559+
this.count = 0;
560+
}
561+
}
562+
}
534563
}

test/src/com/rabbitmq/client/test/functional/ConsumerCancelNotificiation.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@ public void testConsumerCancellationInterruptsQueuingConsumerWait()
5757
final QueueingConsumer consumer = new QueueingConsumer(channel);
5858
Runnable receiver = new Runnable() {
5959

60-
@Override
6160
public void run() {
6261
try {
6362
try {

test/src/com/rabbitmq/client/test/functional/PerQueueTTL.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,9 +108,10 @@ public void testTTLMustBePositive() throws Exception {
108108
public void testQueueRedeclareEquivalence() throws Exception {
109109
declareQueue(TTL_QUEUE_NAME, 10);
110110
try {
111-
declareQueue(TTL_QUEUE_NAME, 20);
111+
declareQueue(TTL_QUEUE_NAME, 20);
112+
fail("Should not be able to redeclare with different TTL");
112113
} catch(IOException ex) {
113-
checkShutdownSignal(AMQP.NOT_ALLOWED, ex);
114+
checkShutdownSignal(AMQP.PRECONDITION_FAILED, ex);
114115
}
115116
}
116117

0 commit comments

Comments
 (0)