diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 7426059e576f6..607b6d09cc239 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -802,41 +802,33 @@ public void asyncAddEntry(ByteBuf buffer, int numberOfMessages, AddEntryCallback buffer.retain(); // Jump to specific thread to avoid contention from writers writing from different threads - final var addOperation = OpAddEntry.createNoRetainBuffer(this, buffer, numberOfMessages, callback, ctx, - currentLedgerTimeoutTriggered); - var added = false; - try { - // Use synchronized to ensure if `addOperation` is added to queue and fails later, it will be the first - // element in `pendingAddEntries`. - synchronized (this) { - if (managedLedgerInterceptor != null) { - managedLedgerInterceptor.beforeAddEntry(addOperation, addOperation.getNumberOfMessages()); - } - final var state = STATE_UPDATER.get(this); - beforeAddEntryToQueue(state); - pendingAddEntries.add(addOperation); - added = true; - afterAddEntryToQueue(state, addOperation); - } - } catch (Throwable throwable) { - if (!added) { - addOperation.failed(ManagedLedgerException.getManagedLedgerException(throwable)); - } // else: all elements of `pendingAddEntries` will fail in another thread - } + executor.execute(() -> { + OpAddEntry addOperation = OpAddEntry.createNoRetainBuffer(this, buffer, numberOfMessages, callback, ctx, + currentLedgerTimeoutTriggered); + internalAsyncAddEntry(addOperation); + }); } - protected void beforeAddEntryToQueue(State state) throws ManagedLedgerException { - if (state.isFenced()) { - throw new ManagedLedgerFencedException(); + protected synchronized void internalAsyncAddEntry(OpAddEntry addOperation) { + if (!beforeAddEntry(addOperation)) { + return; } - switch (state) { - case Terminated -> throw new ManagedLedgerTerminatedException("Managed ledger was already terminated"); - case Closed -> throw new ManagedLedgerAlreadyClosedException("Managed ledger was already closed"); - case WriteFailed -> throw new ManagedLedgerAlreadyClosedException("Waiting to recover from failure"); + final State state = STATE_UPDATER.get(this); + if (state.isFenced()) { + addOperation.failed(new ManagedLedgerFencedException()); + return; + } else if (state == State.Terminated) { + addOperation.failed(new ManagedLedgerTerminatedException("Managed ledger was already terminated")); + return; + } else if (state == State.Closed) { + addOperation.failed(new ManagedLedgerAlreadyClosedException("Managed ledger was already closed")); + return; + } else if (state == State.WriteFailed) { + addOperation.failed(new ManagedLedgerAlreadyClosedException("Waiting to recover from failure")); + return; } - } + pendingAddEntries.add(addOperation); - protected void afterAddEntryToQueue(State state, OpAddEntry addOperation) throws ManagedLedgerException { if (state == State.ClosingLedger || state == State.CreatingLedger) { // We don't have a ready ledger to write into // We are waiting for a new ledger to be created diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java index bae6cd66d2825..4b03cad8e0a1d 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java @@ -223,23 +223,25 @@ private void initLastConfirmedEntry() { } @Override - protected void beforeAddEntryToQueue(State state) throws ManagedLedgerException { + protected synchronized void internalAsyncAddEntry(OpAddEntry addOperation) { + if (!beforeAddEntry(addOperation)) { + return; + } if (state != State.LedgerOpened) { - throw new ManagedLedgerException("Managed ledger is not opened"); + addOperation.failed(new ManagedLedgerException("Managed ledger is not opened")); + return; } - } - @Override - protected void afterAddEntryToQueue(State state, OpAddEntry addOperation) throws ManagedLedgerException { if (addOperation.getCtx() == null || !(addOperation.getCtx() instanceof Position position)) { - pendingAddEntries.poll(); - throw new ManagedLedgerException("Illegal addOperation context object."); + addOperation.failed(new ManagedLedgerException("Illegal addOperation context object.")); + return; } if (log.isDebugEnabled()) { log.debug("[{}] Add entry into shadow ledger lh={} entries={}, pos=({},{})", name, currentLedger.getId(), currentLedgerEntries, position.getLedgerId(), position.getEntryId()); } + pendingAddEntries.add(addOperation); if (position.getLedgerId() <= currentLedger.getId()) { // Write into lastLedger if (position.getLedgerId() == currentLedger.getId()) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImplTest.java index 8663019efb8c1..b57b5ce94be42 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImplTest.java @@ -21,7 +21,6 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertNotNull; -import static org.testng.Assert.assertTrue; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; @@ -30,12 +29,9 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; -import java.util.Random; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Predicate; import lombok.Cleanup; @@ -503,53 +499,4 @@ public void addFailed(ManagedLedgerException exception, Object ctx) { ledger.close(); } - @Test - public void testBeforeAddEntry() throws Exception { - final var interceptor = new ManagedLedgerInterceptorImpl(getBrokerEntryMetadataInterceptors(), null); - final var config = new ManagedLedgerConfig(); - final var numEntries = 100; - config.setMaxEntriesPerLedger(numEntries); - config.setManagedLedgerInterceptor(interceptor); - @Cleanup final var ml = (ManagedLedgerImpl) factory.open("test_concurrent_add_entry", config); - - final var indexesBeforeAdd = new ArrayList(); - final var batchSizes = new ArrayList(); - final var random = new Random(); - final var latch = new CountDownLatch(numEntries); - final var executor = Executors.newFixedThreadPool(3); - final var lock = new Object(); // make sure `asyncAddEntry` are called in order - for (int i = 0; i < numEntries; i++) { - final var batchSize = random.nextInt(0, 100); - final var msg = "msg-" + i; - final var callback = new AsyncCallbacks.AddEntryCallback() { - - @Override - public void addComplete(Position position, ByteBuf entryData, Object ctx) { - latch.countDown(); - } - - @Override - public void addFailed(ManagedLedgerException exception, Object ctx) { - log.error("Failed to add {}", msg, exception); - latch.countDown(); - } - }; - executor.execute(() -> { - synchronized (lock) { - batchSizes.add((long) batchSize); - indexesBeforeAdd.add(interceptor.getIndex() + 1); // index is updated in each asyncAddEntry call - ml.asyncAddEntry(Unpooled.wrappedBuffer(msg.getBytes()), batchSize, callback, null); - } - }); - } - assertTrue(latch.await(3, TimeUnit.SECONDS)); - synchronized (lock) { - for (int i = 1; i < numEntries; i++) { - final var sum = batchSizes.get(i) + batchSizes.get(i - 1); - batchSizes.set(i, sum); - } - assertEquals(indexesBeforeAdd.subList(1, numEntries), batchSizes.subList(0, numEntries - 1)); - } - executor.shutdown(); - } }