3737import org .apache .sysds .runtime .compress .colgroup .ColGroupFactory ;
3838import org .apache .sysds .runtime .compress .colgroup .ColGroupUncompressed ;
3939import org .apache .sysds .runtime .compress .cost .ACostEstimate ;
40+ import org .apache .sysds .runtime .compress .cost .ComputationCostEstimator ;
4041import org .apache .sysds .runtime .compress .cost .CostEstimatorBuilder ;
4142import org .apache .sysds .runtime .compress .cost .CostEstimatorFactory ;
4243import org .apache .sysds .runtime .compress .cost .InstructionTypeCounter ;
@@ -159,7 +160,7 @@ public static Pair<MatrixBlock, CompressionStatistics> compress(MatrixBlock mb,
159160 return compress (mb , k , compSettings , (WTreeRoot ) null );
160161 }
161162
162- public static Future <Void > compressAsync (ExecutionContext ec , String varName ) {
163+ public static Future <Void > compressAsync (ExecutionContext ec , String varName ) {
163164 return compressAsync (ec , varName , null );
164165 }
165166
@@ -168,7 +169,7 @@ public static Future<Void> compressAsync(ExecutionContext ec, String varName, In
168169 final ExecutorService pool = CommonThreadPool .get (); // We have to guarantee that a thread pool is allocated.
169170 return CompletableFuture .runAsync (() -> {
170171 // method call or code to be async
171- try {
172+ try {
172173 CacheableData <?> data = ec .getCacheableData (varName );
173174 if (data instanceof MatrixObject ) {
174175 MatrixObject mo = (MatrixObject ) data ;
@@ -178,10 +179,11 @@ public static Future<Void> compressAsync(ExecutionContext ec, String varName, In
178179 ExecutionContext .createCacheableData (mb );
179180 mo .acquireModify (mbc );
180181 mo .release ();
182+ mbc .sum (); // calculate sum to forcefully materialize counts
181183 }
182184 }
183185 }
184- finally {
186+ finally {
185187 pool .shutdown ();
186188 }
187189 }, pool );
@@ -288,11 +290,16 @@ else if(mb instanceof CompressedMatrixBlock && ((CompressedMatrixBlock) mb).isOv
288290 _stats .originalSize = mb .getInMemorySize ();
289291 _stats .originalCost = costEstimator .getCost (mb );
290292
293+ final double orgSum ;
294+ if (CompressedMatrixBlock .debug )
295+ orgSum = mb .sum (k ).getDouble (0 , 0 );
296+ else
297+ orgSum = 0 ;
291298 if (mb .isEmpty ()) // empty input return empty compression
292299 return createEmpty ();
293300
294301 res = new CompressedMatrixBlock (mb ); // copy metadata and allocate soft reference
295-
302+ logInit ();
296303 classifyPhase ();
297304 if (compressionGroups == null )
298305 return abortCompression ();
@@ -308,6 +315,12 @@ else if(mb instanceof CompressedMatrixBlock && ((CompressedMatrixBlock) mb).isOv
308315 if (res == null )
309316 return abortCompression ();
310317
318+ if (CompressedMatrixBlock .debug ) {
319+ final double afterComp = mb .sum (k ).getDouble (0 , 0 );
320+ final double deltaSum = Math .abs (orgSum - afterComp );
321+ LOG .debug ("compression Sum: Before:" + orgSum + " after: " + afterComp + " |delta|: " + deltaSum );
322+ }
323+
311324 return new ImmutablePair <>(res , _stats );
312325 }
313326
@@ -334,7 +347,8 @@ private void classifyPhase() {
334347 final double scale = Math .sqrt (nCols );
335348 final double threshold = _stats .estimatedCostCols / scale ;
336349
337- if (threshold < _stats .originalCost ) {
350+ if (threshold < _stats .originalCost *
351+ ((costEstimator instanceof ComputationCostEstimator ) && !(mb instanceof CompressedMatrixBlock ) ? 15 : 0.8 )) {
338352 if (nCols > 1 )
339353 coCodePhase ();
340354 else // LOG a short cocode phase (since there is one column we don't cocode)
@@ -406,7 +420,7 @@ private void transposeHeuristics() {
406420 compSettings .transposed = false ;
407421 break ;
408422 default :
409- compSettings .transposed = transposeHeuristics (compressionGroups .getNumberColGroups () , mb );
423+ compSettings .transposed = transposeHeuristics (compressionGroups .getNumberColGroups (), mb );
410424 }
411425 }
412426
@@ -442,20 +456,20 @@ private void finalizePhase() {
442456
443457 _stats .compressedSize = res .getInMemorySize ();
444458 _stats .compressedCost = costEstimator .getCost (res .getColGroups (), res .getNumRows ());
445-
446- final double ratio = _stats .getRatio ();
447- final double denseRatio = _stats .getDenseRatio ();
448-
449459 _stats .setColGroupsCounts (res .getColGroups ());
450- if (ratio < 1 && denseRatio < 100.0 ) {
460+
461+ if (_stats .compressedCost > _stats .originalCost ) {
451462 LOG .info ("--dense size: " + _stats .denseSize );
452463 LOG .info ("--original size: " + _stats .originalSize );
453464 LOG .info ("--compressed size: " + _stats .compressedSize );
454- LOG .info ("--compression ratio: " + ratio );
465+ LOG .info ("--compression ratio: " + _stats .getRatio ());
466+ LOG .info ("--original Cost: " + _stats .originalCost );
467+ LOG .info ("--Compressed Cost: " + _stats .compressedCost );
468+ LOG .info ("--Cost Ratio: " + _stats .getCostRatio ());
455469 LOG .debug ("--col groups types " + _stats .getGroupsTypesString ());
456470 LOG .debug ("--col groups sizes " + _stats .getGroupsSizesString ());
457471 logLengths ();
458- LOG .info ("Abort block compression because compression ratio is less than 1." );
472+ LOG .info ("Abort block compression because cost ratio is less than 1. " );
459473 res = null ;
460474 setNextTimePhase (time .stop ());
461475 DMLCompressionStatistics .addCompressionTime (getLastTimePhase (), phase );
@@ -472,9 +486,23 @@ private void finalizePhase() {
472486
473487 private Pair <MatrixBlock , CompressionStatistics > abortCompression () {
474488 LOG .warn ("Compression aborted at phase: " + phase );
489+ if (mb instanceof CompressedMatrixBlock && mb .getInMemorySize () > _stats .denseSize ) {
490+ MatrixBlock ucmb = ((CompressedMatrixBlock ) mb ).getUncompressed ("Decompressing for abort: " , k );
491+ return new ImmutablePair <>(ucmb , _stats );
492+ }
475493 return new ImmutablePair <>(mb , _stats );
476494 }
477495
496+ private void logInit () {
497+ if (LOG .isDebugEnabled ()) {
498+ LOG .debug ("--Seed used for comp : " + compSettings .seed );
499+ LOG .debug (String .format ("--number columns to compress: %10d" , mb .getNumColumns ()));
500+ LOG .debug (String .format ("--number rows to compress : %10d" , mb .getNumRows ()));
501+ LOG .debug (String .format ("--sparsity : %10.5f" , mb .getSparsity ()));
502+ LOG .debug (String .format ("--nonZeros : %10d" , mb .getNonZeros ()));
503+ }
504+ }
505+
478506 private void logPhase () {
479507 setNextTimePhase (time .stop ());
480508 DMLCompressionStatistics .addCompressionTime (getLastTimePhase (), phase );
@@ -486,7 +514,6 @@ private void logPhase() {
486514 else {
487515 switch (phase ) {
488516 case 0 :
489- LOG .debug ("--Seed used for comp : " + compSettings .seed );
490517 LOG .debug ("--compression phase " + phase + " Classify : " + getLastTimePhase ());
491518 LOG .debug ("--Individual Columns Estimated Compression: " + _stats .estimatedSizeCols );
492519 if (mb instanceof CompressedMatrixBlock ) {
0 commit comments