|
28 | 28 | import java.util.ArrayList; |
29 | 29 | import java.util.Iterator; |
30 | 30 | import java.util.List; |
| 31 | +import java.util.concurrent.ExecutorService; |
31 | 32 | import java.util.concurrent.Future; |
32 | 33 |
|
33 | 34 | import org.apache.commons.lang3.NotImplementedException; |
|
88 | 89 | import org.apache.sysds.runtime.matrix.operators.ScalarOperator; |
89 | 90 | import org.apache.sysds.runtime.matrix.operators.TernaryOperator; |
90 | 91 | import org.apache.sysds.runtime.matrix.operators.UnaryOperator; |
| 92 | +import org.apache.sysds.runtime.util.CommonThreadPool; |
91 | 93 | import org.apache.sysds.runtime.util.IndexRange; |
92 | 94 | import org.apache.sysds.utils.DMLCompressionStatistics; |
93 | 95 | import org.apache.sysds.utils.stats.InfrastructureAnalyzer; |
@@ -319,6 +321,35 @@ public long recomputeNonZeros() { |
319 | 321 | return nonZeros; |
320 | 322 | } |
321 | 323 |
|
| 324 | + @Override |
| 325 | + public long recomputeNonZeros(int k) { |
| 326 | + if(k <= 1 || isOverlapping() || _colGroups.size() <= 1) |
| 327 | + return recomputeNonZeros(); |
| 328 | + |
| 329 | + final ExecutorService pool = CommonThreadPool.get(k); |
| 330 | + try { |
| 331 | + List<Future<Long>> tasks = new ArrayList<>(); |
| 332 | + for(AColGroup g : _colGroups) |
| 333 | + tasks.add(pool.submit(() -> g.getNumberNonZeros(rlen))); |
| 334 | + |
| 335 | + long nnz = 0; |
| 336 | + for(Future<Long> t : tasks) |
| 337 | + nnz += t.get(); |
| 338 | + nonZeros = nnz; |
| 339 | + } |
| 340 | + catch(Exception e) { |
| 341 | + throw new DMLRuntimeException("Failed to count non zeros", e); |
| 342 | + } |
| 343 | + finally { |
| 344 | + pool.shutdown(); |
| 345 | + } |
| 346 | + |
| 347 | + if(nonZeros == 0) // If there is no nonzeros then reallocate into single empty column group. |
| 348 | + allocateColGroup(ColGroupEmpty.create(getNumColumns())); |
| 349 | + |
| 350 | + return nonZeros; |
| 351 | + } |
| 352 | + |
322 | 353 | @Override |
323 | 354 | public long recomputeNonZeros(int rl, int ru) { |
324 | 355 | throw new NotImplementedException(); |
|
0 commit comments