|
45 | 45 | import java.util.Collection;
|
46 | 46 | import java.util.Collections;
|
47 | 47 | import java.util.List;
|
| 48 | +import java.util.Set; |
| 49 | +import java.util.WeakHashMap; |
48 | 50 | import java.util.concurrent.BlockingQueue;
|
49 | 51 | import java.util.concurrent.Callable;
|
50 | 52 | import java.util.concurrent.ExecutorService;
|
@@ -145,15 +147,9 @@ private ExecutorService getExecutorService(OptimizedCallTarget callTarget) {
|
145 | 147 | long keepAliveTime = compilerIdleDelay >= 0 ? compilerIdleDelay : 0;
|
146 | 148 |
|
147 | 149 | this.compilationQueue = createQueue(callTarget, threads);
|
148 |
| - ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(threads, threads, |
| 150 | + ThreadPoolExecutor threadPoolExecutor = new TruffleThreadPoolExecutor(threads, threads, |
149 | 151 | keepAliveTime, TimeUnit.MILLISECONDS,
|
150 |
| - compilationQueue, factory) { |
151 |
| - @Override |
152 |
| - @SuppressWarnings({"unchecked"}) |
153 |
| - protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { |
154 |
| - return (RunnableFuture<T>) new CompilationTask.ExecutorServiceWrapper((CompilationTask) callable); |
155 |
| - } |
156 |
| - }; |
| 152 | + compilationQueue, factory); |
157 | 153 |
|
158 | 154 | if (compilerIdleDelay > 0) {
|
159 | 155 | // There are two mechanisms to signal idleness: if core threads can timeout, then
|
@@ -247,7 +243,6 @@ public void shutdownAndAwaitTermination(long timeout) {
|
247 | 243 | return;
|
248 | 244 | }
|
249 | 245 | }
|
250 |
| - |
251 | 246 | threadPool.shutdownNow();
|
252 | 247 | try {
|
253 | 248 | threadPool.awaitTermination(timeout, TimeUnit.MILLISECONDS);
|
@@ -282,9 +277,40 @@ public enum Tier {
|
282 | 277 |
|
283 | 278 | }
|
284 | 279 |
|
285 |
| - private final class TruffleCompilerThreadFactory implements ThreadFactory { |
| 280 | + private final class TruffleThreadPoolExecutor extends ThreadPoolExecutor { |
| 281 | + private TruffleThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { |
| 282 | + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); |
| 283 | + } |
| 284 | + |
| 285 | + @Override |
| 286 | + @SuppressWarnings({"unchecked"}) |
| 287 | + protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { |
| 288 | + return (RunnableFuture<T>) new CompilationTask.ExecutorServiceWrapper((CompilationTask) callable); |
| 289 | + } |
| 290 | + |
| 291 | + @Override |
| 292 | + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { |
| 293 | + ThreadFactory threadFactory = getThreadFactory(); |
| 294 | + if (threadFactory instanceof JoinableThreadFactory) { |
| 295 | + return ((JoinableThreadFactory) threadFactory).joinOtherThreads(timeout, unit); |
| 296 | + } else { |
| 297 | + return super.awaitTermination(timeout, unit); |
| 298 | + } |
| 299 | + } |
| 300 | + } |
| 301 | + |
| 302 | + public interface JoinableThreadFactory extends ThreadFactory { |
| 303 | + /** |
| 304 | + * Join all but the current thread. If the current thread belongs to this thread factory, |
| 305 | + * its interrupted status is just cleared instead of joining it. |
| 306 | + */ |
| 307 | + boolean joinOtherThreads(long timeout, TimeUnit unit) throws InterruptedException; |
| 308 | + } |
| 309 | + |
| 310 | + private final class TruffleCompilerThreadFactory implements JoinableThreadFactory { |
286 | 311 | private final String namePrefix;
|
287 | 312 | private final OptimizedTruffleRuntime runtime;
|
| 313 | + private final Set<Thread> threads = Collections.synchronizedSet(Collections.newSetFromMap(new WeakHashMap<>())); |
288 | 314 |
|
289 | 315 | TruffleCompilerThreadFactory(final String namePrefix, OptimizedTruffleRuntime runtime) {
|
290 | 316 | this.namePrefix = namePrefix;
|
@@ -315,8 +341,33 @@ public void run() {
|
315 | 341 | t.setName(namePrefix + "-" + t.getId());
|
316 | 342 | t.setPriority(Thread.MAX_PRIORITY);
|
317 | 343 | t.setDaemon(true);
|
| 344 | + threads.add(t); |
318 | 345 | return t;
|
319 | 346 | }
|
| 347 | + |
| 348 | + @Override |
| 349 | + public boolean joinOtherThreads(long timeout, TimeUnit unit) throws InterruptedException { |
| 350 | + long timeoutNanos = unit.toNanos(timeout); |
| 351 | + synchronized (threads) { |
| 352 | + if (threads.contains(Thread.currentThread())) { |
| 353 | + // clear interrupt status |
| 354 | + Thread.interrupted(); |
| 355 | + } |
| 356 | + for (Thread thread : threads) { |
| 357 | + if (thread == Thread.currentThread()) { |
| 358 | + continue; |
| 359 | + } |
| 360 | + long joinStart = System.nanoTime(); |
| 361 | + TimeUnit.NANOSECONDS.timedJoin(thread, timeoutNanos); |
| 362 | + long joinEnd = System.nanoTime(); |
| 363 | + timeoutNanos -= (joinEnd - joinStart); |
| 364 | + if (timeoutNanos <= 0) { |
| 365 | + return false; |
| 366 | + } |
| 367 | + } |
| 368 | + return true; |
| 369 | + } |
| 370 | + } |
320 | 371 | }
|
321 | 372 |
|
322 | 373 | /**
|
|
0 commit comments