Skip to content

Commit a4b3de3

Browse files
committed
[MINOR] Updated reader/write tests to force parallel writers
1 parent bf0c344 commit a4b3de3

File tree

11 files changed

+26
-12
lines changed

11 files changed

+26
-12
lines changed

src/main/java/org/apache/sysds/runtime/io/FrameWriter.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,13 @@
3535
*/
3636
public abstract class FrameWriter {
3737
protected static final Log LOG = LogFactory.getLog(FrameWriter.class.getName());
38+
39+
protected boolean _forcedParallel = false;
40+
3841
public abstract void writeFrameToHDFS( FrameBlock src, String fname, long rlen, long clen )
3942
throws IOException, DMLRuntimeException;
4043

44+
public void setForcedParallel(boolean par) {
45+
_forcedParallel = par;
46+
}
4147
}

src/main/java/org/apache/sysds/runtime/io/FrameWriterBinaryBlockParallel.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444
*
4545
*/
4646
public class FrameWriterBinaryBlockParallel extends FrameWriterBinaryBlock
47-
{
47+
{
4848
@Override
4949
protected void writeBinaryBlockFrameToHDFS( Path path, JobConf job, FrameBlock src, long rlen, long clen )
5050
throws IOException, DMLRuntimeException
@@ -59,11 +59,11 @@ protected void writeBinaryBlockFrameToHDFS( Path path, JobConf job, FrameBlock s
5959
numThreads = Math.min(numThreads, numPartFiles);
6060

6161
//fall back to sequential write if dop is 1 (e.g., <128MB) in order to create single file
62-
if( numThreads <= 1 ) {
62+
if( !_forcedParallel && numThreads <= 1 ) {
6363
super.writeBinaryBlockFrameToHDFS(path, job, src, rlen, clen);
6464
return;
6565
}
66-
66+
6767
//create directory for concurrent tasks
6868
HDFSTool.createDirIfNotExistOnHDFS(path, DMLConfig.DEFAULT_SHARED_DIR_PERMISSION);
6969
FileSystem fs = IOUtilFunctions.getFileSystem(path);

src/main/java/org/apache/sysds/runtime/io/FrameWriterTextCSVParallel.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ protected void writeCSVFrameToHDFS( Path path, JobConf job, FrameBlock src, long
6060
numThreads = Math.min(numThreads, numPartFiles);
6161

6262
//fall back to sequential write if dop is 1 (e.g., <128MB) in order to create single file
63-
if( numThreads <= 1 ) {
63+
if( !_forcedParallel && numThreads <= 1 ) {
6464
super.writeCSVFrameToHDFS(path, job, src, rlen, clen, csvprops);
6565
return;
6666
}

src/main/java/org/apache/sysds/runtime/io/FrameWriterTextCellParallel.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ protected void writeTextCellFrameToHDFS( Path path, JobConf job, FrameBlock src,
5656
numThreads = Math.min(numThreads, numPartFiles);
5757

5858
//fall back to sequential write if dop is 1 (e.g., <128MB) in order to create single file
59-
if( numThreads <= 1 ) {
59+
if( !_forcedParallel && numThreads <= 1 ) {
6060
super.writeTextCellFrameToHDFS(path, job, src, rlen, clen);
6161
return;
6262
}

src/main/java/org/apache/sysds/runtime/io/MatrixWriter.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,19 @@
3434
public abstract class MatrixWriter {
3535
protected static final Log LOG = LogFactory.getLog(MatrixWriter.class.getName());
3636

37+
protected boolean _forcedParallel = false;
38+
3739
public void writeMatrixToHDFS( MatrixBlock src, String fname, long rlen, long clen, int blen, long nnz ) throws IOException {
3840
writeMatrixToHDFS(src, fname, rlen, clen, blen, nnz, false);
3941
}
4042

4143
public abstract void writeMatrixToHDFS( MatrixBlock src, String fname, long rlen, long clen, int blen, long nnz, boolean diag )
4244
throws IOException;
4345

46+
public void setForcedParallel(boolean par) {
47+
_forcedParallel = par;
48+
}
49+
4450

4551
/**
4652
* Writes a minimal entry to represent an empty matrix on hdfs.

src/main/java/org/apache/sysds/runtime/io/WriterHDF5Parallel.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public void writeHDF5MatrixToHDFS(Path path, JobConf job, FileSystem fs, MatrixB
5959
numThreads = Math.min(numThreads, numPartFiles);
6060

6161
//fall back to sequential write if dop is 1 (e.g., <128MB) in order to create single file
62-
if(numThreads <= 1) {
62+
if( !_forcedParallel && numThreads <= 1 ) {
6363
super.writeHDF5MatrixToHDFS(path, job, fs, src);
6464
return;
6565
}

src/main/java/org/apache/sysds/runtime/io/WriterMatrixMarketParallel.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ protected void writeMatrixMarketMatrixToHDFS( Path path, JobConf job, FileSystem
5555
numThreads = Math.min(numThreads, numPartFiles);
5656

5757
//fall back to sequential write if dop is 1 (e.g., <128MB) in order to create single file
58-
if( numThreads <= 1 ) {
58+
if( !_forcedParallel && numThreads <= 1 ) {
5959
super.writeMatrixMarketMatrixToHDFS(path, job, fs, src);
6060
return;
6161
}

src/main/java/org/apache/sysds/runtime/io/WriterTextCSVParallel.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ protected void writeCSVMatrixToHDFS(Path path, JobConf job, FileSystem fs, Matri
5757
numThreads = Math.min(numThreads, numPartFiles);
5858

5959
//fall back to sequential write if dop is 1 (e.g., <128MB) in order to create single file
60-
if( numThreads <= 1 ) {
60+
if( !_forcedParallel && numThreads <= 1 ) {
6161
super.writeCSVMatrixToHDFS(path, job, fs, src, csvprops);
6262
return;
6363
}

src/main/java/org/apache/sysds/runtime/io/WriterTextCellParallel.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ protected void writeTextCellMatrixToHDFS( Path path, JobConf job, FileSystem fs,
5454
numThreads = Math.min(numThreads, numPartFiles);
5555

5656
//fall back to sequential write if dop is 1 (e.g., <128MB) in order to create single file
57-
if( numThreads <= 1 || src.getNonZeros()==0 ) {
57+
if( !_forcedParallel && (numThreads <= 1 || src.getNonZeros()==0) ) {
5858
super.writeTextCellMatrixToHDFS(path, job, fs, src, rlen, clen);
5959
return;
6060
}

src/main/java/org/apache/sysds/runtime/io/WriterTextLIBSVMParallel.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ protected void writeLIBSVMMatrixToHDFS(Path path, JobConf job, FileSystem fs, Ma
5757
numThreads = Math.min(numThreads, numPartFiles);
5858

5959
//fall back to sequential write if dop is 1 (e.g., <128MB) in order to create single file
60-
if( numThreads <= 1 ) {
60+
if( !_forcedParallel && numThreads <= 1 ) {
6161
super.writeLIBSVMMatrixToHDFS(path, job, fs, src);
6262
return;
6363
}

0 commit comments

Comments
 (0)