Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package org.apache.geode.internal.net;

import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.Map;

/**
* Tracks the relationship between sliced ByteBuffers and their original parent buffers.
* This replaces the need to access internal JDK implementation classes, using only
* public Java APIs instead.
*
* When ByteBuffer.slice() is called, it creates a new buffer that shares content with
* the original. We need to track this relationship so that when returning buffers to
* the pool, we return the original pooled buffer, not the slice.
*
* This class uses IdentityHashMap (synchronized) which provides thread-safe access
* using object identity rather than equals(). This is critical because ByteBuffer.equals()
* compares buffer content and can throw IndexOutOfBoundsException if buffer position/limit
* is modified after being used as a map key. Callers must explicitly call removeTracking()
* to clean up entries when buffers are returned to the pool.
*/
class BufferAttachmentTracker {

/**
* Maps sliced buffers to their original parent buffers using object identity.
* Uses synchronized IdentityHashMap for thread-safe access without relying on
* ByteBuffer.equals() or hashCode(), which can be problematic when buffer state changes.
* Entries must be explicitly removed via removeTracking() to prevent memory leaks.
*
* Note: This static mutable field is intentionally designed for global buffer tracking
* across the application. The PMD.StaticFieldsMustBeImmutable warning is suppressed
* because:
* 1. Mutable shared state is required to track buffer relationships across all threads
* 2. IdentityHashMap uses object identity (==) avoiding equals()/hashCode() issues
* 3. Collections.synchronizedMap provides thread-safe operations
* 4. This is the most efficient design for this use case
*/
@SuppressWarnings("PMD.StaticFieldsMustBeImmutable")
private static final Map<ByteBuffer, ByteBuffer> sliceToOriginal =
Collections.synchronizedMap(new IdentityHashMap<>());

/**
* Records that a slice buffer was created from an original buffer.
*
* @param slice the sliced ByteBuffer
* @param original the original ByteBuffer that was sliced
*/
static void recordSlice(ByteBuffer slice, ByteBuffer original) {
sliceToOriginal.put(slice, original);
}

/**
* Retrieves the original buffer for a given buffer, which may be a slice.
* If the buffer is not a slice (not tracked), returns the buffer itself.
*
* @param buffer the buffer to look up, which may be a slice
* @return the original pooled buffer, or the buffer itself if not a slice
*/
static ByteBuffer getOriginal(ByteBuffer buffer) {
ByteBuffer original = sliceToOriginal.get(buffer);
return original != null ? original : buffer;
}

/**
* Removes tracking for a buffer. Should be called when returning a buffer
* to the pool to avoid memory leaks in the tracking map.
*
* @param buffer the buffer to stop tracking
*/
static void removeTracking(ByteBuffer buffer) {
sliceToOriginal.remove(buffer);
}

/**
* For testing: returns the current size of the tracking map.
*/
static int getTrackingMapSize() {
return sliceToOriginal.size();
}

/**
* For testing: clears all tracking entries.
*/
static void clearTracking() {
sliceToOriginal.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,11 @@

import org.jetbrains.annotations.NotNull;

import org.apache.geode.InternalGemFireException;
import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.distributed.internal.DMStats;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.tcp.Connection;
import org.apache.geode.unsafe.internal.sun.nio.ch.DirectBuffer;
import org.apache.geode.util.internal.GeodeGlossary;

public class BufferPool {
Expand Down Expand Up @@ -111,8 +109,11 @@ private ByteBuffer acquireDirectBuffer(int size, boolean send) {
result = acquireLargeBuffer(send, size);
}
if (result.capacity() > size) {
ByteBuffer original = result;
result.position(0).limit(size);
result = result.slice();
// Track the slice-to-original mapping to support buffer pool return
BufferAttachmentTracker.recordSlice(result, original);
}
return result;
}
Expand Down Expand Up @@ -159,19 +160,14 @@ private ByteBuffer acquirePredefinedFixedBuffer(boolean send, int size) {
// it was garbage collected
updateBufferStats(-defaultSize, ref.getSend(), true);
} else {
// Reset the buffer to full capacity - clear() resets position and sets limit to capacity
bb.clear();
if (defaultSize > size) {
bb.limit(size);
}
return bb;
}
ref = bufferTempQueue.poll();
}
result = ByteBuffer.allocateDirect(defaultSize);
updateBufferStats(defaultSize, send, true);
if (defaultSize > size) {
result.limit(size);
}
return result;
}

