Skip to content

Commit 2880ead

Browse files
committed
revert RDD frame read
1 parent 2479526 commit 2880ead

File tree

1 file changed

+24
-21
lines changed

1 file changed

+24
-21
lines changed

src/main/java/org/apache/sysds/runtime/instructions/spark/utils/FrameRDDConverterUtils.java

Lines changed: 24 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,10 @@ public static JavaPairRDD<Long, FrameBlock> csvToBinaryBlock(JavaSparkContext sc
9090
JavaRDD<String> tmp = input.values()
9191
.map(new TextToStringFunction());
9292
String tmpStr = tmp.first();
93-
long rlen = tmp.count() ;
93+
boolean metaHeader = tmpStr.startsWith(TfUtils.TXMTD_MVPREFIX)
94+
|| tmpStr.startsWith(TfUtils.TXMTD_NDPREFIX);
95+
tmpStr = (metaHeader) ? tmpStr.substring(tmpStr.indexOf(delim)+1) : tmpStr;
96+
long rlen = tmp.count() - (hasHeader ? 1 : 0) - (metaHeader ? 2 : 0);
9497
long clen = IOUtilFunctions.splitCSV(tmpStr, delim).length;
9598
mc.set(rlen, clen, mc.getBlocksize(), -1);
9699
}
@@ -579,14 +582,14 @@ public Iterator<Tuple2<Long, FrameBlock>> call(Iterator<Tuple2<Text,Long>> arg0)
579582
_colnames = row.split(_delim);
580583
continue;
581584
}
582-
// if( row.startsWith(TfUtils.TXMTD_MVPREFIX) ) {
583-
// _mvMeta = Arrays.asList(Arrays.copyOfRange(IOUtilFunctions.splitCSV(row, _delim), 1, (int)_clen+1));
584-
// continue;
585-
// }
586-
// else if( row.startsWith(TfUtils.TXMTD_NDPREFIX) ) {
587-
// _ndMeta = Arrays.asList(Arrays.copyOfRange(IOUtilFunctions.splitCSV(row, _delim), 1, (int)_clen+1));
588-
// continue;
589-
// }
585+
if( row.startsWith(TfUtils.TXMTD_MVPREFIX) ) {
586+
_mvMeta = Arrays.asList(Arrays.copyOfRange(IOUtilFunctions.splitCSV(row, _delim), 1, (int)_clen+1));
587+
continue;
588+
}
589+
else if( row.startsWith(TfUtils.TXMTD_NDPREFIX) ) {
590+
_ndMeta = Arrays.asList(Arrays.copyOfRange(IOUtilFunctions.splitCSV(row, _delim), 1, (int)_clen+1));
591+
continue;
592+
}
590593

591594
//adjust row index for header and meta data
592595
rowix += (_hasHeader ? 0 : 1) - ((_mvMeta == null) ? 0 : 2);
@@ -667,18 +670,18 @@ public Iterator<String> call(Tuple2<Long, FrameBlock> arg0)
667670
ret.add(sb.toString());
668671
sb.setLength(0); //reset
669672
}
670-
// if( !blk.isColumnMetadataDefault() ) {
671-
// sb.append(TfUtils.TXMTD_MVPREFIX + _props.getDelim());
672-
// for( int j=0; j<blk.getNumColumns(); j++ )
673-
// sb.append(blk.getColumnMetadata(j).getMvValue() + ((j<blk.getNumColumns()-1)?_props.getDelim():""));
674-
// ret.add(sb.toString());
675-
// sb.setLength(0); //reset
676-
// sb.append(TfUtils.TXMTD_NDPREFIX + _props.getDelim());
677-
// for( int j=0; j<blk.getNumColumns(); j++ )
678-
// sb.append(blk.getColumnMetadata(j).getNumDistinct() + ((j<blk.getNumColumns()-1)?_props.getDelim():""));
679-
// ret.add(sb.toString());
680-
// sb.setLength(0); //reset
681-
// }
673+
if( !blk.isColumnMetadataDefault() ) {
674+
sb.append(TfUtils.TXMTD_MVPREFIX + _props.getDelim());
675+
for( int j=0; j<blk.getNumColumns(); j++ )
676+
sb.append(blk.getColumnMetadata(j).getMvValue() + ((j<blk.getNumColumns()-1)?_props.getDelim():""));
677+
ret.add(sb.toString());
678+
sb.setLength(0); //reset
679+
sb.append(TfUtils.TXMTD_NDPREFIX + _props.getDelim());
680+
for( int j=0; j<blk.getNumColumns(); j++ )
681+
sb.append(blk.getColumnMetadata(j).getNumDistinct() + ((j<blk.getNumColumns()-1)?_props.getDelim():""));
682+
ret.add(sb.toString());
683+
sb.setLength(0); //reset
684+
}
682685
}
683686

684687
//handle Frame block data

0 commit comments

Comments
 (0)