Skip to content

Commit 751b55f

Browse files
committed
[SYSTEMDS-3644] Compressed transform encode
This commit primarily contains optimizations to compressed and uncompressed transform encode. We observe 3x improved speed from various uncompressed or compressed transformations. The primary source of speedup is a custom HashMap implementation that leverages primitive int values for the recode map. The commit also contains minor optimizations to reading FrameBlock CSV files and cleanup to the logic in CommonThreadPool to not allocate new threads if only 1 thread is requested. Closes #2171
1 parent 99fd1f5 commit 751b55f

File tree

83 files changed

+3384
-925
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

83 files changed

+3384
-925
lines changed

src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlockFactory.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -348,6 +348,7 @@ private void classifyPhase() {
348348
LOG.info("Threshold was set to : " + threshold + " but it was above original " + _stats.originalCost);
349349
LOG.info("Original size : " + _stats.originalSize);
350350
LOG.info("single col size : " + _stats.estimatedSizeCols);
351+
LOG.debug(String.format("--compressed size: %16d", _stats.originalSize));
351352
if(!(costEstimator instanceof MemoryCostEstimator)) {
352353
LOG.info("original cost : " + _stats.originalCost);
353354
LOG.info("single col cost : " + _stats.estimatedCostCols);

src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeGreedy.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,8 @@ public Object call() throws Exception {
224224
final int maxCombined = c1i.getNumVals() * c2i.getNumVals();
225225

226226
if(maxCombined < 0 // int overflow
227-
|| maxCombined > c1i.getNumRows()) // higher combined than number of rows.
227+
|| maxCombined > c1i.getNumRows() // higher than number of rows
228+
|| maxCombined > 100000) // higher than 100k ... then lets not precalculate it.
228229
return null;
229230

230231
final IColIndex c = _c1._indexes.combine(_c2._indexes);

src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupIO.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,9 @@ public static long getExactSizeOnDisk(List<AColGroup> colGroups) {
9494
}
9595
ret += grp.getExactSizeOnDisk();
9696
}
97-
LOG.error(" duplicate dicts on exact Size on Disk : " + (colGroups.size() - dicts.size()) );
97+
if(LOG.isWarnEnabled())
98+
LOG.warn(" duplicate dicts on exact Size on Disk : " + (colGroups.size() - dicts.size()) );
99+
98100
return ret;
99101
}
100102

