Skip to content

Commit 350957e

Browse files
committed
[SYSTEMDS-3795] Fix missing sparse block support in HDF5 reader/writer
HDF5 only supports dense matrices and so far our HDF5 writers and readers simply ignored sparse blocks. This patch adds this missing support and writes/reads sparse blocks as dense rows (converted in a streaming fashion row-by-row).
1 parent 9b940f7 commit 350957e

File tree

3 files changed

+79
-41
lines changed

3 files changed

+79
-41
lines changed

src/main/java/org/apache/sysds/runtime/io/ReaderHDF5.java

Lines changed: 35 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,13 @@
3232
import org.apache.sysds.conf.ConfigurationManager;
3333
import org.apache.sysds.runtime.DMLRuntimeException;
3434
import org.apache.sysds.runtime.data.DenseBlock;
35+
import org.apache.sysds.runtime.data.SparseBlock;
3536
import org.apache.sysds.runtime.io.hdf5.H5;
3637
import org.apache.sysds.runtime.io.hdf5.H5Constants;
3738
import org.apache.sysds.runtime.io.hdf5.H5ContiguousDataset;
3839
import org.apache.sysds.runtime.io.hdf5.H5RootObject;
3940
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
41+
import org.apache.sysds.runtime.util.UtilFunctions;
4042

