|
21 | 21 | import static org.testng.Assert.assertEquals; |
22 | 22 | import static org.testng.Assert.assertNotEquals; |
23 | 23 | import static org.testng.Assert.assertNotNull; |
| 24 | +import static org.testng.Assert.fail; |
| 25 | + |
24 | 26 | import io.netty.buffer.ByteBuf; |
25 | 27 | import io.netty.buffer.Unpooled; |
26 | 28 | import java.nio.charset.StandardCharsets; |
| 29 | +import java.util.ArrayList; |
| 30 | +import java.util.Collections; |
27 | 31 | import java.util.HashSet; |
28 | 32 | import java.util.List; |
29 | 33 | import java.util.Set; |
@@ -431,4 +435,49 @@ public boolean test(@Nullable Entry entry) { |
431 | 435 | } |
432 | 436 | } |
433 | 437 |
|
| 438 | + @Test(timeOut = 3000) |
| 439 | + public void testManagedLedgerPayloadInputProcessorFailure() throws Exception { |
| 440 | + var config = new ManagedLedgerConfig(); |
| 441 | + final String failureMsg = "failed to process input payload"; |
| 442 | + config.setManagedLedgerInterceptor(new ManagedLedgerInterceptorImpl( |
| 443 | + Collections.emptySet(), Set.of(new ManagedLedgerPayloadProcessor() { |
| 444 | + @Override |
| 445 | + public Processor inputProcessor() { |
| 446 | + return new Processor() { |
| 447 | + @Override |
| 448 | + public ByteBuf process(Object contextObj, ByteBuf inputPayload) { |
| 449 | + throw new RuntimeException(failureMsg); |
| 450 | + } |
| 451 | + |
| 452 | + @Override |
| 453 | + public void release(ByteBuf processedPayload) { |
| 454 | + // no-op |
| 455 | + fail("the release method can't be reached"); |
| 456 | + } |
| 457 | + }; |
| 458 | + } |
| 459 | + }))); |
| 460 | + |
| 461 | + var ledger = factory.open("testManagedLedgerPayloadProcessorFailure", config); |
| 462 | + var countDownLatch = new CountDownLatch(1); |
| 463 | + var expectedException = new ArrayList<Exception>(); |
| 464 | + ledger.asyncAddEntry("test".getBytes(), 1, 1, new AsyncCallbacks.AddEntryCallback() { |
| 465 | + @Override |
| 466 | + public void addComplete(Position position, ByteBuf entryData, Object ctx) { |
| 467 | + entryData.release(); |
| 468 | + countDownLatch.countDown(); |
| 469 | + } |
| 470 | + |
| 471 | + @Override |
| 472 | + public void addFailed(ManagedLedgerException exception, Object ctx) { |
| 473 | + // expected |
| 474 | + expectedException.add(exception); |
| 475 | + countDownLatch.countDown(); |
| 476 | + } |
| 477 | + }, null); |
| 478 | + countDownLatch.await(); |
| 479 | + assertEquals(expectedException.size(), 1); |
| 480 | + assertEquals(expectedException.get(0).getCause().getMessage(), failureMsg); |
| 481 | + } |
| 482 | + |
434 | 483 | } |
0 commit comments