Skip to content

Commit aea8162

Browse files
committed
ThreadPool?
1 parent 45c248d commit aea8162

File tree

18 files changed

+64
-177
lines changed

18 files changed

+64
-177
lines changed

src/main/java/org/apache/sysds/runtime/frame/data/columns/ABooleanArray.java

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,7 @@
1919

2020
package org.apache.sysds.runtime.frame.data.columns;
2121

22-
import java.util.ArrayList;
23-
import java.util.Collections;
2422
import java.util.HashMap;
25-
import java.util.List;
2623
import java.util.Map;
2724
import java.util.concurrent.ExecutorService;
2825

@@ -60,17 +57,6 @@ public boolean possiblyContainsNaN() {
6057
*/
6158
public abstract void setNullsFromString(int rl, int ru, Array<String> value);
6259

63-
@Override
64-
protected void mergeRecodeMaps(Map<Boolean, Integer> target, Map<Boolean, Integer> from) {
65-
final List<Boolean> fromEntriesOrdered = new ArrayList<>(Collections.nCopies(from.size(), null));
66-
for(Map.Entry<Boolean, Integer> e : from.entrySet())
67-
fromEntriesOrdered.set(e.getValue() - 1, e.getKey());
68-
int id = target.size();
69-
for(Boolean e : fromEntriesOrdered) {
70-
if(target.putIfAbsent(e, id) == null)
71-
id++;
72-
}
73-
}
7460

7561
@Override
7662
protected Map<Boolean, Integer> createRecodeMap(int estimate, ExecutorService pool) {

src/main/java/org/apache/sysds/runtime/frame/data/columns/ACompressedArray.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -163,10 +163,4 @@ public void setM(Map<T, Integer> map, AMapToData m, int i) {
163163
public void setM(Map<T, Integer> map, int si, AMapToData m, int i) {
164164
throw new NotImplementedException();
165165
}
166-
167-
@Override
168-
protected void mergeRecodeMaps(Map<T, Integer> target, Map<T, Integer> from) {
169-
throw new DMLCompressionException("Invalid to change sub compressed array");
170-
}
171-
172166
}

src/main/java/org/apache/sysds/runtime/frame/data/columns/Array.java

Lines changed: 45 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,12 @@
2121

2222
import java.lang.ref.SoftReference;
2323
import java.util.ArrayList;
24+
import java.util.Collections;
2425
import java.util.HashMap;
2526
import java.util.Iterator;
2627
import java.util.List;
2728
import java.util.Map;
29+
import java.util.concurrent.ExecutionException;
2830
import java.util.concurrent.ExecutorService;
2931
import java.util.concurrent.Future;
3032

@@ -102,7 +104,12 @@ public synchronized final Map<T, Integer> getRecodeMap() {
102104
* @return A recode map
103105
*/
104106
public synchronized final Map<T, Integer> getRecodeMap(int estimate) {
105-
return getRecodeMap(estimate, null);
107+
try {
108+
return getRecodeMap(estimate, null);
109+
}
110+
catch(Exception e) {
111+
throw new RuntimeException(e);
112+
}
106113
}
107114

108115
/**
@@ -111,10 +118,13 @@ public synchronized final Map<T, Integer> getRecodeMap(int estimate) {
111118
* column.
112119
*
113120
* @param estimate the estimated number of unique values in this array.
114-
* @param pool An executor pool to be used for parallel execution
121+
* @param pool An executor pool to be used for parallel execution (Note this method does not shutdown the pool)
115122
* @return A recode map
123+
* @throws ExecutionException if the parallel execution fails
124+
* @throws InterruptedException if the parallel execution fails
116125
*/
117-
public synchronized final Map<T, Integer> getRecodeMap(int estimate, ExecutorService pool) {
126+
public synchronized final Map<T, Integer> getRecodeMap(int estimate, ExecutorService pool)
127+
throws InterruptedException, ExecutionException {
118128
// probe cache for existing map
119129
Map<T, Integer> map;
120130
SoftReference<Map<T, Integer>> tmp = getCache();
@@ -137,10 +147,14 @@ public synchronized final Map<T, Integer> getRecodeMap(int estimate, ExecutorSer
137147
* column.
138148
*
139149
* @param estimate The estimate number of unique values inside this array.
140-
* @param pool The thread pool to use for parallel creation of recode map (can be null).
150+
* @param pool The thread pool to use for parallel creation of recode map (can be null). (Note this method does
151+
* not shutdown the pool)
141152
* @return The recode map created.
153+
* @throws ExecutionException if the parallel execution fails
154+
* @throws InterruptedException if the parallel execution fails
142155
*/
143-
protected Map<T, Integer> createRecodeMap(int estimate, ExecutorService pool) {
156+
protected Map<T, Integer> createRecodeMap(int estimate, ExecutorService pool)
157+
throws InterruptedException, ExecutionException {
144158
Timing t = new Timing();
145159
final int s = size();
146160
int k = OptimizerUtils.getTransformNumThreads();
@@ -157,31 +171,25 @@ protected Map<T, Integer> createRecodeMap(int estimate, ExecutorService pool) {
157171
return ret;
158172
}
159173

160-
private Map<T, Integer> parallelCreateRecodeMap(int estimate, ExecutorService pool, final int s, int k) {
174+
private Map<T, Integer> parallelCreateRecodeMap(int estimate, ExecutorService pool, final int s, int k)
175+
throws InterruptedException, ExecutionException {
161176

162-
try {
163-
final int blk = Math.max(10000, (s + k) / k);
164-
final List<Future<Map<T, Integer>>> tasks = new ArrayList<>();
165-
for(int i = blk; i < s; i += blk) { // start at blk for the other threads
166-
final int start = i;
167-
final int end = Math.min(i + blk, s);
168-
tasks.add(pool.submit(() -> createRecodeMap(estimate, start, end)));
169-
}
170-
// make the initial map thread local allocation.
171-
final Map<T, Integer> map = new HashMap<>((int) (estimate * 1.3));
172-
createRecodeMap(map, 0, blk);
173-
for(int i = 0; i < tasks.size(); i++) { // merge with other threads work.
174-
final Map<T, Integer> map2 = tasks.get(i).get();
175-
mergeRecodeMaps(map, map2);
176-
}
177-
return map;
177+
final int blk = Math.max(10000, (s + k) / k);
178+
final List<Future<Map<T, Integer>>> tasks = new ArrayList<>();
179+
for(int i = blk; i < s; i += blk) { // start at blk for the other threads
180+
final int start = i;
181+
final int end = Math.min(i + blk, s);
182+
tasks.add(pool.submit(() -> createRecodeMap(estimate, start, end)));
178183
}
179-
catch(Exception e) {
180-
throw new RuntimeException(e);
181-
}
182-
finally {
183-
pool.shutdown();
184+
// make the initial map thread local allocation.
185+
final Map<T, Integer> map = new HashMap<>((int) (estimate * 1.3));
186+
createRecodeMap(map, 0, blk);
187+
for(int i = 0; i < tasks.size(); i++) { // merge with other threads work.
188+
final Map<T, Integer> map2 = tasks.get(i).get();
189+
mergeRecodeMaps(map, map2);
184190
}
191+
return map;
192+
185193
}
186194

187195
/**
@@ -193,7 +201,16 @@ private Map<T, Integer> parallelCreateRecodeMap(int estimate, ExecutorService po
193201
* @param target The target object to merge the two maps into
194202
* @param from The Map to take entries from.
195203
*/
196-
protected abstract void mergeRecodeMaps(Map<T, Integer> target, Map<T, Integer> from);
204+
protected static <T> void mergeRecodeMaps(Map<T, Integer> target, Map<T, Integer> from) {
205+
final List<T> fromEntriesOrdered = new ArrayList<>(Collections.nCopies(from.size(), null));
206+
for(Map.Entry<T, Integer> e : from.entrySet())
207+
fromEntriesOrdered.set(e.getValue() - 1, e.getKey());
208+
int id = target.size();
209+
for(T e : fromEntriesOrdered) {
210+
if(target.putIfAbsent(e, id) == null)
211+
id++;
212+
}
213+
}
197214

198215
private Map<T, Integer> createRecodeMap(final int estimate, final int s, final int e) {
199216
// * 1.3 because we hashMap has a load factor of 1.75

src/main/java/org/apache/sysds/runtime/frame/data/columns/CharArray.java

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,7 @@
2323
import java.io.IOException;
2424
import java.nio.ByteBuffer;
2525
import java.nio.ByteOrder;
26-
import java.util.ArrayList;
2726
import java.util.Arrays;
28-
import java.util.Collections;
29-
import java.util.List;
30-
import java.util.Map;
3127

3228
import org.apache.sysds.common.Types.ValueType;
3329
import org.apache.sysds.runtime.DMLRuntimeException;
@@ -381,18 +377,6 @@ public boolean possiblyContainsNaN() {
381377
return false;
382378
}
383379

384-
@Override
385-
protected void mergeRecodeMaps(Map<Character, Integer> target, Map<Character, Integer> from) {
386-
final List<Character> fromEntriesOrdered = new ArrayList<>(Collections.nCopies(from.size(), null));
387-
for(Map.Entry<Character, Integer> e : from.entrySet())
388-
fromEntriesOrdered.set(e.getValue() - 1, e.getKey());
389-
int id = target.size();
390-
for(Character e : fromEntriesOrdered) {
391-
if(target.putIfAbsent(e, id) == null)
392-
id++;
393-
}
394-
}
395-
396380
@Override
397381
public String toString() {
398382
StringBuilder sb = new StringBuilder(_size * 2 + 15);

src/main/java/org/apache/sysds/runtime/frame/data/columns/DDCArray.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.HashMap;
2626
import java.util.Map;
2727
import java.util.Map.Entry;
28+
import java.util.concurrent.ExecutionException;
2829
import java.util.concurrent.ExecutorService;
2930

3031
import org.apache.sysds.common.Types.ValueType;
@@ -176,7 +177,7 @@ public static <T> Array<T> compressToDDC(Array<T> arr, int estimateUnique) {
176177
}
177178

178179
@Override
179-
protected Map<T, Integer> createRecodeMap(int estimate, ExecutorService pool) {
180+
protected Map<T, Integer> createRecodeMap(int estimate, ExecutorService pool) throws InterruptedException, ExecutionException {
180181
return dict.createRecodeMap(estimate, pool);
181182
}
182183

src/main/java/org/apache/sysds/runtime/frame/data/columns/DoubleArray.java

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import java.nio.ByteBuffer;
2626
import java.nio.ByteOrder;
2727
import java.util.Arrays;
28-
import java.util.Map;
2928

3029
import org.apache.sysds.common.Types.ValueType;
3130
import org.apache.sysds.runtime.DMLRuntimeException;
@@ -463,18 +462,6 @@ public double[] minMax(int l, int u) {
463462
return new double[] {min, max};
464463
}
465464

466-
@Override
467-
protected void mergeRecodeMaps(Map<Double, Integer> target, Map<Double, Integer> from) {
468-
final double[] fromEntriesOrdered = new double[from.size()];
469-
for(Map.Entry<Double, Integer> e : from.entrySet())
470-
fromEntriesOrdered[e.getValue() - 1] = e.getKey();
471-
int id = target.size();
472-
for(double e : fromEntriesOrdered) {
473-
if(target.putIfAbsent(e, id) == null)
474-
id++;
475-
}
476-
}
477-
478465
@Override
479466
public String toString() {
480467
StringBuilder sb = new StringBuilder(_size * 5 + 2);

src/main/java/org/apache/sysds/runtime/frame/data/columns/FloatArray.java

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -412,18 +412,6 @@ protected int setAndAddToDict(Map<Float, Integer> rcd, AMapToData m, int i, Inte
412412
return id;
413413
}
414414

415-
@Override
416-
protected void mergeRecodeMaps(Map<Float, Integer> target, Map<Float, Integer> from) {
417-
final float[] fromEntriesOrdered = new float[from.size()];
418-
for(Map.Entry<Float, Integer> e : from.entrySet())
419-
fromEntriesOrdered[e.getValue() - 1] = e.getKey();
420-
int id = target.size();
421-
for(float e : fromEntriesOrdered) {
422-
if(target.putIfAbsent(e, id) == null)
423-
id++;
424-
}
425-
}
426-
427415
@Override
428416
public String toString() {
429417
StringBuilder sb = new StringBuilder(_size * 5 + 2);

src/main/java/org/apache/sysds/runtime/frame/data/columns/HashIntegerArray.java

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -458,18 +458,6 @@ public void setM(Map<Object, Integer> map, int si, AMapToData m, int i) {
458458
m.set(i, map.get(v).intValue() - 1);
459459
}
460460

461-
@Override
462-
protected void mergeRecodeMaps(Map<Object, Integer> target, Map<Object, Integer> from) {
463-
final int[] fromEntriesOrdered = new int[from.size()];
464-
for(Map.Entry<Object, Integer> e : from.entrySet())
465-
fromEntriesOrdered[e.getValue() - 1] = (Integer) e.getKey();
466-
int id = target.size();
467-
for(int e : fromEntriesOrdered) {
468-
if(target.putIfAbsent(e, id) == null)
469-
id++;
470-
}
471-
}
472-
473461
@Override
474462
public String toString() {
475463
StringBuilder sb = new StringBuilder(_size * 5 + 2);

src/main/java/org/apache/sysds/runtime/frame/data/columns/HashLongArray.java

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -454,18 +454,6 @@ public void setM(Map<Object, Integer> map, int si, AMapToData m, int i) {
454454
m.set(i, map.get(Long.valueOf(getLong(i))) - 1);
455455
}
456456

457-
@Override
458-
protected void mergeRecodeMaps(Map<Object, Integer> target, Map<Object, Integer> from) {
459-
final long[] fromEntriesOrdered = new long[from.size()];
460-
for(Map.Entry<Object, Integer> e : from.entrySet())
461-
fromEntriesOrdered[e.getValue() - 1] = (Long) e.getKey();
462-
int id = target.size();
463-
for(long e : fromEntriesOrdered) {
464-
if(target.putIfAbsent(e, id) == null)
465-
id++;
466-
}
467-
}
468-
469457
@Override
470458
public String toString() {
471459
StringBuilder sb = new StringBuilder(_size * 5 + 2);

src/main/java/org/apache/sysds/runtime/frame/data/columns/IntegerArray.java

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import java.nio.ByteBuffer;
2626
import java.nio.ByteOrder;
2727
import java.util.Arrays;
28-
import java.util.Map;
2928

3029
import org.apache.sysds.common.Types.ValueType;
3130
import org.apache.sysds.runtime.DMLRuntimeException;
@@ -384,18 +383,6 @@ public boolean possiblyContainsNaN() {
384383
return false;
385384
}
386385

387-
@Override
388-
protected void mergeRecodeMaps(Map<Integer, Integer> target, Map<Integer, Integer> from) {
389-
final int[] fromEntriesOrdered = new int[from.size()];
390-
for(Map.Entry<Integer, Integer> e : from.entrySet())
391-
fromEntriesOrdered[e.getValue() - 1] = e.getKey();
392-
int id = target.size();
393-
for(int e : fromEntriesOrdered) {
394-
if(target.putIfAbsent(e, id) == null)
395-
id++;
396-
}
397-
}
398-
399386
@Override
400387
public String toString() {
401388
StringBuilder sb = new StringBuilder(_size * 5 + 2);

0 commit comments

Comments
 (0)