From 2b0cfa52ed810e076c51f1310c316d766a8199df Mon Sep 17 00:00:00 2001 From: ldematte Date: Wed, 20 Aug 2025 18:27:39 +0200 Subject: [PATCH 1/7] PoolingCuVSResourceManager with memory availability --- .../xpack/gpu/codec/CuVSResourceManager.java | 104 ++++++++++++++---- .../gpu/codec/CuVSResourceManagerTests.java | 24 +++- 2 files changed, 107 insertions(+), 21 deletions(-) diff --git a/x-pack/plugin/gpu/src/main/java/org/elasticsearch/xpack/gpu/codec/CuVSResourceManager.java b/x-pack/plugin/gpu/src/main/java/org/elasticsearch/xpack/gpu/codec/CuVSResourceManager.java index e7977f28c9c22..073b3f1ea6786 100644 --- a/x-pack/plugin/gpu/src/main/java/org/elasticsearch/xpack/gpu/codec/CuVSResourceManager.java +++ b/x-pack/plugin/gpu/src/main/java/org/elasticsearch/xpack/gpu/codec/CuVSResourceManager.java @@ -9,12 +9,17 @@ import com.nvidia.cuvs.CuVSResources; +import com.nvidia.cuvs.GPUInfoProvider; + +import com.nvidia.cuvs.spi.CuVSProvider; + +import org.elasticsearch.core.Strings; import org.elasticsearch.xpack.gpu.GPUSupport; import java.nio.file.Path; import java.util.Objects; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; /** * A manager of {@link com.nvidia.cuvs.CuVSResources}. There is one manager per GPU. @@ -65,35 +70,84 @@ static CuVSResourceManager pooling() { */ class PoolingCuVSResourceManager implements CuVSResourceManager { + /** A multiplier on input data to account for intermediate and output data size required while processing it */ + static final double GPU_COMPUTATION_MEMORY_FACTOR = 2.0; static final int MAX_RESOURCES = 2; - static final PoolingCuVSResourceManager INSTANCE = new PoolingCuVSResourceManager(MAX_RESOURCES); + static final PoolingCuVSResourceManager INSTANCE = new PoolingCuVSResourceManager( + MAX_RESOURCES, + CuVSProvider.provider().gpuInfoProvider() + ); - final BlockingQueue pool; + final ManagedCuVSResources[] pool; final int capacity; + final GPUInfoProvider gpuInfoProvider; int createdCount; - public PoolingCuVSResourceManager(int capacity) { + ReentrantLock lock = new ReentrantLock(); + Condition poolAvailableCondition = lock.newCondition(); + Condition enoughMemoryCondition = lock.newCondition(); + + public PoolingCuVSResourceManager(int capacity, GPUInfoProvider gpuInfoProvider) { if (capacity < 1 || capacity > MAX_RESOURCES) { throw new IllegalArgumentException("Resource count must be between 1 and " + MAX_RESOURCES); } this.capacity = capacity; - this.pool = new ArrayBlockingQueue<>(capacity); + this.gpuInfoProvider = gpuInfoProvider; + this.pool = new ManagedCuVSResources[MAX_RESOURCES]; } - @Override - public ManagedCuVSResources acquire(int numVectors, int dims) throws InterruptedException { - ManagedCuVSResources res = pool.poll(); - if (res != null) { + private ManagedCuVSResources getResourceFromPool() { + for (int i = 0; i < createdCount; ++i) { + var res = pool[i]; + if (res.locked == false) { + return res; + } + } + if (createdCount < capacity) { + var res = new ManagedCuVSResources(Objects.requireNonNull(createNew())); + pool[createdCount++] = res; return res; } - synchronized (this) { - if (createdCount < capacity) { - createdCount++; - return new ManagedCuVSResources(Objects.requireNonNull(createNew())); + return null; + } + + @Override + public ManagedCuVSResources acquire(int numVectors, int dims) throws InterruptedException { + try { + lock.lock(); + ManagedCuVSResources res; + while ((res = getResourceFromPool()) == null) { + poolAvailableCondition.await(); + } + + // Check resources availability + var resourcesInfo = gpuInfoProvider.getCurrentInfo(res); + + // Memory + long requiredMemoryInBytes = estimateRequiredMemory(numVectors, dims); + if (requiredMemoryInBytes > resourcesInfo.totalDeviceMemoryInBytes()) { + throw new IllegalArgumentException( + Strings.format( + "Requested GPU memory for [%d] vectors, [%d] dims is greater than the GPU total memory [%dMB]", + numVectors, + dims, + resourcesInfo.totalDeviceMemoryInBytes() / 1048576.0f + ) + ); + } + while (requiredMemoryInBytes > resourcesInfo.freeDeviceMemoryInBytes()) { + enoughMemoryCondition.await(); } + + res.locked = true; + return res; + } finally { + lock.unlock(); } - // Otherwise, wait for one to be released - return pool.take(); + } + + private long estimateRequiredMemory(int numVectors, int dims) { + return (long)(GPU_COMPUTATION_MEMORY_FACTOR * numVectors * dims * Float.BYTES); } // visible for testing @@ -104,20 +158,29 @@ protected CuVSResources createNew() { @Override public void finishedComputation(ManagedCuVSResources resources) { // currently does nothing, but could allow acquire to return possibly blocked resources + // something like enoughComputationCondition.signal()? } @Override public void release(ManagedCuVSResources resources) { - var added = pool.offer(Objects.requireNonNull(resources)); - assert added : "Failed to release resource back to pool"; + try { + lock.lock(); + assert resources.locked; + resources.locked = false; + poolAvailableCondition.signalAll(); + enoughMemoryCondition.signalAll(); + } finally { + lock.unlock(); + } } @Override public void shutdown() { - for (ManagedCuVSResources res : pool) { + for (int i = 0; i < createdCount; ++i) { + var res = pool[i]; + assert res != null; res.delegate.close(); } - pool.clear(); } } @@ -125,6 +188,7 @@ public void shutdown() { final class ManagedCuVSResources implements CuVSResources { final CuVSResources delegate; + boolean locked = false; ManagedCuVSResources(CuVSResources resources) { this.delegate = resources; diff --git a/x-pack/plugin/gpu/src/test/java/org/elasticsearch/xpack/gpu/codec/CuVSResourceManagerTests.java b/x-pack/plugin/gpu/src/test/java/org/elasticsearch/xpack/gpu/codec/CuVSResourceManagerTests.java index a5bac96cc3b51..c75e4f72618f7 100644 --- a/x-pack/plugin/gpu/src/test/java/org/elasticsearch/xpack/gpu/codec/CuVSResourceManagerTests.java +++ b/x-pack/plugin/gpu/src/test/java/org/elasticsearch/xpack/gpu/codec/CuVSResourceManagerTests.java @@ -9,9 +9,14 @@ import com.nvidia.cuvs.CuVSResources; +import com.nvidia.cuvs.CuVSResourcesInfo; +import com.nvidia.cuvs.GPUInfo; +import com.nvidia.cuvs.GPUInfoProvider; + import org.elasticsearch.test.ESTestCase; import java.nio.file.Path; +import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -83,7 +88,7 @@ static class MockPoolingCuVSResourceManager extends CuVSResourceManager.PoolingC final AtomicInteger idGenerator = new AtomicInteger(); MockPoolingCuVSResourceManager(int capacity) { - super(capacity); + super(capacity, new MockGPUInfoProvider()); } @Override @@ -118,4 +123,21 @@ public String toString() { return "MockCuVSResources[id=" + id + "]"; } } + + private static class MockGPUInfoProvider implements GPUInfoProvider { + @Override + public List availableGPUs() throws Throwable { + throw new UnsupportedOperationException(); + } + + @Override + public List compatibleGPUs() throws Throwable { + throw new UnsupportedOperationException(); + } + + @Override + public CuVSResourcesInfo getCurrentInfo(CuVSResources cuVSResources) { + return new CuVSResourcesInfo(256L * 1024 * 1024, 2048L * 1024 * 1024); + } + } } From 204405b1ca4e9563aa6ea84e7901014acbdcbe94 Mon Sep 17 00:00:00 2001 From: ldematte Date: Thu, 21 Aug 2025 12:28:54 +0200 Subject: [PATCH 2/7] Add tests + fixes --- .../xpack/gpu/codec/CuVSResourceManager.java | 21 ++-- .../gpu/codec/CuVSResourceManagerTests.java | 100 +++++++++++++++++- 2 files changed, 108 insertions(+), 13 deletions(-) diff --git a/x-pack/plugin/gpu/src/main/java/org/elasticsearch/xpack/gpu/codec/CuVSResourceManager.java b/x-pack/plugin/gpu/src/main/java/org/elasticsearch/xpack/gpu/codec/CuVSResourceManager.java index 073b3f1ea6786..97db55acf7cd6 100644 --- a/x-pack/plugin/gpu/src/main/java/org/elasticsearch/xpack/gpu/codec/CuVSResourceManager.java +++ b/x-pack/plugin/gpu/src/main/java/org/elasticsearch/xpack/gpu/codec/CuVSResourceManager.java @@ -78,10 +78,10 @@ class PoolingCuVSResourceManager implements CuVSResourceManager { CuVSProvider.provider().gpuInfoProvider() ); - final ManagedCuVSResources[] pool; - final int capacity; - final GPUInfoProvider gpuInfoProvider; - int createdCount; + private final ManagedCuVSResources[] pool; + private final int capacity; + private final GPUInfoProvider gpuInfoProvider; + private int createdCount; ReentrantLock lock = new ReentrantLock(); Condition poolAvailableCondition = lock.newCondition(); @@ -121,21 +121,19 @@ public ManagedCuVSResources acquire(int numVectors, int dims) throws Interrupted } // Check resources availability - var resourcesInfo = gpuInfoProvider.getCurrentInfo(res); - // Memory long requiredMemoryInBytes = estimateRequiredMemory(numVectors, dims); - if (requiredMemoryInBytes > resourcesInfo.totalDeviceMemoryInBytes()) { + if (requiredMemoryInBytes > gpuInfoProvider.getCurrentInfo(res).totalDeviceMemoryInBytes()) { throw new IllegalArgumentException( Strings.format( "Requested GPU memory for [%d] vectors, [%d] dims is greater than the GPU total memory [%dMB]", numVectors, dims, - resourcesInfo.totalDeviceMemoryInBytes() / 1048576.0f + gpuInfoProvider.getCurrentInfo(res).totalDeviceMemoryInBytes() / (1024L * 1024L) ) ); } - while (requiredMemoryInBytes > resourcesInfo.freeDeviceMemoryInBytes()) { + while (requiredMemoryInBytes > gpuInfoProvider.getCurrentInfo(res).freeDeviceMemoryInBytes()) { enoughMemoryCondition.await(); } @@ -199,6 +197,11 @@ public ScopedAccess access() { return delegate.access(); } + @Override + public int deviceId() { + return delegate.deviceId(); + } + @Override public void close() { throw new UnsupportedOperationException("this resource is managed, cannot be closed by clients"); diff --git a/x-pack/plugin/gpu/src/test/java/org/elasticsearch/xpack/gpu/codec/CuVSResourceManagerTests.java b/x-pack/plugin/gpu/src/test/java/org/elasticsearch/xpack/gpu/codec/CuVSResourceManagerTests.java index c75e4f72618f7..591a54596b3b1 100644 --- a/x-pack/plugin/gpu/src/test/java/org/elasticsearch/xpack/gpu/codec/CuVSResourceManagerTests.java +++ b/x-pack/plugin/gpu/src/test/java/org/elasticsearch/xpack/gpu/codec/CuVSResourceManagerTests.java @@ -13,18 +13,28 @@ import com.nvidia.cuvs.GPUInfo; import com.nvidia.cuvs.GPUInfoProvider; +import org.elasticsearch.logging.LogManager; +import org.elasticsearch.logging.Logger; import org.elasticsearch.test.ESTestCase; import java.nio.file.Path; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.LongSupplier; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.not; public class CuVSResourceManagerTests extends ESTestCase { + private static final Logger log = LogManager.getLogger(CuVSResourceManagerTests.class); + + public static final long TOTAL_DEVICE_MEMORY_IN_BYTES = 256L * 1024 * 1024; + public void testBasic() throws InterruptedException { var mgr = new MockPoolingCuVSResourceManager(2); var res1 = mgr.acquire(0, 0); @@ -65,10 +75,52 @@ public void testBlocking() throws Exception { mgr.shutdown(); } + public void testBlockingOnInsufficientMemory() throws Exception { + var mgr = new MockPoolingCuVSResourceManager(2); + var res1 = mgr.acquire(16 * 1024, 1024); + + AtomicReference holder = new AtomicReference<>(); + Thread t = new Thread(() -> { + try { + var res2 = mgr.acquire((16 * 1024) + 1, 1024); + holder.set(res2); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + }); + t.start(); + Thread.sleep(1_000); + assertNull(holder.get()); + mgr.release(res1); + t.join(); + assertThat(holder.get().toString(), anyOf(containsString("id=0"), containsString("id=1"))); + mgr.shutdown(); + } + + public void testNotBlockingOnSufficientMemory() throws Exception { + var mgr = new MockPoolingCuVSResourceManager(2); + var res1 = mgr.acquire(16 * 1024, 1024); + + AtomicReference holder = new AtomicReference<>(); + Thread t = new Thread(() -> { + try { + var res2 = mgr.acquire((16 * 1024) - 1, 1024); + holder.set(res2); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + }); + t.start(); + t.join(5_000); + assertNotNull(holder.get()); + assertThat(holder.get().toString(), not(equalTo(res1.toString()))); + mgr.shutdown(); + } + public void testManagedResIsNotClosable() throws Exception { var mgr = new MockPoolingCuVSResourceManager(1); var res = mgr.acquire(0, 0); - assertThrows(UnsupportedOperationException.class, () -> res.close()); + assertThrows(UnsupportedOperationException.class, res::close); mgr.release(res); mgr.shutdown(); } @@ -85,16 +137,45 @@ public void testDoubleRelease() throws InterruptedException { static class MockPoolingCuVSResourceManager extends CuVSResourceManager.PoolingCuVSResourceManager { - final AtomicInteger idGenerator = new AtomicInteger(); + private final AtomicInteger idGenerator = new AtomicInteger(); + private final List allocations; MockPoolingCuVSResourceManager(int capacity) { - super(capacity, new MockGPUInfoProvider()); + this(capacity, new ArrayList<>()); + } + + private MockPoolingCuVSResourceManager(int capacity, List allocationList) { + super(capacity, new MockGPUInfoProvider(() -> freeMemoryFunction(allocationList))); + this.allocations = allocationList; + } + + private static long freeMemoryFunction(List allocations) { + return TOTAL_DEVICE_MEMORY_IN_BYTES - allocations.stream().mapToLong(x -> x).sum(); } @Override protected CuVSResources createNew() { return new MockCuVSResources(idGenerator.getAndIncrement()); } + + @Override + public ManagedCuVSResources acquire(int numVectors, int dims) throws InterruptedException { + var res = super.acquire(numVectors, dims); + long memory = (long)(numVectors * dims * Float.BYTES * + CuVSResourceManager.PoolingCuVSResourceManager.GPU_COMPUTATION_MEMORY_FACTOR); + allocations.add(memory); + log.info("Added [{}]", memory); + return res; + } + + @Override + public void release(ManagedCuVSResources resources) { + if (allocations.isEmpty() == false) { + var x = allocations.removeLast(); + log.info("Removed [{}]", x); + } + super.release(resources); + } } static class MockCuVSResources implements CuVSResources { @@ -110,6 +191,11 @@ public ScopedAccess access() { throw new UnsupportedOperationException(); } + @Override + public int deviceId() { + return 0; + } + @Override public void close() {} @@ -125,6 +211,12 @@ public String toString() { } private static class MockGPUInfoProvider implements GPUInfoProvider { + private final LongSupplier freeMemorySupplier; + + MockGPUInfoProvider(LongSupplier freeMemorySupplier) { + this.freeMemorySupplier = freeMemorySupplier; + } + @Override public List availableGPUs() throws Throwable { throw new UnsupportedOperationException(); @@ -137,7 +229,7 @@ public List compatibleGPUs() throws Throwable { @Override public CuVSResourcesInfo getCurrentInfo(CuVSResources cuVSResources) { - return new CuVSResourcesInfo(256L * 1024 * 1024, 2048L * 1024 * 1024); + return new CuVSResourcesInfo(freeMemorySupplier.getAsLong(), TOTAL_DEVICE_MEMORY_IN_BYTES); } } } From 2cf0388c2238b4ef58f21f408a7bf7522672cb9e Mon Sep 17 00:00:00 2001 From: ldematte Date: Thu, 21 Aug 2025 15:55:42 +0200 Subject: [PATCH 3/7] Fix: re-acquire res before re-evaluating condition(s) --- .../xpack/gpu/codec/CuVSResourceManager.java | 53 +++++++++++-------- 1 file changed, 30 insertions(+), 23 deletions(-) diff --git a/x-pack/plugin/gpu/src/main/java/org/elasticsearch/xpack/gpu/codec/CuVSResourceManager.java b/x-pack/plugin/gpu/src/main/java/org/elasticsearch/xpack/gpu/codec/CuVSResourceManager.java index 97db55acf7cd6..828652c368929 100644 --- a/x-pack/plugin/gpu/src/main/java/org/elasticsearch/xpack/gpu/codec/CuVSResourceManager.java +++ b/x-pack/plugin/gpu/src/main/java/org/elasticsearch/xpack/gpu/codec/CuVSResourceManager.java @@ -84,7 +84,6 @@ class PoolingCuVSResourceManager implements CuVSResourceManager { private int createdCount; ReentrantLock lock = new ReentrantLock(); - Condition poolAvailableCondition = lock.newCondition(); Condition enoughMemoryCondition = lock.newCondition(); public PoolingCuVSResourceManager(int capacity, GPUInfoProvider gpuInfoProvider) { @@ -115,28 +114,37 @@ private ManagedCuVSResources getResourceFromPool() { public ManagedCuVSResources acquire(int numVectors, int dims) throws InterruptedException { try { lock.lock(); - ManagedCuVSResources res; - while ((res = getResourceFromPool()) == null) { - poolAvailableCondition.await(); - } - // Check resources availability - // Memory - long requiredMemoryInBytes = estimateRequiredMemory(numVectors, dims); - if (requiredMemoryInBytes > gpuInfoProvider.getCurrentInfo(res).totalDeviceMemoryInBytes()) { - throw new IllegalArgumentException( - Strings.format( - "Requested GPU memory for [%d] vectors, [%d] dims is greater than the GPU total memory [%dMB]", - numVectors, - dims, - gpuInfoProvider.getCurrentInfo(res).totalDeviceMemoryInBytes() / (1024L * 1024L) - ) - ); - } - while (requiredMemoryInBytes > gpuInfoProvider.getCurrentInfo(res).freeDeviceMemoryInBytes()) { - enoughMemoryCondition.await(); + boolean allConditionsMet = false; + ManagedCuVSResources res = null; + while (allConditionsMet == false) { + res = getResourceFromPool(); + final boolean enoughMemory; + if (res != null) { + // Check resources availability + // Memory + long requiredMemoryInBytes = estimateRequiredMemory(numVectors, dims); + if (requiredMemoryInBytes > gpuInfoProvider.getCurrentInfo(res).totalDeviceMemoryInBytes()) { + throw new IllegalArgumentException( + Strings.format( + "Requested GPU memory for [%d] vectors, [%d] dims is greater than the GPU total memory [%dMB]", + numVectors, + dims, + gpuInfoProvider.getCurrentInfo(res).totalDeviceMemoryInBytes() / (1024L * 1024L) + ) + ); + } + enoughMemory = requiredMemoryInBytes <= gpuInfoProvider.getCurrentInfo(res).freeDeviceMemoryInBytes(); + } else { + enoughMemory = false; + } + if (enoughMemory == false) { + enoughMemoryCondition.await(); + } + + // TODO: add enoughComputation / enoughComputationCondition here + allConditionsMet = enoughMemory; // && enoughComputation } - res.locked = true; return res; } finally { @@ -165,8 +173,7 @@ public void release(ManagedCuVSResources resources) { lock.lock(); assert resources.locked; resources.locked = false; - poolAvailableCondition.signalAll(); - enoughMemoryCondition.signalAll(); + enoughMemoryCondition.signal(); } finally { lock.unlock(); } From 8608731f55aa312026ecdc14fb27b72fa15db56a Mon Sep 17 00:00:00 2001 From: ldematte Date: Thu, 21 Aug 2025 16:14:37 +0200 Subject: [PATCH 4/7] signalAll --- .../elasticsearch/xpack/gpu/codec/CuVSResourceManager.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/gpu/src/main/java/org/elasticsearch/xpack/gpu/codec/CuVSResourceManager.java b/x-pack/plugin/gpu/src/main/java/org/elasticsearch/xpack/gpu/codec/CuVSResourceManager.java index 828652c368929..77d0976f71af8 100644 --- a/x-pack/plugin/gpu/src/main/java/org/elasticsearch/xpack/gpu/codec/CuVSResourceManager.java +++ b/x-pack/plugin/gpu/src/main/java/org/elasticsearch/xpack/gpu/codec/CuVSResourceManager.java @@ -164,7 +164,7 @@ protected CuVSResources createNew() { @Override public void finishedComputation(ManagedCuVSResources resources) { // currently does nothing, but could allow acquire to return possibly blocked resources - // something like enoughComputationCondition.signal()? + // something like enoughComputationCondition.signalAll()? } @Override @@ -173,7 +173,7 @@ public void release(ManagedCuVSResources resources) { lock.lock(); assert resources.locked; resources.locked = false; - enoughMemoryCondition.signal(); + enoughMemoryCondition.signalAll(); } finally { lock.unlock(); } From d32a466e4d38c0845b7de82d117b90a82b940858 Mon Sep 17 00:00:00 2001 From: ldematte Date: Fri, 22 Aug 2025 09:27:08 +0200 Subject: [PATCH 5/7] Short circuit to avoid livelock + spotless --- .../xpack/gpu/codec/CuVSResourceManager.java | 34 +++++++++++++------ .../gpu/codec/CuVSResourceManagerTests.java | 5 ++- 2 files changed, 25 insertions(+), 14 deletions(-) diff --git a/x-pack/plugin/gpu/src/main/java/org/elasticsearch/xpack/gpu/codec/CuVSResourceManager.java b/x-pack/plugin/gpu/src/main/java/org/elasticsearch/xpack/gpu/codec/CuVSResourceManager.java index 77d0976f71af8..8489e7cc12392 100644 --- a/x-pack/plugin/gpu/src/main/java/org/elasticsearch/xpack/gpu/codec/CuVSResourceManager.java +++ b/x-pack/plugin/gpu/src/main/java/org/elasticsearch/xpack/gpu/codec/CuVSResourceManager.java @@ -8,9 +8,7 @@ package org.elasticsearch.xpack.gpu.codec; import com.nvidia.cuvs.CuVSResources; - import com.nvidia.cuvs.GPUInfoProvider; - import com.nvidia.cuvs.spi.CuVSProvider; import org.elasticsearch.core.Strings; @@ -84,7 +82,7 @@ class PoolingCuVSResourceManager implements CuVSResourceManager { private int createdCount; ReentrantLock lock = new ReentrantLock(); - Condition enoughMemoryCondition = lock.newCondition(); + Condition enoughResourcesCondition = lock.newCondition(); public PoolingCuVSResourceManager(int capacity, GPUInfoProvider gpuInfoProvider) { if (capacity < 1 || capacity > MAX_RESOURCES) { @@ -110,6 +108,17 @@ private ManagedCuVSResources getResourceFromPool() { return null; } + private int numLockedResources() { + int lockedResources = 0; + for (int i = 0; i < createdCount; ++i) { + var res = pool[i]; + if (res.locked) { + lockedResources++; + } + } + return lockedResources; + } + @Override public ManagedCuVSResources acquire(int numVectors, int dims) throws InterruptedException { try { @@ -119,10 +128,14 @@ public ManagedCuVSResources acquire(int numVectors, int dims) throws Interrupted ManagedCuVSResources res = null; while (allConditionsMet == false) { res = getResourceFromPool(); + final boolean enoughMemory; if (res != null) { + // If no resource in the pool is locked, short circuit to avoid livelock + if (numLockedResources() == 0) { + break; + } // Check resources availability - // Memory long requiredMemoryInBytes = estimateRequiredMemory(numVectors, dims); if (requiredMemoryInBytes > gpuInfoProvider.getCurrentInfo(res).totalDeviceMemoryInBytes()) { throw new IllegalArgumentException( @@ -138,12 +151,11 @@ public ManagedCuVSResources acquire(int numVectors, int dims) throws Interrupted } else { enoughMemory = false; } - if (enoughMemory == false) { - enoughMemoryCondition.await(); - } - // TODO: add enoughComputation / enoughComputationCondition here allConditionsMet = enoughMemory; // && enoughComputation + if (allConditionsMet == false) { + enoughResourcesCondition.await(); + } } res.locked = true; return res; @@ -153,7 +165,7 @@ public ManagedCuVSResources acquire(int numVectors, int dims) throws Interrupted } private long estimateRequiredMemory(int numVectors, int dims) { - return (long)(GPU_COMPUTATION_MEMORY_FACTOR * numVectors * dims * Float.BYTES); + return (long) (GPU_COMPUTATION_MEMORY_FACTOR * numVectors * dims * Float.BYTES); } // visible for testing @@ -164,7 +176,7 @@ protected CuVSResources createNew() { @Override public void finishedComputation(ManagedCuVSResources resources) { // currently does nothing, but could allow acquire to return possibly blocked resources - // something like enoughComputationCondition.signalAll()? + // enoughResourcesCondition.signalAll() } @Override @@ -173,7 +185,7 @@ public void release(ManagedCuVSResources resources) { lock.lock(); assert resources.locked; resources.locked = false; - enoughMemoryCondition.signalAll(); + enoughResourcesCondition.signalAll(); } finally { lock.unlock(); } diff --git a/x-pack/plugin/gpu/src/test/java/org/elasticsearch/xpack/gpu/codec/CuVSResourceManagerTests.java b/x-pack/plugin/gpu/src/test/java/org/elasticsearch/xpack/gpu/codec/CuVSResourceManagerTests.java index 591a54596b3b1..5a848e0436983 100644 --- a/x-pack/plugin/gpu/src/test/java/org/elasticsearch/xpack/gpu/codec/CuVSResourceManagerTests.java +++ b/x-pack/plugin/gpu/src/test/java/org/elasticsearch/xpack/gpu/codec/CuVSResourceManagerTests.java @@ -8,7 +8,6 @@ package org.elasticsearch.xpack.gpu.codec; import com.nvidia.cuvs.CuVSResources; - import com.nvidia.cuvs.CuVSResourcesInfo; import com.nvidia.cuvs.GPUInfo; import com.nvidia.cuvs.GPUInfoProvider; @@ -161,8 +160,8 @@ protected CuVSResources createNew() { @Override public ManagedCuVSResources acquire(int numVectors, int dims) throws InterruptedException { var res = super.acquire(numVectors, dims); - long memory = (long)(numVectors * dims * Float.BYTES * - CuVSResourceManager.PoolingCuVSResourceManager.GPU_COMPUTATION_MEMORY_FACTOR); + long memory = (long) (numVectors * dims * Float.BYTES + * CuVSResourceManager.PoolingCuVSResourceManager.GPU_COMPUTATION_MEMORY_FACTOR); allocations.add(memory); log.info("Added [{}]", memory); return res; From 147c8fe63fab6db145a3ffc7b64bc9223f604044 Mon Sep 17 00:00:00 2001 From: Lorenzo Dematte Date: Wed, 27 Aug 2025 17:30:01 +0200 Subject: [PATCH 6/7] Add dataType to acquire + logging --- .../xpack/gpu/codec/CuVSResourceManager.java | 60 ++++++++++++++----- .../gpu/codec/GPUToHNSWVectorsWriter.java | 2 +- .../gpu/codec/CuVSResourceManagerTests.java | 33 +++++----- 3 files changed, 62 insertions(+), 33 deletions(-) diff --git a/x-pack/plugin/gpu/src/main/java/org/elasticsearch/xpack/gpu/codec/CuVSResourceManager.java b/x-pack/plugin/gpu/src/main/java/org/elasticsearch/xpack/gpu/codec/CuVSResourceManager.java index 8489e7cc12392..26e4e94ed57ea 100644 --- a/x-pack/plugin/gpu/src/main/java/org/elasticsearch/xpack/gpu/codec/CuVSResourceManager.java +++ b/x-pack/plugin/gpu/src/main/java/org/elasticsearch/xpack/gpu/codec/CuVSResourceManager.java @@ -7,11 +7,14 @@ package org.elasticsearch.xpack.gpu.codec; +import com.nvidia.cuvs.CuVSMatrix; import com.nvidia.cuvs.CuVSResources; import com.nvidia.cuvs.GPUInfoProvider; import com.nvidia.cuvs.spi.CuVSProvider; import org.elasticsearch.core.Strings; +import org.elasticsearch.logging.LogManager; +import org.elasticsearch.logging.Logger; import org.elasticsearch.xpack.gpu.GPUSupport; import java.nio.file.Path; @@ -47,7 +50,7 @@ public interface CuVSResourceManager { // numVectors and dims are currently unused, but could be used along with GPU metadata, // memory, generation, etc, when acquiring for 10M x 1536 dims, or 100,000 x 128 dims, // to give out a resources or not. - ManagedCuVSResources acquire(int numVectors, int dims) throws InterruptedException; + ManagedCuVSResources acquire(int numVectors, int dims, CuVSMatrix.DataType dataType) throws InterruptedException; /** Marks the resources as finished with regard to compute. */ void finishedComputation(ManagedCuVSResources resources); @@ -68,6 +71,8 @@ static CuVSResourceManager pooling() { */ class PoolingCuVSResourceManager implements CuVSResourceManager { + static final Logger logger = LogManager.getLogger(CuVSResourceManager.class); + /** A multiplier on input data to account for intermediate and output data size required while processing it */ static final double GPU_COMPUTATION_MEMORY_FACTOR = 2.0; static final int MAX_RESOURCES = 2; @@ -120,7 +125,7 @@ private int numLockedResources() { } @Override - public ManagedCuVSResources acquire(int numVectors, int dims) throws InterruptedException { + public ManagedCuVSResources acquire(int numVectors, int dims, CuVSMatrix.DataType dataType) throws InterruptedException { try { lock.lock(); @@ -131,24 +136,40 @@ public ManagedCuVSResources acquire(int numVectors, int dims) throws Interrupted final boolean enoughMemory; if (res != null) { + long requiredMemoryInBytes = estimateRequiredMemory(numVectors, dims, dataType); + logger.info( + "Estimated memory for [{}] vectors, [{}] dims of type [{}] is [{} B]", + numVectors, + dims, + dataType.name(), + requiredMemoryInBytes + ); + + // Check immutable constraints + long totalDeviceMemoryInBytes = gpuInfoProvider.getCurrentInfo(res).totalDeviceMemoryInBytes(); + if (requiredMemoryInBytes > totalDeviceMemoryInBytes) { + String message = Strings.format( + "Requested GPU memory for [%d] vectors, [%d] dims is greater than the GPU total memory [%d B]", + numVectors, + dims, + totalDeviceMemoryInBytes + ); + logger.error(message); + throw new IllegalArgumentException(message); + } + // If no resource in the pool is locked, short circuit to avoid livelock if (numLockedResources() == 0) { + logger.info("No resources currently locked, proceeding"); break; } + // Check resources availability - long requiredMemoryInBytes = estimateRequiredMemory(numVectors, dims); - if (requiredMemoryInBytes > gpuInfoProvider.getCurrentInfo(res).totalDeviceMemoryInBytes()) { - throw new IllegalArgumentException( - Strings.format( - "Requested GPU memory for [%d] vectors, [%d] dims is greater than the GPU total memory [%dMB]", - numVectors, - dims, - gpuInfoProvider.getCurrentInfo(res).totalDeviceMemoryInBytes() / (1024L * 1024L) - ) - ); - } - enoughMemory = requiredMemoryInBytes <= gpuInfoProvider.getCurrentInfo(res).freeDeviceMemoryInBytes(); + long freeDeviceMemoryInBytes = gpuInfoProvider.getCurrentInfo(res).freeDeviceMemoryInBytes(); + enoughMemory = requiredMemoryInBytes <= freeDeviceMemoryInBytes; + logger.info("Free device memory [{} B], enoughMemory[{}]", freeDeviceMemoryInBytes); } else { + logger.info("No resources available in pool"); enoughMemory = false; } // TODO: add enoughComputation / enoughComputationCondition here @@ -164,8 +185,13 @@ public ManagedCuVSResources acquire(int numVectors, int dims) throws Interrupted } } - private long estimateRequiredMemory(int numVectors, int dims) { - return (long) (GPU_COMPUTATION_MEMORY_FACTOR * numVectors * dims * Float.BYTES); + private long estimateRequiredMemory(int numVectors, int dims, CuVSMatrix.DataType dataType) { + int elementTypeBytes = switch (dataType) { + case FLOAT -> Float.BYTES; + case INT, UINT -> Integer.BYTES; + case BYTE -> Byte.BYTES; + }; + return (long) (GPU_COMPUTATION_MEMORY_FACTOR * numVectors * dims * elementTypeBytes); } // visible for testing @@ -175,12 +201,14 @@ protected CuVSResources createNew() { @Override public void finishedComputation(ManagedCuVSResources resources) { + logger.info("Computation finished"); // currently does nothing, but could allow acquire to return possibly blocked resources // enoughResourcesCondition.signalAll() } @Override public void release(ManagedCuVSResources resources) { + logger.info("Releasing resources to pool"); try { lock.lock(); assert resources.locked; diff --git a/x-pack/plugin/gpu/src/main/java/org/elasticsearch/xpack/gpu/codec/GPUToHNSWVectorsWriter.java b/x-pack/plugin/gpu/src/main/java/org/elasticsearch/xpack/gpu/codec/GPUToHNSWVectorsWriter.java index 4968a659e73e7..56a6de52246fb 100644 --- a/x-pack/plugin/gpu/src/main/java/org/elasticsearch/xpack/gpu/codec/GPUToHNSWVectorsWriter.java +++ b/x-pack/plugin/gpu/src/main/java/org/elasticsearch/xpack/gpu/codec/GPUToHNSWVectorsWriter.java @@ -244,7 +244,7 @@ private void writeFieldInternal(FieldInfo fieldInfo, DatasetOrVectors datasetOrV mockGraph = writeGraph(vectors, graphLevelNodeOffsets); } else { var dataset = datasetOrVectors.dataset; - var cuVSResources = cuVSResourceManager.acquire((int) dataset.size(), (int) dataset.columns()); + var cuVSResources = cuVSResourceManager.acquire((int) dataset.size(), (int) dataset.columns(), dataset.dataType()); try { try (var index = buildGPUIndex(cuVSResources, fieldInfo.getVectorSimilarityFunction(), dataset)) { assert index != null : "GPU index should be built for field: " + fieldInfo.name; diff --git a/x-pack/plugin/gpu/src/test/java/org/elasticsearch/xpack/gpu/codec/CuVSResourceManagerTests.java b/x-pack/plugin/gpu/src/test/java/org/elasticsearch/xpack/gpu/codec/CuVSResourceManagerTests.java index 5a848e0436983..0b3222bde2a52 100644 --- a/x-pack/plugin/gpu/src/test/java/org/elasticsearch/xpack/gpu/codec/CuVSResourceManagerTests.java +++ b/x-pack/plugin/gpu/src/test/java/org/elasticsearch/xpack/gpu/codec/CuVSResourceManagerTests.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.gpu.codec; +import com.nvidia.cuvs.CuVSMatrix; import com.nvidia.cuvs.CuVSResources; import com.nvidia.cuvs.CuVSResourcesInfo; import com.nvidia.cuvs.GPUInfo; @@ -36,14 +37,14 @@ public class CuVSResourceManagerTests extends ESTestCase { public void testBasic() throws InterruptedException { var mgr = new MockPoolingCuVSResourceManager(2); - var res1 = mgr.acquire(0, 0); - var res2 = mgr.acquire(0, 0); + var res1 = mgr.acquire(0, 0, CuVSMatrix.DataType.FLOAT); + var res2 = mgr.acquire(0, 0, CuVSMatrix.DataType.FLOAT); assertThat(res1.toString(), containsString("id=0")); assertThat(res2.toString(), containsString("id=1")); mgr.release(res1); mgr.release(res2); - res1 = mgr.acquire(0, 0); - res2 = mgr.acquire(0, 0); + res1 = mgr.acquire(0, 0, CuVSMatrix.DataType.FLOAT); + res2 = mgr.acquire(0, 0, CuVSMatrix.DataType.FLOAT); assertThat(res1.toString(), containsString("id=0")); assertThat(res2.toString(), containsString("id=1")); mgr.release(res1); @@ -53,13 +54,13 @@ public void testBasic() throws InterruptedException { public void testBlocking() throws Exception { var mgr = new MockPoolingCuVSResourceManager(2); - var res1 = mgr.acquire(0, 0); - var res2 = mgr.acquire(0, 0); + var res1 = mgr.acquire(0, 0, CuVSMatrix.DataType.FLOAT); + var res2 = mgr.acquire(0, 0, CuVSMatrix.DataType.FLOAT); AtomicReference holder = new AtomicReference<>(); Thread t = new Thread(() -> { try { - var res3 = mgr.acquire(0, 0); + var res3 = mgr.acquire(0, 0, CuVSMatrix.DataType.FLOAT); holder.set(res3); } catch (InterruptedException e) { throw new AssertionError(e); @@ -76,12 +77,12 @@ public void testBlocking() throws Exception { public void testBlockingOnInsufficientMemory() throws Exception { var mgr = new MockPoolingCuVSResourceManager(2); - var res1 = mgr.acquire(16 * 1024, 1024); + var res1 = mgr.acquire(16 * 1024, 1024, CuVSMatrix.DataType.FLOAT); AtomicReference holder = new AtomicReference<>(); Thread t = new Thread(() -> { try { - var res2 = mgr.acquire((16 * 1024) + 1, 1024); + var res2 = mgr.acquire((16 * 1024) + 1, 1024, CuVSMatrix.DataType.FLOAT); holder.set(res2); } catch (InterruptedException e) { throw new AssertionError(e); @@ -98,12 +99,12 @@ public void testBlockingOnInsufficientMemory() throws Exception { public void testNotBlockingOnSufficientMemory() throws Exception { var mgr = new MockPoolingCuVSResourceManager(2); - var res1 = mgr.acquire(16 * 1024, 1024); + var res1 = mgr.acquire(16 * 1024, 1024, CuVSMatrix.DataType.FLOAT); AtomicReference holder = new AtomicReference<>(); Thread t = new Thread(() -> { try { - var res2 = mgr.acquire((16 * 1024) - 1, 1024); + var res2 = mgr.acquire((16 * 1024) - 1, 1024, CuVSMatrix.DataType.FLOAT); holder.set(res2); } catch (InterruptedException e) { throw new AssertionError(e); @@ -118,7 +119,7 @@ public void testNotBlockingOnSufficientMemory() throws Exception { public void testManagedResIsNotClosable() throws Exception { var mgr = new MockPoolingCuVSResourceManager(1); - var res = mgr.acquire(0, 0); + var res = mgr.acquire(0, 0, CuVSMatrix.DataType.FLOAT); assertThrows(UnsupportedOperationException.class, res::close); mgr.release(res); mgr.shutdown(); @@ -126,8 +127,8 @@ public void testManagedResIsNotClosable() throws Exception { public void testDoubleRelease() throws InterruptedException { var mgr = new MockPoolingCuVSResourceManager(2); - var res1 = mgr.acquire(0, 0); - var res2 = mgr.acquire(0, 0); + var res1 = mgr.acquire(0, 0, CuVSMatrix.DataType.FLOAT); + var res2 = mgr.acquire(0, 0, CuVSMatrix.DataType.FLOAT); mgr.release(res1); mgr.release(res2); assertThrows(AssertionError.class, () -> mgr.release(randomFrom(res1, res2))); @@ -158,8 +159,8 @@ protected CuVSResources createNew() { } @Override - public ManagedCuVSResources acquire(int numVectors, int dims) throws InterruptedException { - var res = super.acquire(numVectors, dims); + public ManagedCuVSResources acquire(int numVectors, int dims, CuVSMatrix.DataType dataType) throws InterruptedException { + var res = super.acquire(numVectors, dims, dataType); long memory = (long) (numVectors * dims * Float.BYTES * CuVSResourceManager.PoolingCuVSResourceManager.GPU_COMPUTATION_MEMORY_FACTOR); allocations.add(memory); From 2fcf997aff9d734cd4ccab9a26247f10a4726609 Mon Sep 17 00:00:00 2001 From: ldematte Date: Wed, 27 Aug 2025 17:35:55 +0200 Subject: [PATCH 7/7] Fix signature for latest cuvs-java --- .../xpack/gpu/codec/CuVSResourceManagerTests.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/gpu/src/test/java/org/elasticsearch/xpack/gpu/codec/CuVSResourceManagerTests.java b/x-pack/plugin/gpu/src/test/java/org/elasticsearch/xpack/gpu/codec/CuVSResourceManagerTests.java index 0b3222bde2a52..b466f37cbe9c9 100644 --- a/x-pack/plugin/gpu/src/test/java/org/elasticsearch/xpack/gpu/codec/CuVSResourceManagerTests.java +++ b/x-pack/plugin/gpu/src/test/java/org/elasticsearch/xpack/gpu/codec/CuVSResourceManagerTests.java @@ -218,12 +218,12 @@ private static class MockGPUInfoProvider implements GPUInfoProvider { } @Override - public List availableGPUs() throws Throwable { + public List availableGPUs() { throw new UnsupportedOperationException(); } @Override - public List compatibleGPUs() throws Throwable { + public List compatibleGPUs() { throw new UnsupportedOperationException(); }