Skip to content

Commit 8753168

Browse files
committed
address review comments, use sequenceFile
1 parent 1a15b57 commit 8753168

File tree

1 file changed

+38
-0
lines changed

1 file changed

+38
-0
lines changed

src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,14 @@
2626
import org.apache.commons.io.IOUtils;
2727
import org.apache.hadoop.fs.FileSystem;
2828
import org.apache.hadoop.fs.Path;
29+
import org.apache.hadoop.io.SequenceFile;
2930
import org.apache.hadoop.io.SequenceFile.Writer;
3031
import org.apache.hadoop.mapred.JobConf;
3132
import org.apache.sysds.conf.CompilerConfig.ConfigType;
3233
import org.apache.sysds.conf.ConfigurationManager;
3334
import org.apache.sysds.hops.OptimizerUtils;
3435
import org.apache.sysds.runtime.DMLRuntimeException;
36+
import org.apache.sysds.runtime.DMLScriptException;
3537
import org.apache.sysds.runtime.compress.CompressedMatrixBlock;
3638
import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue;
3739
import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue;
@@ -237,6 +239,39 @@ protected final void writeDiagBinaryBlockMatrixToHDFS(Path path, JobConf job, M
237239

238240
@Override
239241
public long writeMatrixFromStream(String fname, LocalTaskQueue<IndexedMatrixValue> stream, long rlen, long clen, int blen) throws IOException {
242+
JobConf conf = ConfigurationManager.getCachedJobConf();
243+
Path path = new Path(fname);
244+
FileSystem fs = IOUtilFunctions.getFileSystem(path, conf);
245+
246+
SequenceFile.Writer writer = null;
247+
248+
long totalNnz = 0;
249+
try {
250+
// 1. Create Sequence file writer for the final destination file
251+
writer = new SequenceFile.Writer(fs, conf, path, MatrixIndexes.class, MatrixBlock.class);
252+
253+
// 2. Loop through OOC stream
254+
IndexedMatrixValue i_val = null;
255+
while((i_val = stream.dequeueTask()) != LocalTaskQueue.NO_MORE_TASKS) {
256+
MatrixBlock mb = (MatrixBlock) i_val.getValue();
257+
MatrixIndexes ix = i_val.getIndexes();
258+
259+
// 3. Append (key, value) record as a new value in the file
260+
writer.append(ix, mb);
261+
262+
totalNnz += mb.getNonZeros();
263+
}
264+
265+
} catch (IOException | InterruptedException e) {
266+
throw new DMLRuntimeException(e);
267+
} finally {
268+
IOUtilFunctions.closeSilently(writer);
269+
}
270+
271+
return totalNnz;
272+
}
273+
274+
public long writeMatrixFromStream1(String fname, LocalTaskQueue<IndexedMatrixValue> stream, long rlen, long clen, int blen) throws IOException {
240275
DataOutputStream dostream_data = null;
241276
DataOutputStream dostream_header = null;
242277

@@ -246,6 +281,7 @@ public long writeMatrixFromStream(String fname, LocalTaskQueue<IndexedMatrixValu
246281
Path headerPath = new Path(tempHeaderFname);
247282
Path finalPath = new Path(fname);
248283

284+
249285
FileSystem fs = null;
250286
long totalNnz = 0;
251287
try {
@@ -284,9 +320,11 @@ public long writeMatrixFromStream(String fname, LocalTaskQueue<IndexedMatrixValu
284320

285321
// MERGE STEP: Use HDFS concat for metadata-only merge
286322
fs.concat(finalPath, new Path[]{dataPath, headerPath});
323+
System.out.println("merged file available");
287324

288325
} catch (UnsupportedOperationException ex) {
289326
LOG.warn(ex.getMessage());
327+
System.out.println("concat is not available");
290328

291329
DataInputStream distream_header = null;
292330
DataInputStream distream_data = null;

0 commit comments

Comments
 (0)