Skip to content

Commit cee72fc

Browse files
committed
[SYSTEMDS-3795] Fix multi-threaded HDF5 readers/writers
This patch fixes the existing multi-threaded HDF5 readers/writers by adding (1) proper NNZ maintenance of the overall block, and (2) handling of both single- and multi-part HDF5 files/directories.
1 parent 350957e commit cee72fc

File tree

4 files changed

+32
-28
lines changed

4 files changed

+32
-28
lines changed

src/main/java/org/apache/sysds/runtime/io/ReaderHDF5.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ private static MatrixBlock readHDF5MatrixFromHDFS(Path path, JobConf job,
108108

109109
//determine matrix size via additional pass if required
110110
if(dest == null) {
111-
dest = computeHDF5Size(files, fs, datasetName);
111+
dest = computeHDF5Size(files, fs, datasetName, rlen*clen);
112112
clen = dest.getNumColumns();
113113
rlen = dest.getNumRows();
114114
}
@@ -169,7 +169,7 @@ public static long readMatrixFromHDF5(BufferedInputStream bis, String datasetNam
169169
return lnnz;
170170
}
171171

172-
public static MatrixBlock computeHDF5Size(List<Path> files, FileSystem fs, String datasetName)
172+
public static MatrixBlock computeHDF5Size(List<Path> files, FileSystem fs, String datasetName, long estnnz)
173173
throws IOException, DMLRuntimeException
174174
{
175175
int nrow = 0;
@@ -186,6 +186,6 @@ public static MatrixBlock computeHDF5Size(List<Path> files, FileSystem fs, Strin
186186
IOUtilFunctions.closeSilently(bis);
187187
}
188188
// allocate target matrix block based on given size;
189-
return createOutputMatrixBlock(nrow, ncol, nrow, (long) nrow * ncol, true, false);
189+
return createOutputMatrixBlock(nrow, ncol, nrow, estnnz, true, true);
190190
}
191191
}

src/main/java/org/apache/sysds/runtime/io/ReaderHDF5Parallel.java

Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.apache.sysds.runtime.io.hdf5.H5Constants;
3939
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
4040
import org.apache.sysds.runtime.util.CommonThreadPool;
41+
import org.apache.sysds.runtime.util.HDFSTool;
4142

