Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.bookkeeper.mledger.impl;

import java.util.Optional;
import lombok.Getter;
import lombok.Setter;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;

Expand All @@ -31,6 +33,11 @@ public class AckSetPositionImpl implements Position, AckSetState {
protected final long ledgerId;
protected final long entryId;
protected volatile long[] ackSet;
@Getter
@Setter
private volatile int batchMessagesAckedCount;
@Getter
private volatile boolean positionRemovedFromCursor;

public AckSetPositionImpl(long ledgerId, long entryId, long[] ackSet) {
this.ledgerId = ledgerId;
Expand Down Expand Up @@ -63,6 +70,10 @@ public Position getNext() {
}
}

public void markPositionRemovedFromCursor() {
this.positionRemovedFromCursor = true;
}

@Override
public String toString() {
return ledgerId + ":" + entryId + " (ackSet " + (ackSet == null ? "is null" :
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@
* ackSet state and to extract the state.
*/
public interface AckSetState {

int BATCH_MESSAGE_ACKED_AT_ONCE = -1;
int BATCH_MESSAGE_ACKED_FIRST_PART = -2;

/**
* Get the ackSet bitset information encoded as a long array.
* @return the ackSet
Expand All @@ -38,6 +42,14 @@ public interface AckSetState {
*/
void setAckSet(long[] ackSet);

void setBatchMessagesAckedCount(int messagesCountAcked);

int getBatchMessagesAckedCount();

void markPositionRemovedFromCursor();

boolean isPositionRemovedFromCursor();

/**
* Check if the ackSet is set.
* @return true if the ackSet is set, false otherwise
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,36 @@ public static long[] getAckSetArrayOrNull(Position position) {
return maybeGetAckSetState(position).map(AckSetState::getAckSet).orElse(null);
}

public static void setBatchMessagesAckedCount(Position position, int messagesCountAcked) {
Optional<AckSetState> ackSetState = maybeGetAckSetState(position);
if (ackSetState.isPresent()) {
ackSetState.get().setBatchMessagesAckedCount(messagesCountAcked);
}
}

public static int getBatchMessagesAckedCount(Position position) {
Optional<AckSetState> ackSetState = maybeGetAckSetState(position);
if (ackSetState.isPresent()) {
return ackSetState.get().getBatchMessagesAckedCount();
}
return 0;
}

public static void markPositionRemovedFromCursor(Position position) {
Optional<AckSetState> ackSetState = maybeGetAckSetState(position);
if (ackSetState.isPresent()) {
ackSetState.get().markPositionRemovedFromCursor();
}
}

public static boolean isPositionRemovedFromCursor(Position position) {
Optional<AckSetState> ackSetState = maybeGetAckSetState(position);
if (ackSetState.isPresent()) {
return ackSetState.get().isPositionRemovedFromCursor();
}
return true;
}

/**
* Get the AckSetState instance from the position.
* @param position position which contains the AckSetState
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
import static org.apache.bookkeeper.mledger.ManagedLedgerException.getManagedLedgerException;
import static org.apache.bookkeeper.mledger.impl.AckSetState.BATCH_MESSAGE_ACKED_AT_ONCE;
import static org.apache.bookkeeper.mledger.impl.AckSetState.BATCH_MESSAGE_ACKED_FIRST_PART;
import static org.apache.bookkeeper.mledger.impl.EntryCountEstimator.estimateEntryCountByBytesSize;
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC;
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_RETRIES;
Expand Down Expand Up @@ -103,6 +105,7 @@
import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo.Builder;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.StringProperty;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
Expand Down Expand Up @@ -2361,7 +2364,7 @@ public void asyncDelete(Iterable<Position> positions, AsyncCallbacks.DeleteCallb
Position newMarkDeletePosition = null;

lock.writeLock().lock();

final MutableBoolean cbHasExecuted = new MutableBoolean(false);
try {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Deleting individual messages at {}. Current status: {} - md-position: {}",
Expand Down Expand Up @@ -2391,8 +2394,12 @@ public void asyncDelete(Iterable<Position> positions, AsyncCallbacks.DeleteCallb
}
long[] ackSet = AckSetStateUtil.getAckSetArrayOrNull(position);
if (ackSet == null) {
if (batchDeletedIndexes != null) {
batchDeletedIndexes.remove(position);
AckSetStateUtil.markPositionRemovedFromCursor(position);
BitSet bitSet;
if (batchDeletedIndexes == null || (bitSet = batchDeletedIndexes.remove(position)) == null) {
AckSetStateUtil.setBatchMessagesAckedCount(position, BATCH_MESSAGE_ACKED_AT_ONCE);
} else {
AckSetStateUtil.setBatchMessagesAckedCount(position, bitSet.cardinality());
}
// Add a range (prev, pos] to the set. Adding the previous entry as an open limit to the range will
// make the RangeSet recognize the "continuity" between adjacent Positions.
Expand All @@ -2416,9 +2423,15 @@ public void asyncDelete(Iterable<Position> positions, AsyncCallbacks.DeleteCallb
final var givenBitSet = BitSet.valueOf(ackSet);
final var bitSet = batchDeletedIndexes.computeIfAbsent(position, __ -> givenBitSet);
if (givenBitSet != bitSet) {
int unAckedBefore = bitSet.cardinality();
bitSet.and(givenBitSet);
int unAckedAfter = bitSet.cardinality();
AckSetStateUtil.setBatchMessagesAckedCount(position, unAckedBefore - unAckedAfter);
} else {
AckSetStateUtil.setBatchMessagesAckedCount(position, BATCH_MESSAGE_ACKED_FIRST_PART);
}
if (bitSet.isEmpty()) {
AckSetStateUtil.markPositionRemovedFromCursor(position);
Position previousPosition = ledger.getPreviousPosition(position);
individualDeletedMessages.addOpenClosed(previousPosition.getLedgerId(),
previousPosition.getEntryId(),
Expand Down Expand Up @@ -2478,14 +2491,17 @@ public void asyncDelete(Iterable<Position> positions, AsyncCallbacks.DeleteCallb
lock.writeLock().unlock();
if (empty) {
callback.deleteComplete(ctx);
cbHasExecuted.setTrue();
}
}

// Apply rate limiting to mark-delete operations
if (markDeleteLimiter != null && !markDeleteLimiter.tryAcquire()) {
isDirty = true;
updateLastMarkDeleteEntryToLatest(newMarkDeletePosition, null);
callback.deleteComplete(ctx);
if (!cbHasExecuted.booleanValue()) {
callback.deleteComplete(ctx);
}
return;
}

Expand All @@ -2496,12 +2512,18 @@ public void asyncDelete(Iterable<Position> positions, AsyncCallbacks.DeleteCallb
internalAsyncMarkDelete(newMarkDeletePosition, properties, new MarkDeleteCallback() {
@Override
public void markDeleteComplete(Object ctx) {
callback.deleteComplete(ctx);
if (!cbHasExecuted.booleanValue()) {
callback.deleteComplete(ctx);
cbHasExecuted.setTrue();
}
}

@Override
public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
callback.deleteFailed(exception, ctx);
if (!cbHasExecuted.booleanValue()) {
callback.deleteFailed(exception, ctx);
cbHasExecuted.setTrue();
}
}

}, ctx);
Expand All @@ -2512,7 +2534,9 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
log.debug("[{}] Consumer {} cursor asyncDelete error, counters: consumed {} mdPos {} rdPos {}",
ledger.getName(), name, messagesConsumedCounter, markDeletePosition, readPosition);
}
callback.deleteFailed(new ManagedLedgerException(e), ctx);
if (!cbHasExecuted.booleanValue()) {
callback.deleteFailed(new ManagedLedgerException(e), ctx);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ public int filterEntriesForConsumer(@Nullable MessageMetadata[] metadataArray, i

private void individualAcknowledgeMessageIfNeeded(List<Position> positions, Map<String, Long> properties) {
if (!(subscription instanceof PulsarCompactorSubscription)) {
subscription.acknowledgeMessage(positions, AckType.Individual, properties);
subscription.acknowledgeMessage(positions, AckType.Individual, properties, null);
}
}

Expand Down
Loading
Loading