Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.netty.util.Timeout;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -68,8 +69,8 @@
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
import org.jspecify.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -1309,7 +1310,7 @@ protected boolean isValidConsumerEpoch(MessageImpl<T> message) {
return true;
}

protected boolean isSingleMessageAcked(BitSetRecyclable ackBitSet, int batchIndex) {
protected boolean isSingleMessageAcked(@Nullable BitSet ackBitSet, int batchIndex) {
return ackBitSet != null && !ackBitSet.get(batchIndex);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,8 @@
import org.apache.pulsar.common.util.CompletableFutureCancellationHandler;
import org.apache.pulsar.common.util.ExceptionHandler;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.SafeCollectionUtils;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.apache.pulsar.common.util.collections.ConcurrentBitSetRecyclable;
import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
import org.jspecify.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -1288,7 +1286,7 @@ protected <V> MessageImpl<V> newSingleMessage(final int index,
final MessageIdImpl messageId,
final Schema<V> schema,
final boolean containMetadata,
final BitSetRecyclable ackBitSet,
@Nullable final BitSet ackBitSet,
final BitSet ackSetInMessageId,
final int redeliveryCount,
final long consumerEpoch,
Expand Down Expand Up @@ -1396,7 +1394,7 @@ protected void processPayloadByProcessor(final BrokerEntryMetadata brokerEntryMe
final MessageIdImpl messageId,
final Schema<T> schema,
final int redeliveryCount,
final List<Long> ackSet,
final long[] ackSet,
long consumerEpoch) {
final MessagePayloadImpl payload = MessagePayloadImpl.create(byteBuf);
final MessagePayloadContextImpl entryContext = MessagePayloadContextImpl.get(
Expand Down Expand Up @@ -1426,12 +1424,14 @@ protected void processPayloadByProcessor(final BrokerEntryMetadata brokerEntryMe
}

void messageReceived(CommandMessage cmdMessage, ByteBuf headersAndPayload, ClientCnx cnx) {
List<Long> ackSet = Collections.emptyList();
final long[] ackSet;
if (cmdMessage.getAckSetsCount() > 0) {
ackSet = new ArrayList<>(cmdMessage.getAckSetsCount());
ackSet = new long[cmdMessage.getAckSetsCount()];
for (int i = 0; i < cmdMessage.getAckSetsCount(); i++) {
ackSet.add(cmdMessage.getAckSetAt(i));
ackSet[i] = cmdMessage.getAckSetAt(i);
}
} else {
ackSet = new long[0];
}
int redeliveryCount = cmdMessage.getRedeliveryCount();
MessageIdData messageId = cmdMessage.getMessageId();
Expand Down Expand Up @@ -1767,7 +1767,7 @@ private void interceptAndComplete(final Message<T> message, final CompletableFut
}

void receiveIndividualMessagesFromBatch(BrokerEntryMetadata brokerEntryMetadata, MessageMetadata msgMetadata,
int redeliveryCount, List<Long> ackSet, ByteBuf uncompressedPayload,
int redeliveryCount, long[] ackSet, ByteBuf uncompressedPayload,
MessageIdData messageId, ClientCnx cnx, long consumerEpoch,
boolean isEncrypted) {
int batchSize = msgMetadata.getNumMessagesInBatch();
Expand All @@ -1781,10 +1781,7 @@ void receiveIndividualMessagesFromBatch(BrokerEntryMetadata brokerEntryMetadata,
}

BitSet ackSetInMessageId = BatchMessageIdImpl.newAckSet(batchSize);
BitSetRecyclable ackBitSet = null;
if (ackSet != null && ackSet.size() > 0) {
ackBitSet = BitSetRecyclable.valueOf(SafeCollectionUtils.longListToArray(ackSet));
}
BitSet ackBitSet = ackSet.length > 0 ? BitSet.valueOf(ackSet) : null;

SingleMessageMetadata singleMessageMetadata = new SingleMessageMetadata();
int skippedMessages = 0;
Expand Down Expand Up @@ -1818,9 +1815,6 @@ void receiveIndividualMessagesFromBatch(BrokerEntryMetadata brokerEntryMetadata,
}
executeNotifyCallback(message);
}
if (ackBitSet != null) {
ackBitSet.recycle();
}
} catch (IllegalStateException e) {
log.warn("[{}] [{}] unable to obtain message in batch", subscription, consumerName, e);
discardCorruptedMessage(messageId, cnx, ValidationError.BatchDeSerializeError);
Expand Down Expand Up @@ -2676,11 +2670,10 @@ public CompletableFuture<Void> seekAsync(MessageId messageId) {
} else {
final long[] ackSetArr;
if (MessageIdAdvUtils.isBatch(msgId)) {
final BitSetRecyclable ackSet = BitSetRecyclable.create();
final BitSet ackSet = new BitSet();
ackSet.set(0, msgId.getBatchSize());
ackSet.clear(0, Math.max(msgId.getBatchIndex(), 0));
ackSetArr = ackSet.toLongArray();
ackSet.recycle();
} else {
ackSetArr = new long[0];
}
Expand Down Expand Up @@ -3163,18 +3156,17 @@ private CompletableFuture<Void> doTransactionAcknowledgeForResponse(MessageId me
final long entryId = messageIdAdv.getEntryId();
final List<ByteBuf> cmdList;
if (MessageIdAdvUtils.isBatch(messageIdAdv)) {
BitSetRecyclable bitSetRecyclable = BitSetRecyclable.create();
bitSetRecyclable.set(0, messageIdAdv.getBatchSize());
BitSet bitSet = new BitSet();
bitSet.set(0, messageIdAdv.getBatchSize());
if (ackType == AckType.Cumulative) {
MessageIdAdvUtils.acknowledge(messageIdAdv, false);
bitSetRecyclable.clear(0, messageIdAdv.getBatchIndex() + 1);
bitSet.clear(0, messageIdAdv.getBatchIndex() + 1);
} else {
bitSetRecyclable.clear(messageIdAdv.getBatchIndex());
bitSet.clear(messageIdAdv.getBatchIndex());
}
cmdList = Collections.singletonList(Commands.newAck(consumerId, ledgerId, entryId, bitSetRecyclable,
cmdList = Collections.singletonList(Commands.newAck(consumerId, ledgerId, entryId, bitSet,
ackType, validationError, properties, txnID.getLeastSigBits(), txnID.getMostSigBits(), requestId,
messageIdAdv.getBatchSize()));
bitSetRecyclable.recycle();
} else {
MessageIdImpl[] chunkMsgIds = this.unAckedChunkedMessageIdSequenceMap.remove(messageIdAdv);
// cumulative ack chunk by the last messageId
Expand All @@ -3184,8 +3176,7 @@ private CompletableFuture<Void> doTransactionAcknowledgeForResponse(MessageId me
} else {
if (Commands.peerSupportsMultiMessageAcknowledgment(
getClientCnx().getRemoteEndpointProtocolVersion())) {
List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entriesToAck =
new ArrayList<>(chunkMsgIds.length);
List<Triple<Long, Long, BitSet>> entriesToAck = new ArrayList<>(chunkMsgIds.length);
for (MessageIdImpl cMsgId : chunkMsgIds) {
if (cMsgId != null && chunkMsgIds.length > 1) {
entriesToAck.add(Triple.of(cMsgId.getLedgerId(), cMsgId.getEntryId(), null));
Expand Down Expand Up @@ -3222,7 +3213,7 @@ private CompletableFuture<Void> doTransactionAcknowledgeForResponse(MessageId me
}

private ByteBuf newMultiTransactionMessageAck(long consumerId, TxnID txnID,
List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entries,
List<Triple<Long, Long, BitSet>> entries,
long requestID) {
BaseCommand cmd = newMultiMessageAckCommon(entries);
cmd.getAck()
Expand All @@ -3241,7 +3232,7 @@ protected BaseCommand initialValue() throws Exception {
}
};

private static BaseCommand newMultiMessageAckCommon(List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entries) {
private static BaseCommand newMultiMessageAckCommon(List<Triple<Long, Long, BitSet>> entries) {
BaseCommand cmd = LOCAL_BASE_COMMAND.get()
.clear()
.setType(BaseCommand.Type.ACK);
Expand All @@ -3250,7 +3241,7 @@ private static BaseCommand newMultiMessageAckCommon(List<Triple<Long, Long, Conc
for (int i = 0; i < entriesCount; i++) {
long ledgerId = entries.get(i).getLeft();
long entryId = entries.get(i).getMiddle();
ConcurrentBitSetRecyclable bitSet = entries.get(i).getRight();
BitSet bitSet = entries.get(i).getRight();
MessageIdData msgId = ack.addMessageId()
.setLedgerId(ledgerId)
.setEntryId(entryId);
Expand All @@ -3259,7 +3250,6 @@ private static BaseCommand newMultiMessageAckCommon(List<Triple<Long, Long, Conc
for (int j = 0; j < ackSet.length; j++) {
msgId.addAckSet(ackSet[j]);
}
bitSet.recycle();
}
}

Expand All @@ -3276,19 +3266,18 @@ private CompletableFuture<Void> doTransactionAcknowledgeForResponse(List<Message
messageIdData.setLedgerId(messageIdAdv.getLedgerId());
messageIdData.setEntryId(messageIdAdv.getEntryId());
if (MessageIdAdvUtils.isBatch(messageIdAdv)) {
final BitSetRecyclable bitSetRecyclable = BitSetRecyclable.create();
bitSetRecyclable.set(0, messageIdAdv.getBatchSize());
final BitSet bitSet = new BitSet();
bitSet.set(0, messageIdAdv.getBatchSize());
messageIdData.setBatchSize(messageIdAdv.getBatchSize());
if (ackType == AckType.Cumulative) {
MessageIdAdvUtils.acknowledge(messageIdAdv, false);
bitSetRecyclable.clear(0, messageIdAdv.getBatchIndex() + 1);
bitSet.clear(0, messageIdAdv.getBatchIndex() + 1);
} else {
bitSetRecyclable.clear(messageIdAdv.getBatchIndex());
bitSet.clear(messageIdAdv.getBatchIndex());
}
for (long x : bitSetRecyclable.toLongArray()) {
for (long x : bitSet.toLongArray()) {
messageIdData.addAckSet(x);
}
bitSetRecyclable.recycle();
}

messageIdDataList.add(messageIdData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import io.netty.buffer.ByteBuf;
import io.netty.util.Recycler;
import java.util.BitSet;
import java.util.List;
import lombok.NonNull;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessagePayload;
Expand All @@ -32,8 +31,6 @@
import org.apache.pulsar.common.api.proto.KeyValue;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.SingleMessageMetadata;
import org.apache.pulsar.common.util.SafeCollectionUtils;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;

public class MessagePayloadContextImpl implements MessagePayloadContext {

Expand All @@ -52,7 +49,7 @@ protected MessagePayloadContextImpl newObject(Handle<MessagePayloadContextImpl>
private ConsumerImpl<?> consumer;
private int redeliveryCount;
private BitSet ackSetInMessageId;
private BitSetRecyclable ackBitSet;
private BitSet ackBitSet;
private long consumerEpoch;

private MessagePayloadContextImpl(final Recycler.Handle<MessagePayloadContextImpl> handle) {
Expand All @@ -64,7 +61,7 @@ public static MessagePayloadContextImpl get(final BrokerEntryMetadata brokerEntr
@NonNull final MessageIdImpl messageId,
@NonNull final ConsumerImpl<?> consumer,
final int redeliveryCount,
final List<Long> ackSet,
final long[] ackSet,
final long consumerEpoch) {
final MessagePayloadContextImpl context = RECYCLER.get();
context.consumerEpoch = consumerEpoch;
Expand All @@ -75,9 +72,7 @@ public static MessagePayloadContextImpl get(final BrokerEntryMetadata brokerEntr
context.consumer = consumer;
context.redeliveryCount = redeliveryCount;
context.ackSetInMessageId = BatchMessageIdImpl.newAckSet(context.getNumMessages());
context.ackBitSet = (ackSet != null && ackSet.size() > 0)
? BitSetRecyclable.valueOf(SafeCollectionUtils.longListToArray(ackSet))
: null;
context.ackBitSet = (ackSet.length > 0 ? BitSet.valueOf(ackSet) : null);
return context;
}

Expand All @@ -90,10 +85,7 @@ public void recycle() {
redeliveryCount = 0;
consumerEpoch = DEFAULT_CONSUMER_EPOCH;
ackSetInMessageId = null;
if (ackBitSet != null) {
ackBitSet.recycle();
ackBitSet = null;
}
ackBitSet = null;
recyclerHandle.recycle(this);
}

Expand Down
Loading
Loading