Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.NavigableMap;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Predicate;
import org.apache.bookkeeper.common.annotation.InterfaceAudience;
import org.apache.bookkeeper.common.annotation.InterfaceStability;
Expand Down Expand Up @@ -76,6 +77,8 @@ public interface ManagedLedger {
* data entry to be persisted
* @return the Position at which the entry has been inserted
* @throws ManagedLedgerException
* @apiNote This method is not guaranteed to be thread safe. The caller should be responsible to call all
* {@link ManagedLedger#asyncAddEntry} and {@link ManagedLedger#addEntry} overloads in order.
*/
Position addEntry(byte[] data) throws InterruptedException, ManagedLedgerException;

Expand All @@ -88,6 +91,8 @@ public interface ManagedLedger {
* numberOfMessages of entry
* @return the Position at which the entry has been inserted
* @throws ManagedLedgerException
* @apiNote This method is not guaranteed to be thread safe. The caller should be responsible to call all
* {@link ManagedLedger#asyncAddEntry} and {@link ManagedLedger#addEntry} overloads in order.
*/
Position addEntry(byte[] data, int numberOfMessages) throws InterruptedException, ManagedLedgerException;

Expand All @@ -102,6 +107,8 @@ public interface ManagedLedger {
* callback object
* @param ctx
* opaque context
* @apiNote This method is not guaranteed to be thread safe. The caller should be responsible to call all
* {@link ManagedLedger#asyncAddEntry} and {@link ManagedLedger#addEntry} overloads in order.
*/
void asyncAddEntry(byte[] data, AddEntryCallback callback, Object ctx);

Expand All @@ -116,6 +123,8 @@ public interface ManagedLedger {
* number of bytes
* @return the Position at which the entry has been inserted
* @throws ManagedLedgerException
* @apiNote This method is not guaranteed to be thread safe. The caller should be responsible to call all
* {@link ManagedLedger#asyncAddEntry} and {@link ManagedLedger#addEntry} overloads in order.
*/
Position addEntry(byte[] data, int offset, int length) throws InterruptedException, ManagedLedgerException;

Expand All @@ -132,6 +141,8 @@ public interface ManagedLedger {
* number of bytes
* @return the Position at which the entry has been inserted
* @throws ManagedLedgerException
* @apiNote This method is not guaranteed to be thread safe. The caller should be responsible to call all
* {@link ManagedLedger#asyncAddEntry} and {@link ManagedLedger#addEntry} overloads in order.
*/
Position addEntry(byte[] data, int numberOfMessages, int offset, int length) throws InterruptedException,
ManagedLedgerException;
Expand All @@ -150,6 +161,8 @@ Position addEntry(byte[] data, int numberOfMessages, int offset, int length) thr
* callback object
* @param ctx
* opaque context
* @apiNote This method is not guaranteed to be thread safe. The caller should be responsible to call all
* {@link ManagedLedger#asyncAddEntry} and {@link ManagedLedger#addEntry} overloads in order.
*/
void asyncAddEntry(byte[] data, int offset, int length, AddEntryCallback callback, Object ctx);

Expand All @@ -169,6 +182,8 @@ Position addEntry(byte[] data, int numberOfMessages, int offset, int length) thr
* callback object
* @param ctx
* opaque context
* @apiNote This method is not guaranteed to be thread safe. The caller should be responsible to call all
* {@link ManagedLedger#asyncAddEntry} and {@link ManagedLedger#addEntry} overloads in order.
*/
void asyncAddEntry(byte[] data, int numberOfMessages, int offset, int length, AddEntryCallback callback,
Object ctx);
Expand All @@ -184,6 +199,8 @@ void asyncAddEntry(byte[] data, int numberOfMessages, int offset, int length, Ad
* callback object
* @param ctx
* opaque context
* @apiNote This method is not guaranteed to be thread safe. The caller should be responsible to call all
* {@link ManagedLedger#asyncAddEntry} and {@link ManagedLedger#addEntry} overloads in order.
*/
void asyncAddEntry(ByteBuf buffer, AddEntryCallback callback, Object ctx);

Expand All @@ -199,6 +216,8 @@ void asyncAddEntry(byte[] data, int numberOfMessages, int offset, int length, Ad
* callback object
* @param ctx
* opaque context
* @apiNote This method is not guaranteed to be thread safe. The caller should be responsible to call all
* {@link ManagedLedger#asyncAddEntry} and {@link ManagedLedger#addEntry} overloads in order.
*/
void asyncAddEntry(ByteBuf buffer, int numberOfMessages, AddEntryCallback callback, Object ctx);

Expand Down Expand Up @@ -733,4 +752,13 @@ default CompletableFuture<Position> getLastDispatchablePosition(final Predicate<
}

Position getFirstPosition();

/**
* In the internal implementation of a managed ledger, it is reasonable to use a single-thread executor to execute
* tasks that need to be performed in order. By exposing this internal executor, the caller can synchronize its
* custom tasks, as well as the internal tasks.
*/
default Executor getExecutor() {
return Runnable::run;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -798,34 +798,30 @@ public void asyncAddEntry(ByteBuf buffer, int numberOfMessages, AddEntryCallback
log.debug("[{}] asyncAddEntry size={} state={}", name, buffer.readableBytes(), state);
}

// retain buffer in this thread
// The buffer will be queued in `pendingAddEntries` and might be polled later in a different thread. However,
// the caller could release it after this method returns. To ensure the buffer is not released when it's polled,
// increase the reference count, which should be decreased by `OpAddEntry`'s methods later.
buffer.retain();

// Jump to specific thread to avoid contention from writers writing from different threads
final var addOperation = OpAddEntry.createNoRetainBuffer(this, buffer, numberOfMessages, callback, ctx,
currentLedgerTimeoutTriggered);
var added = false;
try {
// Use synchronized to ensure if `addOperation` is added to queue and fails later, it will be the first
// element in `pendingAddEntries`.
synchronized (this) {
if (managedLedgerInterceptor != null) {
managedLedgerInterceptor.beforeAddEntry(addOperation, addOperation.getNumberOfMessages());
}
final var state = STATE_UPDATER.get(this);
beforeAddEntryToQueue(state);
pendingAddEntries.add(addOperation);
added = true;
afterAddEntryToQueue(state, addOperation);
if (managedLedgerInterceptor != null) {
managedLedgerInterceptor.beforeAddEntry(addOperation, addOperation.getNumberOfMessages());
}
beforeAddEntryToQueue();
pendingAddEntries.add(addOperation);
added = true;
afterAddEntryToQueue(addOperation);
} catch (Throwable throwable) {
if (!added) {
addOperation.failed(ManagedLedgerException.getManagedLedgerException(throwable));
} // else: all elements of `pendingAddEntries` will fail in another thread
}
}

protected void beforeAddEntryToQueue(State state) throws ManagedLedgerException {
protected void beforeAddEntryToQueue() throws ManagedLedgerException {
final var state = STATE_UPDATER.get(this);
if (state.isFenced()) {
throw new ManagedLedgerFencedException();
}
Expand All @@ -836,7 +832,9 @@ protected void beforeAddEntryToQueue(State state) throws ManagedLedgerException
}
}

protected void afterAddEntryToQueue(State state, OpAddEntry addOperation) throws ManagedLedgerException {
// TODO: does this method really need to be synchronized?
protected synchronized void afterAddEntryToQueue(OpAddEntry addOperation) throws ManagedLedgerException {
final var state = STATE_UPDATER.get(this);
if (state == State.ClosingLedger || state == State.CreatingLedger) {
// We don't have a ready ledger to write into
// We are waiting for a new ledger to be created
Expand Down Expand Up @@ -895,22 +893,6 @@ protected void afterFailedAddEntry(int numOfMessages) {
managedLedgerInterceptor.afterFailedAddEntry(numOfMessages);
}

protected boolean beforeAddEntry(OpAddEntry addOperation) {
// if no interceptor, just return true to make sure addOperation will be initiate()
if (managedLedgerInterceptor == null) {
return true;
}
try {
managedLedgerInterceptor.beforeAddEntry(addOperation, addOperation.getNumberOfMessages());
return true;
} catch (Exception e) {
addOperation.failed(
new ManagedLedgerInterceptException("Interceptor managed ledger before add to bookie failed."));
log.error("[{}] Failed to intercept adding an entry to bookie.", name, e);
return false;
}
}

@Override
public void readyToCreateNewLedger() {
// only set transition state to ClosedLedger if current state is WriteFailed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,14 +223,14 @@ private void initLastConfirmedEntry() {
}

@Override
protected void beforeAddEntryToQueue(State state) throws ManagedLedgerException {
if (state != State.LedgerOpened) {
protected void beforeAddEntryToQueue() throws ManagedLedgerException {
if (STATE_UPDATER.get(this) != State.LedgerOpened) {
throw new ManagedLedgerException("Managed ledger is not opened");
}
}

@Override
protected void afterAddEntryToQueue(State state, OpAddEntry addOperation) throws ManagedLedgerException {
protected void afterAddEntryToQueue(OpAddEntry addOperation) throws ManagedLedgerException {
if (addOperation.getCtx() == null || !(addOperation.getCtx() instanceof Position position)) {
pendingAddEntries.poll();
throw new ManagedLedgerException("Illegal addOperation context object.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,10 @@ public void verifyAsyncReadEntryUsingCache() throws Exception {
// wait for all threads to be ready to start at once
barrier.await();
while (!done.get()) {
Position position = ledger.addEntry("entry".getBytes());
Position position;
synchronized (this) {
position = ledger.addEntry("entry".getBytes());
}
positions.add(position);
Thread.sleep(1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -671,8 +671,21 @@ public void updateSubscribeRateLimiter() {
}

private void asyncAddEntry(ByteBuf headersAndPayload, PublishContext publishContext) {
ledger.asyncAddEntry(headersAndPayload,
(int) publishContext.getNumberOfMessages(), this, publishContext);
// Retain the buffer in advance to avoid the buffer might have been released when it's passed to `asyncAddEntry`
final var buffer = headersAndPayload.retain();
try {
ledger.getExecutor().execute(() -> {
try {
ledger.asyncAddEntry(buffer, (int) publishContext.getNumberOfMessages(), this, publishContext);
} finally {
buffer.release();
}
});
} catch (Exception e) {
buffer.release(); // decrease the reference count retained at the beginning of this method
buffer.release(); // release the original headersAndPayload since it won't be used anymore
publishContext.completed(e, -1L, -1L);
}
}

public void asyncReadEntry(Position position, AsyncCallbacks.ReadEntryCallback callback, Object ctx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
Expand All @@ -72,6 +73,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -296,16 +298,36 @@ public void testCreateTopicMLFailure() {

@Test
public void testPublishMessage() throws Exception {

// Only allow at most 1 pending task
final var mlIOExecutor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1));
doAnswer(invocationOnMock -> {
final ByteBuf payload = (ByteBuf) invocationOnMock.getArguments()[0];
final AddEntryCallback callback = (AddEntryCallback) invocationOnMock.getArguments()[2];
final Topic.PublishContext ctx = (Topic.PublishContext) invocationOnMock.getArguments()[3];
callback.addComplete(PositionFactory.LATEST, payload, ctx);
mlIOExecutor.execute(() -> {
callback.addComplete(PositionFactory.LATEST, payload, ctx);
payload.release();
});
return null;
}).when(ledgerMock).asyncAddEntry(any(ByteBuf.class), anyInt(), any(AddEntryCallback.class), any());
doReturn(mlIOExecutor).when(ledgerMock).getExecutor();

PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
verifyMessagePublish(topic, true);

mlIOExecutor.execute(() -> {
try {
Thread.sleep(30000);
} catch (InterruptedException ignored) {
}
});
mlIOExecutor.execute(() -> {}); // make the work queue full
verifyMessagePublish(topic, false);

mlIOExecutor.shutdown();
}

private void verifyMessagePublish(PersistentTopic topic, boolean success) throws Exception {
long lastMaxReadPositionMovedForwardTimestamp = topic.getLastMaxReadPositionMovedForwardTimestamp();

/*
Expand All @@ -315,27 +337,35 @@ public void testPublishMessage() throws Exception {
*/
ByteBuf payload = Unpooled.wrappedBuffer("content".getBytes());

final CountDownLatch latch = new CountDownLatch(1);

final var future = new CompletableFuture<Position>();
final Topic.PublishContext publishContext = new Topic.PublishContext() {
@Override
public void completed(Exception e, long ledgerId, long entryId) {
assertEquals(ledgerId, PositionFactory.LATEST.getLedgerId());
assertEquals(entryId, PositionFactory.LATEST.getEntryId());
latch.countDown();
if (e == null) {
future.complete(PositionFactory.create(ledgerId, entryId));
} else {
future.completeExceptionally(e);
}
}

@Override
public void setMetadataFromEntryData(ByteBuf entryData) {
// This method must be invoked before `completed`
assertEquals(latch.getCount(), 1);
assertFalse(future.isDone());
assertEquals(entryData.array(), payload.array());
}
};
topic.publishMessage(payload, publishContext);

assertTrue(latch.await(1, TimeUnit.SECONDS));
assertTrue(topic.getLastMaxReadPositionMovedForwardTimestamp() > lastMaxReadPositionMovedForwardTimestamp);
try {
assertEquals(future.get(3, TimeUnit.SECONDS), PositionFactory.LATEST);
assertTrue(topic.getLastMaxReadPositionMovedForwardTimestamp() > lastMaxReadPositionMovedForwardTimestamp);
assertTrue(success);
} catch (ExecutionException ignored) {
assertFalse(success);
}

Awaitility.await().atMost(1, TimeUnit.SECONDS).untilAsserted(() -> assertEquals(payload.refCnt(), 0));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
Expand Down Expand Up @@ -2922,6 +2923,7 @@ private void setupMLAsyncCallbackMocks() {
cursorMock = mock(ManagedCursor.class);
doReturn(new ArrayList<>()).when(ledgerMock).getCursors();
doReturn(new ManagedLedgerConfig()).when(ledgerMock).getConfig();
doReturn((Executor) Runnable::run).when(ledgerMock).getExecutor();

// call openLedgerComplete with ledgerMock on ML factory asyncOpen
doAnswer((Answer<Object>) invocationOnMock -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
Expand Down Expand Up @@ -252,6 +253,7 @@ public void testIsDuplicateWithFailure() {
ManagedLedger managedLedger = mock(ManagedLedger.class);
MessageDeduplication messageDeduplication = spy(new MessageDeduplication(pulsarService, mock(PersistentTopic.class), managedLedger));
doReturn(true).when(messageDeduplication).isEnabled();
doReturn((Executor) Runnable::run).when(managedLedger).getExecutor();


EventLoopGroup eventLoopGroup = mock(EventLoopGroup.class);
Expand Down
Loading