|
1 | 1 | package com.github.bordertech.taskmaster.impl; |
2 | 2 |
|
3 | | -import com.github.bordertech.config.Config; |
4 | 3 | import com.github.bordertech.taskmaster.exception.RejectedTaskException; |
5 | 4 | import com.github.bordertech.taskmaster.TaskFuture; |
6 | | -import java.util.HashMap; |
7 | | -import java.util.Map; |
8 | | -import java.util.concurrent.ArrayBlockingQueue; |
9 | | -import java.util.concurrent.BlockingQueue; |
10 | 5 | import java.util.concurrent.ExecutorService; |
11 | | -import java.util.concurrent.Executors; |
12 | 6 | import java.util.concurrent.Future; |
13 | | -import java.util.concurrent.LinkedBlockingQueue; |
14 | 7 | import java.util.concurrent.RejectedExecutionException; |
15 | | -import java.util.concurrent.SynchronousQueue; |
16 | | -import java.util.concurrent.ThreadPoolExecutor; |
17 | | -import java.util.concurrent.TimeUnit; |
18 | 8 | import javax.inject.Singleton; |
19 | 9 | import com.github.bordertech.taskmaster.TaskMasterProvider; |
20 | 10 |
|
|
27 | 17 | @Singleton |
28 | 18 | public class TaskMasterProviderExecutorService implements TaskMasterProvider { |
29 | 19 |
|
30 | | - private static final Map<String, ExecutorService> THREAD_POOLS = new HashMap<>(); |
31 | | - |
32 | | - private static final String TP_PARAM_PREFIX = "bordertech.taskmaster.pool."; |
33 | | - |
34 | | - private static final String DEFAULT_POOL = Config.getInstance().getString(TP_PARAM_PREFIX + "default", "default"); |
35 | | - |
36 | | - private static final int DEFAULT_MAX_THREADS = 20; |
37 | | - private static final int DEFAULT_QUEUE_LENGTH = 0; |
38 | | - |
39 | | - static { |
40 | | - // Load thread pools |
41 | | - String[] pools = Config.getInstance().getStringArray(TP_PARAM_PREFIX + "names"); |
42 | | - for (String pool : pools) { |
43 | | - THREAD_POOLS.put(pool, buildPool(pool)); |
44 | | - } |
45 | | - // Check if default pool needs to be created |
46 | | - if (!THREAD_POOLS.containsKey(DEFAULT_POOL)) { |
47 | | - THREAD_POOLS.put(DEFAULT_POOL, buildPool(DEFAULT_POOL)); |
48 | | - } |
| 20 | + @Override |
| 21 | + public void shutdown() { |
| 22 | + shutdownNow(); |
49 | 23 | } |
50 | 24 |
|
51 | 25 | /** |
52 | | - * @param pool the pool name to create |
53 | | - * @return the executor service |
| 26 | + * TODO This needs to be put in TaskMaster. |
54 | 27 | */ |
55 | | - private static ExecutorService buildPool(final String pool) { |
56 | | - // TODO Logging and performance parameters |
57 | | - // http://www.nurkiewicz.com/2014/11/executorservice-10-tips-and-tricks.html |
58 | | - |
59 | | - // Get the pool type - defaults to cached |
60 | | - String type = Config.getInstance().getString(TP_PARAM_PREFIX + pool + ".type", "cached"); |
61 | | - switch (type.toLowerCase()) { |
62 | | - case "single": |
63 | | - return Executors.newSingleThreadExecutor(); |
64 | | - case "fixed": |
65 | | - // Number of fixed threads |
66 | | - int max = Config.getInstance().getInt(TP_PARAM_PREFIX + pool + ".max", DEFAULT_MAX_THREADS); |
67 | | - if (max < 1) { |
68 | | - max = DEFAULT_MAX_THREADS; |
69 | | - } |
70 | | - // Length of pending queue |
71 | | - int queue = Config.getInstance().getInt(TP_PARAM_PREFIX + pool + ".queue", DEFAULT_QUEUE_LENGTH); |
72 | | - // Create executable with the appropriate queue type |
73 | | - BlockingQueue<Runnable> blkQueue; |
74 | | - if (queue < 0) { |
75 | | - // Unlimited |
76 | | - blkQueue = new LinkedBlockingQueue<>(); |
77 | | - } else if (queue == 0) { |
78 | | - // No queue |
79 | | - blkQueue = new SynchronousQueue<>(); |
80 | | - } else { |
81 | | - // Fixed queue length |
82 | | - blkQueue = new ArrayBlockingQueue<>(queue); |
83 | | - } |
84 | | - return new ThreadPoolExecutor(max, max, 0L, TimeUnit.MILLISECONDS, blkQueue); |
85 | | - default: |
86 | | - // Default - Unlimited Threads and No Queue |
87 | | - return Executors.newCachedThreadPool(); |
88 | | - } |
89 | | - } |
90 | | - |
91 | | - @Override |
92 | | - public void shutdown() { |
93 | | - for (ExecutorService exec : THREAD_POOLS.values()) { |
94 | | - exec.shutdown(); |
95 | | - } |
| 28 | + public void shutdownNow() { |
| 29 | + // Provide an immediate shutdown of threads without running the waiting threads. |
| 30 | + TaskMasterPoolUtil.shutdown(); |
96 | 31 | } |
97 | 32 |
|
98 | 33 | @Override |
99 | 34 | public <T> TaskFuture<T> submit(final Runnable task, final T result) throws RejectedTaskException { |
100 | | - return submit(task, result, DEFAULT_POOL); |
| 35 | + return submit(task, result, TaskMasterPoolUtil.DEFAULT_POOL); |
101 | 36 | } |
102 | 37 |
|
103 | 38 | @Override |
104 | 39 | public <T> TaskFuture<T> submit(final Runnable task, final T result, final String pool) throws RejectedTaskException { |
105 | | - String name = pool == null ? DEFAULT_POOL : pool; |
106 | | - ExecutorService exec = THREAD_POOLS.get(name); |
107 | | - if (exec == null) { |
108 | | - throw new IllegalStateException("Thread pool [" + name + "] has not been defined."); |
109 | | - } |
| 40 | + ExecutorService exec = TaskMasterPoolUtil.getPool(pool); |
110 | 41 | try { |
111 | 42 | Future<T> future = exec.submit(task, result); |
112 | 43 | return new TaskFutureWrapper<>(future); |
|
0 commit comments