Expand Down Expand Up @@ -267,17 +263,51 @@ ByteBuffer expandWriteBufferIfNeeded(BufferType type, ByteBuffer existing,
}

ByteBuffer acquireDirectBuffer(BufferPool.BufferType type, int capacity) {
// This method is used by NioPlainEngine and NioSslEngine which need full-capacity buffers
// that can be reused for multiple read/write operations. We should NOT create slices here.
switch (type) {
case UNTRACKED:
return ByteBuffer.allocate(capacity);
case TRACKED_SENDER:
return acquireDirectSenderBuffer(capacity);
return acquireDirectSenderBufferNonSliced(capacity);
case TRACKED_RECEIVER:
return acquireDirectReceiveBuffer(capacity);
return acquireDirectReceiveBufferNonSliced(capacity);
}
throw new IllegalArgumentException("Unexpected buffer type " + type);
}

/**
* Acquire a direct sender buffer without slicing - returns a buffer with capacity >= requested
* size
*/
private ByteBuffer acquireDirectSenderBufferNonSliced(int size) {
if (!useDirectBuffers) {
return ByteBuffer.allocate(size);
}

if (size <= MEDIUM_BUFFER_SIZE) {
return acquirePredefinedFixedBuffer(true, size);
} else {
return acquireLargeBuffer(true, size);
}
}

/**
* Acquire a direct receive buffer without slicing - returns a buffer with capacity >= requested
* size
*/
private ByteBuffer acquireDirectReceiveBufferNonSliced(int size) {
if (!useDirectBuffers) {
return ByteBuffer.allocate(size);
}

if (size <= MEDIUM_BUFFER_SIZE) {
return acquirePredefinedFixedBuffer(false, size);
} else {
return acquireLargeBuffer(false, size);
}
}

ByteBuffer acquireNonDirectBuffer(BufferPool.BufferType type, int capacity) {
switch (type) {
case UNTRACKED:
Expand Down Expand Up @@ -310,11 +340,13 @@ void releaseBuffer(BufferPool.BufferType type, @NotNull ByteBuffer buffer) {
*/
private void releaseBuffer(ByteBuffer buffer, boolean send) {
if (buffer.isDirect()) {
buffer = getPoolableBuffer(buffer);
BBSoftReference bbRef = new BBSoftReference(buffer, send);
if (buffer.capacity() <= SMALL_BUFFER_SIZE) {
ByteBuffer original = getPoolableBuffer(buffer);
// Clean up tracking for this buffer to prevent memory leaks
BufferAttachmentTracker.removeTracking(buffer);
BBSoftReference bbRef = new BBSoftReference(original, send);
if (original.capacity() <= SMALL_BUFFER_SIZE) {
bufferSmallQueue.offer(bbRef);
} else if (buffer.capacity() <= MEDIUM_BUFFER_SIZE) {
} else if (original.capacity() <= MEDIUM_BUFFER_SIZE) {
bufferMiddleQueue.offer(bbRef);
} else {
bufferLargeQueue.offer(bbRef);
Expand All @@ -328,25 +360,14 @@ private void releaseBuffer(ByteBuffer buffer, boolean send) {
* If we hand out a buffer that is larger than the requested size we create a
* "slice" of the buffer having the requested capacity and hand that out instead.
* When we put the buffer back in the pool we need to find the original, non-sliced,
* buffer. This is held in DirectBuffer in its "attachment" field.
* buffer. This is tracked using BufferAttachmentTracker.
*
* This method is visible for use in debugging and testing. For debugging, invoke this method if
* you need to see the non-sliced buffer for some reason, such as logging its hashcode.
*/
@VisibleForTesting
ByteBuffer getPoolableBuffer(final ByteBuffer buffer) {
final Object attachment = DirectBuffer.attachment(buffer);

if (null == attachment) {
return buffer;
}

if (attachment instanceof ByteBuffer) {
return (ByteBuffer) attachment;
}

throw new InternalGemFireException("direct byte buffer attachment was not a byte buffer but a "
+ attachment.getClass().getName());
return BufferAttachmentTracker.getOriginal(buffer);
}

/**
Expand Down
Loading
Loading