1919
2020package org .apache .sysds .runtime .io ;
2121
22- import java .io .DataInputStream ;
23- import java .io .DataOutputStream ;
2422import java .io .IOException ;
2523
26- import org .apache .commons .io .IOUtils ;
2724import org .apache .hadoop .fs .FileSystem ;
2825import org .apache .hadoop .fs .Path ;
2926import org .apache .hadoop .io .SequenceFile ;
3330import org .apache .sysds .conf .ConfigurationManager ;
3431import org .apache .sysds .hops .OptimizerUtils ;
3532import org .apache .sysds .runtime .DMLRuntimeException ;
36- import org .apache .sysds .runtime .DMLScriptException ;
3733import org .apache .sysds .runtime .compress .CompressedMatrixBlock ;
3834import org .apache .sysds .runtime .controlprogram .parfor .LocalTaskQueue ;
3935import org .apache .sysds .runtime .instructions .spark .data .IndexedMatrixValue ;
@@ -247,8 +243,7 @@ public long writeMatrixFromStream(String fname, LocalTaskQueue<IndexedMatrixValu
247243
248244 long totalNnz = 0 ;
249245 try {
250- // 1. Create Sequence file writer for the final destination file
251- // writer = new SequenceFile.Writer(fs, conf, path, MatrixIndexes.class, MatrixBlock.class);
246+ // 1. Create Sequence file writer for the final destination file writer = new SequenceFile.Writer(fs, conf, path, MatrixIndexes.class, MatrixBlock.class);
252247 writer = SequenceFile .createWriter (fs , conf , path , MatrixIndexes .class , MatrixBlock .class );
253248
254249 // 2. Loop through OOC stream
@@ -271,100 +266,4 @@ public long writeMatrixFromStream(String fname, LocalTaskQueue<IndexedMatrixValu
271266
272267 return totalNnz ;
273268 }
274-
275- public long writeMatrixFromStream1 (String fname , LocalTaskQueue <IndexedMatrixValue > stream , long rlen , long clen , int blen ) throws IOException {
276- DataOutputStream dostream_data = null ;
277- DataOutputStream dostream_header = null ;
278-
279- String tempDataFname = fname + "._data" ;
280- String tempHeaderFname = fname + "._header" ;
281- Path dataPath = new Path (tempDataFname );
282- Path headerPath = new Path (tempHeaderFname );
283- Path finalPath = new Path (fname );
284-
285-
286- FileSystem fs = null ;
287- long totalNnz = 0 ;
288- try {
289- // PASS 1: Stream to a temporary raw data file and count NNZ
290- fs = IOUtilFunctions .getFileSystem (dataPath );
291- // dostream = getHDFSDataOutputStream(fname, true);
292- dostream_data = fs .create (dataPath , true );
293- // dostream_data.writeLong(rlen);
294- // dostream_data.writeLong(clen);
295-
296- // long totalNnz = 0;
297- IndexedMatrixValue i_val = null ;
298- while ((i_val = stream .dequeueTask ()) != LocalTaskQueue .NO_MORE_TASKS ) {
299- MatrixBlock mb = (MatrixBlock ) i_val .getValue ();
300- totalNnz += mb .getNonZeros ();
301-
302- double [] denseValues = mb .getDenseBlockValues ();
303- System .out .println ("totalNnz: " + totalNnz );
304- if (denseValues != null ) {
305- for (double v : denseValues ) {
306- dostream_data .writeDouble (v );
307- }
308- }
309- // mb.write(dostream);
310- }
311- IOUtilFunctions .closeSilently (dostream_data );
312-
313- // PASS 2: Create a header file in RAM (very small)
314- dostream_header = fs .create (headerPath , true );
315- dostream_header .writeLong (rlen );
316- dostream_header .writeLong (clen );
317- dostream_header .writeInt (blen );
318- dostream_header .writeBoolean (false ); // isSparse
319- dostream_header .writeLong (totalNnz );
320- IOUtilFunctions .closeSilently (dostream_header );
321-
322- // MERGE STEP: Use HDFS concat for metadata-only merge
323- fs .concat (finalPath , new Path []{dataPath , headerPath });
324- System .out .println ("merged file available" );
325-
326- } catch (UnsupportedOperationException ex ) {
327- LOG .warn (ex .getMessage ());
328- System .out .println ("concat is not available" );
329-
330- DataInputStream distream_header = null ;
331- DataInputStream distream_data = null ;
332- DataOutputStream dostream_final = null ;
333-
334- try {
335- dostream_final = fs .create (finalPath , true );
336-
337- // 1. Copy header file content
338- distream_header = fs .open (headerPath );
339- IOUtils .copy (distream_header , dostream_final );
340- IOUtilFunctions .closeSilently (distream_header );
341-
342- // 2. Copy data file content
343- distream_data = fs .open (dataPath );
344- IOUtils .copy (distream_data , dostream_final );
345- IOUtilFunctions .closeSilently (distream_data );
346- }
347- finally {
348- IOUtilFunctions .closeSilently (dostream_final );
349- }
350-
351-
352- // throw new IOException("The filesystem doesn't support concat, required for OOC", ex);
353- }
354- catch (IOException ex ) {
355- throw new RuntimeException (ex );
356- } catch (InterruptedException e ) {
357- throw new RuntimeException (e );
358- } finally {
359- // Cleanup incase of failure before concat
360- // IOUtilFunctions.closeSilently(dostream_data);
361- // IOUtilFunctions.closeSilently(dostream_header);
362- //
363- // if (fs != null) {
364- // if (fs.exists(dataPath)) fs.delete(dataPath, false);
365- // if (fs.exists(headerPath)) fs.delete(headerPath, false);
366- // }
367- }
368- return totalNnz ;
369- };
370269}
0 commit comments