Skip to content

Commit dc9fca9

Browse files
committed
[SYSTEMDS-3900] Improved integration of OOC binary stream writer
1 parent 54a90ad commit dc9fca9

File tree

8 files changed

+95
-90
lines changed

8 files changed

+95
-90
lines changed

src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java

Lines changed: 51 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -909,44 +909,58 @@ public synchronized void exportData (String fName, String outputFormat, int repl
909909
// a) get the matrix
910910
boolean federatedWrite = (outputFormat != null ) && outputFormat.contains("federated");
911911

912-
if( isEmpty(true) && !federatedWrite)
913-
{
914-
//read data from HDFS if required (never read before), this applies only to pWrite w/ different output formats
915-
//note: for large rdd outputs, we compile dedicated writespinstructions (no need to handle this here)
912+
if(getStreamHandle()!=null) {
916913
try {
917-
if( getRDDHandle()==null || getRDDHandle().allowsShortCircuitRead() )
918-
_data = readBlobFromHDFS( _hdfsFileName );
919-
else if( getRDDHandle() != null )
920-
_data = readBlobFromRDD( getRDDHandle(), new MutableBoolean() );
921-
else if(!federatedWrite)
922-
_data = readBlobFromFederated( getFedMapping() );
923-
setDirty(false);
924-
refreshMetaData(); //e.g., after unknown csv read
914+
long totalNnz = writeStreamToHDFS(fName, outputFormat, replication, formatProperties);
915+
updateDataCharacteristics(new MatrixCharacteristics(
916+
getNumRows(), getNumColumns(), blen, totalNnz));
917+
writeMetaData(fName, outputFormat, formatProperties);
925918
}
926-
catch (IOException e) {
927-
throw new DMLRuntimeException("Reading of " + _hdfsFileName + " ("+hashCode()+") failed.", e);
919+
catch(Exception ex) {
920+
throw new DMLRuntimeException("Failed to write OOC stream to " + fName, ex);
928921
}
929922
}
930-
//get object from cache
931-
if(!federatedWrite) {
932-
if( _data == null )
933-
getCache();
934-
acquire( false, _data==null ); //incl. read matrix if evicted
935-
}
936-
937-
// b) write the matrix
938-
try {
939-
writeMetaData( fName, outputFormat, formatProperties );
940-
writeBlobToHDFS( fName, outputFormat, replication, formatProperties );
941-
if ( !pWrite )
942-
setDirty(false);
943-
}
944-
catch (Exception e) {
945-
throw new DMLRuntimeException("Export to " + fName + " failed.", e);
946-
}
947-
finally {
948-
if(!federatedWrite)
949-
release();
923+
else {
924+
if( isEmpty(true) && !federatedWrite)
925+
{
926+
//read data from HDFS if required (never read before), this applies only to pWrite w/ different output formats
927+
//note: for large rdd outputs, we compile dedicated writespinstructions (no need to handle this here)
928+
try {
929+
if( getRDDHandle()==null || getRDDHandle().allowsShortCircuitRead() )
930+
_data = readBlobFromHDFS( _hdfsFileName );
931+
else if( getRDDHandle() != null )
932+
_data = readBlobFromRDD( getRDDHandle(), new MutableBoolean() );
933+
else if(!federatedWrite)
934+
_data = readBlobFromFederated( getFedMapping() );
935+
setDirty(false);
936+
refreshMetaData(); //e.g., after unknown csv read
937+
}
938+
catch (IOException e) {
939+
throw new DMLRuntimeException("Reading of " + _hdfsFileName + " ("+hashCode()+") failed.", e);
940+
}
941+
}
942+
943+
//get object from cache
944+
if(!federatedWrite) {
945+
if( _data == null )
946+
getCache();
947+
acquire( false, _data==null ); //incl. read matrix if evicted
948+
}
949+
950+
// b) write the matrix
951+
try {
952+
writeMetaData( fName, outputFormat, formatProperties );
953+
writeBlobToHDFS( fName, outputFormat, replication, formatProperties );
954+
if ( !pWrite )
955+
setDirty(false);
956+
}
957+
catch (Exception e) {
958+
throw new DMLRuntimeException("Export to " + fName + " failed.", e);
959+
}
960+
finally {
961+
if(!federatedWrite)
962+
release();
963+
}
950964
}
951965
}
952966
else if( pWrite ) // pwrite with same output format
@@ -1132,6 +1146,9 @@ protected abstract T readBlobFromFederated(FederationMap fedMap, long[] dims)
11321146
protected abstract void writeBlobToHDFS(String fname, String ofmt, int rep, FileFormatProperties fprop)
11331147
throws IOException;
11341148

1149+
protected abstract long writeStreamToHDFS(String fname, String ofmt, int rep, FileFormatProperties fprop)
1150+
throws IOException;
1151+
11351152
protected abstract void writeBlobFromRDDtoHDFS(RDDObject rdd, String fname, String ofmt)
11361153
throws IOException;
11371154

src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,14 @@ protected void writeBlobToHDFS(String fname, String ofmt, int rep, FileFormatPro
295295
writer.writeFrameToHDFS(_data, fname, getNumRows(), getNumColumns());
296296
}
297297

298+
@Override
299+
protected long writeStreamToHDFS(String fname, String ofmt, int rep, FileFormatProperties fprop)
300+
throws IOException, DMLRuntimeException
301+
{
302+
throw new UnsupportedOperationException();
303+
}
304+
305+
298306
@Override
299307
protected void writeBlobFromRDDtoHDFS(RDDObject rdd, String fname, String ofmt)
300308
throws IOException, DMLRuntimeException

src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@
4747
import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue;
4848
import org.apache.sysds.runtime.instructions.spark.data.RDDObject;
4949
import org.apache.sysds.runtime.io.FileFormatProperties;
50+
import org.apache.sysds.runtime.io.MatrixWriter;
51+
import org.apache.sysds.runtime.io.MatrixWriterFactory;
5052
import org.apache.sysds.runtime.io.ReaderWriterFederated;
5153
import org.apache.sysds.runtime.lineage.LineageItem;
5254
import org.apache.sysds.runtime.lineage.LineageRecomputeUtils;
@@ -601,6 +603,17 @@ else if(LOG.isTraceEnabled()) {
601603
if(DMLScript.STATISTICS)
602604
CacheStatistics.incrementHDFSWrites();
603605
}
606+
607+
@Override
608+
protected long writeStreamToHDFS(String fname, String ofmt, int rep, FileFormatProperties fprop)
609+
throws IOException, DMLRuntimeException
610+
{
611+
MetaDataFormat iimd = (MetaDataFormat) _metaData;
612+
FileFormat fmt = (ofmt != null ? FileFormat.safeValueOf(ofmt) : iimd.getFileFormat());
613+
MatrixWriter writer = MatrixWriterFactory.createMatrixWriter(fmt, rep, fprop);
614+
return writer.writeMatrixFromStream(fname, getStreamHandle(),
615+
getNumRows(), getNumColumns(), ConfigurationManager.getBlocksize());
616+
}
604617

605618
@Override
606619
protected void writeBlobFromRDDtoHDFS(RDDObject rdd, String fname, String outputFormat)

src/main/java/org/apache/sysds/runtime/controlprogram/caching/TensorObject.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,13 @@ else if (LOG.isTraceEnabled()) {
189189
if( DMLScript.STATISTICS )
190190
CacheStatistics.incrementHDFSWrites();
191191
}
192+
193+
@Override
194+
protected long writeStreamToHDFS(String fname, String ofmt, int rep, FileFormatProperties fprop)
195+
throws IOException, DMLRuntimeException
196+
{
197+
throw new UnsupportedOperationException();
198+
}
192199

193200
@Override
194201
protected ValueType[] getSchema() {

src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java

Lines changed: 0 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,11 @@
4141
import org.apache.sysds.runtime.controlprogram.caching.MatrixObject.UpdateType;
4242
import org.apache.sysds.runtime.controlprogram.caching.TensorObject;
4343
import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
44-
import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue;
4544
import org.apache.sysds.runtime.controlprogram.parfor.util.IDSequence;
4645
import org.apache.sysds.runtime.data.TensorBlock;
4746
import org.apache.sysds.runtime.frame.data.FrameBlock;
4847
import org.apache.sysds.runtime.instructions.Instruction;
4948
import org.apache.sysds.runtime.instructions.InstructionUtils;
50-
import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue;
5149
import org.apache.sysds.runtime.io.FileFormatProperties;
5250
import org.apache.sysds.runtime.io.FileFormatPropertiesCSV;
5351
import org.apache.sysds.runtime.io.FileFormatPropertiesHDF5;
@@ -57,8 +55,6 @@
5755
import org.apache.sysds.runtime.io.WriterHDF5;
5856
import org.apache.sysds.runtime.io.WriterMatrixMarket;
5957
import org.apache.sysds.runtime.io.WriterTextCSV;
60-
import org.apache.sysds.runtime.io.MatrixWriterFactory;
61-
import org.apache.sysds.runtime.io.MatrixWriter;
6258
import org.apache.sysds.runtime.lineage.LineageItem;
6359
import org.apache.sysds.runtime.lineage.LineageItemUtils;
6460
import org.apache.sysds.runtime.lineage.LineageTraceable;
@@ -1066,33 +1062,7 @@ private void processWriteInstruction(ExecutionContext ec) {
10661062
else if( getInput1().getDataType() == DataType.MATRIX ) {
10671063
MatrixObject mo = ec.getMatrixObject(getInput1().getName());
10681064
int blen = Integer.parseInt(getInput4().getName());
1069-
LocalTaskQueue<IndexedMatrixValue> stream = mo.getStreamHandle();
10701065

1071-
if (stream != null) {
1072-
1073-
try {
1074-
MatrixWriter writer = MatrixWriterFactory.createMatrixWriter(fmt);
1075-
long nrows = mo.getNumRows();
1076-
long ncols = mo.getNumColumns();
1077-
1078-
long totalNnz = writer.writeMatrixFromStream(fname, stream, nrows, ncols, blen);
1079-
MatrixCharacteristics mc = new MatrixCharacteristics(nrows, ncols, blen, totalNnz);
1080-
HDFSTool.writeMetaDataFile(fname + ".mtd", mo.getValueType(), mc, fmt);
1081-
1082-
// 1. Update the metadata of the MatrixObject in the symbol table.
1083-
mo.updateDataCharacteristics(mc);
1084-
System.out.println("MO characterstics updated to avoid recompilation");
1085-
1086-
// 2. Clear its dirty flag and update its file path to the result we just wrote.
1087-
// This tells the system that the data for this variable now lives in 'fname'.
1088-
HDFSTool.copyFileOnHDFS(fname, mo.getFileName());
1089-
mo.setDirty(false);
1090-
1091-
}
1092-
catch(Exception ex) {
1093-
throw new DMLRuntimeException("Failed to write OOC stream to " + fname, ex);
1094-
}
1095-
}
10961066
if( fmt == FileFormat.MM )
10971067
writeMMFile(ec, fname);
10981068
else if( fmt == FileFormat.CSV )

src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -235,16 +235,13 @@ protected final void writeDiagBinaryBlockMatrixToHDFS(Path path, JobConf job, M
235235

236236
@Override
237237
public long writeMatrixFromStream(String fname, LocalTaskQueue<IndexedMatrixValue> stream, long rlen, long clen, int blen) throws IOException {
238-
JobConf conf = ConfigurationManager.getCachedJobConf();
239238
Path path = new Path(fname);
240-
FileSystem fs = IOUtilFunctions.getFileSystem(path, conf);
241-
242239
SequenceFile.Writer writer = null;
243240

244241
long totalNnz = 0;
245242
try {
246-
// 1. Create Sequence file writer for the final destination file writer = new SequenceFile.Writer(fs, conf, path, MatrixIndexes.class, MatrixBlock.class);
247-
writer = SequenceFile.createWriter(fs, conf, path, MatrixIndexes.class, MatrixBlock.class);
243+
// 1. Create Sequence file writer for the final destination file
244+
writer = IOUtilFunctions.getSeqWriter(path, job, _replication);
248245

249246
// 2. Loop through OOC stream
250247
IndexedMatrixValue i_val = null;
@@ -257,13 +254,12 @@ public long writeMatrixFromStream(String fname, LocalTaskQueue<IndexedMatrixValu
257254

258255
totalNnz += mb.getNonZeros();
259256
}
260-
261-
} catch (IOException | InterruptedException e) {
257+
} catch (Exception e) {
262258
throw new DMLRuntimeException(e);
263-
} finally {
259+
} finally {
264260
IOUtilFunctions.closeSilently(writer);
265261
}
266262

267-
return totalNnz;
263+
return totalNnz;
268264
}
269265
}

src/test/java/org/apache/sysds/test/functions/ooc/UnaryTest.java

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import org.apache.sysds.runtime.io.MatrixWriter;
3030
import org.apache.sysds.runtime.io.MatrixWriterFactory;
3131
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
32-
import org.apache.sysds.runtime.matrix.data.MatrixValue;
3332
import org.apache.sysds.runtime.meta.MatrixCharacteristics;
3433
import org.apache.sysds.runtime.util.DataConverter;
3534
import org.apache.sysds.runtime.util.HDFSTool;
@@ -40,13 +39,10 @@
4039
import org.junit.Test;
4140

4241
import java.io.IOException;
43-
import java.util.HashMap;
44-
45-
import static org.apache.sysds.test.TestUtils.readDMLMatrixFromHDFS;
4642

4743
public class UnaryTest extends AutomatedTestBase {
4844

49-
private static final String TEST_NAME = "Unary";
45+
private static final String TEST_NAME = "UnaryWrite";
5046
private static final String TEST_DIR = "functions/ooc/";
5147
private static final String TEST_CLASS_DIR = TEST_DIR + UnaryTest.class.getSimpleName() + "/";
5248
private static final String INPUT_NAME = "X";
@@ -55,18 +51,19 @@ public class UnaryTest extends AutomatedTestBase {
5551
@Override
5652
public void setUp() {
5753
TestUtils.clearAssertionInformation();
58-
TestConfiguration config = new TestConfiguration(TEST_CLASS_DIR, TEST_NAME);
59-
addTestConfiguration(TEST_NAME, config);
54+
addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME));
6055
}
6156

62-
/**
63-
* Test the sum of scalar multiplication, "sum(X*7)", with OOC backend.
64-
*/
6557
@Test
66-
public void testUnary() {
58+
public void testWriteNoRewrite() {
6759
testUnaryOperation(false);
6860
}
6961

62+
@Test
63+
public void testWriteRewrite() {
64+
testUnaryOperation(true);
65+
}
66+
7067

7168
public void testUnaryOperation(boolean rewrite)
7269
{
@@ -116,8 +113,9 @@ public void testUnaryOperation(boolean rewrite)
116113
}
117114
}
118115

119-
private static double[][] readMatrix( String fname, FileFormat fmt, long rows, long cols, int brows, int bcols )
120-
throws IOException
116+
private static double[][] readMatrix( String fname, FileFormat fmt,
117+
long rows, long cols, int brows, int bcols )
118+
throws IOException
121119
{
122120
MatrixBlock mb = DataConverter.readMatrixFromHDFS(fname, fmt, rows, cols, brows, bcols);
123121
double[][] C = DataConverter.convertToDoubleMatrix(mb);

src/test/scripts/functions/ooc/Unary.dml renamed to src/test/scripts/functions/ooc/UnaryWrite.dml

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,5 @@
2121

2222
# Read input matrix and operator from command line args
2323
X = read($1);
24-
#print(toString(X))
2524
res = ceil(X);
26-
#print(toString(Y))
27-
#res = as.matrix(sum(Y));
28-
# Write the final matrix result
2925
write(res, $2, format="binary");

0 commit comments

Comments
 (0)