1818 */
1919package org .apache .pulsar .broker .intercept ;
2020
21- import static org .testng .Assert .fail ;
2221
2322import io .netty .buffer .ByteBuf ;
2423import io .netty .buffer .Unpooled ;
2524import java .util .ArrayList ;
2625import java .util .Collections ;
2726import java .util .concurrent .CompletableFuture ;
2827import java .util .concurrent .CountDownLatch ;
28+ import java .util .concurrent .atomic .AtomicInteger ;
2929import java .util .function .Predicate ;
3030import lombok .Cleanup ;
3131import org .apache .bookkeeper .mledger .AsyncCallbacks ;
6363import static org .testng .Assert .assertNotNull ;
6464
6565@ Test (groups = "broker" )
66- public class MangedLedgerInterceptorImplTest extends MockedBookKeeperTestCase {
67- private static final Logger log = LoggerFactory .getLogger (MangedLedgerInterceptorImplTest .class );
66+ public class ManagedLedgerInterceptorImplTest extends MockedBookKeeperTestCase {
67+ private static final Logger log = LoggerFactory .getLogger (ManagedLedgerInterceptorImplTest .class );
6868
6969 public static class TestPayloadProcessor implements ManagedLedgerPayloadProcessor {
7070 @ Override
@@ -451,26 +451,32 @@ public Processor inputProcessor() {
451451 return new Processor () {
452452 @ Override
453453 public ByteBuf process (Object contextObj , ByteBuf inputPayload ) {
454- throw new RuntimeException (failureMsg );
454+ if (inputPayload .readBoolean ()) {
455+ throw new RuntimeException (failureMsg );
456+ }
457+ return inputPayload ;
455458 }
456459
457460 @ Override
458461 public void release (ByteBuf processedPayload ) {
459462 // no-op
460- fail ("the release method can't be reached" );
461463 }
462464 };
463465 }
464466 })));
465467
466468 var ledger = factory .open ("testManagedLedgerPayloadProcessorFailure" , config );
467- var countDownLatch = new CountDownLatch (1 );
469+ int count = 10 ;
470+ var countDownLatch = new CountDownLatch (count );
471+ var successCount = new AtomicInteger (0 );
468472 var expectedException = new ArrayList <Exception >();
469- ledger .asyncAddEntry ("test" .getBytes (), 1 , 1 , new AsyncCallbacks .AddEntryCallback () {
473+
474+ var addEntryCallback = new AsyncCallbacks .AddEntryCallback () {
470475 @ Override
471476 public void addComplete (Position position , ByteBuf entryData , Object ctx ) {
472477 entryData .release ();
473478 countDownLatch .countDown ();
479+ successCount .incrementAndGet ();
474480 }
475481
476482 @ Override
@@ -479,10 +485,23 @@ public void addFailed(ManagedLedgerException exception, Object ctx) {
479485 expectedException .add (exception );
480486 countDownLatch .countDown ();
481487 }
482- }, null );
488+ };
489+
490+ for (int i = 0 ; i < count ; i ++) {
491+ if (i % 2 == 0 ) {
492+ ledger .asyncAddEntry (Unpooled .buffer ().writeBoolean (true ), addEntryCallback , null );
493+ } else {
494+ ledger .asyncAddEntry (Unpooled .buffer ().writeBoolean (false ), addEntryCallback , null );
495+ }
496+ }
497+
483498 countDownLatch .await ();
484- assertEquals (expectedException .size (), 1 );
485- assertEquals (expectedException .get (0 ).getCause ().getMessage (), failureMsg );
499+ assertEquals (expectedException .size (), count / 2 );
500+ assertEquals (successCount .get (), count / 2 );
501+ for (Exception e : expectedException ) {
502+ assertEquals (e .getCause ().getMessage (), failureMsg );
503+ }
504+ ledger .close ();
486505 }
487506
488507}
0 commit comments