Skip to content

Commit b570380

Browse files
committed
SYSTEMDS-3539 Implement delta encoding (Parts 1, 2, and 3)
1 parent 3779d50 commit b570380

24 files changed

+1674
-130
lines changed

src/main/java/org/apache/sysds/runtime/compress/CompressionSettings.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,11 +133,14 @@ public class CompressionSettings {
133133

134134
public final double[] scaleFactors;
135135

136+
public final boolean preferDeltaEncoding;
137+
136138
protected CompressionSettings(double samplingRatio, double samplePower, boolean allowSharedDictionary,
137139
String transposeInput, int seed, boolean lossy, EnumSet<CompressionType> validCompressions,
138140
boolean sortValuesByLength, PartitionerType columnPartitioner, int maxColGroupCoCode, double coCodePercentage,
139141
int minimumSampleSize, int maxSampleSize, EstimationType estimationType, CostType costComputationType,
140-
double minimumCompressionRatio, boolean isInSparkInstruction, SORT_TYPE sdcSortType, double[] scaleFactors) {
142+
double minimumCompressionRatio, boolean isInSparkInstruction, SORT_TYPE sdcSortType, double[] scaleFactors,
143+
boolean preferDeltaEncoding) {
141144
this.samplingRatio = samplingRatio;
142145
this.samplePower = samplePower;
143146
this.allowSharedDictionary = allowSharedDictionary;
@@ -157,6 +160,7 @@ protected CompressionSettings(double samplingRatio, double samplePower, boolean
157160
this.isInSparkInstruction = isInSparkInstruction;
158161
this.sdcSortType = sdcSortType;
159162
this.scaleFactors = scaleFactors;
163+
this.preferDeltaEncoding = preferDeltaEncoding;
160164

161165
if(!printedStatus && LOG.isDebugEnabled()) {
162166
printedStatus = true;

src/main/java/org/apache/sysds/runtime/compress/CompressionSettingsBuilder.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ public class CompressionSettingsBuilder {
5353
private boolean isInSparkInstruction = false;
5454
private SORT_TYPE sdcSortType = SORT_TYPE.MATERIALIZE;
5555
private double[] scaleFactors = null;
56+
private boolean preferDeltaEncoding = false;
5657

5758
public CompressionSettingsBuilder() {
5859

@@ -101,6 +102,7 @@ public CompressionSettingsBuilder copySettings(CompressionSettings that) {
101102
this.maxColGroupCoCode = that.maxColGroupCoCode;
102103
this.coCodePercentage = that.coCodePercentage;
103104
this.minimumSampleSize = that.minimumSampleSize;
105+
this.preferDeltaEncoding = that.preferDeltaEncoding;
104106
return this;
105107
}
106108

@@ -336,6 +338,19 @@ public CompressionSettingsBuilder setSDCSortType(SORT_TYPE sdcSortType) {
336338
return this;
337339
}
338340

341+
/**
342+
* Set whether to prefer delta encoding during compression estimation.
343+
* When enabled, the compression estimator will use delta encoding statistics
344+
* instead of regular encoding statistics.
345+
*
346+
* @param preferDeltaEncoding Whether to prefer delta encoding
347+
* @return The CompressionSettingsBuilder
348+
*/
349+
public CompressionSettingsBuilder setPreferDeltaEncoding(boolean preferDeltaEncoding) {
350+
this.preferDeltaEncoding = preferDeltaEncoding;
351+
return this;
352+
}
353+
339354
/**
340355
* Create the CompressionSettings object to use in the compression.
341356
*
@@ -345,6 +360,6 @@ public CompressionSettings create() {
345360
return new CompressionSettings(samplingRatio, samplePower, allowSharedDictionary, transposeInput, seed, lossy,
346361
validCompressions, sortValuesByLength, columnPartitioner, maxColGroupCoCode, coCodePercentage,
347362
minimumSampleSize, maxSampleSize, estimationType, costType, minimumCompressionRatio, isInSparkInstruction,
348-
sdcSortType, scaleFactors);
363+
sdcSortType, scaleFactors, preferDeltaEncoding);
349364
}
350365
}

src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroup.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,10 @@ protected static enum ColGroupType {
9393
/** The ColGroup indexes contained in the ColGroup */
9494
protected final IColIndex _colIndexes;
9595

96+
protected AColGroup() {
97+
_colIndexes = null;
98+
}
99+
96100
/**
97101
* Main constructor.
98102
*

src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroupCompressed.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,10 @@ public abstract class AColGroupCompressed extends AColGroup {
4646

4747
private static final long serialVersionUID = 6219835795420081223L;
4848

49+
protected AColGroupCompressed() {
50+
super();
51+
}
52+
4953
protected AColGroupCompressed(IColIndex colIndices) {
5054
super(colIndices);
5155
}

src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroupValue.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@ public abstract class AColGroupValue extends ADictBasedColGroup {
3535
/** The count of each distinct value contained in the dictionary */
3636
private SoftReference<int[]> counts = null;
3737

38+
protected AColGroupValue() {
39+
super();
40+
}
41+
3842
/**
3943
* A abstract class for column groups that contain IDictionary for values.
4044
*

src/main/java/org/apache/sysds/runtime/compress/colgroup/ADictBasedColGroup.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,11 @@ public abstract class ADictBasedColGroup extends AColGroupCompressed implements
4343
/** Distinct value tuples associated with individual bitmaps. */
4444
protected final IDictionary _dict;
4545

46+
protected ADictBasedColGroup() {
47+
super();
48+
_dict = null;
49+
}
50+
4651
/**
4752
* A Abstract class for column groups that contain IDictionary for values.
4853
*

src/main/java/org/apache/sysds/runtime/compress/colgroup/APreAgg.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,10 @@ public abstract class APreAgg extends AColGroupValue {
4141

4242
private static boolean loggedWarningForDirect = false;
4343

44+
protected APreAgg() {
45+
super();
46+
}
47+
4448
/**
4549
* A Abstract class for column groups that contain IDictionary for values.
4650
*

src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,12 @@ public class ColGroupDDC extends APreAgg implements IMapToDataGroup {
7777

7878
static final VectorSpecies<Double> SPECIES = DoubleVector.SPECIES_PREFERRED;
7979

80-
private ColGroupDDC(IColIndex colIndexes, IDictionary dict, AMapToData data, int[] cachedCounts) {
80+
protected ColGroupDDC() {
81+
super();
82+
_data = null;
83+
}
84+
85+
protected ColGroupDDC(IColIndex colIndexes, IDictionary dict, AMapToData data, int[] cachedCounts) {
8186
super(colIndexes, dict, cachedCounts);
8287
_data = data;
8388

src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDeltaDDC.java

Lines changed: 91 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -19,62 +19,107 @@
1919

2020
package org.apache.sysds.runtime.compress.colgroup;
2121

22+
import org.apache.commons.lang3.NotImplementedException;
23+
import org.apache.sysds.runtime.DMLRuntimeException;
24+
import org.apache.sysds.runtime.compress.CompressedMatrixBlock;
25+
import org.apache.sysds.runtime.compress.DMLCompressionException;
26+
import org.apache.sysds.runtime.compress.colgroup.dictionary.DeltaDictionary;
27+
import org.apache.sysds.runtime.compress.colgroup.dictionary.IDictionary;
28+
import org.apache.sysds.runtime.compress.colgroup.indexes.IColIndex;
29+
import org.apache.sysds.runtime.compress.colgroup.mapping.AMapToData;
30+
import org.apache.sysds.runtime.data.DenseBlock;
31+
import org.apache.sysds.runtime.data.SparseBlock;
32+
import org.apache.sysds.runtime.matrix.operators.ScalarOperator;
33+
2234
/**
2335
* Class to encapsulate information about a column group that is first delta encoded then encoded with dense dictionary
2436
* encoding (DeltaDDC).
2537
*/
26-
public class ColGroupDeltaDDC { // extends ColGroupDDC
38+
public class ColGroupDeltaDDC extends ColGroupDDC {
39+
private static final long serialVersionUID = -1045556313148564147L;
2740

28-
// private static final long serialVersionUID = -1045556313148564147L;
41+
/** Constructor for serialization */
42+
protected ColGroupDeltaDDC() {
43+
super();
44+
}
2945

30-
// /** Constructor for serialization */
31-
// protected ColGroupDeltaDDC() {
32-
// }
46+
private ColGroupDeltaDDC(IColIndex colIndexes, IDictionary dict, AMapToData data, int[] cachedCounts) {
47+
super(colIndexes, dict, data, cachedCounts);
48+
if(CompressedMatrixBlock.debug) {
49+
if(!(dict instanceof DeltaDictionary))
50+
throw new DMLCompressionException("DeltaDDC must use DeltaDictionary");
51+
}
52+
}
3353

34-
// private ColGroupDeltaDDC(int[] colIndexes, ADictionary dict, AMapToData data, int[] cachedCounts) {
35-
// super();
36-
// LOG.info("Carefully use of DeltaDDC since implementation is not finished.");
37-
// _colIndexes = colIndexes;
38-
// _dict = dict;
39-
// _data = data;
40-
// }
54+
public static AColGroup create(IColIndex colIndexes, IDictionary dict, AMapToData data, int[] cachedCounts) {
55+
if(data.getUnique() == 1)
56+
return ColGroupConst.create(colIndexes, dict);
57+
else if(dict == null)
58+
return new ColGroupEmpty(colIndexes);
59+
else
60+
return new ColGroupDeltaDDC(colIndexes, dict, data, cachedCounts);
61+
}
4162

42-
// public static AColGroup create(int[] colIndices, ADictionary dict, AMapToData data, int[] cachedCounts) {
43-
// if(dict == null)
44-
// throw new NotImplementedException("Not implemented constant delta group");
45-
// else
46-
// return new ColGroupDeltaDDC(colIndices, dict, data, cachedCounts);
47-
// }
63+
@Override
64+
public CompressionType getCompType() {
65+
return CompressionType.DeltaDDC;
66+
}
4867

49-
// public CompressionType getCompType() {
50-
// return CompressionType.DeltaDDC;
51-
// }
68+
@Override
69+
protected void decompressToDenseBlockDenseDictionary(DenseBlock db, int rl, int ru, int offR, int offC,
70+
double[] values) {
71+
final int nCol = _colIndexes.size();
72+
final double[] prevRow = new double[nCol];
73+
74+
if(rl > 0) {
75+
final double[] prevRowData = db.values(rl - 1 + offR);
76+
final int prevOff = db.pos(rl - 1 + offR) + offC;
77+
for(int j = 0; j < nCol; j++) {
78+
prevRow[j] = prevRowData[prevOff + _colIndexes.get(j)];
79+
}
80+
}
5281

53-
// @Override
54-
// protected void decompressToDenseBlockDenseDictionary(DenseBlock db, int rl, int ru, int offR, int offC,
55-
// double[] values) {
56-
// final int nCol = _colIndexes.length;
57-
// for(int i = rl, offT = rl + offR; i < ru; i++, offT++) {
58-
// final double[] c = db.values(offT);
59-
// final int off = db.pos(offT) + offC;
60-
// final int rowIndex = _data.getIndex(i) * nCol;
61-
// final int prevOff = (off == 0) ? off : off - nCol;
62-
// for(int j = 0; j < nCol; j++) {
63-
// // Here we use the values in the previous row to compute current values along with the delta
64-
// double newValue = c[prevOff + j] + values[rowIndex + j];
65-
// c[off + _colIndexes[j]] += newValue;
66-
// }
67-
// }
68-
// }
82+
for(int i = rl, offT = rl + offR; i < ru; i++, offT++) {
83+
final double[] c = db.values(offT);
84+
final int off = db.pos(offT) + offC;
85+
final int dictIdx = _data.getIndex(i);
86+
final int rowIndex = dictIdx * nCol;
87+
88+
if(i == 0 && rl == 0) {
89+
for(int j = 0; j < nCol; j++) {
90+
final double value = values[rowIndex + j];
91+
final int colIdx = _colIndexes.get(j);
92+
c[off + colIdx] = value;
93+
prevRow[j] = value;
94+
}
95+
}
96+
else {
97+
for(int j = 0; j < nCol; j++) {
98+
final double delta = values[rowIndex + j];
99+
final double newValue = prevRow[j] + delta;
100+
final int colIdx = _colIndexes.get(j);
101+
c[off + colIdx] = newValue;
102+
prevRow[j] = newValue;
103+
}
104+
}
105+
}
106+
}
69107

70-
// @Override
71-
// protected void decompressToSparseBlockDenseDictionary(SparseBlock ret, int rl, int ru, int offR, int offC,
72-
// double[] values) {
73-
// throw new NotImplementedException();
74-
// }
108+
@Override
109+
protected void decompressToSparseBlockDenseDictionary(SparseBlock ret, int rl, int ru, int offR, int offC,
110+
double[] values) {
111+
throw new NotImplementedException("Sparse block decompression for DeltaDDC not yet implemented");
112+
}
75113

76-
// @Override
77-
// public AColGroup scalarOperation(ScalarOperator op) {
78-
// return new ColGroupDeltaDDC(_colIndexes, _dict.applyScalarOp(op), _data, getCachedCounts());
79-
// }
114+
@Override
115+
public AColGroup scalarOperation(ScalarOperator op) {
116+
if(_dict instanceof DeltaDictionary) {
117+
DeltaDictionary deltaDict = (DeltaDictionary) _dict;
118+
IDictionary newDict = deltaDict.applyScalarOp(op);
119+
return new ColGroupDeltaDDC(_colIndexes, newDict, _data, getCachedCounts());
120+
}
121+
else {
122+
throw new DMLRuntimeException("DeltaDDC must use DeltaDictionary");
123+
}
124+
}
80125
}

0 commit comments

Comments
 (0)