|
22 | 22 | import static org.junit.Assert.assertThat; |
23 | 23 | import static org.junit.Assert.assertTrue; |
24 | 24 | import static org.mockito.ArgumentMatchers.argThat; |
| 25 | +import static org.mockito.ArgumentMatchers.nullable; |
| 26 | +import static org.mockito.Mockito.doAnswer; |
25 | 27 | import static org.mockito.Mockito.mock; |
26 | 28 | import static org.mockito.Mockito.verify; |
27 | 29 | import static org.mockito.Mockito.when; |
|
39 | 41 | import io.jaegertracing.spi.Reporter; |
40 | 42 | import io.jaegertracing.spi.Sender; |
41 | 43 | import java.util.ArrayList; |
| 44 | +import java.util.Collections; |
42 | 45 | import java.util.List; |
43 | 46 | import java.util.concurrent.CountDownLatch; |
44 | 47 | import java.util.concurrent.CyclicBarrier; |
45 | 48 | import java.util.concurrent.TimeUnit; |
46 | 49 | import java.util.concurrent.atomic.AtomicBoolean; |
| 50 | +import java.util.concurrent.atomic.AtomicInteger; |
| 51 | +import java.util.concurrent.atomic.AtomicReference; |
| 52 | +import java.util.function.Consumer; |
| 53 | + |
47 | 54 | import org.junit.Before; |
48 | 55 | import org.junit.Ignore; |
49 | 56 | import org.junit.Test; |
50 | 57 | import org.mockito.ArgumentMatcher; |
| 58 | +import org.mockito.ArgumentMatchers; |
51 | 59 | import org.slf4j.LoggerFactory; |
52 | 60 |
|
53 | 61 | public class RemoteReporterTest { |
@@ -215,15 +223,8 @@ public void testCloseWhenQueueFull() { |
215 | 223 | @Test |
216 | 224 | public void testCloseLogSenderException() throws SenderException { |
217 | 225 |
|
218 | | - // set up mocking |
219 | | - ch.qos.logback.classic.Logger root = |
220 | | - (ch.qos.logback.classic.Logger) LoggerFactory |
221 | | - .getLogger(ch.qos.logback.classic.Logger.ROOT_LOGGER_NAME); |
222 | | - |
223 | | - @SuppressWarnings("unchecked") final Appender<ILoggingEvent> mockAppender = |
224 | | - mock(Appender.class); |
225 | | - when(mockAppender.getName()).thenReturn("MOCK"); |
226 | | - root.addAppender(mockAppender); |
| 226 | + Appender<ILoggingEvent> mockAppender = mockLogger(e -> { |
| 227 | + }); |
227 | 228 |
|
228 | 229 | final Sender mockedSender = mock(Sender.class); |
229 | 230 | when(mockedSender.close()).thenThrow(SenderException.class); |
@@ -322,6 +323,74 @@ public int flush() throws SenderException { |
322 | 323 | assertEquals("mySpan", (sender.getReceived().get(0)).getOperationName()); |
323 | 324 | } |
324 | 325 |
|
| 326 | + @Test |
| 327 | + public void testFlushErrorsLoggedJustOnce() throws InterruptedException { |
| 328 | + |
| 329 | + Object logMonitor = new Object(); |
| 330 | + AtomicReference<String> logMsg = new AtomicReference<>(null); |
| 331 | + mockLogger(e -> { |
| 332 | + synchronized (logMonitor) { |
| 333 | + logMsg.set(e.getFormattedMessage()); |
| 334 | + logMonitor.notifyAll(); |
| 335 | + } |
| 336 | + }); |
| 337 | + |
| 338 | + class FailingSender extends InMemorySender { |
| 339 | + private final AtomicInteger flushCounter = new AtomicInteger(0); |
| 340 | + |
| 341 | + @Override |
| 342 | + public int flush() throws SenderException { |
| 343 | + int i = super.flush(); |
| 344 | + switch (flushCounter.getAndIncrement()) { |
| 345 | + case 1: |
| 346 | + case 2: |
| 347 | + case 3: |
| 348 | + throw new SenderException("test1", super.flush()); |
| 349 | + default: |
| 350 | + return i; |
| 351 | + } |
| 352 | + } |
| 353 | + |
| 354 | + private String awaitMessage(AtomicReference<String> ref) throws InterruptedException { |
| 355 | + synchronized (logMonitor) { |
| 356 | + while (ref.get() == null) { |
| 357 | + logMonitor.wait(); |
| 358 | + } |
| 359 | + return ref.getAndSet(null); |
| 360 | + } |
| 361 | + } |
| 362 | + } |
| 363 | + |
| 364 | + FailingSender sender = new FailingSender(); |
| 365 | + |
| 366 | + RemoteReporter remoteReporter = new Builder() |
| 367 | + .withSender(sender) |
| 368 | + .withFlushInterval(Integer.MAX_VALUE) |
| 369 | + .withMaxQueueSize(maxQueueSize) |
| 370 | + .withMetrics(metrics) |
| 371 | + .build(); |
| 372 | + tracer = new JaegerTracer.Builder("test-remote-reporter") |
| 373 | + .withReporter(remoteReporter) |
| 374 | + .withSampler(new ConstSampler(true)) |
| 375 | + .withMetrics(metrics) |
| 376 | + .build(); |
| 377 | + |
| 378 | + tracer.buildSpan("mySpan").start().finish(); |
| 379 | + remoteReporter.flush(); |
| 380 | + |
| 381 | + tracer.buildSpan("mySpan").start().finish(); |
| 382 | + remoteReporter.flush(); |
| 383 | + |
| 384 | + assertEquals("FlushCommand execution failed! Repeated errors of this command will not be logged.", |
| 385 | + sender.awaitMessage(logMsg)); |
| 386 | + |
| 387 | + remoteReporter.flush(); |
| 388 | + remoteReporter.flush(); |
| 389 | + remoteReporter.flush(); |
| 390 | + assertEquals("FlushCommand is working again!", sender.awaitMessage(logMsg)); |
| 391 | + |
| 392 | + } |
| 393 | + |
325 | 394 | @Test |
326 | 395 | public void testUpdateSuccessMetricWhenAppendFlushed() throws InterruptedException { |
327 | 396 | int totalSpans = 3; |
@@ -396,4 +465,21 @@ public int append(JaegerSpan span) throws SenderException { |
396 | 465 | private JaegerSpan newSpan() { |
397 | 466 | return tracer.buildSpan("x").start(); |
398 | 467 | } |
| 468 | + |
| 469 | + private static Appender<ILoggingEvent> mockLogger(Consumer<ILoggingEvent> append) { |
| 470 | + ch.qos.logback.classic.Logger root = |
| 471 | + (ch.qos.logback.classic.Logger) LoggerFactory |
| 472 | + .getLogger(ch.qos.logback.classic.Logger.ROOT_LOGGER_NAME); |
| 473 | + |
| 474 | + @SuppressWarnings("unchecked") |
| 475 | + final Appender<ILoggingEvent> mockAppender = mock(Appender.class); |
| 476 | + when(mockAppender.getName()).thenReturn("MOCK"); |
| 477 | + doAnswer(i -> { |
| 478 | + append.accept(i.getArgument(0)); |
| 479 | + return null; |
| 480 | + }).when(mockAppender).doAppend(ArgumentMatchers.any(ILoggingEvent.class)); |
| 481 | + root.addAppender(mockAppender); |
| 482 | + |
| 483 | + return mockAppender; |
| 484 | + } |
399 | 485 | } |
0 commit comments