Skip to content

Commit 54a90ad

Browse files
j143mboehm7
authored andcommitted
[SYSTEMDS-3900] New OOC block stream binary writer
Closes #2301. Closes #2302.
1 parent 6df1ce5 commit 54a90ad

File tree

11 files changed

+149
-10
lines changed

11 files changed

+149
-10
lines changed

src/main/java/org/apache/sysds/runtime/compress/io/WriterCompressed.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,9 @@
4646
import org.apache.sysds.runtime.compress.lib.CLALibSeparator;
4747
import org.apache.sysds.runtime.compress.lib.CLALibSeparator.SeparatedGroups;
4848
import org.apache.sysds.runtime.compress.lib.CLALibSlice;
49+
import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue;
4950
import org.apache.sysds.runtime.instructions.spark.CompressionSPInstruction.CompressionFunction;
51+
import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue;
5052
import org.apache.sysds.runtime.instructions.spark.utils.RDDConverterUtils;
5153
import org.apache.sysds.runtime.io.FileFormatProperties;
5254
import org.apache.sysds.runtime.io.IOUtilFunctions;
@@ -407,4 +409,9 @@ public Object call() throws Exception {
407409

408410
}
409411

412+
@Override
413+
public long writeMatrixFromStream(String fname, LocalTaskQueue<IndexedMatrixValue> stream, long rlen, long clen, int blen) {
414+
throw new UnsupportedOperationException("Writing from an OOC stream is not supported for the HDF5 format.");
415+
};
416+
410417
}

src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,13 @@
4141
import org.apache.sysds.runtime.controlprogram.caching.MatrixObject.UpdateType;
4242
import org.apache.sysds.runtime.controlprogram.caching.TensorObject;
4343
import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
44+
import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue;
4445
import org.apache.sysds.runtime.controlprogram.parfor.util.IDSequence;
4546
import org.apache.sysds.runtime.data.TensorBlock;
4647
import org.apache.sysds.runtime.frame.data.FrameBlock;
4748
import org.apache.sysds.runtime.instructions.Instruction;
4849
import org.apache.sysds.runtime.instructions.InstructionUtils;
50+
import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue;
4951
import org.apache.sysds.runtime.io.FileFormatProperties;
5052
import org.apache.sysds.runtime.io.FileFormatPropertiesCSV;
5153
import org.apache.sysds.runtime.io.FileFormatPropertiesHDF5;
@@ -55,6 +57,8 @@
5557
import org.apache.sysds.runtime.io.WriterHDF5;
5658
import org.apache.sysds.runtime.io.WriterMatrixMarket;
5759
import org.apache.sysds.runtime.io.WriterTextCSV;
60+
import org.apache.sysds.runtime.io.MatrixWriterFactory;
61+
import org.apache.sysds.runtime.io.MatrixWriter;
5862
import org.apache.sysds.runtime.lineage.LineageItem;
5963
import org.apache.sysds.runtime.lineage.LineageItemUtils;
6064
import org.apache.sysds.runtime.lineage.LineageTraceable;
@@ -1060,6 +1064,35 @@ private void processWriteInstruction(ExecutionContext ec) {
10601064
HDFSTool.writeScalarToHDFS(ec.getScalarInput(getInput1()), fname);
10611065
}
10621066
else if( getInput1().getDataType() == DataType.MATRIX ) {
1067+
MatrixObject mo = ec.getMatrixObject(getInput1().getName());
1068+
int blen = Integer.parseInt(getInput4().getName());
1069+
LocalTaskQueue<IndexedMatrixValue> stream = mo.getStreamHandle();
1070+
1071+
if (stream != null) {
1072+
1073+
try {
1074+
MatrixWriter writer = MatrixWriterFactory.createMatrixWriter(fmt);
1075+
long nrows = mo.getNumRows();
1076+
long ncols = mo.getNumColumns();
1077+
1078+
long totalNnz = writer.writeMatrixFromStream(fname, stream, nrows, ncols, blen);
1079+
MatrixCharacteristics mc = new MatrixCharacteristics(nrows, ncols, blen, totalNnz);
1080+
HDFSTool.writeMetaDataFile(fname + ".mtd", mo.getValueType(), mc, fmt);
1081+
1082+
// 1. Update the metadata of the MatrixObject in the symbol table.
1083+
mo.updateDataCharacteristics(mc);
1084+
System.out.println("MO characterstics updated to avoid recompilation");
1085+
1086+
// 2. Clear its dirty flag and update its file path to the result we just wrote.
1087+
// This tells the system that the data for this variable now lives in 'fname'.
1088+
HDFSTool.copyFileOnHDFS(fname, mo.getFileName());
1089+
mo.setDirty(false);
1090+
1091+
}
1092+
catch(Exception ex) {
1093+
throw new DMLRuntimeException("Failed to write OOC stream to " + fname, ex);
1094+
}
1095+
}
10631096
if( fmt == FileFormat.MM )
10641097
writeMMFile(ec, fname);
10651098
else if( fmt == FileFormat.CSV )
@@ -1070,8 +1103,6 @@ else if(fmt == FileFormat.HDF5)
10701103
writeHDF5File(ec, fname);
10711104
else {
10721105
// Default behavior (text, binary)
1073-
MatrixObject mo = ec.getMatrixObject(getInput1().getName());
1074-
int blen = Integer.parseInt(getInput4().getName());
10751106
mo.exportData(fname, fmtStr, new FileFormatProperties(blen));
10761107
}
10771108
}

