88import java .util .Collections ;
99import java .util .List ;
1010import java .util .concurrent .BlockingQueue ;
11+ import java .util .concurrent .CancellationException ;
1112import java .util .concurrent .CountDownLatch ;
1213import java .util .concurrent .LinkedBlockingQueue ;
1314import java .util .concurrent .TimeUnit ;
@@ -114,7 +115,7 @@ void queuedMaxSpans_dropsWhenOverqueuing(int queuedMaxBytes) {
114115 reporter .report (span ); // dropped the one that queued more than allowed count
115116 reporter .flush ();
116117 reporter .close ();
117-
118+
118119 assertThat (sentSpans .get ()).isEqualTo (1 );
119120 }
120121
@@ -131,7 +132,7 @@ void report_incrementsMetrics(int queuedMaxBytes) {
131132 reporter .report (span );
132133 reporter .flush ();
133134 reporter .close ();
134-
135+
135136 assertThat (metrics .spans ()).isEqualTo (2 );
136137 assertThat (metrics .spanBytes ()).isEqualTo (SpanBytesEncoder .JSON_V2 .encode (span ).length * 2 );
137138 }
@@ -154,8 +155,7 @@ void report_incrementsSpansDropped(int queuedMaxBytes) {
154155 assertThat (metrics .spans ()).isEqualTo (2 );
155156 assertThat (metrics .spansDropped ()).isEqualTo (1 );
156157 }
157-
158-
158+
159159 @ ParameterizedTest (name = "queuedMaxBytes={0}" )
160160 @ ValueSource (ints = { 0 , 1000000 })
161161 void report_incrementsSpansDroppedOversizing (int queuedMaxBytes ) {
@@ -321,7 +321,7 @@ void messageTimeout_disabled(int queuedMaxBytes) throws InterruptedException {
321321 // check name is pretty
322322 assertThat (threadName .take ())
323323 .isEqualTo ("AsyncReporter{FakeSender}" );
324-
324+
325325 reporter .close ();
326326 }
327327
@@ -342,7 +342,7 @@ void messageTimeout_disabled(int queuedMaxBytes) throws InterruptedException {
342342 BoundedAsyncReporter <Span > impl = (BoundedAsyncReporter <Span >) reporter ;
343343 assertThat (impl .close .await (3 , TimeUnit .MILLISECONDS ))
344344 .isTrue ();
345-
345+
346346 reporter .close ();
347347 }
348348
@@ -396,7 +396,7 @@ void messageTimeout_disabled(int queuedMaxBytes) throws InterruptedException {
396396 assertThat (metrics .messagesDropped ()).isEqualTo (1 );
397397 assertThat (metrics .messagesDroppedByCause ().keySet ().iterator ().next ())
398398 .isEqualTo (ClosedSenderException .class );
399-
399+
400400 reporter .close ();
401401 }
402402
@@ -511,6 +511,41 @@ void quitsBlockingWhenOverTimeout(int queuedMaxBytes) throws InterruptedExceptio
511511 }
512512 }
513513
514+ @ Test void flush_incrementsMetricsAndThrowsWhenIllegalStateExceptionWithMessage () {
515+ AsyncReporter <Span > reporter = AsyncReporter .newBuilder (sleepingSender )
516+ .metrics (metrics )
517+ .messageTimeout (0 , TimeUnit .MILLISECONDS )
518+ .build (SpanBytesEncoder .JSON_V2 );
519+
520+ reporter .report (span );
521+
522+ sleepingSender .throwException (new IllegalStateException ("closed" ));
523+ try {
524+ reporter .flush ();
525+ failBecauseExceptionWasNotThrown (IllegalStateException .class );
526+ } catch (IllegalStateException e ) {
527+ assertThat (metrics .spansDropped ()).isEqualTo (1 );
528+ assertThat (metrics .messagesDropped ()).isEqualTo (1 );
529+ } finally {
530+ reporter .close ();
531+ }
532+ }
533+
534+ @ Test void flush_incrementsMetricsAndDontThrowsWhenCancellationException () {
535+ AsyncReporter <Span > reporter = AsyncReporter .newBuilder (sleepingSender )
536+ .metrics (metrics )
537+ .messageTimeout (0 , TimeUnit .MILLISECONDS )
538+ .build (SpanBytesEncoder .JSON_V2 );
539+
540+ reporter .report (span );
541+
542+ sleepingSender .throwException (new CancellationException ());
543+ reporter .flush ();
544+ assertThat (metrics .spansDropped ()).isEqualTo (1 );
545+ assertThat (metrics .messagesDropped ()).isEqualTo (1 );
546+ reporter .close ();
547+ }
548+
514549 @ Test void build_threadFactory () {
515550 Thread thread = new Thread ();
516551 AsyncReporter <Span > reporter = AsyncReporter .newBuilder (FakeSender .create ())
0 commit comments