Skip to content

Commit 2526c8f

Browse files
gaoran10nikhil-ctds
authored andcommitted
[fix][broker] Remove failed OpAddEntry from pendingAddEntries (apache#23817)
(cherry picked from commit 420f62e) (cherry picked from commit 0a0b3ca)
1 parent 0214ecd commit 2526c8f

File tree

2 files changed

+28
-8
lines changed

2 files changed

+28
-8
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ public void initiate() {
142142
payloadProcessorHandle = ml.getManagedLedgerInterceptor()
143143
.processPayloadBeforeLedgerWrite(this, duplicateBuffer);
144144
} catch (Exception e) {
145+
ml.pendingAddEntries.remove(this);
145146
ReferenceCountUtil.safeRelease(duplicateBuffer);
146147
log.error("[{}] Error processing payload before ledger write", ml.getName(), e);
147148
this.failed(new ManagedLedgerException.ManagedLedgerInterceptException(e));

pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImplTest.java

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,14 @@
1818
*/
1919
package org.apache.pulsar.broker.intercept;
2020

21-
import static org.testng.Assert.fail;
2221

2322
import io.netty.buffer.ByteBuf;
2423
import io.netty.buffer.Unpooled;
2524
import java.util.ArrayList;
2625
import java.util.Collections;
2726
import java.util.concurrent.CompletableFuture;
2827
import java.util.concurrent.CountDownLatch;
28+
import java.util.concurrent.atomic.AtomicInteger;
2929
import java.util.function.Predicate;
3030
import lombok.Cleanup;
3131
import org.apache.bookkeeper.mledger.AsyncCallbacks;
@@ -451,26 +451,32 @@ public Processor inputProcessor() {
451451
return new Processor() {
452452
@Override
453453
public ByteBuf process(Object contextObj, ByteBuf inputPayload) {
454-
throw new RuntimeException(failureMsg);
454+
if (inputPayload.readBoolean()) {
455+
throw new RuntimeException(failureMsg);
456+
}
457+
return inputPayload;
455458
}
456459

457460
@Override
458461
public void release(ByteBuf processedPayload) {
459462
// no-op
460-
fail("the release method can't be reached");
461463
}
462464
};
463465
}
464466
})));
465467

466468
var ledger = factory.open("testManagedLedgerPayloadProcessorFailure", config);
467-
var countDownLatch = new CountDownLatch(1);
469+
int count = 10;
470+
var countDownLatch = new CountDownLatch(count);
471+
var successCount = new AtomicInteger(0);
468472
var expectedException = new ArrayList<Exception>();
469-
ledger.asyncAddEntry("test".getBytes(), 1, 1, new AsyncCallbacks.AddEntryCallback() {
473+
474+
var addEntryCallback = new AsyncCallbacks.AddEntryCallback() {
470475
@Override
471476
public void addComplete(Position position, ByteBuf entryData, Object ctx) {
472477
entryData.release();
473478
countDownLatch.countDown();
479+
successCount.incrementAndGet();
474480
}
475481

476482
@Override
@@ -479,10 +485,23 @@ public void addFailed(ManagedLedgerException exception, Object ctx) {
479485
expectedException.add(exception);
480486
countDownLatch.countDown();
481487
}
482-
}, null);
488+
};
489+
490+
for (int i = 0; i < count; i++) {
491+
if (i % 2 == 0) {
492+
ledger.asyncAddEntry(Unpooled.buffer().writeBoolean(true), addEntryCallback, null);
493+
} else {
494+
ledger.asyncAddEntry(Unpooled.buffer().writeBoolean(false), addEntryCallback, null);
495+
}
496+
}
497+
483498
countDownLatch.await();
484-
assertEquals(expectedException.size(), 1);
485-
assertEquals(expectedException.get(0).getCause().getMessage(), failureMsg);
499+
assertEquals(expectedException.size(), count / 2);
500+
assertEquals(successCount.get(), count / 2);
501+
for (Exception e : expectedException) {
502+
assertEquals(e.getCause().getMessage(), failureMsg);
503+
}
504+
ledger.close();
486505
}
487506

488507
}

0 commit comments

Comments
 (0)