Skip to content

Commit af3f8e5

Browse files
authored
[7.17] Update ExecutorScalingQueue to workaround LinkedTransferQueue JDK bug (#104347) (#104425)
* Update ExecutorScalingQueue to workaround LinkedTransferQueue JDK bug (#104347) This commit adds a few overrides to ExecutorScalingQueue (subclass of LinkedTransferQueue) to workaround a JDK bug in LinkedTransferQueue. * remove usage of var * fix compilation
1 parent b6114f4 commit af3f8e5

File tree

3 files changed

+97
-0
lines changed

3 files changed

+97
-0
lines changed

server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -382,6 +382,29 @@ public boolean offer(E e) {
382382
}
383383
}
384384

385+
// Overridden to workaround a JDK bug introduced in JDK 21.0.2
386+
// https://bugs.openjdk.org/browse/JDK-8323659
387+
@Override
388+
public void put(E e) {
389+
// As the queue is unbounded, this method will always add to the queue.
390+
super.offer(e);
391+
}
392+
393+
// Overridden to workaround a JDK bug introduced in JDK 21.0.2
394+
// https://bugs.openjdk.org/browse/JDK-8323659
395+
@Override
396+
public boolean add(E e) {
397+
// As the queue is unbounded, this method will never return false.
398+
return super.offer(e);
399+
}
400+
401+
// Overridden to workaround a JDK bug introduced in JDK 21.0.2
402+
// https://bugs.openjdk.org/browse/JDK-8323659
403+
@Override
404+
public boolean offer(E e, long timeout, TimeUnit unit) {
405+
// As the queue is unbounded, this method will never return false.
406+
return super.offer(e);
407+
}
385408
}
386409

387410
/**

server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.elasticsearch.common.settings.Setting;
1313
import org.elasticsearch.common.settings.Settings;
1414
import org.elasticsearch.test.ESTestCase;
15+
import org.elasticsearch.threadpool.ThreadPool;
1516
import org.hamcrest.Matcher;
1617

1718
import java.util.Locale;
@@ -437,4 +438,41 @@ private void runProcessorsBoundTest(final Setting<Integer> processorsSetting) {
437438
assertSettingDeprecationsAndWarnings(deprecatedSettings, new DeprecationWarning(DeprecationLogger.CRITICAL, expectedWarning));
438439
}
439440

441+
// This test must complete to ensure that our basic infrastructure is working as expected.
442+
// Specifically that ExecutorScalingQueue, which subclasses LinkedTransferQueue, correctly
443+
// tracks tasks submitted to the executor.
444+
public void testBasicTaskExecution() {
445+
final ThreadPoolExecutor executorService = EsExecutors.newScaling(
446+
"test",
447+
0,
448+
between(1, 5),
449+
60,
450+
TimeUnit.SECONDS,
451+
randomBoolean(),
452+
EsExecutors.daemonThreadFactory("test"),
453+
new ThreadContext(Settings.EMPTY)
454+
);
455+
try {
456+
final CountDownLatch countDownLatch = new CountDownLatch(between(1, 10));
457+
class TestTask extends AbstractRunnable {
458+
@Override
459+
protected void doRun() {
460+
countDownLatch.countDown();
461+
if (countDownLatch.getCount() > 0) {
462+
executorService.execute(TestTask.this);
463+
}
464+
}
465+
466+
@Override
467+
public void onFailure(Exception e) {
468+
fail(e.getMessage());
469+
}
470+
}
471+
472+
executorService.execute(new TestTask());
473+
safeAwait(countDownLatch);
474+
} finally {
475+
ThreadPool.terminate(executorService, 10, TimeUnit.SECONDS);
476+
}
477+
}
440478
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.common.util.concurrent;
10+
11+
import org.elasticsearch.test.ESTestCase;
12+
13+
import java.util.concurrent.LinkedTransferQueue;
14+
import java.util.concurrent.TimeUnit;
15+
16+
public class ExecutorScalingQueueTests extends ESTestCase {
17+
18+
public void testPut() {
19+
LinkedTransferQueue<Object> queue = new EsExecutors.ExecutorScalingQueue<>();
20+
queue.put(new Object());
21+
assertEquals(queue.size(), 1);
22+
}
23+
24+
public void testAdd() {
25+
LinkedTransferQueue<Object> queue = new EsExecutors.ExecutorScalingQueue<>();
26+
assertTrue(queue.add(new Object()));
27+
assertEquals(queue.size(), 1);
28+
}
29+
30+
public void testTimedOffer() {
31+
LinkedTransferQueue<Object> queue = new EsExecutors.ExecutorScalingQueue<>();
32+
assertTrue(queue.offer(new Object(), 60, TimeUnit.SECONDS));
33+
assertEquals(queue.size(), 1);
34+
}
35+
36+
}

0 commit comments

Comments
 (0)