Skip to content

Commit 67ff6be

Browse files
committed
[SYSTEMDS-3930] Cleanup out-of-core backend and buffer pool
* Fix order-dependent restore of indexed matrix blocks (keep indexes) * Fix potential race condition in unguarded cache modifications * Fix warnings and unnecessary imports Closes #2348.
1 parent 8c667b5 commit 67ff6be

File tree

9 files changed

+86
-98
lines changed

9 files changed

+86
-98
lines changed

src/main/java/org/apache/sysds/runtime/instructions/ooc/AggregateUnaryOOCInstruction.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,8 @@
3636
import org.apache.sysds.runtime.matrix.operators.AggregateUnaryOperator;
3737
import org.apache.sysds.runtime.matrix.operators.Operator;
3838
import org.apache.sysds.runtime.meta.DataCharacteristics;
39-
import org.apache.sysds.runtime.util.CommonThreadPool;
4039

4140
import java.util.HashMap;
42-
import java.util.concurrent.ExecutorService;
4341

4442
public class AggregateUnaryOOCInstruction extends ComputationOOCInstruction {
4543
private AggregateOperator _aop = null;

src/main/java/org/apache/sysds/runtime/instructions/ooc/BinaryOOCInstruction.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@
1919

2020
package org.apache.sysds.runtime.instructions.ooc;
2121

22-
import java.util.concurrent.ExecutorService;
23-
2422
import org.apache.sysds.common.Types.DataType;
2523
import org.apache.sysds.runtime.DMLRuntimeException;
2624
import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
@@ -33,7 +31,6 @@
3331
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
3432
import org.apache.sysds.runtime.matrix.operators.Operator;
3533
import org.apache.sysds.runtime.matrix.operators.ScalarOperator;
36-
import org.apache.sysds.runtime.util.CommonThreadPool;
3734

3835
public class BinaryOOCInstruction extends ComputationOOCInstruction {
3936

src/main/java/org/apache/sysds/runtime/instructions/ooc/MatrixVectorBinaryOOCInstruction.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
package org.apache.sysds.runtime.instructions.ooc;
2121

2222
import java.util.HashMap;
23-
import java.util.concurrent.ExecutorService;
2423

2524
import org.apache.sysds.common.Opcodes;
2625
import org.apache.sysds.conf.ConfigurationManager;
@@ -39,7 +38,6 @@
3938
import org.apache.sysds.runtime.matrix.operators.AggregateOperator;
4039
import org.apache.sysds.runtime.matrix.operators.BinaryOperator;
4140
import org.apache.sysds.runtime.matrix.operators.Operator;
42-
import org.apache.sysds.runtime.util.CommonThreadPool;
4341

4442
public class MatrixVectorBinaryOOCInstruction extends ComputationOOCInstruction {
4543

src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java

Lines changed: 77 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,12 @@
2222
import org.apache.sysds.runtime.DMLRuntimeException;
2323
import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue;
2424
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
25-
import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
2625
import org.apache.sysds.runtime.util.LocalFileUtils;
2726

2827
import java.io.File;
2928
import java.io.IOException;
30-
import java.util.ArrayDeque;
31-
import java.util.Deque;
32-
import java.util.HashMap;
29+
import java.util.Iterator;
30+
import java.util.LinkedHashMap;
3331
import java.util.Map;
3432

3533
/**
@@ -78,12 +76,8 @@ public class OOCEvictionManager {
7876
private static long _size;
7977

8078
// Cache structures: map key -> MatrixBlock and eviction deque (head=oldest block)
81-
private static final Map<String, IndexedMatrixValue> _cache = new HashMap<>();
82-
private static final Deque<String> _evictDeque = new ArrayDeque<>();
83-
84-
// Single lock for synchronization
85-
private static final Object lock = new Object();
86-
79+
private static LinkedHashMap<String, IndexedMatrixValue> _cache = new LinkedHashMap<>();
80+
8781
// Spill directory for evicted blocks
8882
private static String _spillDir;
8983

@@ -92,10 +86,8 @@ public enum RPolicy {
9286
}
9387
private static RPolicy _policy = RPolicy.FIFO;
9488

95-
private OOCEvictionManager() {}
96-
9789
static {
98-
_limit = (long)(Runtime.getRuntime().maxMemory() * OOC_BUFFER_PERCENTAGE * 0.01); // e.g., 20% of heap
90+
_limit = (long)(Runtime.getRuntime().maxMemory() * OOC_BUFFER_PERCENTAGE); // e.g., 20% of heap
9991
_size = 0;
10092
_spillDir = LocalFileUtils.getUniqueWorkingDir("ooc_stream");
10193
LocalFileUtils.createLocalFileIfNotExist(_spillDir);
@@ -109,103 +101,106 @@ public static synchronized void put(long streamId, int blockId, IndexedMatrixVal
109101
long size = estimateSerializedSize(mb);
110102
String key = streamId + "_" + blockId;
111103

112-
synchronized (lock) {
113-
IndexedMatrixValue old = _cache.remove(key); // remove old value
114-
if (old != null) {
115-
_evictDeque.remove(key);
116-
_size -= estimateSerializedSize((MatrixBlock) old.getValue());
117-
}
118-
119-
try {
120-
evict(size);
121-
} catch (IOException e) {
122-
throw new DMLRuntimeException(e);
123-
}
124-
125-
_cache.put(key, value); // put new value
126-
_evictDeque.addLast(key); // add to end for FIFO/LRU
127-
_size += size;
104+
IndexedMatrixValue old = _cache.remove(key); // remove old value
105+
if (old != null) {
106+
_size -= estimateSerializedSize((MatrixBlock) old.getValue());
128107
}
108+
109+
//make room if needed
110+
evict(size);
111+
112+
_cache.put(key, value); // put new value last
113+
_size += size;
129114
}
130115

131116
/**
132117
* Get a block from the OOC cache (deserialize on read)
133118
*/
134119
public static synchronized IndexedMatrixValue get(long streamId, int blockId) {
135-
136120
String key = streamId + "_" + blockId;
137121
IndexedMatrixValue imv = _cache.get(key);
138122

139-
synchronized (lock) {
140-
if (imv != null && _policy == RPolicy.LRU) {
141-
_evictDeque.remove(key);
142-
_evictDeque.addLast(key);
143-
}
123+
if (imv != null && _policy == RPolicy.LRU) {
124+
_cache.remove(key);
125+
_cache.put(key, imv); //add last semantic
144126
}
145-
146-
if (imv != null) {
147-
return imv;
148-
} else {
149-
try {
150-
return loadFromDisk(streamId, blockId);
151-
} catch (IOException e) {
152-
throw new DMLRuntimeException(e);
153-
}
154-
}
155-
127+
128+
//restore if needed
129+
return (imv.getValue() != null) ? imv :
130+
loadFromDisk(streamId, blockId);
156131
}
157132

158133
/**
159134
* Evict ByteBuffers to disk
160135
*/
161-
private static void evict(long requiredSize) throws IOException {
162-
while(_size + requiredSize > _limit && !_evictDeque.isEmpty()) {
163-
System.out.println("_size + requiredSize: " + _size +" + "+ requiredSize + "; _limit: " + _limit);
164-
String oldKey = _evictDeque.removeLast();
165-
IndexedMatrixValue toEvict = _cache.remove(oldKey);
166-
167-
if (toEvict == null) { continue;}
168-
MatrixBlock mbToEvict = (MatrixBlock) toEvict.getValue();
169-
170-
// Spill to disk
171-
String filename = _spillDir + "/" + oldKey;
172-
File spillDirFile = new File(_spillDir);
173-
if (!spillDirFile.exists()) {
174-
spillDirFile.mkdirs();
136+
private static void evict(long requiredSize) {
137+
try {
138+
int pos = 0;
139+
while(_size + requiredSize > _limit && pos++ < _cache.size()) {
140+
//System.out.println("BUFFER: "+_size+"/"+_limit+" size="+_cache.size());
141+
Map.Entry<String,IndexedMatrixValue> tmp = removeFirstFromCache();
142+
if( tmp == null || tmp.getValue().getValue() == null ) {
143+
if( tmp != null )
144+
_cache.put(tmp.getKey(), tmp.getValue());
145+
continue;
146+
}
147+
148+
// Spill to disk
149+
String filename = _spillDir + "/" + tmp.getKey();
150+
File spillDirFile = new File(_spillDir);
151+
if (!spillDirFile.exists()) {
152+
spillDirFile.mkdirs();
153+
}
154+
LocalFileUtils.writeMatrixBlockToLocal(filename, (MatrixBlock)tmp.getValue().getValue());
155+
156+
// Evict from memory
157+
long freedSize = estimateSerializedSize((MatrixBlock)tmp.getValue().getValue());
158+
tmp.getValue().setValue(null);
159+
_cache.put(tmp.getKey(), tmp.getValue()); // add last semantic
160+
_size -= freedSize;
175161
}
176-
177-
LocalFileUtils.writeMatrixBlockToLocal(filename, mbToEvict);
178-
179-
long freedSize = estimateSerializedSize(mbToEvict);
180-
_size -= freedSize;
181-
162+
}
163+
catch(Exception ex) {
164+
throw new DMLRuntimeException(ex);
182165
}
183166
}
184167

185168
/**
186169
* Load block from spill file
187170
*/
188-
private static IndexedMatrixValue loadFromDisk(long streamId, int blockId) throws IOException {
189-
String filename = _spillDir + "/" + streamId + "_" + blockId;
171+
private static IndexedMatrixValue loadFromDisk(long streamId, int blockId) {
172+
String key = streamId + "_" + blockId;
173+
String filename = _spillDir + "/" + key;
190174

191-
// check if file exists
192-
if (!LocalFileUtils.isExisting(filename)) {
193-
throw new IOException("File " + filename + " does not exist");
175+
try {
176+
// check if file exists
177+
if (!LocalFileUtils.isExisting(filename)) {
178+
throw new IOException("File " + filename + " does not exist");
179+
}
180+
181+
// Read from disk and put into original indexed matrix value
182+
MatrixBlock mb = LocalFileUtils.readMatrixBlockFromLocal(filename);
183+
IndexedMatrixValue imv = _cache.get(key);
184+
imv.setValue(mb);
185+
return imv;
186+
}
187+
catch(Exception ex) {
188+
throw new DMLRuntimeException(ex);
194189
}
195-
196-
// Read from disk
197-
MatrixBlock mb = LocalFileUtils.readMatrixBlockFromLocal(filename);
198-
199-
MatrixIndexes ix = new MatrixIndexes(blockId + 1, 1);
200-
201-
// Put back in cache (may trigger eviction)
202-
// get() operation should not modify cache
203-
// put(streamId, blockId, new IndexedMatrixValue(ix, mb));
204-
205-
return new IndexedMatrixValue(ix, mb);
206190
}
207191

208192
private static long estimateSerializedSize(MatrixBlock mb) {
209193
return mb.getExactSerializedSize();
210194
}
195+
196+
private static Map.Entry<String, IndexedMatrixValue> removeFirstFromCache() {
197+
//move iterator to first entry
198+
Iterator<Map.Entry<String, IndexedMatrixValue>> iter = _cache.entrySet().iterator();
199+
Map.Entry<String, IndexedMatrixValue> entry = iter.next();
200+
201+
//remove current iterator entry
202+
iter.remove();
203+
204+
return entry;
205+
}
211206
}

src/main/java/org/apache/sysds/runtime/instructions/ooc/ReblockOOCInstruction.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@
1919

2020
package org.apache.sysds.runtime.instructions.ooc;
2121

22-
import java.util.concurrent.ExecutorService;
23-
2422
import org.apache.hadoop.fs.FileSystem;
2523
import org.apache.hadoop.fs.Path;
2624
import org.apache.hadoop.io.SequenceFile;
@@ -40,7 +38,6 @@
4038
import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
4139
import org.apache.sysds.runtime.matrix.operators.Operator;
4240
import org.apache.sysds.runtime.meta.DataCharacteristics;
43-
import org.apache.sysds.runtime.util.CommonThreadPool;
4441

4542
public class ReblockOOCInstruction extends ComputationOOCInstruction {
4643
private int blen;

src/main/java/org/apache/sysds/runtime/instructions/ooc/TransposeOOCInstruction.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,6 @@
3030
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
3131
import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
3232
import org.apache.sysds.runtime.matrix.operators.ReorgOperator;
33-
import org.apache.sysds.runtime.util.CommonThreadPool;
34-
35-
import java.util.concurrent.ExecutorService;
3633

3734
public class TransposeOOCInstruction extends ComputationOOCInstruction {
3835

src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,6 @@
2828
import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue;
2929
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
3030
import org.apache.sysds.runtime.matrix.operators.UnaryOperator;
31-
import org.apache.sysds.runtime.util.CommonThreadPool;
32-
33-
import java.util.concurrent.ExecutorService;
3431

3532
public class UnaryOOCInstruction extends ComputationOOCInstruction {
3633
private UnaryOperator _uop = null;

src/main/java/org/apache/sysds/runtime/instructions/spark/data/IndexedMatrixValue.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,10 @@ public MatrixIndexes getIndexes() {
6666
public MatrixValue getValue() {
6767
return _value;
6868
}
69+
70+
public void setValue(MatrixValue value) {
71+
_value = value;
72+
}
6973

7074
public void set(MatrixIndexes indexes2, MatrixValue block2) {
7175
_indexes.setIndexes(indexes2);

src/main/java/org/apache/sysds/runtime/matrix/data/MatrixIndexes.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,11 @@ public int hashCode() {
108108
public String toString() {
109109
return "("+_row+", "+_col+")";
110110
}
111+
112+
public MatrixIndexes fromString(String ix) {
113+
String[] parts = ix.substring(1, ix.length()-1).split(",");
114+
return new MatrixIndexes(Long.parseLong(parts[0]), Long.parseLong(parts[1].trim()));
115+
}
111116

112117
////////////////////////////////////////////////////
113118
// implementation of Writable read/write

0 commit comments

Comments
 (0)