src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSDCZeros.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.sysds.runtime.compress.colgroup.dictionary.DictionaryFactory;
3535
import org.apache.sysds.runtime.compress.colgroup.dictionary.IDictionary;
3636
import org.apache.sysds.runtime.compress.colgroup.dictionary.MatrixBlockDictionary;
37+
import org.apache.sysds.runtime.compress.colgroup.dictionary.PlaceHolderDict;
3738
import org.apache.sysds.runtime.compress.colgroup.indexes.ColIndexFactory;
3839
import org.apache.sysds.runtime.compress.colgroup.indexes.IColIndex;
3940
import org.apache.sysds.runtime.compress.colgroup.mapping.AMapToData;
@@ -93,7 +94,7 @@ public static AColGroup create(IColIndex colIndices, int numRows, IDictionary di
9394
int[] cachedCounts) {
9495
if(dict == null)
9596
return new ColGroupEmpty(colIndices);
96-
else if(data.getUnique() == 1) {
97+
else if(data.getUnique() == 1 && !(dict instanceof PlaceHolderDict)) {
9798
MatrixBlock mb = dict.getMBDict(colIndices.size()).getMatrixBlock().slice(0, 0);
9899
return ColGroupSDCSingleZeros.create(colIndices, numRows, MatrixBlockDictionary.create(mb), offsets, null);
99100
}

src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupUncompressed.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,12 @@ public class ColGroupUncompressed extends AColGroup {
8080
*/
8181
private final MatrixBlock _data;
8282

83-
private ColGroupUncompressed(MatrixBlock mb, IColIndex colIndexes) {
83+
/**
84+
* Do not use this constructor of column group uncompressed, instead uce the create constructor.
85+
* @param mb The contained data.
86+
* @param colIndexes Column indexes for this Columngroup
87+
*/
88+
protected ColGroupUncompressed(MatrixBlock mb, IColIndex colIndexes) {
8489
super(colIndexes);
8590
_data = mb;
8691
}
Lines changed: 285 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,285 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.sysds.runtime.compress.colgroup;
21+
22+
import org.apache.sysds.runtime.compress.colgroup.ColGroupUtils.P;
23+
import org.apache.sysds.runtime.compress.colgroup.indexes.IColIndex;
24+
import org.apache.sysds.runtime.compress.colgroup.scheme.ICLAScheme;
25+
import org.apache.sysds.runtime.compress.cost.ComputationCostEstimator;
26+
import org.apache.sysds.runtime.compress.estim.CompressedSizeInfoColGroup;
27+
import org.apache.sysds.runtime.data.DenseBlock;
28+
import org.apache.sysds.runtime.data.SparseBlock;
29+
import org.apache.sysds.runtime.data.SparseBlockMCSR;
30+
import org.apache.sysds.runtime.frame.data.columns.Array;
31+
import org.apache.sysds.runtime.instructions.cp.CM_COV_Object;
32+
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
33+
import org.apache.sysds.runtime.matrix.operators.AggregateUnaryOperator;
34+
import org.apache.sysds.runtime.matrix.operators.BinaryOperator;
35+
import org.apache.sysds.runtime.matrix.operators.CMOperator;
36+
import org.apache.sysds.runtime.matrix.operators.ScalarOperator;
37+
import org.apache.sysds.runtime.matrix.operators.UnaryOperator;
38+
39+
/**
40+
* Special sideways compressed column group not supposed to be used outside of the compressed transform encode.
41+
*/
42+
public class ColGroupUncompressedArray extends AColGroup {
43+
private static final long serialVersionUID = -825423333043292199L;
44+
45+
public final Array<?> array;
46+
public final int id; // columnID
47+
48+
public ColGroupUncompressedArray(Array<?> data, int id, IColIndex colIndexes) {
49+
super(colIndexes);
50+
this.array = data;
51+
this.id = id;
52+
}
53+
54+
@Override
55+
public int getNumValues() {
56+
return array.size();
57+
}
58+
59+
@Override
60+
public long estimateInMemorySize() {
61+
// not accurate estimate, but guaranteed larger.
62+
return MatrixBlock.estimateSizeInMemory(array.size(), 1, array.size()) + 80;
63+
}
64+
65+
@Override
66+
public String toString() {
67+
return "UncompressedArrayGroup: " + id + " " + _colIndexes;
68+
}
69+
70+
@Override
71+
public AColGroup copyAndSet(IColIndex colIndexes) {
72+
return new ColGroupUncompressedArray(array, id, colIndexes);
73+
}
74+
75+
@Override
76+
public void decompressToDenseBlockTransposed(DenseBlock db, int rl, int ru) {
77+
throw new UnsupportedOperationException("Unimplemented method 'decompressToDenseBlockTransposed'");
78+
}
79+
80+
@Override
81+
public void decompressToSparseBlockTransposed(SparseBlockMCSR sb, int nColOut) {
82+
throw new UnsupportedOperationException("Unimplemented method 'decompressToSparseBlockTransposed'");
83+
}
84+
85+
@Override
86+
public double getIdx(int r, int colIdx) {
87+
throw new UnsupportedOperationException("Unimplemented method 'getIdx'");
88+
}
89+
90+
@Override
91+
public CompressionType getCompType() {
92+
throw new UnsupportedOperationException("Unimplemented method 'getCompType'");
93+
}
94+
95+
@Override
96+
protected ColGroupType getColGroupType() {
97+
throw new UnsupportedOperationException("Unimplemented method 'getColGroupType'");
98+
}
99+
100+
@Override
101+
public void decompressToDenseBlock(DenseBlock db, int rl, int ru, int offR, int offC) {
102+
throw new UnsupportedOperationException("Unimplemented method 'decompressToDenseBlock'");
103+
}
104+
105+
@Override
106+
public void decompressToSparseBlock(SparseBlock sb, int rl, int ru, int offR, int offC) {
107+
throw new UnsupportedOperationException("Unimplemented method 'decompressToSparseBlock'");
108+
}
109+
110+
@Override
111+
public AColGroup rightMultByMatrix(MatrixBlock right, IColIndex allCols, int k) {
112+
throw new UnsupportedOperationException("Unimplemented method 'rightMultByMatrix'");
113+
}
114+
115+
@Override
116+
public void tsmm(MatrixBlock ret, int nRows) {
117+
throw new UnsupportedOperationException("Unimplemented method 'tsmm'");
118+
}
119+
120+
@Override
121+
public void leftMultByMatrixNoPreAgg(MatrixBlock matrix, MatrixBlock result, int rl, int ru, int cl, int cu) {
122+
throw new UnsupportedOperationException("Unimplemented method 'leftMultByMatrixNoPreAgg'");
123+
}
124+
125+
@Override
126+
public void leftMultByAColGroup(AColGroup lhs, MatrixBlock result, int nRows) {
127+
throw new UnsupportedOperationException("Unimplemented method 'leftMultByAColGroup'");
128+
}
129+
130+
@Override
131+
public void tsmmAColGroup(AColGroup other, MatrixBlock result) {
132+
throw new UnsupportedOperationException("Unimplemented method 'tsmmAColGroup'");
133+
}
134+
135+
@Override
136+
public AColGroup scalarOperation(ScalarOperator op) {
137+
throw new UnsupportedOperationException("Unimplemented method 'scalarOperation'");
138+
}
139+
140+
@Override
141+
public AColGroup binaryRowOpLeft(BinaryOperator op, double[] v, boolean isRowSafe) {
142+
throw new UnsupportedOperationException("Unimplemented method 'binaryRowOpLeft'");
143+
}
144+
145+
@Override
146+
public AColGroup binaryRowOpRight(BinaryOperator op, double[] v, boolean isRowSafe) {
147+
throw new UnsupportedOperationException("Unimplemented method 'binaryRowOpRight'");
148+
}
149+
150+
@Override
151+
public void unaryAggregateOperations(AggregateUnaryOperator op, double[] c, int nRows, int rl, int ru) {
152+
throw new UnsupportedOperationException("Unimplemented method 'unaryAggregateOperations'");
153+
}
154+
155+
@Override
156+
protected AColGroup sliceSingleColumn(int idx) {
157+
throw new UnsupportedOperationException("Unimplemented method 'sliceSingleColumn'");
158+
}
159+
160+
@Override
161+
protected AColGroup sliceMultiColumns(int idStart, int idEnd, IColIndex outputCols) {
162+
throw new UnsupportedOperationException("Unimplemented method 'sliceMultiColumns'");
163+
}
164+
165+
@Override
166+
public AColGroup sliceRows(int rl, int ru) {
167+
throw new UnsupportedOperationException("Unimplemented method 'sliceRows'");
168+
}
169+
170+
@Override
171+
public double getMin() {
172+
throw new UnsupportedOperationException("Unimplemented method 'getMin'");
173+
}
174+
175+
@Override
176+
public double getMax() {
177+
throw new UnsupportedOperationException("Unimplemented method 'getMax'");
178+
}
179+
180+
@Override
181+
public double getSum(int nRows) {
182+
throw new UnsupportedOperationException("Unimplemented method 'getSum'");
183+
}
184+
185+
@Override
186+
public boolean containsValue(double pattern) {
187+
throw new UnsupportedOperationException("Unimplemented method 'containsValue'");
188+
}
189+
190+
@Override
191+
public long getNumberNonZeros(int nRows) {
192+
throw new UnsupportedOperationException("Unimplemented method 'getNumberNonZeros'");
193+
}
194+
195+
@Override
196+
public AColGroup replace(double pattern, double replace) {
197+
throw new UnsupportedOperationException("Unimplemented method 'replace'");
198+
}
199+
200+
@Override
201+
public void computeColSums(double[] c, int nRows) {
202+
throw new UnsupportedOperationException("Unimplemented method 'computeColSums'");
203+
}
204+
205+
@Override
206+
public CM_COV_Object centralMoment(CMOperator op, int nRows) {
207+
throw new UnsupportedOperationException("Unimplemented method 'centralMoment'");
208+
}
209+
210+
@Override
211+
public AColGroup rexpandCols(int max, boolean ignore, boolean cast, int nRows) {
212+
throw new UnsupportedOperationException("Unimplemented method 'rexpandCols'");
213+
}
214+
215+
@Override
216+
public double getCost(ComputationCostEstimator e, int nRows) {
217+
throw new UnsupportedOperationException("Unimplemented method 'getCost'");
218+
}
219+
220+
@Override
221+
public AColGroup unaryOperation(UnaryOperator op) {
222+
throw new UnsupportedOperationException("Unimplemented method 'unaryOperation'");
223+
}
224+
225+
@Override
226+
public boolean isEmpty() {
227+
throw new UnsupportedOperationException("Unimplemented method 'isEmpty'");
228+
}
229+
230+
@Override
231+
public AColGroup append(AColGroup g) {
232+
throw new UnsupportedOperationException("Unimplemented method 'append'");
233+
}
234+
235+
@Override
236+
protected AColGroup appendNInternal(AColGroup[] groups, int blen, int rlen) {
237+
throw new UnsupportedOperationException("Unimplemented method 'appendNInternal'");
238+
}
239+
240+
@Override
241+
public ICLAScheme getCompressionScheme() {
242+
throw new UnsupportedOperationException("Unimplemented method 'getCompressionScheme'");
243+
}
244+
245+
@Override
246+
public AColGroup recompress() {
247+
throw new UnsupportedOperationException("Unimplemented method 'recompress'");
248+
}
249+
250+
@Override
251+
public CompressedSizeInfoColGroup getCompressionInfo(int nRow) {
252+
throw new UnsupportedOperationException("Unimplemented method 'getCompressionInfo'");
253+
}
254+
255+
@Override
256+
protected AColGroup fixColIndexes(IColIndex newColIndex, int[] reordering) {
257+
throw new UnsupportedOperationException("Unimplemented method 'fixColIndexes'");
258+
}
259+
260+
@Override
261+
public AColGroup reduceCols() {
262+
throw new UnsupportedOperationException("Unimplemented method 'reduceCols'");
263+
}
264+
265+
@Override
266+
public double getSparsity() {
267+
throw new UnsupportedOperationException("Unimplemented method 'getSparsity'");
268+
}
269+
270+
@Override
271+
protected void sparseSelection(MatrixBlock selection, P[] points, MatrixBlock ret, int rl, int ru) {
272+
throw new UnsupportedOperationException("Unimplemented method 'sparseSelection'");
273+
}
274+
275+
@Override
276+
protected void denseSelection(MatrixBlock selection, P[] points, MatrixBlock ret, int rl, int ru) {
277+
throw new UnsupportedOperationException("Unimplemented method 'denseSelection'");
278+
}
279+
280+
@Override
281+
public AColGroup[] splitReshape(int multiplier, int nRow, int nColOrg) {
282+
throw new UnsupportedOperationException("Unimplemented method 'splitReshape'");
283+
}
284+
285+
}

src/main/java/org/apache/sysds/runtime/compress/colgroup/indexes/ColIndexFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ else if(contiguous)
126126
return ArrayIndex.estimateInMemorySizeStatic(nCol);
127127
}
128128

129-
public static IColIndex combine(List<AColGroup> gs) {
129+
public static IColIndex combine(List<? extends AColGroup> gs) {
130130
int numCols = 0;
131131
for(AColGroup g : gs)
132132
numCols += g.getNumCols();

src/main/java/org/apache/sysds/runtime/compress/colgroup/mapping/MapToByte.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,11 +166,26 @@ public void copyBit(MapToBit d) {
166166

167167
@Override
168168
public int[] getCounts(int[] ret) {
169-
for(int i = 0; i < _data.length; i++)
169+
final int h = (_data.length) % 8;
170+
for(int i = 0; i < h; i++)
170171
ret[_data[i] & 0xFF]++;
172+
getCountsBy8P(ret, h, _data.length);
171173
return ret;
172174
}
173175

176+
private void getCountsBy8P(int[] ret, int s, int e) {
177+
for(int i = s; i < e; i += 8) {
178+
ret[_data[i] & 0xFF]++;
179+
ret[_data[i + 1] & 0xFF]++;
180+
ret[_data[i + 2] & 0xFF]++;
181+
ret[_data[i + 3] & 0xFF]++;
182+
ret[_data[i + 4] & 0xFF]++;
183+
ret[_data[i + 5] & 0xFF]++;
184+
ret[_data[i + 6] & 0xFF]++;
185+
ret[_data[i + 7] & 0xFF]++;
186+
}
187+
}
188+
174189
@Override
175190
protected void preAggregateDenseToRowBy8(double[] mV, double[] preAV, int cl, int cu, int off) {
176191
final int h = (cu - cl) % 8;

0 commit comments

Comments
 (0)