Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ void flush(BufferNextMessage<S> bundler) {

// Old senders in other artifacts may be using this less precise way of indicating they've been closed
// out-of-band.
if (t instanceof IllegalStateException && t.getMessage().equals("closed")) {
if (t instanceof IllegalStateException && "closed".equals(t.getMessage())) {
throw (IllegalStateException) t;
}
}
Expand Down
12 changes: 8 additions & 4 deletions core/src/test/java/zipkin2/reporter/FakeSender.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,21 @@ public FakeSender messageMaxBytes(int messageMaxBytes) {
return messageMaxBytes;
}

/** close is typically called from a different thread */
volatile boolean closeCalled;
// allow us to simulate an exception
volatile RuntimeException exceptionToThrow;

@Override public void send(List<byte[]> encodedSpans) {
if (closeCalled) throw new ClosedSenderException();
if (exceptionToThrow != null) throw exceptionToThrow;
List<Span> decoded = encodedSpans.stream().map(decoder::decodeOne).collect(Collectors.toList());
onSpans.accept(decoded);
}

@Override public void close() {
closeCalled = true;
exceptionToThrow = new ClosedSenderException();
}

public void throwException(RuntimeException e) {
exceptionToThrow = e;
}

@Override public String toString() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -114,7 +115,7 @@ void queuedMaxSpans_dropsWhenOverqueuing(int queuedMaxBytes) {
reporter.report(span); // dropped the one that queued more than allowed count
reporter.flush();
reporter.close();

assertThat(sentSpans.get()).isEqualTo(1);
}

Expand All @@ -131,7 +132,7 @@ void report_incrementsMetrics(int queuedMaxBytes) {
reporter.report(span);
reporter.flush();
reporter.close();

assertThat(metrics.spans()).isEqualTo(2);
assertThat(metrics.spanBytes()).isEqualTo(SpanBytesEncoder.JSON_V2.encode(span).length * 2);
}
Expand All @@ -154,8 +155,7 @@ void report_incrementsSpansDropped(int queuedMaxBytes) {
assertThat(metrics.spans()).isEqualTo(2);
assertThat(metrics.spansDropped()).isEqualTo(1);
}



@ParameterizedTest(name = "queuedMaxBytes={0}")
@ValueSource(ints = { 0, 1000000 })
void report_incrementsSpansDroppedOversizing(int queuedMaxBytes) {
Expand Down Expand Up @@ -321,7 +321,7 @@ void messageTimeout_disabled(int queuedMaxBytes) throws InterruptedException {
// check name is pretty
assertThat(threadName.take())
.isEqualTo("AsyncReporter{FakeSender}");

reporter.close();
}

Expand All @@ -342,7 +342,7 @@ void messageTimeout_disabled(int queuedMaxBytes) throws InterruptedException {
BoundedAsyncReporter<Span> impl = (BoundedAsyncReporter<Span>) reporter;
assertThat(impl.close.await(3, TimeUnit.MILLISECONDS))
.isTrue();

reporter.close();
}

Expand Down Expand Up @@ -396,7 +396,7 @@ void messageTimeout_disabled(int queuedMaxBytes) throws InterruptedException {
assertThat(metrics.messagesDropped()).isEqualTo(1);
assertThat(metrics.messagesDroppedByCause().keySet().iterator().next())
.isEqualTo(ClosedSenderException.class);

reporter.close();
}

Expand Down Expand Up @@ -511,6 +511,41 @@ void quitsBlockingWhenOverTimeout(int queuedMaxBytes) throws InterruptedExceptio
}
}

@Test void flush_incrementsMetricsAndThrowsWhenIllegalStateExceptionWithMessage() {
AsyncReporter<Span> reporter = AsyncReporter.newBuilder(sleepingSender)
.metrics(metrics)
.messageTimeout(0, TimeUnit.MILLISECONDS)
.build(SpanBytesEncoder.JSON_V2);

reporter.report(span);

sleepingSender.throwException(new IllegalStateException("closed"));
try {
reporter.flush();
failBecauseExceptionWasNotThrown(IllegalStateException.class);
} catch (IllegalStateException e) {
assertThat(metrics.spansDropped()).isEqualTo(1);
assertThat(metrics.messagesDropped()).isEqualTo(1);
} finally {
reporter.close();
}
}

@Test void flush_incrementsMetricsAndDontThrowsWhenCancellationException() {
AsyncReporter<Span> reporter = AsyncReporter.newBuilder(sleepingSender)
.metrics(metrics)
.messageTimeout(0, TimeUnit.MILLISECONDS)
.build(SpanBytesEncoder.JSON_V2);

reporter.report(span);

sleepingSender.throwException(new CancellationException());
reporter.flush();
assertThat(metrics.spansDropped()).isEqualTo(1);
assertThat(metrics.messagesDropped()).isEqualTo(1);
reporter.close();
}

@Test void build_threadFactory() {
Thread thread = new Thread();
AsyncReporter<Span> reporter = AsyncReporter.newBuilder(FakeSender.create())
Expand Down