Skip to content

Commit fbee615

Browse files
committed
parallel putinto
1 parent 1a150d3 commit fbee615

File tree

1 file changed

+45
-14
lines changed

1 file changed

+45
-14
lines changed

src/main/java/org/apache/sysds/runtime/transform/encode/CompressedEncode.java

Lines changed: 45 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -606,7 +606,7 @@ private <T> void estimateRCDMapSize(ColumnEncoderComposite c) {
606606
c._estNumDistincts = estDistCount;
607607
}
608608

609-
private void combineUncompressed(CompressedMatrixBlock mb) {
609+
private void combineUncompressed(CompressedMatrixBlock mb) throws InterruptedException, ExecutionException {
610610

611611
List<ColGroupUncompressedArray> ucg = new ArrayList<>();
612612
List<AColGroup> ret = new ArrayList<>();
@@ -615,34 +615,65 @@ private void combineUncompressed(CompressedMatrixBlock mb) {
615615
ucg.add((ColGroupUncompressedArray) g);
616616
else
617617
ret.add(g);
618-
}
619-
if(ucg.size() > 0){
618+
}
619+
if(ucg.size() > 0) {
620620
ret.add(combine(ucg));
621-
nnz.addAndGet(ret.get(ret.size()-1).getNumberNonZeros(in.getNumRows()));
621+
nnz.addAndGet(ret.get(ret.size() - 1).getNumberNonZeros(in.getNumRows()));
622622
}
623623
mb.allocateColGroupList(ret);
624624
}
625625

626-
private AColGroup combine(List<ColGroupUncompressedArray> ucg) {
626+
private AColGroup combine(List<ColGroupUncompressedArray> ucg) throws InterruptedException, ExecutionException {
627627
IColIndex combinedCols = ColIndexFactory.combine(ucg);
628628

629-
ucg.sort((a,b) -> Integer.compare(a.id,b.id));
629+
ucg.sort((a, b) -> Integer.compare(a.id, b.id));
630630
MatrixBlock ret = new MatrixBlock(in.getNumRows(), combinedCols.size(), false);
631631
ret.allocateDenseBlock();
632-
DenseBlock db = ret.getDenseBlock();
633-
for(int i =0; i < in.getNumRows(); i++){
634-
double[] rval = db.values(i);
635-
int off = db.pos(i);
636-
for(int j = 0; j < combinedCols.size(); j++){
637-
rval[off + j] = ucg.get(j).array.getAsDouble(i);
638-
}
639-
}
632+
final DenseBlock db = ret.getDenseBlock();
633+
final int nrow = in.getNumRows();
634+
final int ncol = combinedCols.size();
635+
if(isParallel() && (long) nrow * ncol > 10000 && nrow > 512)
636+
parallelPutInto(ucg, db, nrow, ncol);
637+
else
638+
putInto(ucg, db, 0, nrow, 0, ncol);
640639

641640
ret.recomputeNonZeros(k);
642641

643642
return ColGroupUncompressed.create(ret, combinedCols);
644643
}
645644

645+
private void parallelPutInto(List<ColGroupUncompressedArray> ucg, DenseBlock db, int nrow, int ncol)
646+
throws InterruptedException, ExecutionException {
647+
List<Future<?>> tasks = new ArrayList<>();
648+
649+
final int iblk = Math.max(512, nrow / k);
650+
final int jblk = Math.min(128, ncol);
651+
for(int i = 0; i < nrow; i += iblk) {
652+
int si = i;
653+
int ei = Math.min(nrow, iblk + i);
654+
for(int j = 0; j < ncol; j += jblk) {
655+
int sj = j;
656+
int ej = Math.min(ncol, jblk + j);
657+
tasks.add(pool.submit(() -> {
658+
putInto(ucg, db, si, ei, sj, ej);
659+
}));
660+
}
661+
}
662+
663+
for(Future<?> t : tasks)
664+
t.get();
665+
}
666+
667+
private void putInto(List<ColGroupUncompressedArray> ucg, DenseBlock db, int il, int iu, int jl, int ju) {
668+
for(int i = il; i < iu; i++) {
669+
final double[] rval = db.values(i);
670+
final int off = db.pos(i);
671+
for(int j = jl; j < ju; j++) {
672+
rval[off + j] = ucg.get(j).array.getAsDouble(i);
673+
}
674+
}
675+
}
676+
646677
private void logging(MatrixBlock mb) {
647678
if(LOG.isDebugEnabled()) {
648679
LOG.debug(String.format("Uncompressed transform encode Dense size: %16d", mb.estimateSizeDenseInMemory()));

0 commit comments

Comments
 (0)