Skip to content

Commit 6f3cdb3

Browse files
janniklindemboehm7
authored andcommitted
[SYSTEMDS-3932] CSV reader for out-of-core streams
Closes #2352.
1 parent cd4c828 commit 6f3cdb3

File tree

7 files changed

+558
-58
lines changed

7 files changed

+558
-58
lines changed

src/main/java/org/apache/sysds/lops/CSVReBlock.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,8 @@ public CSVReBlock(Lop input, int blen, DataType dt, ValueType vt, ExecType et)
4444

4545
_blocksize = blen;
4646

47-
if(et == ExecType.SPARK) {
48-
lps.setProperties( inputs, ExecType.SPARK);
47+
if(et == ExecType.SPARK || et == ExecType.OOC) {
48+
lps.setProperties( inputs, et );
4949
}
5050
else {
5151
throw new LopsException("Incorrect execution type for CSVReblock:" + et);

src/main/java/org/apache/sysds/runtime/instructions/OOCInstructionParser.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.sysds.runtime.DMLRuntimeException;
2626
import org.apache.sysds.runtime.instructions.ooc.AggregateUnaryOOCInstruction;
2727
import org.apache.sysds.runtime.instructions.ooc.BinaryOOCInstruction;
28+
import org.apache.sysds.runtime.instructions.ooc.CSVReblockOOCInstruction;
2829
import org.apache.sysds.runtime.instructions.ooc.CentralMomentOOCInstruction;
2930
import org.apache.sysds.runtime.instructions.ooc.CtableOOCInstruction;
3031
import org.apache.sysds.runtime.instructions.ooc.OOCInstruction;
@@ -56,6 +57,8 @@ public static OOCInstruction parseSingleInstruction(InstructionType ooctype, Str
5657
switch(ooctype) {
5758
case Reblock:
5859
return ReblockOOCInstruction.parseInstruction(str);
60+
case CSVReblock:
61+
return CSVReblockOOCInstruction.parseInstruction(str);
5962
case AggregateUnary:
6063
return AggregateUnaryOOCInstruction.parseInstruction(str);
6164
case Unary:
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.sysds.runtime.instructions.ooc;
21+
22+
import org.apache.sysds.common.Opcodes;
23+
import org.apache.sysds.runtime.DMLRuntimeException;
24+
import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
25+
import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
26+
import org.apache.sysds.runtime.instructions.InstructionUtils;
27+
import org.apache.sysds.runtime.instructions.cp.CPOperand;
28+
import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue;
29+
import org.apache.sysds.runtime.io.FileFormatProperties;
30+
import org.apache.sysds.runtime.io.FileFormatPropertiesCSV;
31+
import org.apache.sysds.runtime.io.ReaderTextCSVParallel;
32+
import org.apache.sysds.runtime.matrix.operators.Operator;
33+
import org.apache.sysds.runtime.meta.DataCharacteristics;
34+
35+
public class CSVReblockOOCInstruction extends ComputationOOCInstruction {
36+
private final int blen;
37+
38+
private CSVReblockOOCInstruction(Operator op, CPOperand in, CPOperand out, int blocklength, String opcode,
39+
String instr) {
40+
super(OOCType.Reblock, op, in, out, opcode, instr);
41+
blen = blocklength;
42+
}
43+
44+
public static CSVReblockOOCInstruction parseInstruction(String str) {
45+
String[] parts = InstructionUtils.getInstructionPartsWithValueType(str);
46+
String opcode = parts[0];
47+
if(!opcode.equals(Opcodes.CSVRBLK.toString()))
48+
throw new DMLRuntimeException("Incorrect opcode for CSVReblockOOCInstruction:" + opcode);
49+
50+
CPOperand in = new CPOperand(parts[1]);
51+
CPOperand out = new CPOperand(parts[2]);
52+
int blen = Integer.parseInt(parts[3]);
53+
return new CSVReblockOOCInstruction(null, in, out, blen, opcode, str);
54+
}
55+
56+
@Override
57+
public void processInstruction(ExecutionContext ec) {
58+
MatrixObject min = ec.getMatrixObject(input1);
59+
DataCharacteristics mc = ec.getDataCharacteristics(input1.getName());
60+
DataCharacteristics mcOut = ec.getDataCharacteristics(output.getName());
61+
mcOut.set(mc.getRows(), mc.getCols(), blen, mc.getNonZeros());
62+
63+
OOCStream<IndexedMatrixValue> qOut = createWritableStream();
64+
addOutStream(qOut);
65+
66+
FileFormatProperties props = min.getFileFormatProperties();
67+
final FileFormatPropertiesCSV csvProps = props instanceof FileFormatPropertiesCSV ? (FileFormatPropertiesCSV) props
68+
: new FileFormatPropertiesCSV();
69+
70+
final ReaderTextCSVParallel reader = new ReaderTextCSVParallel(csvProps);
71+
final String fileName = min.getFileName();
72+
final long rows = mc.getRows();
73+
final long cols = mc.getCols();
74+
final long nnz = mc.getNonZeros();
75+
76+
submitOOCTask(() -> {
77+
try {
78+
reader.readMatrixAsStream(qOut, fileName, rows, cols, blen, nnz);
79+
}
80+
catch(Exception ex) {
81+
throw (ex instanceof DMLRuntimeException) ? (DMLRuntimeException) ex : new DMLRuntimeException(ex);
82+
}
83+
}, qOut);
84+
85+
MatrixObject mout = ec.getMatrixObject(output);
86+
mout.setStreamHandle(qOut);
87+
}
88+
}

src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -135,12 +135,12 @@ private static class spillLocation {
135135

136136
private static class partitionFile {
137137
final String filePath;
138-
final long streamId;
138+
//final long streamId;
139139

140140

141141
private partitionFile(String filePath, long streamId) {
142142
this.filePath = filePath;
143-
this.streamId = streamId;
143+
//this.streamId = streamId;
144144
}
145145
}
146146

@@ -152,13 +152,13 @@ private static class BlockEntry {
152152
private BlockState state = BlockState.HOT;
153153
private IndexedMatrixValue value;
154154
private final long streamId;
155-
private final int blockId;
155+
//private final int blockId;
156156
private final long size;
157157

158158
BlockEntry(IndexedMatrixValue value, long streamId, int blockId, long size) {
159159
this.value = value;
160160
this.streamId = streamId;
161-
this.blockId = blockId;
161+
//this.blockId = blockId;
162162
this.size = size;
163163
}
164164
}
@@ -437,6 +437,7 @@ private static long estimateSerializedSize(MatrixBlock mb) {
437437
return mb.getExactSerializedSize();
438438
}
439439

440+
@SuppressWarnings("unused")
440441
private static Map.Entry<String, BlockEntry> removeFirstFromCache() {
441442
synchronized (_cacheLock) {
442443

0 commit comments

Comments
 (0)