|
18 | 18 | */ |
19 | 19 | package org.apache.pulsar.broker.intercept; |
20 | 20 |
|
| 21 | +import static org.testng.Assert.fail; |
| 22 | + |
21 | 23 | import io.netty.buffer.ByteBuf; |
22 | 24 | import io.netty.buffer.Unpooled; |
| 25 | +import java.util.ArrayList; |
| 26 | +import java.util.Collections; |
23 | 27 | import java.util.concurrent.CompletableFuture; |
24 | 28 | import java.util.concurrent.CountDownLatch; |
25 | 29 | import java.util.function.Predicate; |
@@ -436,4 +440,49 @@ public boolean test(@Nullable Entry entry) { |
436 | 440 | } |
437 | 441 | } |
438 | 442 |
|
| 443 | + @Test(timeOut = 3000) |
| 444 | + public void testManagedLedgerPayloadInputProcessorFailure() throws Exception { |
| 445 | + var config = new ManagedLedgerConfig(); |
| 446 | + final String failureMsg = "failed to process input payload"; |
| 447 | + config.setManagedLedgerInterceptor(new ManagedLedgerInterceptorImpl( |
| 448 | + Collections.emptySet(), Set.of(new ManagedLedgerPayloadProcessor() { |
| 449 | + @Override |
| 450 | + public Processor inputProcessor() { |
| 451 | + return new Processor() { |
| 452 | + @Override |
| 453 | + public ByteBuf process(Object contextObj, ByteBuf inputPayload) { |
| 454 | + throw new RuntimeException(failureMsg); |
| 455 | + } |
| 456 | + |
| 457 | + @Override |
| 458 | + public void release(ByteBuf processedPayload) { |
| 459 | + // no-op |
| 460 | + fail("the release method can't be reached"); |
| 461 | + } |
| 462 | + }; |
| 463 | + } |
| 464 | + }))); |
| 465 | + |
| 466 | + var ledger = factory.open("testManagedLedgerPayloadProcessorFailure", config); |
| 467 | + var countDownLatch = new CountDownLatch(1); |
| 468 | + var expectedException = new ArrayList<Exception>(); |
| 469 | + ledger.asyncAddEntry("test".getBytes(), 1, 1, new AsyncCallbacks.AddEntryCallback() { |
| 470 | + @Override |
| 471 | + public void addComplete(Position position, ByteBuf entryData, Object ctx) { |
| 472 | + entryData.release(); |
| 473 | + countDownLatch.countDown(); |
| 474 | + } |
| 475 | + |
| 476 | + @Override |
| 477 | + public void addFailed(ManagedLedgerException exception, Object ctx) { |
| 478 | + // expected |
| 479 | + expectedException.add(exception); |
| 480 | + countDownLatch.countDown(); |
| 481 | + } |
| 482 | + }, null); |
| 483 | + countDownLatch.await(); |
| 484 | + assertEquals(expectedException.size(), 1); |
| 485 | + assertEquals(expectedException.get(0).getCause().getMessage(), failureMsg); |
| 486 | + } |
| 487 | + |
439 | 488 | } |
0 commit comments