Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -802,41 +802,33 @@ public void asyncAddEntry(ByteBuf buffer, int numberOfMessages, AddEntryCallback
buffer.retain();

// Jump to specific thread to avoid contention from writers writing from different threads
final var addOperation = OpAddEntry.createNoRetainBuffer(this, buffer, numberOfMessages, callback, ctx,
currentLedgerTimeoutTriggered);
var added = false;
try {
// Use synchronized to ensure if `addOperation` is added to queue and fails later, it will be the first
// element in `pendingAddEntries`.
synchronized (this) {
if (managedLedgerInterceptor != null) {
managedLedgerInterceptor.beforeAddEntry(addOperation, addOperation.getNumberOfMessages());
}
final var state = STATE_UPDATER.get(this);
beforeAddEntryToQueue(state);
pendingAddEntries.add(addOperation);
added = true;
afterAddEntryToQueue(state, addOperation);
}
} catch (Throwable throwable) {
if (!added) {
addOperation.failed(ManagedLedgerException.getManagedLedgerException(throwable));
} // else: all elements of `pendingAddEntries` will fail in another thread
}
executor.execute(() -> {
OpAddEntry addOperation = OpAddEntry.createNoRetainBuffer(this, buffer, numberOfMessages, callback, ctx,
currentLedgerTimeoutTriggered);
internalAsyncAddEntry(addOperation);
});
}

protected void beforeAddEntryToQueue(State state) throws ManagedLedgerException {
if (state.isFenced()) {
throw new ManagedLedgerFencedException();
protected synchronized void internalAsyncAddEntry(OpAddEntry addOperation) {
if (!beforeAddEntry(addOperation)) {
return;
}
switch (state) {
case Terminated -> throw new ManagedLedgerTerminatedException("Managed ledger was already terminated");
case Closed -> throw new ManagedLedgerAlreadyClosedException("Managed ledger was already closed");
case WriteFailed -> throw new ManagedLedgerAlreadyClosedException("Waiting to recover from failure");
final State state = STATE_UPDATER.get(this);
if (state.isFenced()) {
addOperation.failed(new ManagedLedgerFencedException());
return;
} else if (state == State.Terminated) {
addOperation.failed(new ManagedLedgerTerminatedException("Managed ledger was already terminated"));
return;
} else if (state == State.Closed) {
addOperation.failed(new ManagedLedgerAlreadyClosedException("Managed ledger was already closed"));
return;
} else if (state == State.WriteFailed) {
addOperation.failed(new ManagedLedgerAlreadyClosedException("Waiting to recover from failure"));
return;
}
}
pendingAddEntries.add(addOperation);

protected void afterAddEntryToQueue(State state, OpAddEntry addOperation) throws ManagedLedgerException {
if (state == State.ClosingLedger || state == State.CreatingLedger) {
// We don't have a ready ledger to write into
// We are waiting for a new ledger to be created
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,23 +223,25 @@ private void initLastConfirmedEntry() {
}

@Override
protected void beforeAddEntryToQueue(State state) throws ManagedLedgerException {
protected synchronized void internalAsyncAddEntry(OpAddEntry addOperation) {
if (!beforeAddEntry(addOperation)) {
return;
}
if (state != State.LedgerOpened) {
throw new ManagedLedgerException("Managed ledger is not opened");
addOperation.failed(new ManagedLedgerException("Managed ledger is not opened"));
return;
}
}

@Override
protected void afterAddEntryToQueue(State state, OpAddEntry addOperation) throws ManagedLedgerException {
if (addOperation.getCtx() == null || !(addOperation.getCtx() instanceof Position position)) {
pendingAddEntries.poll();
throw new ManagedLedgerException("Illegal addOperation context object.");
addOperation.failed(new ManagedLedgerException("Illegal addOperation context object."));
return;
}

if (log.isDebugEnabled()) {
log.debug("[{}] Add entry into shadow ledger lh={} entries={}, pos=({},{})",
name, currentLedger.getId(), currentLedgerEntries, position.getLedgerId(), position.getEntryId());
}
pendingAddEntries.add(addOperation);
if (position.getLedgerId() <= currentLedger.getId()) {
// Write into lastLedger
if (position.getLedgerId() == currentLedger.getId()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
Expand All @@ -30,12 +29,9 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import lombok.Cleanup;
Expand Down Expand Up @@ -503,53 +499,4 @@ public void addFailed(ManagedLedgerException exception, Object ctx) {
ledger.close();
}

@Test
public void testBeforeAddEntry() throws Exception {
final var interceptor = new ManagedLedgerInterceptorImpl(getBrokerEntryMetadataInterceptors(), null);
final var config = new ManagedLedgerConfig();
final var numEntries = 100;
config.setMaxEntriesPerLedger(numEntries);
config.setManagedLedgerInterceptor(interceptor);
@Cleanup final var ml = (ManagedLedgerImpl) factory.open("test_concurrent_add_entry", config);

final var indexesBeforeAdd = new ArrayList<Long>();
final var batchSizes = new ArrayList<Long>();
final var random = new Random();
final var latch = new CountDownLatch(numEntries);
final var executor = Executors.newFixedThreadPool(3);
final var lock = new Object(); // make sure `asyncAddEntry` are called in order
for (int i = 0; i < numEntries; i++) {
final var batchSize = random.nextInt(0, 100);
final var msg = "msg-" + i;
final var callback = new AsyncCallbacks.AddEntryCallback() {

@Override
public void addComplete(Position position, ByteBuf entryData, Object ctx) {
latch.countDown();
}

@Override
public void addFailed(ManagedLedgerException exception, Object ctx) {
log.error("Failed to add {}", msg, exception);
latch.countDown();
}
};
executor.execute(() -> {
synchronized (lock) {
batchSizes.add((long) batchSize);
indexesBeforeAdd.add(interceptor.getIndex() + 1); // index is updated in each asyncAddEntry call
ml.asyncAddEntry(Unpooled.wrappedBuffer(msg.getBytes()), batchSize, callback, null);
}
});
}
assertTrue(latch.await(3, TimeUnit.SECONDS));
synchronized (lock) {
for (int i = 1; i < numEntries; i++) {
final var sum = batchSizes.get(i) + batchSizes.get(i - 1);
batchSizes.set(i, sum);
}
assertEquals(indexesBeforeAdd.subList(1, numEntries), batchSizes.subList(0, numEntries - 1));
}
executor.shutdown();
}
}
Loading