Skip to content

Commit 420f62e

Browse files
authored
[fix][broker] Remove failed OpAddEntry from pendingAddEntries (#23817)
1 parent 2bd9784 commit 420f62e

File tree

3 files changed

+35
-14
lines changed

3 files changed

+35
-14
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ public void initiate() {
143143
payloadProcessorHandle = ml.getManagedLedgerInterceptor()
144144
.processPayloadBeforeLedgerWrite(this.getCtx(), duplicateBuffer);
145145
} catch (Exception e) {
146+
ml.pendingAddEntries.remove(this);
146147
ReferenceCountUtil.safeRelease(duplicateBuffer);
147148
log.error("[{}] Error processing payload before ledger write", ml.getName(), e);
148149
this.failed(new ManagedLedgerException.ManagedLedgerInterceptException(e));

pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/MangedLedgerInterceptorImpl2Test.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
package org.apache.bookkeeper.mledger.impl;
2020

2121
import static org.testng.Assert.assertEquals;
22-
import static org.apache.pulsar.broker.intercept.MangedLedgerInterceptorImplTest.TestPayloadProcessor;
22+
import static org.apache.pulsar.broker.intercept.ManagedLedgerInterceptorImplTest.TestPayloadProcessor;
2323
import java.util.HashSet;
2424
import java.util.Set;
2525
import lombok.extern.slf4j.Slf4j;
@@ -28,13 +28,13 @@
2828
import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
2929
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
3030
import org.apache.pulsar.broker.intercept.ManagedLedgerInterceptorImpl;
31-
import org.apache.pulsar.broker.intercept.MangedLedgerInterceptorImplTest;
31+
import org.apache.pulsar.broker.intercept.ManagedLedgerInterceptorImplTest;
3232
import org.apache.pulsar.common.intercept.ManagedLedgerPayloadProcessor;
3333
import org.awaitility.Awaitility;
3434
import org.testng.annotations.Test;
3535

3636
/***
37-
* Differ to {@link MangedLedgerInterceptorImplTest}, this test can call {@link ManagedLedgerImpl}'s methods modified
37+
* Differ to {@link ManagedLedgerInterceptorImplTest}, this test can call {@link ManagedLedgerImpl}'s methods modified
3838
* by "default".
3939
*/
4040
@Slf4j
@@ -73,7 +73,7 @@ public void testCurrentLedgerSizeCorrectIfHasInterceptor() throws Exception {
7373
switchLedgerManually(ledger);
7474

7575
// verify.
76-
assertEquals(currentLedgerSize, MangedLedgerInterceptorImplTest.calculatePreciseSize(ledger));
76+
assertEquals(currentLedgerSize, ManagedLedgerInterceptorImplTest.calculatePreciseSize(ledger));
7777

7878
// cleanup.
7979
cursor.close();

pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java renamed to pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImplTest.java

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import static org.testng.Assert.assertEquals;
2222
import static org.testng.Assert.assertNotEquals;
2323
import static org.testng.Assert.assertNotNull;
24-
import static org.testng.Assert.fail;
2524

2625
import io.netty.buffer.ByteBuf;
2726
import io.netty.buffer.Unpooled;
@@ -33,6 +32,7 @@
3332
import java.util.Set;
3433
import java.util.concurrent.CompletableFuture;
3534
import java.util.concurrent.CountDownLatch;
35+
import java.util.concurrent.atomic.AtomicInteger;
3636
import java.util.function.Predicate;
3737
import lombok.Cleanup;
3838
import org.apache.bookkeeper.mledger.AsyncCallbacks;
@@ -59,8 +59,8 @@
5959
import org.testng.annotations.Test;
6060

6161
@Test(groups = "broker")
62-
public class MangedLedgerInterceptorImplTest extends MockedBookKeeperTestCase {
63-
private static final Logger log = LoggerFactory.getLogger(MangedLedgerInterceptorImplTest.class);
62+
public class ManagedLedgerInterceptorImplTest extends MockedBookKeeperTestCase {
63+
private static final Logger log = LoggerFactory.getLogger(ManagedLedgerInterceptorImplTest.class);
6464

6565
public static class TestPayloadProcessor implements ManagedLedgerPayloadProcessor {
6666
@Override
@@ -446,26 +446,33 @@ public Processor inputProcessor() {
446446
return new Processor() {
447447
@Override
448448
public ByteBuf process(Object contextObj, ByteBuf inputPayload) {
449-
throw new RuntimeException(failureMsg);
449+
Commands.skipBrokerEntryMetadataIfExist(inputPayload);
450+
if (inputPayload.readBoolean()) {
451+
throw new RuntimeException(failureMsg);
452+
}
453+
return inputPayload;
450454
}
451455

452456
@Override
453457
public void release(ByteBuf processedPayload) {
454458
// no-op
455-
fail("the release method can't be reached");
456459
}
457460
};
458461
}
459462
})));
460463

461464
var ledger = factory.open("testManagedLedgerPayloadProcessorFailure", config);
462-
var countDownLatch = new CountDownLatch(1);
465+
int count = 10;
466+
var countDownLatch = new CountDownLatch(count);
467+
var successCount = new AtomicInteger(0);
463468
var expectedException = new ArrayList<Exception>();
464-
ledger.asyncAddEntry("test".getBytes(), 1, 1, new AsyncCallbacks.AddEntryCallback() {
469+
470+
var addEntryCallback = new AsyncCallbacks.AddEntryCallback() {
465471
@Override
466472
public void addComplete(Position position, ByteBuf entryData, Object ctx) {
467473
entryData.release();
468474
countDownLatch.countDown();
475+
successCount.incrementAndGet();
469476
}
470477

471478
@Override
@@ -474,10 +481,23 @@ public void addFailed(ManagedLedgerException exception, Object ctx) {
474481
expectedException.add(exception);
475482
countDownLatch.countDown();
476483
}
477-
}, null);
484+
};
485+
486+
for (int i = 0; i < count; i++) {
487+
if (i % 2 == 0) {
488+
ledger.asyncAddEntry(Unpooled.buffer().writeBoolean(true), addEntryCallback, null);
489+
} else {
490+
ledger.asyncAddEntry(Unpooled.buffer().writeBoolean(false), addEntryCallback, null);
491+
}
492+
}
493+
478494
countDownLatch.await();
479-
assertEquals(expectedException.size(), 1);
480-
assertEquals(expectedException.get(0).getCause().getMessage(), failureMsg);
495+
assertEquals(expectedException.size(), count / 2);
496+
assertEquals(successCount.get(), count / 2);
497+
for (Exception e : expectedException) {
498+
assertEquals(e.getCause().getMessage(), failureMsg);
499+
}
500+
ledger.close();
481501
}
482502

483503
}

0 commit comments

Comments
 (0)