1+ /*
2+ * Licensed to the Apache Software Foundation (ASF) under one
3+ * or more contributor license agreements. See the NOTICE file
4+ * distributed with this work for additional information
5+ * regarding copyright ownership. The ASF licenses this file
6+ * to you under the Apache License, Version 2.0 (the
7+ * "License"); you may not use this file except in compliance
8+ * with the License. You may obtain a copy of the License at
9+ *
10+ * http://www.apache.org/licenses/LICENSE-2.0
11+ *
12+ * Unless required by applicable law or agreed to in writing,
13+ * software distributed under the License is distributed on an
14+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+ * KIND, either express or implied. See the License for the
16+ * specific language governing permissions and limitations
17+ * under the License.
18+ */
19+
20+ package org .apache .sysds .runtime .controlprogram .caching ;
21+
22+ import org .apache .sysds .runtime .instructions .spark .data .IndexedMatrixValue ;
23+ import org .apache .sysds .runtime .matrix .data .MatrixBlock ;
24+ import org .apache .sysds .runtime .matrix .data .MatrixIndexes ;
25+ import org .apache .sysds .runtime .util .LocalFileUtils ;
26+
27+ import java .io .IOException ;
28+ import java .util .Map .Entry ;
29+
30+ /**
31+ * Eviction Manager for the Out-Of-Core stream cache
32+ * This is the base implementation for LRU, FIFO
33+ * <p>
34+ * This manager runtime memory management by caching serialized
35+ * ByteBuffers and spilling them to disk when needed.
36+ * <p>
37+ * * core function: Caches ByteBuffers (off-heap/direct) and
38+ * spills them to disk
39+ * * Eviction: Evicts a ByteBuffer by writing its contents to a file
40+ * * Granularity: Evicts one IndexedMatrixValue block at a time
41+ * * Data replay: get() will always return the data either from memory or
42+ * by falling back to the disk
43+ * * Memory: Since the datablocks are off-heap (in ByteBuffer) or disk,
44+ * there won't be OOM.
45+ */
46+ public class OOCEvictionManager {
47+ private static OOCEvictionManager _instance ;
48+
49+ // Configuration: OOC buffer limit as percentage of heap
50+ private static final double OOC_BUFFER_PERCENTAGE = 0.15 ; // 15% of heap
51+ private static final int MIN_PREFETCH_DEPTH = 1 ;
52+ private static final int MAX_PREFETCH_DEPTH = 5 ;
53+
54+ // Memory limit for ByteBuffers
55+ private long _limit ;
56+ private long _size ;
57+
58+ // Cache of ByteBuffers (off-heap serialized blocks)
59+ private CacheEvictionQueue _mQueue ;
60+
61+ // I/O service for async spill/load
62+ private CacheMaintenanceService _fClean ;
63+
64+ // Spill directory for evicted blocks
65+ private String _spillDir ;
66+
67+ public enum RPolicy {
68+ FIFO , LRU
69+ }
70+ private RPolicy _policy = RPolicy .FIFO ;
71+
72+ private OOCEvictionManager () {
73+ _mQueue = new CacheEvictionQueue ();
74+ _fClean = new CacheMaintenanceService ();
75+ _limit = (long )(Runtime .getRuntime ().maxMemory () * OOC_BUFFER_PERCENTAGE ); // e.g., 20% of heap
76+ _size = 0 ;
77+ _spillDir = LocalFileUtils .getUniqueWorkingDir ("ooc_stream" );
78+ LocalFileUtils .createLocalFileIfNotExist (_spillDir );
79+ }
80+
81+ public static synchronized OOCEvictionManager getInstance () {
82+ if (_instance == null )
83+ _instance = new OOCEvictionManager ();
84+ return _instance ;
85+ }
86+
87+ /**
88+ * Store a block in the OOC cache (serialize once)
89+ */
90+ public void put (String key , IndexedMatrixValue value ) throws IOException {
91+ MatrixBlock mb = (MatrixBlock ) value .getValue ();
92+ // Serialize to ByteBuffer
93+ long size = estimateSerializedSize (mb );
94+ ByteBuffer bbuff = new ByteBuffer (size );
95+
96+ synchronized (_mQueue ) {
97+ // Make space
98+ evict (size );
99+
100+ // Add to cache
101+ _mQueue .addLast (key , bbuff );
102+ _size += size ;
103+ }
104+
105+ // Serialize outside lock
106+ _fClean .serializeData (bbuff , mb );
107+ }
108+
109+ /**
110+ * Get a block from the OOC cache (deserialize on read)
111+ */
112+ public IndexedMatrixValue get (String key ) throws IOException {
113+ ByteBuffer bbuff = null ;
114+
115+ synchronized (_mQueue ) {
116+ bbuff = _mQueue .get (key );
117+
118+ // LRU: move to end
119+ if (_policy == RPolicy .LRU && bbuff != null ) {
120+ _mQueue .remove (key );
121+ _mQueue .addLast (key , bbuff );
122+ }
123+ }
124+
125+ if (bbuff != null ) {
126+ // Cache hit: deserialize from ByteBuffer
127+ bbuff .checkSerialized ();
128+ MatrixBlock mb = (MatrixBlock ) bbuff .deserializeBlock ();
129+
130+ MatrixIndexes ix = parseIndexesFromKey (key );
131+ return new IndexedMatrixValue (ix , mb );
132+ } else {
133+ // Cache miss: load from disk
134+ return loadFromDisk (key );
135+ }
136+ }
137+
138+ /**
139+ * Evict ByteBuffers to disk
140+ */
141+ private void evict (long requiredSize ) throws IOException {
142+ while (_size + requiredSize > _limit && !_mQueue .isEmpty ()) {
143+ Entry <String , ByteBuffer > entry = _mQueue .removeFirst ();
144+ String key = entry .getKey ();
145+ ByteBuffer bbuff = entry .getValue ();
146+
147+ if (bbuff != null ) {
148+ // Wait for serialization
149+ bbuff .checkSerialized ();
150+
151+ // Spill to disk
152+ String filename = _spillDir + "/" + key ;
153+ bbuff .evictBuffer (filename );
154+ bbuff .freeMemory ();
155+ _size -= bbuff .getSize ();
156+ }
157+ }
158+ }
159+
160+ /**
161+ * Load block from spill file
162+ */
163+ private IndexedMatrixValue loadFromDisk (String key ) throws IOException {
164+ String filename = _spillDir + "/" + key ;
165+
166+ // check if file exists
167+ if (!LocalFileUtils .isExisting (filename )) {
168+ throw new IOException ("File " + filename + " does not exist" );
169+ }
170+
171+ // Read from disk
172+ MatrixBlock mb = LocalFileUtils .readMatrixBlockFromLocal (filename );
173+ MatrixIndexes ix = parseIndexesFromKey (key );
174+
175+ // Put back in cache (may trigger eviction)
176+ put (key , new IndexedMatrixValue (ix , mb ));
177+
178+ return new IndexedMatrixValue (ix , mb );
179+ }
180+
181+ private long estimateSerializedSize (MatrixBlock mb ) {
182+ return mb .getExactSerializedSize ();
183+ }
184+
185+ private MatrixIndexes parseIndexesFromKey (String key ) {
186+ // Key format: "streamId_blockId"
187+ // For now, use simple sequential block IDs
188+ String [] parts = key .split ("_" );
189+ long blockId = Long .parseLong (parts [parts .length - 1 ]);
190+ // Assume row-major ordering with block size
191+ return new MatrixIndexes (blockId + 1 , 1 );
192+ }
193+ }
0 commit comments