4143
public class ReaderHDF5 extends MatrixReader {
4244
protected final FileFormatPropertiesHDF5 _props;
@@ -51,7 +53,7 @@ public MatrixBlock readMatrixFromHDFS(String fname, long rlen, long clen, int bl
5153
//allocate output matrix block
5254
MatrixBlock ret = null;
5355
if(rlen >= 0 && clen >= 0) //otherwise allocated on read
54-
ret = createOutputMatrixBlock(rlen, clen, (int) rlen, estnnz, true, false);
56+
ret = createOutputMatrixBlock(rlen, clen, (int) rlen, estnnz, true, true);
5557

5658
//prepare file access
5759
JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
@@ -92,7 +94,8 @@ public MatrixBlock readMatrixFromInputStream(InputStream is, long rlen, long cle
9294

9395
private static MatrixBlock readHDF5MatrixFromHDFS(Path path, JobConf job,
9496
FileSystem fs, MatrixBlock dest, long rlen, long clen, int blen, String datasetName)
95-
throws IOException, DMLRuntimeException {
97+
throws IOException, DMLRuntimeException
98+
{
9699
//prepare file paths in alphanumeric order
97100
ArrayList<Path> files = new ArrayList<>();
98101
if(fs.getFileStatus(path).isDirectory()) {
@@ -124,7 +127,8 @@ private static MatrixBlock readHDF5MatrixFromHDFS(Path path, JobConf job,
124127
}
125128

126129
public static long readMatrixFromHDF5(BufferedInputStream bis, String datasetName, MatrixBlock dest,
127-
int row, long rlen, long clen, int blen) {
130+
int rl, long ru, long clen, int blen)
131+
{
128132
bis.mark(0);
129133
long lnnz = 0;
130134
H5RootObject rootObject = H5.H5Fopen(bis);
@@ -133,28 +137,44 @@ public static long readMatrixFromHDF5(BufferedInputStream bis, String datasetNam
133137
int[] dims = rootObject.getDimensions();
134138
int ncol = dims[1];
135139

136-
DenseBlock denseBlock = dest.getDenseBlock();
137-
double[] data = new double[ncol];
138-
for(int i = row; i < rlen; i++) {
139-
H5.H5Dread(contiguousDataset, i, data);
140-
for(int j = 0; j < ncol; j++) {
141-
if(data[j] != 0) {
142-
denseBlock.set(i, j, data[j]);
143-
lnnz++;
140+
try {
141+
double[] row = new double[ncol];
142+
if( dest.isInSparseFormat() ) {
143+
SparseBlock sb = dest.getSparseBlock();
144+
for(int i = rl; i < ru; i++) {
145+
H5.H5Dread(contiguousDataset, i, row);
146+
int lnnzi = UtilFunctions.computeNnz(row, 0, (int)clen);
147+
sb.allocate(i, lnnzi); //avoid row reallocations
148+
for(int j = 0; j < ncol; j++)
149+
sb.append(i, j, row[j]); //prunes zeros
150+
lnnz += lnnzi;
151+
}
152+
}
153+
else {
154+
DenseBlock denseBlock = dest.getDenseBlock();
155+
for(int i = rl; i < ru; i++) {
156+
H5.H5Dread(contiguousDataset, i, row);
157+
for(int j = 0; j < ncol; j++) {
158+
if(row[j] != 0) {
159+
denseBlock.set(i, j, row[j]);
160+
lnnz++;
161+
}
162+
}
144163
}
145164
}
146-
row++;
147165
}
148-
IOUtilFunctions.closeSilently(bis);
166+
finally {
167+
IOUtilFunctions.closeSilently(bis);
168+
}
149169
return lnnz;
150170
}
151171

152172
public static MatrixBlock computeHDF5Size(List<Path> files, FileSystem fs, String datasetName)
153-
throws IOException, DMLRuntimeException {
173+
throws IOException, DMLRuntimeException
174+
{
154175
int nrow = 0;
155176
int ncol = 0;
156177
for(int fileNo = 0; fileNo < files.size(); fileNo++) {
157-
158178
BufferedInputStream bis = new BufferedInputStream(fs.open(files.get(fileNo)));
159179
H5RootObject rootObject = H5.H5Fopen(bis);
160180
H5.H5Dopen(rootObject, datasetName);

src/main/java/org/apache/sysds/runtime/io/WriterHDF5.java

Lines changed: 38 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,15 @@
2525
import org.apache.sysds.conf.ConfigurationManager;
2626
import org.apache.sysds.runtime.DMLRuntimeException;
2727
import org.apache.sysds.runtime.data.DenseBlock;
28+
import org.apache.sysds.runtime.data.SparseBlock;
2829
import org.apache.sysds.runtime.io.hdf5.H5;
2930
import org.apache.sysds.runtime.io.hdf5.H5RootObject;
3031
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
3132
import org.apache.sysds.runtime.util.HDFSTool;
3233

3334
import java.io.BufferedOutputStream;
3435
import java.io.IOException;
36+
import java.util.Arrays;
3537

3638
public class WriterHDF5 extends MatrixWriter {
3739

@@ -42,9 +44,9 @@ public WriterHDF5(FileFormatPropertiesHDF5 _props) {
4244
}
4345

4446
@Override
45-
public void writeMatrixToHDFS(MatrixBlock src, String fname, long rlen, long clen, int blen, long nnz,
46-
boolean diag) throws IOException, DMLRuntimeException {
47-
47+
public void writeMatrixToHDFS(MatrixBlock src, String fname, long rlen, long clen, int blen, long nnz, boolean diag)
48+
throws IOException, DMLRuntimeException
49+
{
4850
//validity check matrix dimensions
4951
if(src.getNumRows() != rlen || src.getNumColumns() != clen)
5052
throw new IOException("Matrix dimensions mismatch with metadata: " + src.getNumRows() + "x" + src
@@ -65,23 +67,24 @@ public void writeMatrixToHDFS(MatrixBlock src, String fname, long rlen, long cle
6567
writeHDF5MatrixToHDFS(path, job, fs, src);
6668

6769
IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(fs, path);
68-
6970
}
7071

7172
@Override
7273
public final void writeEmptyMatrixToHDFS(String fname, long rlen, long clen, int blen)
73-
throws IOException, DMLRuntimeException {
74-
74+
throws IOException, DMLRuntimeException
75+
{
76+
throw new DMLRuntimeException("writing empty HDF5 matrices not supported yet");
7577
}
7678

77-
protected void writeHDF5MatrixToHDFS(Path path, JobConf job, FileSystem fs, MatrixBlock src) throws IOException {
78-
//sequential write HDF5 file
79+
protected void writeHDF5MatrixToHDFS(Path path, JobConf job, FileSystem fs, MatrixBlock src)
80+
throws IOException
81+
{
7982
writeHDF5MatrixToFile(path, job, fs, src, 0, src.getNumRows());
8083
}
8184

82-
protected static void writeHDF5MatrixToFile(Path path, JobConf job, FileSystem fs, MatrixBlock src, int rl,
83-
int rlen) throws IOException {
84-
85+
protected static void writeHDF5MatrixToFile(Path path, JobConf job, FileSystem fs, MatrixBlock src, int rl, int rlen)
86+
throws IOException
87+
{
8588
int clen = src.getNumColumns();
8689
BufferedOutputStream bos = new BufferedOutputStream(fs.create(path, true));
8790
String datasetName = _props.getDatasetName();
@@ -94,23 +97,36 @@ protected static void writeHDF5MatrixToFile(Path path, JobConf job, FileSystem f
9497
}
9598

9699
try {
97-
//TODO: HDF5 format don't support spars matrix
98-
// How to store spars matrix in HDF5 format?
99-
100100
// Write the data to the datasets.
101-
double[] data = new double[clen];
102-
DenseBlock d = src.getDenseBlock();
103-
for(int i = rl; i < rlen; i++) {
104-
for(int j = 0; j < clen;j++) {
105-
double lvalue = d!=null ? d.get(i, j) : 0;
106-
data[j] = lvalue;
101+
double[] row = new double[clen];
102+
if( src.isInSparseFormat() ) {
103+
SparseBlock sb = src.getSparseBlock();
104+
for(int i = rl; i < rlen; i++) {
105+
Arrays.fill(row, 0);
106+
if( !sb.isEmpty(i) ) {
107+
int apos = sb.pos(i);
108+
int alen = sb.size(i);
109+
double[] avals = sb.values(i);
110+
int[] aix = sb.indexes(i);
111+
for(int j = apos; j < apos+alen; j++)
112+
row[aix[j]] = avals[j];
113+
}
114+
H5.H5Dwrite(rootObject, row);
115+
}
116+
}
117+
else {
118+
DenseBlock db = src.getDenseBlock();
119+
for(int i = rl; i < rlen; i++) {
120+
for(int j = 0; j < clen;j++) {
121+
double lvalue = db!=null ? db.get(i, j) : 0;
122+
row[j] = lvalue;
123+
}
124+
H5.H5Dwrite(rootObject, row);
107125
}
108-
H5.H5Dwrite(rootObject, data);
109126
}
110127
}
111128
finally {
112129
IOUtilFunctions.closeSilently(bos);
113130
}
114-
115131
}
116132
}

src/test/java/org/apache/sysds/test/functions/io/SeqParReadTest2.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -145,9 +145,9 @@ public static Collection<Object[]> data() {
145145
{false, "binary", false, 0.1},
146146
{false, "binary", true, 0.7},
147147
{false, "binary", true, 0.1},
148-
{true, "hdf5", false, 0.7},
149-
//{true, "hdf5", false, 0.1}, //FIXME
150-
//{true, "hdf5", true, 0.7},
148+
{true, "hdf5", false, 0.7},
149+
{true, "hdf5", false, 0.1},
150+
//{true, "hdf5", true, 0.7}, //FIXME
151151
//{true, "hdf5", true, 0.1},
152152
{true, "libsvm", false, 0.7},
153153
{true, "libsvm", false, 0.1},
@@ -190,8 +190,10 @@ public void textWriteRead() {
190190
}
191191

192192
//compare read content is equivalent to original
193-
if( data2 != null )
193+
if( data2 != null ) {
194+
Assert.assertEquals(data.getNonZeros(), data2.getNonZeros());
194195
TestUtils.compareMatrices(data, data2, eps);
196+
}
195197
}
196198

197199
@SuppressWarnings("incomplete-switch")

0 commit comments

Comments
 (0)