Skip to content

Commit b0ef875

Browse files
janniklindemboehm7
authored andcommitted
[SYSTEMDS-3931] Out-of-core right indexing operations
Closes #2351.
1 parent f265845 commit b0ef875

File tree

11 files changed

+1047
-25
lines changed

11 files changed

+1047
-25
lines changed

src/main/java/org/apache/sysds/runtime/instructions/OOCInstructionParser.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.sysds.runtime.instructions.ooc.CSVReblockOOCInstruction;
2929
import org.apache.sysds.runtime.instructions.ooc.CentralMomentOOCInstruction;
3030
import org.apache.sysds.runtime.instructions.ooc.CtableOOCInstruction;
31+
import org.apache.sysds.runtime.instructions.ooc.IndexingOOCInstruction;
3132
import org.apache.sysds.runtime.instructions.ooc.OOCInstruction;
3233
import org.apache.sysds.runtime.instructions.ooc.ParameterizedBuiltinOOCInstruction;
3334
import org.apache.sysds.runtime.instructions.ooc.ReblockOOCInstruction;
@@ -75,12 +76,14 @@ public static OOCInstruction parseSingleInstruction(InstructionType ooctype, Str
7576
return TransposeOOCInstruction.parseInstruction(str);
7677
case Tee:
7778
return TeeOOCInstruction.parseInstruction(str);
78-
case CentralMoment:
79-
return CentralMomentOOCInstruction.parseInstruction(str);
79+
case CentralMoment:
80+
return CentralMomentOOCInstruction.parseInstruction(str);
8081
case Ctable:
8182
return CtableOOCInstruction.parseInstruction(str);
8283
case ParameterizedBuiltin:
8384
return ParameterizedBuiltinOOCInstruction.parseInstruction(str);
85+
case MatrixIndexing:
86+
return IndexingOOCInstruction.parseInstruction(str);
8487

8588
default:
8689
throw new DMLRuntimeException("Invalid OOC Instruction Type: " + ooctype);

src/main/java/org/apache/sysds/runtime/instructions/ooc/CachingStream.java

Lines changed: 23 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -80,36 +80,37 @@ public CachingStream(OOCStream<IndexedMatrixValue> source, long streamId) {
8080
});
8181
}
8282

83-
private boolean fetchFromStream() throws InterruptedException {
84-
synchronized (this) {
85-
if(!_cacheInProgress)
86-
throw new DMLRuntimeException("Stream is closed");
87-
}
83+
private synchronized boolean fetchFromStream() throws InterruptedException {
84+
if(!_cacheInProgress)
85+
throw new DMLRuntimeException("Stream is closed");
8886

8987
IndexedMatrixValue task = _source.dequeue();
9088

91-
synchronized (this) {
92-
if(task != LocalTaskQueue.NO_MORE_TASKS) {
93-
OOCEvictionManager.put(_streamId, _numBlocks, task);
94-
if (_index != null)
95-
_index.put(task.getIndexes(), _numBlocks);
96-
_numBlocks++;
97-
notifyAll();
98-
return false;
99-
}
100-
else {
101-
_cacheInProgress = false; // caching is complete
102-
notifyAll();
103-
return true;
104-
}
89+
if(task != LocalTaskQueue.NO_MORE_TASKS) {
90+
OOCEvictionManager.put(_streamId, _numBlocks, task);
91+
if (_index != null)
92+
_index.put(task.getIndexes(), _numBlocks);
93+
_numBlocks++;
94+
notifyAll();
95+
return false;
96+
}
97+
else {
98+
_cacheInProgress = false; // caching is complete
99+
notifyAll();
100+
return true;
105101
}
106102
}
107103

108104
public synchronized IndexedMatrixValue get(int idx) throws InterruptedException {
109105
while (true) {
110-
if (idx < _numBlocks)
111-
return OOCEvictionManager.get(_streamId, idx);
112-
else if (!_cacheInProgress)
106+
if (idx < _numBlocks) {
107+
IndexedMatrixValue out = OOCEvictionManager.get(_streamId, idx);
108+
109+
if (_index != null) // Ensure index is up to date
110+
_index.putIfAbsent(out.getIndexes(), idx);
111+
112+
return out;
113+
} else if (!_cacheInProgress)
113114
return (IndexedMatrixValue)LocalTaskQueue.NO_MORE_TASKS;
114115

115116
wait();

0 commit comments

Comments
 (0)