4243
public class ReaderHDF5Parallel extends ReaderHDF5 {
4344

@@ -46,7 +47,7 @@ public class ReaderHDF5Parallel extends ReaderHDF5 {
4647

4748
public ReaderHDF5Parallel(FileFormatPropertiesHDF5 props) {
4849
super(props);
49-
_numThreads = OptimizerUtils.getParallelTextReadParallelism();
50+
_numThreads = OptimizerUtils.getParallelBinaryReadParallelism();
5051
}
5152

5253
@Override
@@ -69,26 +70,31 @@ public MatrixBlock readMatrixFromHDFS(String fname, long rlen, long clen, int bl
6970
// allocate output matrix block
7071
ArrayList<Path> files = new ArrayList<>();
7172
files.add(path);
72-
MatrixBlock src = computeHDF5Size(files, fs, _props.getDatasetName());
73-
73+
MatrixBlock src = computeHDF5Size(files, fs, _props.getDatasetName(), estnnz);
74+
int numParts = Math.min(files.size(), _numThreads);
75+
7476
//create and execute tasks
7577
ExecutorService pool = CommonThreadPool.get(_numThreads);
7678
try {
7779
int bufferSize = (src.getNumColumns() * src.getNumRows()) * 8 + H5Constants.STATIC_HEADER_SIZE;
7880
ArrayList<ReadHDF5Task> tasks = new ArrayList<>();
7981
rlen = src.getNumRows();
80-
int blklen = (int) Math.ceil((double) rlen / _numThreads);
82+
int blklen = (int) Math.ceil((double) rlen / numParts);
8183
for(int i = 0; i < _numThreads & i * blklen < rlen; i++) {
8284
int rl = i * blklen;
8385
int ru = (int) Math.min((i + 1) * blklen, rlen);
84-
BufferedInputStream bis = new BufferedInputStream(fs.open(path), bufferSize);
86+
Path newPath = HDFSTool.isDirectory(fs, path) ?
87+
new Path(path, IOUtilFunctions.getPartFileName(i)) : path;
88+
BufferedInputStream bis = new BufferedInputStream(fs.open(newPath), bufferSize);
8589

8690
//BufferedInputStream bis, String datasetName, MatrixBlock src, MutableInt rl, int ru
87-
tasks.add(new ReadHDF5Task(bis, _props.getDatasetName(), src, rl, ru));
91+
tasks.add(new ReadHDF5Task(bis, _props.getDatasetName(), src, rl, ru, clen, blklen));
8892
}
8993

90-
for(Future<Object> task : pool.invokeAll(tasks))
91-
task.get();
94+
long nnz = 0;
95+
for(Future<Long> task : pool.invokeAll(tasks))
96+
nnz += task.get();
97+
src.setNonZeros(nnz);
9298

9399
return src;
94100
}
@@ -102,31 +108,36 @@ public MatrixBlock readMatrixFromHDFS(String fname, long rlen, long clen, int bl
102108

103109
@Override
104110
public MatrixBlock readMatrixFromInputStream(InputStream is, long rlen, long clen, int blen, long estnnz)
105-
throws IOException, DMLRuntimeException {
106-
111+
throws IOException, DMLRuntimeException
112+
{
107113
return new ReaderHDF5(_props).readMatrixFromInputStream(is, rlen, clen, blen, estnnz);
108114
}
109115

110-
private static class ReadHDF5Task implements Callable<Object> {
116+
private static class ReadHDF5Task implements Callable<Long> {
111117

112118
private final BufferedInputStream _bis;
113119
private final String _datasetName;
114120
private final MatrixBlock _src;
115121
private final int _rl;
116122
private final int _ru;
123+
private final long _clen;
124+
private final int _blen;
117125

118-
public ReadHDF5Task(BufferedInputStream bis, String datasetName, MatrixBlock src, int rl, int ru) {
126+
public ReadHDF5Task(BufferedInputStream bis, String datasetName, MatrixBlock src,
127+
int rl, int ru, long clen, int blen)
128+
{
119129
_bis = bis;
120130
_datasetName = datasetName;
121131
_src = src;
122132
_rl = rl;
123133
_ru = ru;
134+
_clen = clen;
135+
_blen = blen;
124136
}
125137

126138
@Override
127-
public Object call() throws IOException {
128-
readMatrixFromHDF5(_bis, _datasetName, _src, _rl, _ru, 0, 0);
129-
return null;
139+
public Long call() throws IOException {
140+
return readMatrixFromHDF5(_bis, _datasetName, _src, _rl, _ru, _clen, _blen);
130141
}
131142
}
132143
}

src/main/java/org/apache/sysds/runtime/io/WriterHDF5Parallel.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import java.util.concurrent.Future;
2727

2828
import org.apache.hadoop.fs.FileSystem;
29-
import org.apache.hadoop.fs.LocalFileSystem;
3029
import org.apache.hadoop.fs.Path;
3130
import org.apache.hadoop.mapred.JobConf;
3231
import org.apache.sysds.common.Types;
@@ -80,13 +79,6 @@ public void writeHDF5MatrixToHDFS(Path path, JobConf job, FileSystem fs, MatrixB
8079

8180
for(Future<Object> task : pool.invokeAll(tasks))
8281
task.get();
83-
84-
// delete crc files if written to local file system
85-
if(fs instanceof LocalFileSystem) {
86-
for(int i = 0; i < numThreads & i * blklen < rlen; i++)
87-
IOUtilFunctions
88-
.deleteCrcFilesFromLocalFileSystem(fs, new Path(path, IOUtilFunctions.getPartFileName(i)));
89-
}
9082
}
9183
catch(Exception e) {
9284
throw new IOException("Failed parallel write of HDF5 output.", e);
@@ -115,6 +107,7 @@ public WriteHDF5Task(Path path, JobConf job, FileSystem fs, MatrixBlock src, int
115107
@Override
116108
public Object call() throws IOException {
117109
writeHDF5MatrixToFile(_path, _job, _fs, _src, _rl, _ru);
110+
IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(_job, _path);
118111
return null;
119112
}
120113
}

src/test/java/org/apache/sysds/test/functions/io/SeqParReadTest2.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -147,8 +147,8 @@ public static Collection<Object[]> data() {
147147
{false, "binary", true, 0.1},
148148
{true, "hdf5", false, 0.7},
149149
{true, "hdf5", false, 0.1},
150-
//{true, "hdf5", true, 0.7}, //FIXME
151-
//{true, "hdf5", true, 0.1},
150+
{true, "hdf5", true, 0.7},
151+
{true, "hdf5", true, 0.1},
152152
{true, "libsvm", false, 0.7},
153153
{true, "libsvm", false, 0.1},
154154
{true, "libsvm", true, 0.7},

0 commit comments

Comments
 (0)