src/main/java/org/apache/sysds/runtime/io/MatrixWriter.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323

2424
import org.apache.commons.logging.Log;
2525
import org.apache.commons.logging.LogFactory;
26+
import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue;
27+
import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue;
2628
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
2729

2830
/**
@@ -42,6 +44,20 @@ public void writeMatrixToHDFS( MatrixBlock src, String fname, long rlen, long cl
4244

4345
public abstract void writeMatrixToHDFS( MatrixBlock src, String fname, long rlen, long clen, int blen, long nnz, boolean diag )
4446
throws IOException;
47+
48+
/**
49+
* Consumes an out-of-core stream of matrix blocks and writes them to a single file.
50+
* This method must be implemented by writers that support OOC streaming output.
51+
*
52+
* @param fname The target output filename
53+
* @param stream The OOC stream of matrix blocks to consume
54+
* @param rlen The total number of rows in the matrix
55+
* @param clen The total number of columns in the matrix
56+
* @param blen The block size
57+
* @throws IOException if an I/O error occurs
58+
*/
59+
public abstract long writeMatrixFromStream(String fname, LocalTaskQueue<IndexedMatrixValue> stream,
60+
long rlen, long clen, int blen) throws IOException;
4561

4662
public void setForcedParallel(boolean par) {
4763
_forcedParallel = par;

src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,21 @@
2323

2424
import org.apache.hadoop.fs.FileSystem;
2525
import org.apache.hadoop.fs.Path;
26+
import org.apache.hadoop.io.SequenceFile;
2627
import org.apache.hadoop.io.SequenceFile.Writer;
2728
import org.apache.hadoop.mapred.JobConf;
2829
import org.apache.sysds.conf.CompilerConfig.ConfigType;
2930
import org.apache.sysds.conf.ConfigurationManager;
3031
import org.apache.sysds.hops.OptimizerUtils;
3132
import org.apache.sysds.runtime.DMLRuntimeException;
3233
import org.apache.sysds.runtime.compress.CompressedMatrixBlock;
34+
import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue;
35+
import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue;
3336
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
3437
import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
3538
import org.apache.sysds.runtime.util.HDFSTool;
3639

40+
3741
public class WriterBinaryBlock extends MatrixWriter {
3842
protected int _replication = -1;
3943

@@ -228,4 +232,38 @@ protected final void writeDiagBinaryBlockMatrixToHDFS(Path path, JobConf job, M
228232
IOUtilFunctions.closeSilently(writer);
229233
}
230234
}
235+
236+
@Override
237+
public long writeMatrixFromStream(String fname, LocalTaskQueue<IndexedMatrixValue> stream, long rlen, long clen, int blen) throws IOException {
238+
JobConf conf = ConfigurationManager.getCachedJobConf();
239+
Path path = new Path(fname);
240+
FileSystem fs = IOUtilFunctions.getFileSystem(path, conf);
241+
242+
SequenceFile.Writer writer = null;
243+
244+
long totalNnz = 0;
245+
try {
246+
// 1. Create Sequence file writer for the final destination file writer = new SequenceFile.Writer(fs, conf, path, MatrixIndexes.class, MatrixBlock.class);
247+
writer = SequenceFile.createWriter(fs, conf, path, MatrixIndexes.class, MatrixBlock.class);
248+
249+
// 2. Loop through OOC stream
250+
IndexedMatrixValue i_val = null;
251+
while((i_val = stream.dequeueTask()) != LocalTaskQueue.NO_MORE_TASKS) {
252+
MatrixBlock mb = (MatrixBlock) i_val.getValue();
253+
MatrixIndexes ix = i_val.getIndexes();
254+
255+
// 3. Append (key, value) record as a new value in the file
256+
writer.append(ix, mb);
257+
258+
totalNnz += mb.getNonZeros();
259+
}
260+
261+
} catch (IOException | InterruptedException e) {
262+
throw new DMLRuntimeException(e);
263+
} finally {
264+
IOUtilFunctions.closeSilently(writer);
265+
}
266+
267+
return totalNnz;
268+
}
231269
}

