Skip to content

Commit 0bfdde3

Browse files
Hanhoun02mboehm7
authored andcommitted
[SYSTEMDS-3539] New delta-encoding compressed column group
Closes #2361.
1 parent 20a7b67 commit 0bfdde3

30 files changed

+4108
-232
lines changed

src/main/java/org/apache/sysds/runtime/compress/CompressionSettings.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,11 +133,14 @@ public class CompressionSettings {
133133

134134
public final double[] scaleFactors;
135135

136+
public final boolean preferDeltaEncoding;
137+
136138
protected CompressionSettings(double samplingRatio, double samplePower, boolean allowSharedDictionary,
137139
String transposeInput, int seed, boolean lossy, EnumSet<CompressionType> validCompressions,
138140
boolean sortValuesByLength, PartitionerType columnPartitioner, int maxColGroupCoCode, double coCodePercentage,
139141
int minimumSampleSize, int maxSampleSize, EstimationType estimationType, CostType costComputationType,
140-
double minimumCompressionRatio, boolean isInSparkInstruction, SORT_TYPE sdcSortType, double[] scaleFactors) {
142+
double minimumCompressionRatio, boolean isInSparkInstruction, SORT_TYPE sdcSortType, double[] scaleFactors,
143+
boolean preferDeltaEncoding) {
141144
this.samplingRatio = samplingRatio;
142145
this.samplePower = samplePower;
143146
this.allowSharedDictionary = allowSharedDictionary;
@@ -157,6 +160,7 @@ protected CompressionSettings(double samplingRatio, double samplePower, boolean
157160
this.isInSparkInstruction = isInSparkInstruction;
158161
this.sdcSortType = sdcSortType;
159162
this.scaleFactors = scaleFactors;
163+
this.preferDeltaEncoding = preferDeltaEncoding;
160164

161165
if(!printedStatus && LOG.isDebugEnabled()) {
162166
printedStatus = true;

src/main/java/org/apache/sysds/runtime/compress/CompressionSettingsBuilder.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ public class CompressionSettingsBuilder {
5353
private boolean isInSparkInstruction = false;
5454
private SORT_TYPE sdcSortType = SORT_TYPE.MATERIALIZE;
5555
private double[] scaleFactors = null;
56+
private boolean preferDeltaEncoding = false;
5657

5758
public CompressionSettingsBuilder() {
5859

@@ -101,6 +102,7 @@ public CompressionSettingsBuilder copySettings(CompressionSettings that) {
101102
this.maxColGroupCoCode = that.maxColGroupCoCode;
102103
this.coCodePercentage = that.coCodePercentage;
103104
this.minimumSampleSize = that.minimumSampleSize;
105+
this.preferDeltaEncoding = that.preferDeltaEncoding;
104106
return this;
105107
}
106108

@@ -336,6 +338,19 @@ public CompressionSettingsBuilder setSDCSortType(SORT_TYPE sdcSortType) {
336338
return this;
337339
}
338340

341+
/**
342+
* Set whether to prefer delta encoding during compression estimation.
343+
* When enabled, the compression estimator will use delta encoding statistics
344+
* instead of regular encoding statistics.
345+
*
346+
* @param preferDeltaEncoding Whether to prefer delta encoding
347+
* @return The CompressionSettingsBuilder
348+
*/
349+
public CompressionSettingsBuilder setPreferDeltaEncoding(boolean preferDeltaEncoding) {
350+
this.preferDeltaEncoding = preferDeltaEncoding;
351+
return this;
352+
}
353+
339354
/**
340355
* Create the CompressionSettings object to use in the compression.
341356
*
@@ -345,6 +360,6 @@ public CompressionSettings create() {
345360
return new CompressionSettings(samplingRatio, samplePower, allowSharedDictionary, transposeInput, seed, lossy,
346361
validCompressions, sortValuesByLength, columnPartitioner, maxColGroupCoCode, coCodePercentage,
347362
minimumSampleSize, maxSampleSize, estimationType, costType, minimumCompressionRatio, isInSparkInstruction,
348-
sdcSortType, scaleFactors);
363+
sdcSortType, scaleFactors, preferDeltaEncoding);
349364
}
350365
}

src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC.java

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.sysds.runtime.compress.CompressedMatrixBlock;
3434
import org.apache.sysds.runtime.compress.DMLCompressionException;
3535
import org.apache.sysds.runtime.compress.colgroup.ColGroupUtils.P;
36+
import org.apache.sysds.runtime.compress.colgroup.dictionary.DeltaDictionary;
3637
import org.apache.sysds.runtime.compress.colgroup.dictionary.Dictionary;
3738
import org.apache.sysds.runtime.compress.colgroup.dictionary.DictionaryFactory;
3839
import org.apache.sysds.runtime.compress.colgroup.dictionary.IDictionary;
@@ -43,6 +44,9 @@
4344
import org.apache.sysds.runtime.compress.colgroup.indexes.RangeIndex;
4445
import org.apache.sysds.runtime.compress.colgroup.mapping.AMapToData;
4546
import org.apache.sysds.runtime.compress.colgroup.mapping.MapToFactory;
47+
import org.apache.sysds.runtime.compress.utils.ACount;
48+
import org.apache.sysds.runtime.compress.utils.DblArray;
49+
import org.apache.sysds.runtime.compress.utils.DblArrayCountHashMap;
4650
import org.apache.sysds.runtime.compress.colgroup.offset.AOffsetIterator;
4751
import org.apache.sysds.runtime.compress.colgroup.offset.OffsetFactory;
4852
import org.apache.sysds.runtime.compress.colgroup.scheme.DDCScheme;
@@ -77,7 +81,7 @@ public class ColGroupDDC extends APreAgg implements IMapToDataGroup {
7781

7882
static final VectorSpecies<Double> SPECIES = DoubleVector.SPECIES_PREFERRED;
7983

80-
private ColGroupDDC(IColIndex colIndexes, IDictionary dict, AMapToData data, int[] cachedCounts) {
84+
protected ColGroupDDC(IColIndex colIndexes, IDictionary dict, AMapToData data, int[] cachedCounts) {
8185
super(colIndexes, dict, cachedCounts);
8286
_data = data;
8387

@@ -1105,4 +1109,57 @@ protected boolean allowShallowIdentityRightMult() {
11051109
return true;
11061110
}
11071111

1112+
public AColGroup convertToDeltaDDC() {
1113+
int numCols = _colIndexes.size();
1114+
int numRows = _data.size();
1115+
1116+
DblArrayCountHashMap map = new DblArrayCountHashMap(Math.max(numRows, 64));
1117+
double[] rowDelta = new double[numCols];
1118+
double[] prevRow = new double[numCols];
1119+
DblArray dblArray = new DblArray(rowDelta);
1120+
int[] rowToDictId = new int[numRows];
1121+
1122+
double[] dictVals = _dict.getValues();
1123+
1124+
for(int i = 0; i < numRows; i++) {
1125+
int dictIdx = _data.getIndex(i);
1126+
int off = dictIdx * numCols;
1127+
for(int j = 0; j < numCols; j++) {
1128+
double val = dictVals[off + j];
1129+
if(i == 0) {
1130+
rowDelta[j] = val;
1131+
prevRow[j] = val;
1132+
} else {
1133+
rowDelta[j] = val - prevRow[j];
1134+
prevRow[j] = val;
1135+
}
1136+
}
1137+
1138+
rowToDictId[i] = map.increment(dblArray);
1139+
}
1140+
1141+
if(map.size() == 0)
1142+
return new ColGroupEmpty(_colIndexes);
1143+
1144+
ACount<DblArray>[] vals = map.extractValues();
1145+
final int nVals = vals.length;
1146+
final double[] dictValues = new double[nVals * numCols];
1147+
final int[] oldIdToNewId = new int[map.size()];
1148+
int idx = 0;
1149+
for(int i = 0; i < nVals; i++) {
1150+
final ACount<DblArray> dac = vals[i];
1151+
final double[] arrData = dac.key().getData();
1152+
System.arraycopy(arrData, 0, dictValues, idx, numCols);
1153+
oldIdToNewId[dac.id] = i;
1154+
idx += numCols;
1155+
}
1156+
1157+
DeltaDictionary deltaDict = new DeltaDictionary(dictValues, numCols);
1158+
AMapToData newData = MapToFactory.create(numRows, nVals);
1159+
for(int i = 0; i < numRows; i++) {
1160+
newData.set(i, oldIdToNewId[rowToDictId[i]]);
1161+
}
1162+
return ColGroupDeltaDDC.create(_colIndexes, deltaDict, newData, null);
1163+
}
1164+
11081165
}

0 commit comments

Comments
 (0)