diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java index 6512399173f0a..8a6cdb5372aab 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java @@ -31,7 +31,7 @@ public final class EntryImpl extends AbstractCASReferenceCounted implements Entry, Comparable, ReferenceCounted { - private static final Recycler RECYCLER = new Recycler() { + private static final Recycler RECYCLER = new Recycler<>() { @Override protected EntryImpl newObject(Handle handle) { return new EntryImpl(handle); @@ -47,56 +47,57 @@ protected EntryImpl newObject(Handle handle) { private Runnable onDeallocate; public static EntryImpl create(LedgerEntry ledgerEntry) { - EntryImpl entry = RECYCLER.get(); + EntryImpl entry = getEntryFromRecycler(); entry.timestamp = System.nanoTime(); entry.ledgerId = ledgerEntry.getLedgerId(); entry.entryId = ledgerEntry.getEntryId(); entry.data = ledgerEntry.getEntryBuffer(); entry.data.retain(); - entry.setRefCnt(1); + return entry; + } + + private static EntryImpl getEntryFromRecycler() { + EntryImpl entry = RECYCLER.get(); + entry.resetRefCnt(); return entry; } @VisibleForTesting public static EntryImpl create(long ledgerId, long entryId, byte[] data) { - EntryImpl entry = RECYCLER.get(); + EntryImpl entry = getEntryFromRecycler(); entry.timestamp = System.nanoTime(); entry.ledgerId = ledgerId; entry.entryId = entryId; entry.data = Unpooled.wrappedBuffer(data); - entry.setRefCnt(1); return entry; } public static EntryImpl create(long ledgerId, long entryId, ByteBuf data) { - EntryImpl entry = RECYCLER.get(); + EntryImpl entry = getEntryFromRecycler(); entry.timestamp = System.nanoTime(); entry.ledgerId = ledgerId; entry.entryId = entryId; entry.data = data; entry.data.retain(); - entry.setRefCnt(1); return entry; } public static EntryImpl create(PositionImpl position, ByteBuf data) { - EntryImpl entry = RECYCLER.get(); + EntryImpl entry = getEntryFromRecycler(); entry.timestamp = System.nanoTime(); entry.ledgerId = position.getLedgerId(); entry.entryId = position.getEntryId(); entry.data = data; entry.data.retain(); - entry.setRefCnt(1); return entry; } public static EntryImpl create(EntryImpl other) { - EntryImpl entry = RECYCLER.get(); + EntryImpl entry = getEntryFromRecycler(); entry.timestamp = System.nanoTime(); entry.ledgerId = other.ledgerId; entry.entryId = other.entryId; entry.data = other.data.retainedDuplicate(); - entry.setRefCnt(1); return entry; } @@ -121,16 +122,19 @@ public void onDeallocate(Runnable r) { } public long getTimestamp() { + checkRefCount(); return timestamp; } @Override public ByteBuf getDataBuffer() { + checkRefCount(); return data; } @Override public byte[] getData() { + checkRefCount(); byte[] array = new byte[data.readableBytes()]; data.getBytes(data.readerIndex(), array); return array; @@ -139,6 +143,7 @@ public byte[] getData() { // Only for test @Override public byte[] getDataAndRelease() { + checkRefCount(); byte[] array = getData(); release(); return array; @@ -146,26 +151,31 @@ public byte[] getDataAndRelease() { @Override public int getLength() { + checkRefCount(); return data.readableBytes(); } @Override public PositionImpl getPosition() { + checkRefCount(); return new PositionImpl(ledgerId, entryId); } @Override public long getLedgerId() { + checkRefCount(); return ledgerId; } @Override public long getEntryId() { + checkRefCount(); return entryId; } @Override public int compareTo(EntryImpl other) { + checkRefCount(); if (this.ledgerId != other.ledgerId) { return this.ledgerId < other.ledgerId ? -1 : 1; } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/AbstractCASReferenceCounted.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/AbstractCASReferenceCounted.java index 8b0e25f1348ca..15fbcb0eea52d 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/AbstractCASReferenceCounted.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/AbstractCASReferenceCounted.java @@ -32,6 +32,9 @@ */ public abstract class AbstractCASReferenceCounted implements ReferenceCounted { + private static final boolean refCountCheckOnAccess = + Boolean.parseBoolean(System.getProperty("pulsar.refcount.check.on_access", "true")); + private static final AtomicIntegerFieldUpdater refCntUpdater = AtomicIntegerFieldUpdater.newUpdater(AbstractCASReferenceCounted.class, "refCnt"); @@ -42,13 +45,6 @@ public final int refCnt() { return refCnt; } - /** - * An unsafe operation intended for use by a subclass that sets the reference count of the buffer directly. - */ - protected final void setRefCnt(int refCnt) { - refCntUpdater.set(this, refCnt); - } - @Override public ReferenceCounted retain() { return retain0(1); @@ -60,7 +56,7 @@ public ReferenceCounted retain(int increment) { } private ReferenceCounted retain0(int increment) { - for (;;) { + for (; ; ) { int refCnt = this.refCnt; final int nextCnt = refCnt + increment; @@ -91,7 +87,7 @@ public boolean release(int decrement) { } private boolean release0(int decrement) { - for (;;) { + for (; ; ) { int refCnt = this.refCnt; if (refCnt < decrement) { throw new IllegalReferenceCountException(refCnt, -decrement); @@ -111,4 +107,20 @@ private boolean release0(int decrement) { * Called once {@link #refCnt()} is equals 0. */ protected abstract void deallocate(); + + public final void resetRefCnt() { + refCntUpdater.set(this, 1); + } + + /** + * Validate that the instance hasn't been released before accessing fields. + * This is a sanity check to ensure that we don't read fields from deallocated objects. + */ + protected void checkRefCount() { + if (refCountCheckOnAccess && refCnt() < 1) { + throw new IllegalReferenceCountException( + "Possible double release bug (refCnt=" + refCnt() + "). " + getClass().getSimpleName() + + " has been deallocated. "); + } + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 4908d10f330b3..ef49337b18547 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -32,7 +32,6 @@ import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables; import com.google.common.annotations.VisibleForTesting; import io.netty.buffer.ByteBuf; -import io.netty.util.AbstractReferenceCounted; import io.netty.util.Recycler; import io.netty.util.Recycler.Handle; import io.netty.util.ReferenceCountUtil; @@ -92,6 +91,7 @@ import org.apache.pulsar.common.protocol.schema.SchemaVersion; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; +import org.apache.pulsar.common.util.AbstractValidatingReferenceCounted; import org.apache.pulsar.common.util.DateFormatter; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.RelativeTimeUtil; @@ -1357,11 +1357,12 @@ protected boolean verifyLocalBufferIsNotCorrupted(OpSendMsg op) { } } - static class ChunkedMessageCtx extends AbstractReferenceCounted { + static class ChunkedMessageCtx extends AbstractValidatingReferenceCounted { protected MessageIdImpl firstChunkMessageId; protected MessageIdImpl lastChunkMessageId; public ChunkMessageIdImpl getChunkMessageId() { + checkRefCount(); return new ChunkMessageIdImpl(firstChunkMessageId, lastChunkMessageId); } @@ -1374,8 +1375,10 @@ protected ProducerImpl.ChunkedMessageCtx newObject( }; public static ChunkedMessageCtx get(int totalChunks) { - ChunkedMessageCtx chunkedMessageCtx = RECYCLER.get(); - chunkedMessageCtx.setRefCnt(totalChunks); + ChunkedMessageCtx chunkedMessageCtx = getAndCheck(RECYCLER); + if (totalChunks > 1) { + chunkedMessageCtx.retain(totalChunks - 1); + } return chunkedMessageCtx; } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/ReferenceCountedMessageMetadata.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/ReferenceCountedMessageMetadata.java index 3d7c0f4c54f82..01737a84ff129 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/ReferenceCountedMessageMetadata.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/ReferenceCountedMessageMetadata.java @@ -19,15 +19,15 @@ package org.apache.pulsar.common.api.raw; import io.netty.buffer.ByteBuf; -import io.netty.util.AbstractReferenceCounted; import io.netty.util.Recycler; import io.netty.util.ReferenceCounted; import org.apache.pulsar.common.api.proto.MessageMetadata; +import org.apache.pulsar.common.util.AbstractValidatingReferenceCounted; /** * Class representing a reference-counted object that requires explicit deallocation. */ -public class ReferenceCountedMessageMetadata extends AbstractReferenceCounted { +public class ReferenceCountedMessageMetadata extends AbstractValidatingReferenceCounted { private static final Recycler RECYCLER = // new Recycler() { @@ -46,10 +46,9 @@ private ReferenceCountedMessageMetadata(Recycler.Handle recyclerHandle) { * @return */ public static ByteBufPair get(ByteBuf b1, ByteBuf b2) { - ByteBufPair buf = RECYCLER.get(); - buf.setRefCnt(1); + ByteBufPair buf = getAndCheck(RECYCLER); buf.b1 = b1; buf.b2 = b2; return buf; } public ByteBuf getFirst() { + checkRefCount(); return b1; } public ByteBuf getSecond() { + checkRefCount(); return b2; } public int readableBytes() { + checkRefCount(); return b1.readableBytes() + b2.readableBytes(); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/AbstractValidatingReferenceCounted.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/AbstractValidatingReferenceCounted.java new file mode 100644 index 0000000000000..82ed5ad7d9520 --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/AbstractValidatingReferenceCounted.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.util; + +import io.netty.util.AbstractReferenceCounted; +import io.netty.util.IllegalReferenceCountException; +import io.netty.util.Recycler; + +public abstract class AbstractValidatingReferenceCounted extends AbstractReferenceCounted { + private static final boolean refCountCheckOnAccess = + Boolean.parseBoolean(System.getProperty("pulsar.refcount.check.on_access", "true")); + + /** + * Validate that the instance hasn't been released before accessing fields. + * This is a sanity check to ensure that we don't read fields from deallocated objects. + */ + protected void checkRefCount() { + if (refCountCheckOnAccess && refCnt() < 1) { + throw new IllegalReferenceCountException( + "Possible double release bug (refCnt=" + refCnt() + "). " + getClass().getSimpleName() + + " has been deallocated. "); + } + } + + public final void resetRefCnt() { + setRefCnt(1); + } + + public static T getAndCheck(Recycler recycler) { + T object = recycler.get(); + object.resetRefCnt(); + return object; + } +}