src/main/java/org/apache/sysds/runtime/io/WriterHDF5.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,10 @@
2424
import org.apache.hadoop.mapred.JobConf;
2525
import org.apache.sysds.conf.ConfigurationManager;
2626
import org.apache.sysds.runtime.DMLRuntimeException;
27+
import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue;
2728
import org.apache.sysds.runtime.data.DenseBlock;
2829
import org.apache.sysds.runtime.data.SparseBlock;
30+
import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue;
2931
import org.apache.sysds.runtime.io.hdf5.H5;
3032
import org.apache.sysds.runtime.io.hdf5.H5RootObject;
3133
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
@@ -129,4 +131,9 @@ protected static void writeHDF5MatrixToFile(Path path, JobConf job, FileSystem f
129131
IOUtilFunctions.closeSilently(bos);
130132
}
131133
}
134+
135+
@Override
136+
public long writeMatrixFromStream(String fname, LocalTaskQueue<IndexedMatrixValue> stream, long rlen, long clen, int blen) {
137+
throw new UnsupportedOperationException("Writing from an OOC stream is not supported for the HDF5 format.");
138+
};
132139
}

src/main/java/org/apache/sysds/runtime/io/WriterMatrixMarket.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,9 @@
3535
import org.apache.hadoop.mapred.JobConf;
3636
import org.apache.sysds.conf.ConfigurationManager;
3737
import org.apache.sysds.runtime.DMLRuntimeException;
38+
import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue;
3839
import org.apache.sysds.runtime.data.DenseBlock;
40+
import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue;
3941
import org.apache.sysds.runtime.matrix.data.IJV;
4042
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
4143
import org.apache.sysds.runtime.util.HDFSTool;
@@ -220,4 +222,9 @@ public static void mergeTextcellToMatrixMarket( String srcFileName, String fileN
220222
throw new IOException(src.toString() + ": No such file or directory");
221223
}
222224
}
225+
226+
@Override
227+
public long writeMatrixFromStream(String fname, LocalTaskQueue<IndexedMatrixValue> stream, long rlen, long clen, int blen) {
228+
throw new UnsupportedOperationException("Writing from an OOC stream is not supported for the MatrixMarket format.");
229+
};
223230
}

src/main/java/org/apache/sysds/runtime/io/WriterTextCSV.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,10 @@
3535
import org.apache.hadoop.mapred.JobConf;
3636
import org.apache.sysds.conf.ConfigurationManager;
3737
import org.apache.sysds.runtime.DMLRuntimeException;
38+
import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue;
3839
import org.apache.sysds.runtime.data.DenseBlock;
3940
import org.apache.sysds.runtime.data.SparseBlock;
41+
import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue;
4042
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
4143
import org.apache.sysds.runtime.util.HDFSTool;
4244

@@ -341,4 +343,9 @@ public final void addHeaderToCSV(String srcFileName, String destFileName, long r
341343
throw new IOException(srcFilePath.toString() + ": No such file or directory");
342344
}
343345
}
346+
347+
@Override
348+
public long writeMatrixFromStream(String fname, LocalTaskQueue<IndexedMatrixValue> stream, long rlen, long clen, int blen) {
349+
throw new UnsupportedOperationException("Writing from an OOC stream is not supported for the TextCSV format.");
350+
};
344351
}

