Skip to content

Commit 8c667b5

Browse files
j143mboehm7
authored andcommitted
[SYSTEMDS-3930] Basic OOC eviction of intermediate streams
Closes #2343.
1 parent d38e56c commit 8c667b5

File tree

2 files changed

+237
-12
lines changed

2 files changed

+237
-12
lines changed
Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,211 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.sysds.runtime.instructions.ooc;
21+
22+
import org.apache.sysds.runtime.DMLRuntimeException;
23+
import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue;
24+
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
25+
import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
26+
import org.apache.sysds.runtime.util.LocalFileUtils;
27+
28+
import java.io.File;
29+
import java.io.IOException;
30+
import java.util.ArrayDeque;
31+
import java.util.Deque;
32+
import java.util.HashMap;
33+
import java.util.Map;
34+
35+
/**
36+
* Eviction Manager for the Out-Of-Core stream cache
37+
* This is the base implementation for LRU, FIFO
38+
*
39+
* Design choice 1: Pure JVM-memory cache
40+
* What: Store MatrixBlock objects in a synchronized in-memory cache
41+
* (Map + Deque for LRU/FIFO). Spill to disk by serializing MatrixBlock
42+
* only when evicting.
43+
* Pros: Simple to implement; no off-heap management; easy to debug;
44+
* no serialization race since you serialize only when evicting;
45+
* fast cache hits (direct object access).
46+
* Cons: Heap usage counted roughly via serialized-size estimate — actual
47+
* JVM object overhead not accounted; risk of GC pressure and OOM if
48+
* estimates are off or if many small objects cause fragmentation;
49+
* eviction may be more expensive (serialize on eviction).
50+
* <p>
51+
* Design choice 2:
52+
* <p>
53+
* This manager runtime memory management by caching serialized
54+
* ByteBuffers and spilling them to disk when needed.
55+
* <p>
56+
* * core function: Caches ByteBuffers (off-heap/direct) and
57+
* spills them to disk
58+
* * Eviction: Evicts a ByteBuffer by writing its contents to a file
59+
* * Granularity: Evicts one IndexedMatrixValue block at a time
60+
* * Data replay: get() will always return the data either from memory or
61+
* by falling back to the disk
62+
* * Memory: Since the datablocks are off-heap (in ByteBuffer) or disk,
63+
* there won't be OOM.
64+
*
65+
* Pros: Avoids heap OOM by keeping large data off-heap; predictable
66+
* memory usage; good for very large blocks.
67+
* Cons: More complex synchronization; need robust off-heap allocator/free;
68+
* must ensure serialization finishes before adding to queue or make evict
69+
* wait on serialization; careful with native memory leaks.
70+
*/
71+
public class OOCEvictionManager {
72+
73+
// Configuration: OOC buffer limit as percentage of heap
74+
private static final double OOC_BUFFER_PERCENTAGE = 0.15; // 15% of heap
75+
76+
// Memory limit for ByteBuffers
77+
private static long _limit;
78+
private static long _size;
79+
80+
// Cache structures: map key -> MatrixBlock and eviction deque (head=oldest block)
81+
private static final Map<String, IndexedMatrixValue> _cache = new HashMap<>();
82+
private static final Deque<String> _evictDeque = new ArrayDeque<>();
83+
84+
// Single lock for synchronization
85+
private static final Object lock = new Object();
86+
87+
// Spill directory for evicted blocks
88+
private static String _spillDir;
89+
90+
public enum RPolicy {
91+
FIFO, LRU
92+
}
93+
private static RPolicy _policy = RPolicy.FIFO;
94+
95+
private OOCEvictionManager() {}
96+
97+
static {
98+
_limit = (long)(Runtime.getRuntime().maxMemory() * OOC_BUFFER_PERCENTAGE * 0.01); // e.g., 20% of heap
99+
_size = 0;
100+
_spillDir = LocalFileUtils.getUniqueWorkingDir("ooc_stream");
101+
LocalFileUtils.createLocalFileIfNotExist(_spillDir);
102+
}
103+
104+
/**
105+
* Store a block in the OOC cache (serialize once)
106+
*/
107+
public static synchronized void put(long streamId, int blockId, IndexedMatrixValue value) {
108+
MatrixBlock mb = (MatrixBlock) value.getValue();
109+
long size = estimateSerializedSize(mb);
110+
String key = streamId + "_" + blockId;
111+
112+
synchronized (lock) {
113+
IndexedMatrixValue old = _cache.remove(key); // remove old value
114+
if (old != null) {
115+
_evictDeque.remove(key);
116+
_size -= estimateSerializedSize((MatrixBlock) old.getValue());
117+
}
118+
119+
try {
120+
evict(size);
121+
} catch (IOException e) {
122+
throw new DMLRuntimeException(e);
123+
}
124+
125+
_cache.put(key, value); // put new value
126+
_evictDeque.addLast(key); // add to end for FIFO/LRU
127+
_size += size;
128+
}
129+
}
130+
131+
/**
132+
* Get a block from the OOC cache (deserialize on read)
133+
*/
134+
public static synchronized IndexedMatrixValue get(long streamId, int blockId) {
135+
136+
String key = streamId + "_" + blockId;
137+
IndexedMatrixValue imv = _cache.get(key);
138+
139+
synchronized (lock) {
140+
if (imv != null && _policy == RPolicy.LRU) {
141+
_evictDeque.remove(key);
142+
_evictDeque.addLast(key);
143+
}
144+
}
145+
146+
if (imv != null) {
147+
return imv;
148+
} else {
149+
try {
150+
return loadFromDisk(streamId, blockId);
151+
} catch (IOException e) {
152+
throw new DMLRuntimeException(e);
153+
}
154+
}
155+
156+
}
157+
158+
/**
159+
* Evict ByteBuffers to disk
160+
*/
161+
private static void evict(long requiredSize) throws IOException {
162+
while(_size + requiredSize > _limit && !_evictDeque.isEmpty()) {
163+
System.out.println("_size + requiredSize: " + _size +" + "+ requiredSize + "; _limit: " + _limit);
164+
String oldKey = _evictDeque.removeLast();
165+
IndexedMatrixValue toEvict = _cache.remove(oldKey);
166+
167+
if (toEvict == null) { continue;}
168+
MatrixBlock mbToEvict = (MatrixBlock) toEvict.getValue();
169+
170+
// Spill to disk
171+
String filename = _spillDir + "/" + oldKey;
172+
File spillDirFile = new File(_spillDir);
173+
if (!spillDirFile.exists()) {
174+
spillDirFile.mkdirs();
175+
}
176+
177+
LocalFileUtils.writeMatrixBlockToLocal(filename, mbToEvict);
178+
179+
long freedSize = estimateSerializedSize(mbToEvict);
180+
_size -= freedSize;
181+
182+
}
183+
}
184+
185+
/**
186+
* Load block from spill file
187+
*/
188+
private static IndexedMatrixValue loadFromDisk(long streamId, int blockId) throws IOException {
189+
String filename = _spillDir + "/" + streamId + "_" + blockId;
190+
191+
// check if file exists
192+
if (!LocalFileUtils.isExisting(filename)) {
193+
throw new IOException("File " + filename + " does not exist");
194+
}
195+
196+
// Read from disk
197+
MatrixBlock mb = LocalFileUtils.readMatrixBlockFromLocal(filename);
198+
199+
MatrixIndexes ix = new MatrixIndexes(blockId + 1, 1);
200+
201+
// Put back in cache (may trigger eviction)
202+
// get() operation should not modify cache
203+
// put(streamId, blockId, new IndexedMatrixValue(ix, mb));
204+
205+
return new IndexedMatrixValue(ix, mb);
206+
}
207+
208+
private static long estimateSerializedSize(MatrixBlock mb) {
209+
return mb.getExactSerializedSize();
210+
}
211+
}

