@@ -78,7 +78,7 @@ public class OOCEvictionManager {
7878 private static long _size ;
7979
8080 // Cache structures: map key -> MatrixBlock and eviction deque (head=oldest block)
81- private static final Map <String , MatrixBlock > _cache = new HashMap <>();
81+ private static final Map <String , IndexedMatrixValue > _cache = new HashMap <>();
8282 private static final Deque <String > _evictDeque = new ArrayDeque <>();
8383
8484 // Single lock for synchronization
@@ -110,10 +110,10 @@ public static synchronized void put(long streamId, int blockId, IndexedMatrixVal
110110 String key = streamId + "_" + blockId ;
111111
112112 synchronized (lock ) {
113- MatrixBlock old = _cache .remove (key );
113+ IndexedMatrixValue old = _cache .remove (key ); // remove old value
114114 if (old != null ) {
115115 _evictDeque .remove (key );
116- _size -= estimateSerializedSize (old );
116+ _size -= estimateSerializedSize (( MatrixBlock ) old . getValue () );
117117 }
118118
119119 try {
@@ -122,7 +122,7 @@ public static synchronized void put(long streamId, int blockId, IndexedMatrixVal
122122 throw new DMLRuntimeException (e );
123123 }
124124
125- _cache .put (key , mb );
125+ _cache .put (key , value ); // put new value
126126 _evictDeque .addLast (key ); // add to end for FIFO/LRU
127127 _size += size ;
128128 }
@@ -132,19 +132,19 @@ public static synchronized void put(long streamId, int blockId, IndexedMatrixVal
132132 * Get a block from the OOC cache (deserialize on read)
133133 */
134134 public static synchronized IndexedMatrixValue get (long streamId , int blockId ) {
135+
135136 String key = streamId + "_" + blockId ;
136- MatrixBlock mb = ( MatrixBlock ) _cache .get (key );
137+ IndexedMatrixValue imv = _cache .get (key );
137138
138139 synchronized (lock ) {
139- if (mb != null && _policy == RPolicy .LRU ) {
140+ if (imv != null && _policy == RPolicy .LRU ) {
140141 _evictDeque .remove (key );
141142 _evictDeque .addLast (key );
142143 }
143144 }
144145
145- if (mb != null ) {
146- MatrixIndexes ix = new MatrixIndexes (blockId + 1 , 1 );
147- return new IndexedMatrixValue (ix , mb );
146+ if (imv != null ) {
147+ return imv ;
148148 } else {
149149 try {
150150 return loadFromDisk (streamId , blockId );
@@ -162,23 +162,23 @@ private static void evict(long requiredSize) throws IOException {
162162 while (_size + requiredSize > _limit && !_evictDeque .isEmpty ()) {
163163 System .out .println ("_size + requiredSize: " + _size +" + " + requiredSize + "; _limit: " + _limit );
164164 String oldKey = _evictDeque .removeLast ();
165- MatrixBlock mbToEvict = (MatrixBlock ) _cache .remove (oldKey );
165+ IndexedMatrixValue toEvict = _cache .remove (oldKey );
166+
167+ if (toEvict == null ) { continue ;}
168+ MatrixBlock mbToEvict = (MatrixBlock ) toEvict .getValue ();
166169
167- if (mbToEvict != null ) {
170+ // Spill to disk
171+ String filename = _spillDir + "/" + oldKey ;
172+ File spillDirFile = new File (_spillDir );
173+ if (!spillDirFile .exists ()) {
174+ spillDirFile .mkdirs ();
175+ }
168176
169- // Spill to disk
170- String filename = _spillDir + "/" + oldKey ;
171- File spillDirFile = new File (_spillDir );
172- if (!spillDirFile .exists ()) {
173- spillDirFile .mkdirs ();
174- }
177+ LocalFileUtils .writeMatrixBlockToLocal (filename , mbToEvict );
175178
176- LocalFileUtils . writeMatrixBlockToLocal ( filename , mbToEvict );
177- System . out . println ( "Evicting directory: " + filename ) ;
179+ long freedSize = estimateSerializedSize ( mbToEvict );
180+ _size -= freedSize ;
178181
179- long freedSize = estimateSerializedSize (mbToEvict );
180- _size -= freedSize ;
181- }
182182 }
183183 }
184184
0 commit comments