src/main/java/org/apache/sysds/runtime/io/WriterTextCell.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,9 @@
3030
import org.apache.hadoop.mapred.JobConf;
3131
import org.apache.sysds.conf.ConfigurationManager;
3232
import org.apache.sysds.runtime.DMLRuntimeException;
33+
import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue;
3334
import org.apache.sysds.runtime.data.DenseBlock;
35+
import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue;
3436
import org.apache.sysds.runtime.matrix.data.IJV;
3537
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
3638
import org.apache.sysds.runtime.util.HDFSTool;
@@ -137,4 +139,9 @@ protected static void writeTextCellMatrixToFile( Path path, JobConf job, FileSys
137139
br.write(IOUtilFunctions.EMPTY_TEXT_LINE);
138140
}
139141
}
142+
143+
@Override
144+
public long writeMatrixFromStream(String fname, LocalTaskQueue<IndexedMatrixValue> stream, long rlen, long clen, int blen) {
145+
throw new UnsupportedOperationException("Writing from an OOC stream is not supported for the TextCell format.");
146+
};
140147
}

src/main/java/org/apache/sysds/runtime/io/WriterTextLIBSVM.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,10 @@
2828
import org.apache.hadoop.mapred.JobConf;
2929
import org.apache.sysds.conf.ConfigurationManager;
3030
import org.apache.sysds.runtime.DMLRuntimeException;
31+
import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue;
3132
import org.apache.sysds.runtime.data.DenseBlock;
3233
import org.apache.sysds.runtime.data.SparseBlock;
34+
import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue;
3335
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
3436
import org.apache.sysds.runtime.util.HDFSTool;
3537

@@ -156,4 +158,9 @@ protected static void appendIndexValLibsvm(StringBuilder sb, int index, double v
156158
sb.append(_props.getIndexDelim());
157159
sb.append(value);
158160
}
161+
162+
@Override
163+
public long writeMatrixFromStream(String fname, LocalTaskQueue<IndexedMatrixValue> stream, long rlen, long clen, int blen) {
164+
throw new UnsupportedOperationException("Writing from an OOC stream is not supported for the LIBSVM format.");
165+
};
159166
}

src/test/java/org/apache/sysds/test/functions/ooc/UnaryTest.java

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,19 @@
3131
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
3232
import org.apache.sysds.runtime.matrix.data.MatrixValue;
3333
import org.apache.sysds.runtime.meta.MatrixCharacteristics;
34+
import org.apache.sysds.runtime.util.DataConverter;
3435
import org.apache.sysds.runtime.util.HDFSTool;
3536
import org.apache.sysds.test.AutomatedTestBase;
3637
import org.apache.sysds.test.TestConfiguration;
3738
import org.apache.sysds.test.TestUtils;
3839
import org.junit.Assert;
3940
import org.junit.Test;
4041

42+
import java.io.IOException;
4143
import java.util.HashMap;
4244

45+
import static org.apache.sysds.test.TestUtils.readDMLMatrixFromHDFS;
46+
4347
public class UnaryTest extends AutomatedTestBase {
4448

4549
private static final String TEST_NAME = "Unary";
@@ -86,17 +90,17 @@ public void testUnaryOperation(boolean rewrite)
8690

8791
runTest(true, false, null, -1);
8892

89-
HashMap<MatrixValue.CellIndex, Double> dmlfile = readDMLMatrixFromOutputDir(OUTPUT_NAME);
90-
Double result = dmlfile.get(new MatrixValue.CellIndex(1, 1));
93+
double[][] C1 = readMatrix(output(OUTPUT_NAME), FileFormat.BINARY, rows, cols, 1000, 1000);
9194
double expected = 0.0;
95+
double result = 0.0;
9296
for(int i = 0; i < rows; i++) {
9397
for(int j = 0; j < cols; j++) {
94-
expected += Math.ceil(mb.get(i, j));
98+
expected = Math.ceil(mb.get(i, j));
99+
result = C1[i][j];
100+
Assert.assertEquals(expected, result, 1e-10);
95101
}
96102
}
97103

98-
Assert.assertEquals(expected, result, 1e-10);
99-
100104
String prefix = Instruction.OOC_INST_PREFIX;
101105
Assert.assertTrue("OOC wasn't used for RBLK",
102106
heavyHittersContainsString(prefix + Opcodes.RBLK));
@@ -111,4 +115,12 @@ public void testUnaryOperation(boolean rewrite)
111115
resetExecMode(platformOld);
112116
}
113117
}
118+
119+
private static double[][] readMatrix( String fname, FileFormat fmt, long rows, long cols, int brows, int bcols )
120+
throws IOException
121+
{
122+
MatrixBlock mb = DataConverter.readMatrixFromHDFS(fname, fmt, rows, cols, brows, bcols);
123+
double[][] C = DataConverter.convertToDoubleMatrix(mb);
124+
return C;
125+
}
114126
}

0 commit comments

Comments
 (0)