src/main/java/org/apache/sysds/runtime/instructions/ooc/ResettableStream.java

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,38 +20,47 @@
2020
package org.apache.sysds.runtime.instructions.ooc;
2121

2222
import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue;
23+
import org.apache.sysds.runtime.controlprogram.parfor.util.IDSequence;
2324
import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue;
2425

25-
import java.util.ArrayList;
2626

2727
/**
2828
* A wrapper around LocalTaskQueue to consume the source stream and reset to
2929
* consume again for other operators.
30+
* <p>
31+
* Uses OOCEvictionManager for out-of-core caching.
3032
*
3133
*/
3234
public class ResettableStream extends LocalTaskQueue<IndexedMatrixValue> {
3335

3436
// original live stream
3537
private final LocalTaskQueue<IndexedMatrixValue> _source;
3638

37-
// in-memory cache to store stream for re-play
38-
private final ArrayList<IndexedMatrixValue> _cache;
39+
private static final IDSequence _streamSeq = new IDSequence();
40+
// stream identifier
41+
private final long _streamId;
42+
43+
// block counter
44+
private int _numBlocks = 0;
45+
3946

4047
// state flags
4148
private boolean _cacheInProgress = true; // caching in progress, in the first pass.
4249
private int _replayPosition = 0; // slider position in the stream
4350

4451
public ResettableStream(LocalTaskQueue<IndexedMatrixValue> source) {
52+
this(source, _streamSeq.getNextID());
53+
}
54+
public ResettableStream(LocalTaskQueue<IndexedMatrixValue> source, long streamId) {
4555
_source = source;
46-
_cache = new ArrayList<>();
56+
_streamId = streamId;
4757
}
4858

4959
/**
5060
* Dequeues a task. If it is the first, it reads from the disk and stores in the cache.
5161
* For subsequent passes it reads from the memory.
5262
*
5363
* @return The next matrix value in the stream, or NO_MORE_TASKS
54-
* @throws InterruptedException
5564
*/
5665
@Override
5766
public synchronized IndexedMatrixValue dequeueTask()
@@ -60,18 +69,23 @@ public synchronized IndexedMatrixValue dequeueTask()
6069
// First pass: Read value from the source and cache it, and return.
6170
IndexedMatrixValue task = _source.dequeueTask();
6271
if (task != NO_MORE_TASKS) {
63-
_cache.add(new IndexedMatrixValue(task));
72+
73+
OOCEvictionManager.put(_streamId, _numBlocks, task);
74+
_numBlocks++;
75+
76+
return task;
6477
} else {
6578
_cacheInProgress = false; // caching is complete
6679
_source.closeInput(); // close source stream
80+
81+
// Notify all the waiting consumers waiting for cache to fill with this stream
82+
notifyAll();
83+
return (IndexedMatrixValue) NO_MORE_TASKS;
6784
}
68-
notifyAll(); // Notify all the waiting consumers waiting for cache to fill with this stream
69-
return task;
7085
} else {
71-
// Replay pass: read directly from in-memory cache
72-
if (_replayPosition < _cache.size()) {
73-
// Return a copy to ensure consumer won't modify the cache
74-
return new IndexedMatrixValue(_cache.get(_replayPosition++));
86+
// Replay pass: read from the buffer
87+
if (_replayPosition < _numBlocks) {
88+
return OOCEvictionManager.get(_streamId, _replayPosition++);
7589
} else {
7690
return (IndexedMatrixValue) NO_MORE_TASKS;
7791
}

0 commit comments

Comments
 (0)