Skip to content

Commit 6a57fd0

Browse files
committed
[SYSTEMDS-3644] Compressed transform encode
1 parent 5f360ef commit 6a57fd0

30 files changed

+1001
-458
lines changed

src/main/java/org/apache/sysds/runtime/frame/data/FrameBlock.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1267,8 +1267,8 @@ public void copy(int rl, int ru, int cl, int cu, FrameBlock src) {
12671267
* @param col is the column # from frame data which contains Recode map generated earlier.
12681268
* @return map of token and code for every element in the input column of a frame containing Recode map
12691269
*/
1270-
public Map<Object, Long> getRecodeMap(int col) {
1271-
return _coldata[col].getRecodeMap();
1270+
public Map<Object, Integer> getRecodeMap(int col) {
1271+
return _coldata[col].getRecodeMap(4);
12721272
}
12731273

12741274
@Override

src/main/java/org/apache/sysds/runtime/frame/data/columns/ABooleanArray.java

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,12 @@
1919

2020
package org.apache.sysds.runtime.frame.data.columns;
2121

22+
import java.util.ArrayList;
23+
import java.util.Collections;
2224
import java.util.HashMap;
25+
import java.util.List;
2326
import java.util.Map;
27+
import java.util.concurrent.ExecutorService;
2428

2529
public abstract class ABooleanArray extends Array<Boolean> {
2630

@@ -55,13 +59,25 @@ public boolean possiblyContainsNaN() {
5559
* @param value The string array to set from.
5660
*/
5761
public abstract void setNullsFromString(int rl, int ru, Array<String> value);
58-
62+
63+
@Override
64+
protected void mergeRecodeMaps(Map<Boolean, Integer> target, Map<Boolean, Integer> from) {
65+
final List<Boolean> fromEntriesOrdered = new ArrayList<>(Collections.nCopies(from.size(), null));
66+
for(Map.Entry<Boolean, Integer> e : from.entrySet())
67+
fromEntriesOrdered.set(e.getValue() - 1, e.getKey());
68+
int id = target.size();
69+
for(Boolean e : fromEntriesOrdered) {
70+
if(target.putIfAbsent(e, id) == null)
71+
id++;
72+
}
73+
}
74+
5975
@Override
60-
protected Map<Boolean, Long> createRecodeMap() {
61-
Map<Boolean, Long> map = new HashMap<>();
62-
long id = 1;
76+
protected Map<Boolean, Integer> createRecodeMap(int estimate, ExecutorService pool) {
77+
Map<Boolean, Integer> map = new HashMap<>();
78+
int id = 1;
6379
for(int i = 0; i < size() && id <= 2; i++) {
64-
Long v = map.putIfAbsent(get(i), id);
80+
Integer v = map.putIfAbsent(get(i), id);
6581
if(v == null)
6682
id++;
6783
}

src/main/java/org/apache/sysds/runtime/frame/data/columns/ACompressedArray.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,12 @@
1919

2020
package org.apache.sysds.runtime.frame.data.columns;
2121

22+
import java.util.Map;
23+
24+
import org.apache.commons.lang3.NotImplementedException;
2225
import org.apache.sysds.common.Types.ValueType;
2326
import org.apache.sysds.runtime.compress.DMLCompressionException;
27+
import org.apache.sysds.runtime.compress.colgroup.mapping.AMapToData;
2428
import org.apache.sysds.runtime.frame.data.compress.ArrayCompressionStatistics;
2529

2630
/**
@@ -59,11 +63,6 @@ public void setFromOtherType(int rl, int ru, Array<?> value) {
5963
throw new DMLCompressionException("Invalid to set value in CompressedArray");
6064
}
6165

62-
@Override
63-
public void set(int rl, int ru, Array<T> value, int rlSrc) {
64-
throw new DMLCompressionException("Invalid to set value in CompressedArray");
65-
}
66-
6766
@Override
6867
public void setNz(int rl, int ru, Array<T> value) {
6968
throw new DMLCompressionException("Invalid to set value in CompressedArray");
@@ -154,4 +153,15 @@ protected Array<Object> changeTypeHash64(Array<Object> retA, int l, int u) {
154153
protected Array<Object> changeTypeHash32(Array<Object> ret, int l, int u) {
155154
throw new DMLCompressionException("Invalid to change sub compressed array");
156155
}
156+
157+
@Override
158+
public void setM(Map<T, Integer> map, AMapToData m, int i) {
159+
throw new NotImplementedException();
160+
}
161+
162+
@Override
163+
protected void mergeRecodeMaps(Map<T, Integer> target, Map<T, Integer> from) {
164+
throw new DMLCompressionException("Invalid to change sub compressed array");
165+
}
166+
157167
}

src/main/java/org/apache/sysds/runtime/frame/data/columns/Array.java

Lines changed: 114 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -20,20 +20,26 @@
2020
package org.apache.sysds.runtime.frame.data.columns;
2121

2222
import java.lang.ref.SoftReference;
23+
import java.util.ArrayList;
2324
import java.util.HashMap;
2425
import java.util.Iterator;
26+
import java.util.List;
2527
import java.util.Map;
28+
import java.util.concurrent.ExecutorService;
29+
import java.util.concurrent.Future;
2630

2731
import org.apache.commons.logging.Log;
2832
import org.apache.commons.logging.LogFactory;
2933
import org.apache.hadoop.io.Writable;
3034
import org.apache.sysds.common.Types.ValueType;
35+
import org.apache.sysds.hops.OptimizerUtils;
3136
import org.apache.sysds.runtime.DMLRuntimeException;
3237
import org.apache.sysds.runtime.compress.colgroup.mapping.AMapToData;
3338
import org.apache.sysds.runtime.compress.estim.sample.SampleEstimatorFactory;
3439
import org.apache.sysds.runtime.frame.data.columns.ArrayFactory.FrameArrayType;
3540
import org.apache.sysds.runtime.frame.data.compress.ArrayCompressionStatistics;
3641
import org.apache.sysds.runtime.matrix.data.Pair;
42+
import org.apache.sysds.utils.stats.Timing;
3743

3844
/**
3945
* Generic, resizable native arrays for the internal representation of the columns in the FrameBlock. We use this custom
@@ -43,7 +49,7 @@ public abstract class Array<T> implements Writable {
4349
protected static final Log LOG = LogFactory.getLog(Array.class.getName());
4450

4551
/** A soft reference to a memorization of this arrays mapping, used in transformEncode */
46-
protected SoftReference<Map<T, Long>> _rcdMapCache = null;
52+
protected SoftReference<Map<T, Integer>> _rcdMapCache = null;
4753

4854
/** The current allocated number of elements in this Array */
4955
protected int _size;
@@ -63,7 +69,7 @@ protected int newSize() {
6369
*
6470
* @return The cached recode map
6571
*/
66-
public final SoftReference<Map<T, Long>> getCache() {
72+
public final SoftReference<Map<T, Integer>> getCache() {
6773
return _rcdMapCache;
6874
}
6975

@@ -72,7 +78,7 @@ public final SoftReference<Map<T, Long>> getCache() {
7278
*
7379
* @param m The element to cache.
7480
*/
75-
public final void setCache(SoftReference<Map<T, Long>> m) {
81+
public final void setCache(SoftReference<Map<T, Integer>> m) {
7682
_rcdMapCache = m;
7783
}
7884

@@ -83,43 +89,109 @@ public final void setCache(SoftReference<Map<T, Long>> m) {
8389
*
8490
* @return A recode map
8591
*/
86-
public synchronized final Map<T, Long> getRecodeMap() {
92+
public synchronized final Map<T, Integer> getRecodeMap() {
93+
return getRecodeMap(4);
94+
}
95+
96+
public synchronized final Map<T, Integer> getRecodeMap(int estimate) {
97+
return getRecodeMap(estimate, null);
98+
}
99+
100+
/**
101+
* Get a recode map that maps each unique value in the array, to a long ID. Null values are ignored, and not included
102+
* in the mapping. The resulting recode map in stored in a soft reference to speed up repeated calls to the same
103+
* column.
104+
*
105+
* @param estimate the estimated number of unique.
106+
* @param pool An executor pool to be used for parallel execution
107+
* @return A recode map
108+
*/
109+
public synchronized final Map<T, Integer> getRecodeMap(int estimate, ExecutorService pool) {
87110
// probe cache for existing map
88-
Map<T, Long> map;
89-
SoftReference<Map<T, Long>> tmp = getCache();
111+
Map<T, Integer> map;
112+
SoftReference<Map<T, Integer>> tmp = getCache();
90113
map = (tmp != null) ? tmp.get() : null;
91114
if(map != null)
92115
return map;
93116

94117
// construct recode map
95-
map = createRecodeMap();
118+
map = createRecodeMap(estimate, pool);
96119

97120
// put created map into cache
98121
setCache(new SoftReference<>(map));
99122

100123
return map;
101124
}
102125

103-
/**
104-
* Recreate the recode map from what is inside array. This is an internal method for arrays, and the result is cached
105-
* in the main class of the arrays.
106-
*
107-
* @return The recode map
108-
*/
109-
protected Map<T, Long> createRecodeMap() {
110-
Map<T, Long> map = new HashMap<>();
111-
long id = 1;
112-
for(int i = 0; i < size(); i++) {
113-
T val = get(i);
114-
if(val != null) {
115-
Long v = map.putIfAbsent(val, id);
116-
if(v == null)
117-
id++;
126+
protected Map<T, Integer> createRecodeMap(int estimate, ExecutorService pool) {
127+
Timing t = new Timing();
128+
final int s = size();
129+
int k = OptimizerUtils.getTransformNumThreads();
130+
Map<T, Integer> ret;
131+
if(pool == null || s < 10000 || estimate < 1024)
132+
ret = createRecodeMap(estimate, 0, s);
133+
else
134+
ret = parallelCreateRecodeMap(estimate, pool, s, k);
135+
136+
if(LOG.isDebugEnabled()) {
137+
String base = "CreateRecodeMap estimate: %10d actual %10d time: %10.5f";
138+
LOG.debug(String.format(base, estimate, ret.size(), t.stop()));
139+
}
140+
return ret;
141+
}
142+
143+
private Map<T, Integer> parallelCreateRecodeMap(int estimate, ExecutorService pool, final int s, int k) {
144+
145+
try {
146+
final int blk = Math.max(10000, (s + k) / k);
147+
final List<Future<Map<T, Integer>>> tasks = new ArrayList<>();
148+
for(int i = blk; i < s; i += blk) { // start at blk for the other threads
149+
final int start = i;
150+
final int end = Math.min(i + blk, s);
151+
tasks.add(pool.submit(() -> createRecodeMap(estimate, start, end)));
118152
}
153+
// make the initial map thread local allocation.
154+
final Map<T, Integer> map = new HashMap<>((int) (estimate * 1.3));
155+
createRecodeMap(map, 0, blk);
156+
for(int i = 0; i < tasks.size(); i++) { // merge with other threads work.
157+
final Map<T, Integer> map2 = tasks.get(i).get();
158+
mergeRecodeMaps(map, map2);
159+
}
160+
return map;
161+
}
162+
catch(Exception e) {
163+
throw new RuntimeException(e);
119164
}
165+
finally {
166+
pool.shutdown();
167+
}
168+
}
169+
170+
protected abstract void mergeRecodeMaps(Map<T, Integer> target, Map<T, Integer> from);
171+
172+
private Map<T, Integer> createRecodeMap(final int estimate, final int s, final int e) {
173+
// * 1.3 because we hashMap has a load factor of 1.75
174+
final Map<T, Integer> map = new HashMap<>((int) (Math.min((long) estimate, (e - s)) * 1.3));
175+
return createRecodeMap(map, s, e);
176+
}
177+
178+
private Map<T, Integer> createRecodeMap(Map<T, Integer> map, final int s, final int e) {
179+
int id = 1;
180+
for(int i = s; i < e; i++)
181+
id = addValRecodeMap(map, id, i);
120182
return map;
121183
}
122184

185+
protected int addValRecodeMap(Map<T, Integer> map, int id, int i) {
186+
T val = getInternal(i);
187+
if(val != null) {
188+
Integer v = map.putIfAbsent(val, id);
189+
if(v == null)
190+
id++;
191+
}
192+
return id;
193+
}
194+
123195
/**
124196
* Get the number of elements in the array, this does not necessarily reflect the current allocated size.
125197
*
@@ -224,15 +296,10 @@ public double getAsNaNDouble(int i) {
224296
*
225297
* @param rl row lower
226298
* @param ru row upper (inclusive)
227-
* @param value value array to take values from (same type)
299+
* @param value value array to take values from (same type) offset by rl.
228300
*/
229301
public abstract void set(int rl, int ru, Array<T> value);
230302

231-
// {
232-
// for(int i = rl; i <= ru; i++)
233-
// set(i, value.get(i));
234-
// }
235-
236303
/**
237304
* Set range to given arrays value with an offset into other array
238305
*
@@ -243,7 +310,7 @@ public double getAsNaNDouble(int i) {
243310
*/
244311
public void set(int rl, int ru, Array<T> value, int rlSrc) {
245312
for(int i = rl, off = rlSrc; i <= ru; i++, off++)
246-
set(i, value.get(off));
313+
set(i, value.getInternal(off));
247314
}
248315

249316
/**
@@ -918,4 +985,22 @@ public double[] minMax(int l, int u) {
918985
}
919986
return new double[] {min, max};
920987
}
988+
989+
public void setM(Map<T, Integer> map, AMapToData m, int i) {
990+
m.set(i, map.get(getInternal(i)).intValue() - 1);
991+
}
992+
993+
public void setM(Map<T, Integer> map, int si, AMapToData m, int i) {
994+
try {
995+
final T v = getInternal(i);
996+
if(v != null)
997+
m.set(i, map.get(v).intValue() - 1);
998+
else
999+
m.set(i, si);
1000+
}
1001+
catch(Exception e) {
1002+
String error = "expected: " + getInternal(i) + " to be in map: " + map;
1003+
throw new RuntimeException(error, e);
1004+
}
1005+
}
9211006
}

src/main/java/org/apache/sysds/runtime/frame/data/columns/ArrayFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -356,7 +356,7 @@ else if(target.getFrameArrayType() != FrameArrayType.OPTIONAL //
356356

357357
Array<C> targetC = (Array<C>) (ta != tc ? target.changeType(tc) : target);
358358
Array<C> srcC = (Array<C>) (tb != tc ? src.changeType(tc) : src);
359-
targetC.set(rl, ru, srcC);
359+
targetC.set(rl, ru, srcC, 0);
360360
return targetC;
361361

362362
}

src/main/java/org/apache/sysds/runtime/frame/data/columns/CharArray.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,11 @@
2323
import java.io.IOException;
2424
import java.nio.ByteBuffer;
2525
import java.nio.ByteOrder;
26+
import java.util.ArrayList;
2627
import java.util.Arrays;
28+
import java.util.Collections;
29+
import java.util.List;
30+
import java.util.Map;
2731

2832
import org.apache.sysds.common.Types.ValueType;
2933
import org.apache.sysds.runtime.DMLRuntimeException;
@@ -377,6 +381,19 @@ public boolean possiblyContainsNaN() {
377381
return false;
378382
}
379383

384+
385+
@Override
386+
protected void mergeRecodeMaps(Map<Character, Integer> target, Map<Character, Integer> from) {
387+
final List<Character> fromEntriesOrdered = new ArrayList<>(Collections.nCopies(from.size(), null));
388+
for(Map.Entry<Character, Integer> e : from.entrySet())
389+
fromEntriesOrdered.set(e.getValue() - 1, e.getKey());
390+
int id = target.size();
391+
for(Character e : fromEntriesOrdered) {
392+
if(target.putIfAbsent(e, id) == null)
393+
id++;
394+
}
395+
}
396+
380397
@Override
381398
public String toString() {
382399
StringBuilder sb = new StringBuilder(_size * 2 + 15);

0 commit comments

Comments
 (0)