Skip to content

Commit f7b230c

Browse files
committed
use file partition to work with multiple blocks in parts
1 parent 575351a commit f7b230c

File tree

3 files changed

+25
-22
lines changed

3 files changed

+25
-22
lines changed

src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,7 @@
2626

2727
import org.apache.commons.lang3.StringUtils;
2828
import org.apache.commons.lang3.tuple.Pair;
29-
import org.apache.hadoop.fs.Options;
3029
import org.apache.sysds.api.DMLScript;
31-
import org.apache.sysds.api.mlcontext.MatrixMetadata;
3230
import org.apache.sysds.common.Opcodes;
3331
import org.apache.sysds.common.Types.DataType;
3432
import org.apache.sysds.common.Types.FileFormat;
@@ -50,11 +48,22 @@
5048
import org.apache.sysds.runtime.instructions.Instruction;
5149
import org.apache.sysds.runtime.instructions.InstructionUtils;
5250
import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue;
53-
import org.apache.sysds.runtime.io.*;
51+
import org.apache.sysds.runtime.io.FileFormatProperties;
52+
import org.apache.sysds.runtime.io.FileFormatPropertiesCSV;
53+
import org.apache.sysds.runtime.io.FileFormatPropertiesHDF5;
54+
import org.apache.sysds.runtime.io.FileFormatPropertiesLIBSVM;
55+
import org.apache.sysds.runtime.io.ListReader;
56+
import org.apache.sysds.runtime.io.ListWriter;
57+
import org.apache.sysds.runtime.io.MatrixWriter;
58+
import org.apache.sysds.runtime.io.MatrixWriterFactory;
59+
import org.apache.sysds.runtime.io.WriterHDF5;
60+
import org.apache.sysds.runtime.io.WriterMatrixMarket;
61+
import org.apache.sysds.runtime.io.WriterTextCSV;
5462
import org.apache.sysds.runtime.lineage.LineageItem;
5563
import org.apache.sysds.runtime.lineage.LineageItemUtils;
5664
import org.apache.sysds.runtime.lineage.LineageTraceable;
5765
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
66+
import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
5867
import org.apache.sysds.runtime.meta.DataCharacteristics;
5968
import org.apache.sysds.runtime.meta.MatrixCharacteristics;
6069
import org.apache.sysds.runtime.meta.MetaData;
@@ -1061,31 +1070,25 @@ else if( getInput1().getDataType() == DataType.MATRIX ) {
10611070
LocalTaskQueue<IndexedMatrixValue> stream = mo.getStreamHandle();
10621071

10631072
if (stream != null) {
1064-
System.out.println("Write OOC instruction: " + getInput1().getName() + "to file name: " + fname);
10651073

10661074
try {
10671075
IndexedMatrixValue tmp = null;
10681076
MatrixWriter writer = MatrixWriterFactory.createMatrixWriter(fmt);
1069-
System.out.println("mo details: "+ mo.getDataCharacteristics());
1070-
10711077

10721078
while((tmp = stream.dequeueTask()) != LocalTaskQueue.NO_MORE_TASKS) {
10731079
MatrixBlock mb = (MatrixBlock)tmp.getValue();
1080+
MatrixIndexes mi = tmp.getIndexes();
10741081

1082+
// Construct a unique filename for each part-file inside the output directory
1083+
String partFilePath = fname + "/part-" + mi.getRowIndex() + "-" + mi.getColumnIndex();
10751084

1076-
// writer.writeMatrixToHDFS(tmp.getValue(), fname, FileFormat.HDF5, mo.getMetaData().getDataCharacteristics());
1077-
writer.writeMatrixToHDFS(mb, fname, mb.getNumRows(), mb.getNumColumns(), mo.getBlocksize(),mb.getNonZeros());
1078-
// writer.writeMatrixToHDFS((MatrixBlock)tmp.getValue(), fname, mc.getRows(), mc.getCols(),
1079-
// mc.getBlocksize(), mc.getNonZeros());
1080-
System.out.println(tmp);
1081-
1085+
writer.writeMatrixToHDFS(mb, partFilePath, mb.getNumRows(), mb.getNumColumns(), (int) mb.getLength() , mb.getNonZeros());
10821086
}
1083-
// IOUtilFunctions.closeSilently(writer);
1084-
HDFSTool.writeMetaDataFile(fname + ".mtd", mo.getValueType(),
1087+
HDFSTool.writeMetaDataFile(fname + "/.mtd", mo.getValueType(),
10851088
mo.getMetaData().getDataCharacteristics(), FileFormat.HDF5, _formatProperties);
10861089
}
10871090
catch(Exception ex) {
1088-
throw new DMLRuntimeException(ex);
1091+
throw new DMLRuntimeException("Failed to write OOC stream to " + fname, ex);
10891092
}
10901093
}
10911094
else {

src/test/java/org/apache/sysds/test/functions/ooc/UnaryTest.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public void setUp() {
5656
}
5757

5858
/**
59-
* Test the sum of scalar multiplication, "sum(X*7)", with OOC backend.
59+
* Test the unary operation, "ceil(X)", with OOC backend.
6060
*/
6161
@Test
6262
public void testUnary() {
@@ -77,7 +77,7 @@ public void testUnaryOperation(boolean rewrite)
7777
programArgs = new String[] {"-explain", "-stats", "-ooc",
7878
"-args", input(INPUT_NAME), output(OUTPUT_NAME)};
7979

80-
int rows = 1000, cols = 1000;
80+
int rows = 5000, cols = 5000;
8181
MatrixBlock mb = MatrixBlock.randOperations(rows, cols, 1.0, -1, 1, "uniform", 7);
8282
MatrixWriter writer = MatrixWriterFactory.createMatrixWriter(FileFormat.BINARY);
8383
writer.writeMatrixToHDFS(mb, input(INPUT_NAME), rows, cols, 1000, rows*cols);
@@ -94,8 +94,8 @@ public void testUnaryOperation(boolean rewrite)
9494
Double dmlResult = dmlfile.get(new MatrixValue.CellIndex(i+1 , j+1 )); // Note: MM format is 1-based index
9595
double actualValue = (dmlResult == null) ? 0.0 : dmlResult;
9696
expected = Math.abs(Math.ceil(mb.get(i, j)));
97-
Assert.assertEquals(expected, actualValue, 1e-10);
9897
System.out.println("("+i+","+j+"): " + actualValue + "actual: " + expected);
98+
Assert.assertEquals(expected, actualValue, 1e-10);
9999
}
100100
}
101101

@@ -104,6 +104,8 @@ public void testUnaryOperation(boolean rewrite)
104104
heavyHittersContainsString(prefix + Opcodes.RBLK));
105105
Assert.assertTrue("OOC wasn't used for CEIL",
106106
heavyHittersContainsString(prefix + Opcodes.CEIL));
107+
Assert.assertTrue("Stream Aware WRITE wasn't used",
108+
heavyHittersContainsString(String.valueOf(Opcodes.WRITE)));
107109
}
108110
catch(Exception ex) {
109111
Assert.fail(ex.getMessage());

src/test/scripts/functions/ooc/Unary.dml

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,7 @@
2121

2222
# Read input matrix and operator from command line args
2323
X = read($1);
24-
#print(toString(X))
2524
Y = ceil(X);
26-
#print(toString(Y))
27-
#res = as.matrix(sum(Y));
25+
2826
# Write the final matrix result
29-
write(Y, $2);
27+
write(Y, $2, format="binary");

0 commit comments

Comments
 (0)