diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/BufferAttachmentTracker.java b/geode-core/src/main/java/org/apache/geode/internal/net/BufferAttachmentTracker.java new file mode 100644 index 000000000000..67bc775c2622 --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/internal/net/BufferAttachmentTracker.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.internal.net; + +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.IdentityHashMap; +import java.util.Map; + +/** + * Tracks the relationship between sliced ByteBuffers and their original parent buffers. + * This replaces the need to access internal JDK implementation classes, using only + * public Java APIs instead. + * + * When ByteBuffer.slice() is called, it creates a new buffer that shares content with + * the original. We need to track this relationship so that when returning buffers to + * the pool, we return the original pooled buffer, not the slice. + * + * This class uses IdentityHashMap (synchronized) which provides thread-safe access + * using object identity rather than equals(). This is critical because ByteBuffer.equals() + * compares buffer content and can throw IndexOutOfBoundsException if buffer position/limit + * is modified after being used as a map key. Callers must explicitly call removeTracking() + * to clean up entries when buffers are returned to the pool. + */ +class BufferAttachmentTracker { + + /** + * Maps sliced buffers to their original parent buffers using object identity. + * Uses synchronized IdentityHashMap for thread-safe access without relying on + * ByteBuffer.equals() or hashCode(), which can be problematic when buffer state changes. + * Entries must be explicitly removed via removeTracking() to prevent memory leaks. + * + * Note: This static mutable field is intentionally designed for global buffer tracking + * across the application. The PMD.StaticFieldsMustBeImmutable warning is suppressed + * because: + * 1. Mutable shared state is required to track buffer relationships across all threads + * 2. IdentityHashMap uses object identity (==) avoiding equals()/hashCode() issues + * 3. Collections.synchronizedMap provides thread-safe operations + * 4. This is the most efficient design for this use case + */ + @SuppressWarnings("PMD.StaticFieldsMustBeImmutable") + private static final Map sliceToOriginal = + Collections.synchronizedMap(new IdentityHashMap<>()); + + /** + * Records that a slice buffer was created from an original buffer. + * + * @param slice the sliced ByteBuffer + * @param original the original ByteBuffer that was sliced + */ + static void recordSlice(ByteBuffer slice, ByteBuffer original) { + sliceToOriginal.put(slice, original); + } + + /** + * Retrieves the original buffer for a given buffer, which may be a slice. + * If the buffer is not a slice (not tracked), returns the buffer itself. + * + * @param buffer the buffer to look up, which may be a slice + * @return the original pooled buffer, or the buffer itself if not a slice + */ + static ByteBuffer getOriginal(ByteBuffer buffer) { + ByteBuffer original = sliceToOriginal.get(buffer); + return original != null ? original : buffer; + } + + /** + * Removes tracking for a buffer. Should be called when returning a buffer + * to the pool to avoid memory leaks in the tracking map. + * + * @param buffer the buffer to stop tracking + */ + static void removeTracking(ByteBuffer buffer) { + sliceToOriginal.remove(buffer); + } + + /** + * For testing: returns the current size of the tracking map. + */ + static int getTrackingMapSize() { + return sliceToOriginal.size(); + } + + /** + * For testing: clears all tracking entries. + */ + static void clearTracking() { + sliceToOriginal.clear(); + } +} diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/BufferPool.java b/geode-core/src/main/java/org/apache/geode/internal/net/BufferPool.java index 56c0b7328c0b..09a1b1796858 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/net/BufferPool.java +++ b/geode-core/src/main/java/org/apache/geode/internal/net/BufferPool.java @@ -22,13 +22,11 @@ import org.jetbrains.annotations.NotNull; -import org.apache.geode.InternalGemFireException; import org.apache.geode.annotations.VisibleForTesting; import org.apache.geode.distributed.internal.DMStats; import org.apache.geode.distributed.internal.DistributionConfig; import org.apache.geode.internal.Assert; import org.apache.geode.internal.tcp.Connection; -import org.apache.geode.unsafe.internal.sun.nio.ch.DirectBuffer; import org.apache.geode.util.internal.GeodeGlossary; public class BufferPool { @@ -111,8 +109,11 @@ private ByteBuffer acquireDirectBuffer(int size, boolean send) { result = acquireLargeBuffer(send, size); } if (result.capacity() > size) { + ByteBuffer original = result; result.position(0).limit(size); result = result.slice(); + // Track the slice-to-original mapping to support buffer pool return + BufferAttachmentTracker.recordSlice(result, original); } return result; } @@ -159,19 +160,14 @@ private ByteBuffer acquirePredefinedFixedBuffer(boolean send, int size) { // it was garbage collected updateBufferStats(-defaultSize, ref.getSend(), true); } else { + // Reset the buffer to full capacity - clear() resets position and sets limit to capacity bb.clear(); - if (defaultSize > size) { - bb.limit(size); - } return bb; } ref = bufferTempQueue.poll(); } result = ByteBuffer.allocateDirect(defaultSize); updateBufferStats(defaultSize, send, true); - if (defaultSize > size) { - result.limit(size); - } return result; } @@ -267,17 +263,51 @@ ByteBuffer expandWriteBufferIfNeeded(BufferType type, ByteBuffer existing, } ByteBuffer acquireDirectBuffer(BufferPool.BufferType type, int capacity) { + // This method is used by NioPlainEngine and NioSslEngine which need full-capacity buffers + // that can be reused for multiple read/write operations. We should NOT create slices here. switch (type) { case UNTRACKED: return ByteBuffer.allocate(capacity); case TRACKED_SENDER: - return acquireDirectSenderBuffer(capacity); + return acquireDirectSenderBufferNonSliced(capacity); case TRACKED_RECEIVER: - return acquireDirectReceiveBuffer(capacity); + return acquireDirectReceiveBufferNonSliced(capacity); } throw new IllegalArgumentException("Unexpected buffer type " + type); } + /** + * Acquire a direct sender buffer without slicing - returns a buffer with capacity >= requested + * size + */ + private ByteBuffer acquireDirectSenderBufferNonSliced(int size) { + if (!useDirectBuffers) { + return ByteBuffer.allocate(size); + } + + if (size <= MEDIUM_BUFFER_SIZE) { + return acquirePredefinedFixedBuffer(true, size); + } else { + return acquireLargeBuffer(true, size); + } + } + + /** + * Acquire a direct receive buffer without slicing - returns a buffer with capacity >= requested + * size + */ + private ByteBuffer acquireDirectReceiveBufferNonSliced(int size) { + if (!useDirectBuffers) { + return ByteBuffer.allocate(size); + } + + if (size <= MEDIUM_BUFFER_SIZE) { + return acquirePredefinedFixedBuffer(false, size); + } else { + return acquireLargeBuffer(false, size); + } + } + ByteBuffer acquireNonDirectBuffer(BufferPool.BufferType type, int capacity) { switch (type) { case UNTRACKED: @@ -310,11 +340,13 @@ void releaseBuffer(BufferPool.BufferType type, @NotNull ByteBuffer buffer) { */ private void releaseBuffer(ByteBuffer buffer, boolean send) { if (buffer.isDirect()) { - buffer = getPoolableBuffer(buffer); - BBSoftReference bbRef = new BBSoftReference(buffer, send); - if (buffer.capacity() <= SMALL_BUFFER_SIZE) { + ByteBuffer original = getPoolableBuffer(buffer); + // Clean up tracking for this buffer to prevent memory leaks + BufferAttachmentTracker.removeTracking(buffer); + BBSoftReference bbRef = new BBSoftReference(original, send); + if (original.capacity() <= SMALL_BUFFER_SIZE) { bufferSmallQueue.offer(bbRef); - } else if (buffer.capacity() <= MEDIUM_BUFFER_SIZE) { + } else if (original.capacity() <= MEDIUM_BUFFER_SIZE) { bufferMiddleQueue.offer(bbRef); } else { bufferLargeQueue.offer(bbRef); @@ -328,25 +360,14 @@ private void releaseBuffer(ByteBuffer buffer, boolean send) { * If we hand out a buffer that is larger than the requested size we create a * "slice" of the buffer having the requested capacity and hand that out instead. * When we put the buffer back in the pool we need to find the original, non-sliced, - * buffer. This is held in DirectBuffer in its "attachment" field. + * buffer. This is tracked using BufferAttachmentTracker. * * This method is visible for use in debugging and testing. For debugging, invoke this method if * you need to see the non-sliced buffer for some reason, such as logging its hashcode. */ @VisibleForTesting ByteBuffer getPoolableBuffer(final ByteBuffer buffer) { - final Object attachment = DirectBuffer.attachment(buffer); - - if (null == attachment) { - return buffer; - } - - if (attachment instanceof ByteBuffer) { - return (ByteBuffer) attachment; - } - - throw new InternalGemFireException("direct byte buffer attachment was not a byte buffer but a " - + attachment.getClass().getName()); + return BufferAttachmentTracker.getOriginal(buffer); } /** diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/BufferAttachmentTrackerTest.java b/geode-core/src/test/java/org/apache/geode/internal/net/BufferAttachmentTrackerTest.java new file mode 100644 index 000000000000..aa37d9b64aa9 --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/internal/net/BufferAttachmentTrackerTest.java @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.internal.net; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.nio.ByteBuffer; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.After; +import org.junit.Test; + +/** + * Unit tests for BufferAttachmentTracker. + */ +public class BufferAttachmentTrackerTest { + + @After + public void tearDown() { + // Clean up after each test + BufferAttachmentTracker.clearTracking(); + } + + @Test + public void getOriginal_returnsOriginalBufferForSlice() { + ByteBuffer original = ByteBuffer.allocateDirect(1024); + original.position(0).limit(512); + ByteBuffer slice = original.slice(); + + BufferAttachmentTracker.recordSlice(slice, original); + + ByteBuffer result = BufferAttachmentTracker.getOriginal(slice); + + assertThat(result).isSameAs(original); + } + + @Test + public void getOriginal_returnsBufferItselfWhenNotTracked() { + ByteBuffer buffer = ByteBuffer.allocateDirect(1024); + + ByteBuffer result = BufferAttachmentTracker.getOriginal(buffer); + + assertThat(result).isSameAs(buffer); + } + + @Test + public void removeTracking_removesSliceMapping() { + ByteBuffer original = ByteBuffer.allocateDirect(1024); + original.position(0).limit(512); + ByteBuffer slice = original.slice(); + + BufferAttachmentTracker.recordSlice(slice, original); + assertThat(BufferAttachmentTracker.getTrackingMapSize()).isEqualTo(1); + + BufferAttachmentTracker.removeTracking(slice); + + assertThat(BufferAttachmentTracker.getTrackingMapSize()).isEqualTo(0); + assertThat(BufferAttachmentTracker.getOriginal(slice)).isSameAs(slice); + } + + @Test + public void trackingMapSize_reflectsCurrentMappings() { + assertThat(BufferAttachmentTracker.getTrackingMapSize()).isEqualTo(0); + + ByteBuffer original1 = ByteBuffer.allocateDirect(1024); + ByteBuffer slice1 = original1.slice(); + BufferAttachmentTracker.recordSlice(slice1, original1); + assertThat(BufferAttachmentTracker.getTrackingMapSize()).isEqualTo(1); + + ByteBuffer original2 = ByteBuffer.allocateDirect(2048); + ByteBuffer slice2 = original2.slice(); + BufferAttachmentTracker.recordSlice(slice2, original2); + assertThat(BufferAttachmentTracker.getTrackingMapSize()).isEqualTo(2); + } + + @Test + public void clearTracking_removesAllMappings() { + ByteBuffer original1 = ByteBuffer.allocateDirect(1024); + ByteBuffer slice1 = original1.slice(); + BufferAttachmentTracker.recordSlice(slice1, original1); + + ByteBuffer original2 = ByteBuffer.allocateDirect(2048); + ByteBuffer slice2 = original2.slice(); + BufferAttachmentTracker.recordSlice(slice2, original2); + + assertThat(BufferAttachmentTracker.getTrackingMapSize()).isEqualTo(2); + + BufferAttachmentTracker.clearTracking(); + + assertThat(BufferAttachmentTracker.getTrackingMapSize()).isEqualTo(0); + } + + @Test + public void recordSlice_canOverwriteExistingMapping() { + ByteBuffer original1 = ByteBuffer.allocateDirect(1024); + ByteBuffer original2 = ByteBuffer.allocateDirect(2048); + ByteBuffer slice = original1.slice(); + + BufferAttachmentTracker.recordSlice(slice, original1); + assertThat(BufferAttachmentTracker.getOriginal(slice)).isSameAs(original1); + + BufferAttachmentTracker.recordSlice(slice, original2); + assertThat(BufferAttachmentTracker.getOriginal(slice)).isSameAs(original2); + } + + @Test + public void worksWithHeapBuffers() { + ByteBuffer original = ByteBuffer.allocate(1024); + original.position(0).limit(512); + ByteBuffer slice = original.slice(); + + BufferAttachmentTracker.recordSlice(slice, original); + + ByteBuffer result = BufferAttachmentTracker.getOriginal(slice); + + assertThat(result).isSameAs(original); + } + + @Test + public void simpleThreadSafetyTest() { + // Create a single original and slice + ByteBuffer original = ByteBuffer.allocateDirect(1024); + ByteBuffer slice = original.slice(); + + // Record it + BufferAttachmentTracker.recordSlice(slice, original); + + // Immediately retrieve it + ByteBuffer result = BufferAttachmentTracker.getOriginal(slice); + + // Should get back the exact same original + assertThat(result).isSameAs(original); + assertThat(result).isNotSameAs(slice); + + System.out.println("Original identity: " + System.identityHashCode(original)); + System.out.println("Slice identity: " + System.identityHashCode(slice)); + System.out.println("Result identity: " + System.identityHashCode(result)); + } + + /** + * Thread-safety test: Concurrent reads and writes on the same slice. + * This verifies that race conditions don't cause incorrect mappings. + */ + @Test + public void concurrentAccessToSameSlice_isThreadSafe() throws InterruptedException { + final int numThreads = 10; + final int iterations = 1000; + final ExecutorService executor = Executors.newFixedThreadPool(numThreads); + final CountDownLatch startLatch = new CountDownLatch(1); + final CountDownLatch doneLatch = new CountDownLatch(numThreads); + final AtomicInteger errors = new AtomicInteger(0); + + ByteBuffer original = ByteBuffer.allocateDirect(1024); + ByteBuffer slice = original.slice(); + + for (int i = 0; i < numThreads; i++) { + executor.submit(() -> { + try { + startLatch.await(); + + for (int j = 0; j < iterations; j++) { + // Record the mapping + BufferAttachmentTracker.recordSlice(slice, original); + + // Immediately retrieve it + ByteBuffer retrieved = BufferAttachmentTracker.getOriginal(slice); + + // Should always get the original back + if (retrieved != original) { + errors.incrementAndGet(); + } + } + } catch (Exception e) { + errors.incrementAndGet(); + e.printStackTrace(); + } finally { + doneLatch.countDown(); + } + }); + } + + startLatch.countDown(); + boolean completed = doneLatch.await(30, TimeUnit.SECONDS); + executor.shutdown(); + + assertThat(completed).isTrue(); + assertThat(errors.get()).isEqualTo(0); + } + + /** + * Memory safety test: Verifies that WeakHashMap allows slice buffers to be + * garbage collected without causing memory leaks. + */ + @Test + public void weakHashMap_allowsGarbageCollection() { + ByteBuffer original = ByteBuffer.allocateDirect(1024); + ByteBuffer slice = original.slice(); + + BufferAttachmentTracker.recordSlice(slice, original); + assertThat(BufferAttachmentTracker.getTrackingMapSize()).isEqualTo(1); + + // Remove reference to slice (but not original) + slice = null; + + // Force garbage collection + System.gc(); + System.runFinalization(); + + // Give GC time to clean up weak references + // The WeakHashMap should eventually remove the entry when the slice is GC'd + // Note: This is non-deterministic, so we can't assert on size without + // potentially making the test flaky. The important thing is that it + // doesn't prevent GC. + + // What we can verify is that having null'd the slice doesn't break anything + ByteBuffer result = BufferAttachmentTracker.getOriginal(original); + assertThat(result).isSameAs(original); // Original still works + } +} diff --git a/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/commands/MemberJvmOptions.java b/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/commands/MemberJvmOptions.java index dbfc1ee40043..1a25de526b48 100644 --- a/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/commands/MemberJvmOptions.java +++ b/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/commands/MemberJvmOptions.java @@ -30,15 +30,9 @@ import org.apache.geode.internal.offheap.AddressableMemoryManager; import org.apache.geode.internal.stats50.VMStats50; import org.apache.geode.unsafe.internal.com.sun.jmx.remote.security.MBeanServerAccessController; -import org.apache.geode.unsafe.internal.sun.nio.ch.DirectBuffer; public class MemberJvmOptions { static final int CMS_INITIAL_OCCUPANCY_FRACTION = 60; - /** - * export needed by {@link DirectBuffer} - */ - private static final String SUN_NIO_CH_EXPORT = - "--add-exports=java.base/sun.nio.ch=ALL-UNNAMED"; /** * export needed by {@link MBeanServerAccessController} */ @@ -60,7 +54,6 @@ public class MemberJvmOptions { static final List JAVA_11_OPTIONS = Arrays.asList( COM_SUN_JMX_REMOTE_SECURITY_EXPORT, - SUN_NIO_CH_EXPORT, COM_SUN_MANAGEMENT_INTERNAL_OPEN, JAVA_LANG_OPEN, JAVA_NIO_OPEN); diff --git a/geode-unsafe/src/main/java/org/apache/geode/unsafe/internal/sun/nio/ch/DirectBuffer.java b/geode-unsafe/src/main/java/org/apache/geode/unsafe/internal/sun/nio/ch/DirectBuffer.java deleted file mode 100644 index dc894cfea212..000000000000 --- a/geode-unsafe/src/main/java/org/apache/geode/unsafe/internal/sun/nio/ch/DirectBuffer.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.geode.unsafe.internal.sun.nio.ch; - -/** - * Provides access to methods on non-SDK class {@link sun.nio.ch.DirectBuffer}. - */ -public interface DirectBuffer { - - /** - * @see sun.nio.ch.DirectBuffer#attachment() - * @param object to get attachment for - * @return returns attachment if object is {@link sun.nio.ch.DirectBuffer} otherwise null. - */ - static Object attachment(final Object object) { - if (object instanceof sun.nio.ch.DirectBuffer) { - return ((sun.nio.ch.DirectBuffer) object).attachment(); - } - - return null; - } - -} diff --git a/geode-unsafe/src/test/java/org/apache/geode/unsafe/internal/sun/nio/ch/DirectBufferTest.java b/geode-unsafe/src/test/java/org/apache/geode/unsafe/internal/sun/nio/ch/DirectBufferTest.java deleted file mode 100644 index 6d2f52b1c339..000000000000 --- a/geode-unsafe/src/test/java/org/apache/geode/unsafe/internal/sun/nio/ch/DirectBufferTest.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - - -package org.apache.geode.unsafe.internal.sun.nio.ch; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.jupiter.api.parallel.ExecutionMode.CONCURRENT; - -import java.nio.ByteBuffer; - -import org.junit.jupiter.api.MethodOrderer; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.TestMethodOrder; -import org.junit.jupiter.api.parallel.Execution; - -@Execution(CONCURRENT) -@TestMethodOrder(MethodOrderer.Random.class) -public class DirectBufferTest { - - @Test - public void attachmentIsNullForNonDirectBuffer() { - assertThat(DirectBuffer.attachment(null)).isNull(); - assertThat(DirectBuffer.attachment(new Object())).isNull(); - assertThat(DirectBuffer.attachment(ByteBuffer.allocate(1))).isNull(); - } - - @Test - public void attachmentIsNullForUnslicedDirectBuffer() { - assertThat(DirectBuffer.attachment(ByteBuffer.allocateDirect(1))).isNull(); - } - - @Test - public void attachmentIsRootBufferForDirectBufferSlice() { - final ByteBuffer root = ByteBuffer.allocateDirect(10); - final ByteBuffer slice = root.slice(); - - assertThat(DirectBuffer.attachment(slice)).isSameAs(root); - } - -}