88package org .elasticsearch .xpack .gpu .codec ;
99
1010import com .nvidia .cuvs .CuVSResources ;
11-
1211import com .nvidia .cuvs .GPUInfoProvider ;
13-
1412import com .nvidia .cuvs .spi .CuVSProvider ;
1513
1614import org .elasticsearch .core .Strings ;
@@ -84,7 +82,7 @@ class PoolingCuVSResourceManager implements CuVSResourceManager {
8482 private int createdCount ;
8583
8684 ReentrantLock lock = new ReentrantLock ();
87- Condition enoughMemoryCondition = lock .newCondition ();
85+ Condition enoughResourcesCondition = lock .newCondition ();
8886
8987 public PoolingCuVSResourceManager (int capacity , GPUInfoProvider gpuInfoProvider ) {
9088 if (capacity < 1 || capacity > MAX_RESOURCES ) {
@@ -110,6 +108,17 @@ private ManagedCuVSResources getResourceFromPool() {
110108 return null ;
111109 }
112110
111+ private int numLockedResources () {
112+ int lockedResources = 0 ;
113+ for (int i = 0 ; i < createdCount ; ++i ) {
114+ var res = pool [i ];
115+ if (res .locked ) {
116+ lockedResources ++;
117+ }
118+ }
119+ return lockedResources ;
120+ }
121+
113122 @ Override
114123 public ManagedCuVSResources acquire (int numVectors , int dims ) throws InterruptedException {
115124 try {
@@ -119,10 +128,14 @@ public ManagedCuVSResources acquire(int numVectors, int dims) throws Interrupted
119128 ManagedCuVSResources res = null ;
120129 while (allConditionsMet == false ) {
121130 res = getResourceFromPool ();
131+
122132 final boolean enoughMemory ;
123133 if (res != null ) {
134+ // If no resource in the pool is locked, short circuit to avoid livelock
135+ if (numLockedResources () == 0 ) {
136+ break ;
137+ }
124138 // Check resources availability
125- // Memory
126139 long requiredMemoryInBytes = estimateRequiredMemory (numVectors , dims );
127140 if (requiredMemoryInBytes > gpuInfoProvider .getCurrentInfo (res ).totalDeviceMemoryInBytes ()) {
128141 throw new IllegalArgumentException (
@@ -138,12 +151,11 @@ public ManagedCuVSResources acquire(int numVectors, int dims) throws Interrupted
138151 } else {
139152 enoughMemory = false ;
140153 }
141- if (enoughMemory == false ) {
142- enoughMemoryCondition .await ();
143- }
144-
145154 // TODO: add enoughComputation / enoughComputationCondition here
146155 allConditionsMet = enoughMemory ; // && enoughComputation
156+ if (allConditionsMet == false ) {
157+ enoughResourcesCondition .await ();
158+ }
147159 }
148160 res .locked = true ;
149161 return res ;
@@ -153,7 +165,7 @@ public ManagedCuVSResources acquire(int numVectors, int dims) throws Interrupted
153165 }
154166
155167 private long estimateRequiredMemory (int numVectors , int dims ) {
156- return (long )(GPU_COMPUTATION_MEMORY_FACTOR * numVectors * dims * Float .BYTES );
168+ return (long ) (GPU_COMPUTATION_MEMORY_FACTOR * numVectors * dims * Float .BYTES );
157169 }
158170
159171 // visible for testing
@@ -164,7 +176,7 @@ protected CuVSResources createNew() {
164176 @ Override
165177 public void finishedComputation (ManagedCuVSResources resources ) {
166178 // currently does nothing, but could allow acquire to return possibly blocked resources
167- // something like enoughComputationCondition .signalAll()?
179+ // enoughResourcesCondition .signalAll()
168180 }
169181
170182 @ Override
@@ -173,7 +185,7 @@ public void release(ManagedCuVSResources resources) {
173185 lock .lock ();
174186 assert resources .locked ;
175187 resources .locked = false ;
176- enoughMemoryCondition .signalAll ();
188+ enoughResourcesCondition .signalAll ();
177189 } finally {
178190 lock .unlock ();
179191 }
0 commit comments