Skip to content

Commit 75874c5

Browse files
committed
[MINOR] Fix singlethread CLA decompressingAggregate
1 parent ce23151 commit 75874c5

File tree

1 file changed

+32
-2
lines changed

1 file changed

+32
-2
lines changed

src/main/java/org/apache/sysds/runtime/compress/lib/CLALibCompAgg.java

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -418,8 +418,38 @@ private static void divideByNumberOfCellsForMeanAll(CompressedMatrixBlock m1, Ma
418418

419419
private static void decompressingAggregate(CompressedMatrixBlock m1, MatrixBlock ret, AggregateUnaryOperator op,
420420
MatrixIndexes indexesIn, boolean inCP) throws Exception {
421-
List<Future<MatrixBlock>> rtasks = generateUnaryAggregateOverlappingFutures(m1, ret, op);
422-
reduceFutures(rtasks, ret, op, true);
421+
if(op.getNumThreads() > 1){
422+
423+
List<Future<MatrixBlock>> rtasks = generateUnaryAggregateOverlappingFutures(m1, ret, op);
424+
reduceFutures(rtasks, ret, op, true);
425+
}
426+
else{
427+
final int nCol = m1.getNumColumns();
428+
final int nRow = m1.getNumRows();
429+
final List<AColGroup> groups = m1.getColGroups();
430+
final boolean shouldFilter = CLALibUtils.shouldPreFilter(groups);
431+
432+
final UAOverlappingTask t;
433+
if(shouldFilter) {
434+
final double[] constV = new double[nCol];
435+
final List<AColGroup> filteredGroups = CLALibUtils.filterGroups(groups, constV);
436+
final AColGroup cRet = ColGroupConst.create(constV);
437+
filteredGroups.add(cRet);
438+
t = new UAOverlappingTask(filteredGroups, ret, 0, nRow, op, nCol);
439+
}
440+
else {
441+
t = new UAOverlappingTask(groups, ret, 0, nRow, op, nCol);
442+
}
443+
if(op.indexFn instanceof ReduceAll)
444+
ret.set(0,0,t.call().get(0,0));
445+
else if(op.indexFn instanceof ReduceRow) {
446+
final boolean isPlus = op.aggOp.increOp.fn instanceof Mean || op.aggOp.increOp.fn instanceof KahanFunction;
447+
final BinaryOperator bop = isPlus ? new BinaryOperator(Plus.getPlusFnObject()) : op. aggOp.increOp;
448+
LibMatrixBincell.bincellOpInPlace(ret, t.call(), bop);
449+
}
450+
else // reduce cols just get the tasks done.
451+
t.call();
452+
}
423453
}
424454

425455
private static void reduceFutures(List<Future<MatrixBlock>> futures, MatrixBlock ret, AggregateUnaryOperator op,

0 commit comments

Comments
 (0)