2727
2828import java .io .File ;
2929import java .io .IOException ;
30+ import java .util .ArrayDeque ;
31+ import java .util .Deque ;
3032import java .util .HashMap ;
3133import java .util .Map ;
32- import java .util .Map .Entry ;
33- import java .util .concurrent .atomic .AtomicLong ;
3434
3535/**
3636 * Eviction Manager for the Out-Of-Core stream cache
3737 * This is the base implementation for LRU, FIFO
38+ *
39+ * Design choice 1: Pure JVM-memory cache
40+ * What: Store MatrixBlock objects in a synchronized in-memory cache
41+ * (Map + Deque for LRU/FIFO). Spill to disk by serializing MatrixBlock
42+ * only when evicting.
43+ * Pros: Simple to implement; no off-heap management; easy to debug;
44+ * no serialization race since you serialize only when evicting;
45+ * fast cache hits (direct object access).
46+ * Cons: Heap usage counted roughly via serialized-size estimate — actual
47+ * JVM object overhead not accounted; risk of GC pressure and OOM if
48+ * estimates are off or if many small objects cause fragmentation;
49+ * eviction may be more expensive (serialize on eviction).
50+ * <p>
51+ * Design choice 2:
3852 * <p>
3953 * This manager runtime memory management by caching serialized
4054 * ByteBuffers and spilling them to disk when needed.
4761 * by falling back to the disk
4862 * * Memory: Since the datablocks are off-heap (in ByteBuffer) or disk,
4963 * there won't be OOM.
64+ *
65+ * Pros: Avoids heap OOM by keeping large data off-heap; predictable
66+ * memory usage; good for very large blocks.
67+ * Cons: More complex synchronization; need robust off-heap allocator/free;
68+ * must ensure serialization finishes before adding to queue or make evict
69+ * wait on serialization; careful with native memory leaks.
5070 */
5171public class OOCEvictionManager {
5272
@@ -55,14 +75,14 @@ public class OOCEvictionManager {
5575
5676 // Memory limit for ByteBuffers
5777 private static long _limit ;
58- private static AtomicLong _size ;
78+ private static long _size ;
5979
60- // Cache of ByteBuffers (off-heap serialized blocks)
61- private static CacheEvictionQueue _mQueue ;
80+ // Cache structures: map key -> MatrixBlock and eviction deque (head=oldest block)
81+ private static final Map <String , MatrixBlock > _cache = new HashMap <>();
82+ private static final Deque <String > _evictDeque = new ArrayDeque <>();
6283
63- // private static Map<Long, Map<Integer, ByteBuffer>> cache = new HashMap<>();
64- // I/O service for async spill/load
65- private static CacheMaintenanceService _fClean ;
84+ // Single lock for synchronization
85+ private static final Object lock = new Object ();
6686
6787 // Spill directory for evicted blocks
6888 private static String _spillDir ;
@@ -75,8 +95,6 @@ public enum RPolicy {
7595 private OOCEvictionManager () {}
7696
7797 static {
78- _mQueue = new CacheEvictionQueue ();
79- _fClean = new CacheMaintenanceService ();
8098 _limit = (long )(Runtime .getRuntime ().maxMemory () * OOC_BUFFER_PERCENTAGE * 0.01 ); // e.g., 20% of heap
8199 _size = 0 ;
82100 _spillDir = LocalFileUtils .getUniqueWorkingDir ("ooc_stream" );
@@ -87,88 +105,79 @@ private OOCEvictionManager() {}
87105 * Store a block in the OOC cache (serialize once)
88106 */
89107 public static synchronized void put (long streamId , int blockId , IndexedMatrixValue value ) {
90- try {
91- MatrixBlock mb = (MatrixBlock ) value .getValue ();
92- // Serialize to ByteBuffer
93- long size = estimateSerializedSize (mb );
94- ByteBuffer bbuff = new ByteBuffer (size );
95-
96- synchronized (_mQueue ) {
97- // Make space
98- evict (size );
108+ MatrixBlock mb = (MatrixBlock ) value .getValue ();
109+ long size = estimateSerializedSize (mb );
110+ String key = streamId + "_" + blockId ;
99111
100- // Add to cache
101- _mQueue .addLast (streamId + "_" + blockId , bbuff );
102- _size += size ;
112+ synchronized (lock ) {
113+ MatrixBlock old = _cache .remove (key );
114+ if (old != null ) {
115+ _evictDeque .remove (key );
116+ _size -= estimateSerializedSize (old );
103117 }
104118
105- // Serialize outside lock
106- _fClean .serializeData (bbuff , mb );
107- }
108- catch (Exception e ) {
109- throw new DMLRuntimeException (e );
119+ try {
120+ evict (size );
121+ } catch (IOException e ) {
122+ throw new DMLRuntimeException (e );
123+ }
124+
125+ _cache .put (key , mb );
126+ _evictDeque .addLast (key ); // add to end for FIFO/LRU
127+ _size += size ;
110128 }
111129 }
112130
113131 /**
114132 * Get a block from the OOC cache (deserialize on read)
115133 */
116134 public static synchronized IndexedMatrixValue get (long streamId , int blockId ) {
117- ByteBuffer bbuff = null ;
118135 String key = streamId + "_" + blockId ;
136+ MatrixBlock mb = (MatrixBlock ) _cache .get (key );
119137
120- try {
121- synchronized (_mQueue ) {
122- bbuff = _mQueue .get (key );
123-
124- // LRU: move to end
125- if (_policy == RPolicy .LRU && bbuff != null ) {
126- _mQueue .remove (key );
127- _mQueue .addLast (key , bbuff );
128- }
138+ synchronized (lock ) {
139+ if (mb != null && _policy == RPolicy .LRU ) {
140+ _evictDeque .remove (key );
141+ _evictDeque .addLast (key );
129142 }
143+ }
130144
131- if (bbuff != null ) {
132- // Cache hit: deserialize from ByteBuffer
133- bbuff .checkSerialized ();
134- MatrixBlock mb = (MatrixBlock ) bbuff .deserializeBlock ();
135-
136- MatrixIndexes ix = new MatrixIndexes (blockId + 1 , 1 );
137- return new IndexedMatrixValue (ix , mb );
138- } else {
139- // Cache miss: load from disk
145+ if (mb != null ) {
146+ MatrixIndexes ix = new MatrixIndexes (blockId + 1 , 1 );
147+ return new IndexedMatrixValue (ix , mb );
148+ } else {
149+ try {
140150 return loadFromDisk (streamId , blockId );
151+ } catch (IOException e ) {
152+ throw new DMLRuntimeException (e );
141153 }
142154 }
143- catch (IOException e ) {
144- throw new DMLRuntimeException (e );
145- }
155+
146156 }
147157
148158 /**
149159 * Evict ByteBuffers to disk
150160 */
151161 private static void evict (long requiredSize ) throws IOException {
152- while (_size + requiredSize > _limit && !_mQueue .isEmpty ()) {
162+ while (_size + requiredSize > _limit && !_evictDeque .isEmpty ()) {
153163 System .out .println ("_size + requiredSize: " + _size +" + " + requiredSize + "; _limit: " + _limit );
154- Entry <String , ByteBuffer > entry = _mQueue .removeFirst ();
155- String key = entry .getKey ();
156- ByteBuffer bbuff = entry .getValue ();
164+ String oldKey = _evictDeque .removeLast ();
165+ MatrixBlock mbToEvict = (MatrixBlock ) _cache .remove (oldKey );
157166
158- if (bbuff != null ) {
159- // Wait for serialization
160- bbuff .checkSerialized ();
167+ if (mbToEvict != null ) {
161168
162169 // Spill to disk
163- String filename = _spillDir + "/" + key ;
170+ String filename = _spillDir + "/" + oldKey ;
164171 File spillDirFile = new File (_spillDir );
165172 if (!spillDirFile .exists ()) {
166173 spillDirFile .mkdirs ();
167174 }
175+
176+ LocalFileUtils .writeMatrixBlockToLocal (filename , mbToEvict );
168177 System .out .println ("Evicting directory: " + filename );
169- bbuff . evictBuffer ( filename );
170- bbuff . freeMemory ( );
171- _size -= bbuff . getSize () ;
178+
179+ long freedSize = estimateSerializedSize ( mbToEvict );
180+ _size -= freedSize ;
172181 }
173182 }
174183 }
0 commit comments