Skip to content

Commit 41c7e29

Browse files
gaoran10lhotari
authored andcommitted
[fix][broker] Remove failed OpAddEntry from pendingAddEntries (#23817)
(cherry picked from commit 420f62e)
1 parent 470e28f commit 41c7e29

File tree

3 files changed

+34
-14
lines changed

3 files changed

+34
-14
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ public void initiate() {
142142
payloadProcessorHandle = ml.getManagedLedgerInterceptor()
143143
.processPayloadBeforeLedgerWrite(this, duplicateBuffer);
144144
} catch (Exception e) {
145+
ml.pendingAddEntries.remove(this);
145146
ReferenceCountUtil.safeRelease(duplicateBuffer);
146147
log.error("[{}] Error processing payload before ledger write", ml.getName(), e);
147148
this.failed(new ManagedLedgerException.ManagedLedgerInterceptException(e));

pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/MangedLedgerInterceptorImpl2Test.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
package org.apache.bookkeeper.mledger.impl;
2020

2121
import static org.testng.Assert.assertEquals;
22-
import static org.apache.pulsar.broker.intercept.MangedLedgerInterceptorImplTest.TestPayloadProcessor;
22+
import static org.apache.pulsar.broker.intercept.ManagedLedgerInterceptorImplTest.TestPayloadProcessor;
2323
import java.util.HashSet;
2424
import java.util.Set;
2525
import lombok.extern.slf4j.Slf4j;
@@ -28,13 +28,13 @@
2828
import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
2929
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
3030
import org.apache.pulsar.broker.intercept.ManagedLedgerInterceptorImpl;
31-
import org.apache.pulsar.broker.intercept.MangedLedgerInterceptorImplTest;
31+
import org.apache.pulsar.broker.intercept.ManagedLedgerInterceptorImplTest;
3232
import org.apache.pulsar.common.intercept.ManagedLedgerPayloadProcessor;
3333
import org.awaitility.Awaitility;
3434
import org.testng.annotations.Test;
3535

3636
/***
37-
* Differ to {@link MangedLedgerInterceptorImplTest}, this test can call {@link ManagedLedgerImpl}'s methods modified
37+
* Differ to {@link ManagedLedgerInterceptorImplTest}, this test can call {@link ManagedLedgerImpl}'s methods modified
3838
* by "default".
3939
*/
4040
@Slf4j
@@ -73,7 +73,7 @@ public void testCurrentLedgerSizeCorrectIfHasInterceptor() throws Exception {
7373
switchLedgerManually(ledger);
7474

7575
// verify.
76-
assertEquals(currentLedgerSize, MangedLedgerInterceptorImplTest.calculatePreciseSize(ledger));
76+
assertEquals(currentLedgerSize, ManagedLedgerInterceptorImplTest.calculatePreciseSize(ledger));
7777

7878
// cleanup.
7979
cursor.close();

pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java renamed to pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImplTest.java

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,14 @@
1818
*/
1919
package org.apache.pulsar.broker.intercept;
2020

21-
import static org.testng.Assert.fail;
2221

2322
import io.netty.buffer.ByteBuf;
2423
import io.netty.buffer.Unpooled;
2524
import java.util.ArrayList;
2625
import java.util.Collections;
2726
import java.util.concurrent.CompletableFuture;
2827
import java.util.concurrent.CountDownLatch;
28+
import java.util.concurrent.atomic.AtomicInteger;
2929
import java.util.function.Predicate;
3030
import lombok.Cleanup;
3131
import org.apache.bookkeeper.mledger.AsyncCallbacks;
@@ -63,8 +63,8 @@
6363
import static org.testng.Assert.assertNotNull;
6464

6565
@Test(groups = "broker")
66-
public class MangedLedgerInterceptorImplTest extends MockedBookKeeperTestCase {
67-
private static final Logger log = LoggerFactory.getLogger(MangedLedgerInterceptorImplTest.class);
66+
public class ManagedLedgerInterceptorImplTest extends MockedBookKeeperTestCase {
67+
private static final Logger log = LoggerFactory.getLogger(ManagedLedgerInterceptorImplTest.class);
6868

6969
public static class TestPayloadProcessor implements ManagedLedgerPayloadProcessor {
7070
@Override
@@ -451,26 +451,32 @@ public Processor inputProcessor() {
451451
return new Processor() {
452452
@Override
453453
public ByteBuf process(Object contextObj, ByteBuf inputPayload) {
454-
throw new RuntimeException(failureMsg);
454+
if (inputPayload.readBoolean()) {
455+
throw new RuntimeException(failureMsg);
456+
}
457+
return inputPayload;
455458
}
456459

457460
@Override
458461
public void release(ByteBuf processedPayload) {
459462
// no-op
460-
fail("the release method can't be reached");
461463
}
462464
};
463465
}
464466
})));
465467

466468
var ledger = factory.open("testManagedLedgerPayloadProcessorFailure", config);
467-
var countDownLatch = new CountDownLatch(1);
469+
int count = 10;
470+
var countDownLatch = new CountDownLatch(count);
471+
var successCount = new AtomicInteger(0);
468472
var expectedException = new ArrayList<Exception>();
469-
ledger.asyncAddEntry("test".getBytes(), 1, 1, new AsyncCallbacks.AddEntryCallback() {
473+
474+
var addEntryCallback = new AsyncCallbacks.AddEntryCallback() {
470475
@Override
471476
public void addComplete(Position position, ByteBuf entryData, Object ctx) {
472477
entryData.release();
473478
countDownLatch.countDown();
479+
successCount.incrementAndGet();
474480
}
475481

476482
@Override
@@ -479,10 +485,23 @@ public void addFailed(ManagedLedgerException exception, Object ctx) {
479485
expectedException.add(exception);
480486
countDownLatch.countDown();
481487
}
482-
}, null);
488+
};
489+
490+
for (int i = 0; i < count; i++) {
491+
if (i % 2 == 0) {
492+
ledger.asyncAddEntry(Unpooled.buffer().writeBoolean(true), addEntryCallback, null);
493+
} else {
494+
ledger.asyncAddEntry(Unpooled.buffer().writeBoolean(false), addEntryCallback, null);
495+
}
496+
}
497+
483498
countDownLatch.await();
484-
assertEquals(expectedException.size(), 1);
485-
assertEquals(expectedException.get(0).getCause().getMessage(), failureMsg);
499+
assertEquals(expectedException.size(), count / 2);
500+
assertEquals(successCount.get(), count / 2);
501+
for (Exception e : expectedException) {
502+
assertEquals(e.getCause().getMessage(), failureMsg);
503+
}
504+
ledger.close();
486505
}
487506

488507
}

0 commit comments

Comments
 (0)