Skip to content

Commit 84e0877

Browse files
authored
Merge branch 'main' into fix/merge_non_empty_results
2 parents aa9556e + 0360db2 commit 84e0877

File tree

13 files changed

+1028
-60
lines changed

13 files changed

+1028
-60
lines changed

benchmarks/README.md

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -82,19 +82,21 @@ To get realistic results, you should exercise care when running benchmarks. Here
8282
NOTE: Linux only. Sorry Mac and Windows.
8383

8484
Disassembling is fun! Maybe not always useful, but always fun! Generally, you'll want to install `perf` and the JDK's `hsdis`.
85-
`perf` is generally available via `apg-get install perf` or `pacman -S perf`. `hsdis` you'll want to compile from source. is a little more involved. This worked
85+
`perf` is generally available via `apg-get install perf` or `pacman -S perf linux-tools`. `hsdis` you'll want to compile from source. is a little more involved. This worked
8686
on 2020-08-01:
8787

8888
```
8989
git clone [email protected]:openjdk/jdk.git
9090
cd jdk
91-
git checkout jdk-17-ga
92-
cd src/utils/hsdis
91+
git checkout jdk-24-ga
9392
# Get a known good binutils
9493
wget https://ftp.gnu.org/gnu/binutils/binutils-2.35.tar.gz
9594
tar xf binutils-2.35.tar.gz
96-
make BINUTILS=binutils-2.35 ARCH=amd64
97-
sudo cp build/linux-amd64/hsdis-amd64.so /usr/lib/jvm/java-17-openjdk/lib/server/
95+
bash configure --with-hsdis=binutils --with-binutils-src=binutils-2.35 \
96+
--with-boot-jdk=~/.gradle/jdks/oracle_corporation-24-amd64-linux.2
97+
make build-hsdis
98+
cp ./build/linux-x86_64-server-release/jdk/lib/hsdis-amd64.so \
99+
~/.gradle/jdks/oracle_corporation-24-amd64-linux.2/lib/hsdis.so
98100
```
99101

100102
If you want to disassemble a single method do something like this:
@@ -105,6 +107,30 @@ gradlew -p benchmarks run --args ' MemoryStatsBenchmark -jvmArgs "-XX:+UnlockDia
105107

106108
If you want `perf` to find the hot methods for you, then do add `-prof perfasm`.
107109

110+
NOTE: `perfasm` will need more access:
111+
```
112+
sudo bash
113+
echo -1 > /proc/sys/kernel/perf_event_paranoid
114+
exit
115+
```
116+
117+
If you get warnings like:
118+
```
119+
The perf event count is suspiciously low (0).
120+
```
121+
then check if you are bumping into [this](https://man.archlinux.org/man/perf-stat.1.en#INTEL_HYBRID_SUPPORT)
122+
by running:
123+
```
124+
perf stat -B dd if=/dev/zero of=/dev/null count=1000000
125+
```
126+
127+
If you see lines like:
128+
```
129+
765019980 cpu_atom/cycles/ # 1.728 GHz (0.60%)
130+
2258845959 cpu_core/cycles/ # 5.103 GHz (99.18%)
131+
```
132+
then `perf` is just not going to work for you.
133+
108134
## Async Profiler
109135

110136
Note: Linux and Mac only. Sorry Windows.
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.benchmark.compute.operator;
11+
12+
import org.apache.lucene.document.InetAddressPoint;
13+
import org.apache.lucene.util.BytesRef;
14+
import org.elasticsearch.common.breaker.NoopCircuitBreaker;
15+
import org.elasticsearch.common.network.InetAddresses;
16+
import org.elasticsearch.compute.operator.BreakingBytesRefBuilder;
17+
import org.elasticsearch.xpack.esql.expression.function.scalar.convert.ParseIp;
18+
import org.openjdk.jmh.annotations.Benchmark;
19+
import org.openjdk.jmh.annotations.BenchmarkMode;
20+
import org.openjdk.jmh.annotations.Fork;
21+
import org.openjdk.jmh.annotations.Measurement;
22+
import org.openjdk.jmh.annotations.Mode;
23+
import org.openjdk.jmh.annotations.OutputTimeUnit;
24+
import org.openjdk.jmh.annotations.Scope;
25+
import org.openjdk.jmh.annotations.State;
26+
import org.openjdk.jmh.annotations.Warmup;
27+
28+
import java.net.InetAddress;
29+
import java.util.concurrent.TimeUnit;
30+
31+
@Warmup(iterations = 5)
32+
@Measurement(iterations = 7)
33+
@BenchmarkMode(Mode.AverageTime)
34+
@OutputTimeUnit(TimeUnit.NANOSECONDS)
35+
@State(Scope.Thread)
36+
@Fork(1)
37+
public class ParseIpBenchmark {
38+
private final BytesRef ip = new BytesRef("192.168.0.1");
39+
private final BreakingBytesRefBuilder scratch = ParseIp.buildScratch(new NoopCircuitBreaker("request"));
40+
41+
@Benchmark
42+
public BytesRef leadingZerosRejected() {
43+
return ParseIp.leadingZerosRejected(ip, scratch);
44+
}
45+
46+
@Benchmark
47+
public BytesRef leadingZerosAreDecimal() {
48+
return ParseIp.leadingZerosAreDecimal(ip, scratch);
49+
}
50+
51+
@Benchmark
52+
public BytesRef leadingZerosAreOctal() {
53+
return ParseIp.leadingZerosAreOctal(ip, scratch);
54+
}
55+
56+
@Benchmark
57+
public BytesRef original() {
58+
InetAddress inetAddress = InetAddresses.forString(ip.utf8ToString());
59+
return new BytesRef(InetAddressPoint.encode(inetAddress));
60+
}
61+
}

docs/changelog/126338.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 126338
2+
summary: Speed up TO_IP
3+
area: ES|QL
4+
type: enhancement
5+
issues: []

server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java

Lines changed: 105 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111

1212
import org.elasticsearch.action.ActionListener;
1313
import org.elasticsearch.action.ActionRunnable;
14+
import org.elasticsearch.action.support.SubscribableListener;
15+
import org.elasticsearch.common.Strings;
1416
import org.elasticsearch.common.settings.Setting;
1517
import org.elasticsearch.common.settings.Settings;
1618
import org.elasticsearch.common.unit.Processors;
@@ -25,9 +27,12 @@
2527
import java.util.concurrent.CyclicBarrier;
2628
import java.util.concurrent.Executor;
2729
import java.util.concurrent.ExecutorService;
30+
import java.util.concurrent.Executors;
31+
import java.util.concurrent.Semaphore;
2832
import java.util.concurrent.ThreadFactory;
2933
import java.util.concurrent.ThreadPoolExecutor;
3034
import java.util.concurrent.TimeUnit;
35+
import java.util.concurrent.TimeoutException;
3136
import java.util.concurrent.atomic.AtomicBoolean;
3237

3338
import static org.elasticsearch.common.util.concurrent.EsExecutors.TaskTrackingConfig.DEFAULT;
@@ -295,6 +300,7 @@ public void run() {
295300
}
296301
try {
297302
executor.execute(new Runnable() {
303+
298304
@Override
299305
public void run() {
300306
// Doesn't matter is going to be rejected
@@ -757,7 +763,7 @@ public void onRejection(Exception e) {
757763
}
758764

759765
public void testScalingWithEmptyCore() {
760-
testScalingWithEmptyCore(
766+
testScalingWithEmptyCoreAndMaxSingleThread(
761767
EsExecutors.newScaling(
762768
getTestName(),
763769
0,
@@ -772,7 +778,7 @@ public void testScalingWithEmptyCore() {
772778
}
773779

774780
public void testScalingWithEmptyCoreAndKeepAlive() {
775-
testScalingWithEmptyCore(
781+
testScalingWithEmptyCoreAndMaxSingleThread(
776782
EsExecutors.newScaling(
777783
getTestName(),
778784
0,
@@ -787,9 +793,7 @@ public void testScalingWithEmptyCoreAndKeepAlive() {
787793
}
788794

789795
public void testScalingWithEmptyCoreAndLargerMaxSize() {
790-
// TODO currently the reproduction of the starvation bug does not work if max pool size > 1
791-
// https://github.com/elastic/elasticsearch/issues/124867
792-
testScalingWithEmptyCore(
796+
testScalingWithEmptyCoreAndMaxMultipleThreads(
793797
EsExecutors.newScaling(
794798
getTestName(),
795799
0,
@@ -804,9 +808,7 @@ public void testScalingWithEmptyCoreAndLargerMaxSize() {
804808
}
805809

806810
public void testScalingWithEmptyCoreAndKeepAliveAndLargerMaxSize() {
807-
// TODO currently the reproduction of the starvation bug does not work if max pool size > 1
808-
// https://github.com/elastic/elasticsearch/issues/124867
809-
testScalingWithEmptyCore(
811+
testScalingWithEmptyCoreAndMaxMultipleThreads(
810812
EsExecutors.newScaling(
811813
getTestName(),
812814
0,
@@ -821,10 +823,8 @@ public void testScalingWithEmptyCoreAndKeepAliveAndLargerMaxSize() {
821823
}
822824

823825
public void testScalingWithEmptyCoreAndWorkerPoolProbing() {
824-
// https://github.com/elastic/elasticsearch/issues/124667 is difficult to reproduce if max pool size > 1.
825-
// if probing mitigates the bug for max pool size = 1, we're good for larger pool sizes as well.
826826
// the executor is created directly here, newScaling doesn't use ExecutorScalingQueue & probing if max pool size = 1.
827-
testScalingWithEmptyCore(
827+
testScalingWithEmptyCoreAndMaxSingleThread(
828828
new EsThreadPoolExecutor(
829829
getTestName(),
830830
0,
@@ -840,10 +840,8 @@ public void testScalingWithEmptyCoreAndWorkerPoolProbing() {
840840
}
841841

842842
public void testScalingWithEmptyCoreAndKeepAliveAndWorkerPoolProbing() {
843-
// https://github.com/elastic/elasticsearch/issues/124667 is difficult to reproduce if max pool size > 1.
844-
// if probing mitigates the bug for max pool size = 1, we're good for larger pool sizes as well.
845843
// the executor is created directly here, newScaling doesn't use ExecutorScalingQueue & probing if max pool size = 1.
846-
testScalingWithEmptyCore(
844+
testScalingWithEmptyCoreAndMaxSingleThread(
847845
new EsThreadPoolExecutor(
848846
getTestName(),
849847
0,
@@ -858,11 +856,13 @@ public void testScalingWithEmptyCoreAndKeepAliveAndWorkerPoolProbing() {
858856
);
859857
}
860858

861-
private void testScalingWithEmptyCore(EsThreadPoolExecutor executor) {
859+
private void testScalingWithEmptyCoreAndMaxSingleThread(EsThreadPoolExecutor testSubject) {
862860
try {
861+
final var keepAliveNanos = testSubject.getKeepAliveTime(TimeUnit.NANOSECONDS);
862+
863863
class Task extends AbstractRunnable {
864-
private int remaining;
865864
private final CountDownLatch doneLatch;
865+
private int remaining;
866866

867867
Task(int iterations, CountDownLatch doneLatch) {
868868
this.remaining = iterations;
@@ -879,29 +879,108 @@ protected void doRun() {
879879
if (--remaining == 0) {
880880
doneLatch.countDown();
881881
} else {
882-
logger.trace("--> remaining [{}]", remaining);
883-
final long keepAliveNanos = executor.getKeepAliveTime(TimeUnit.NANOSECONDS);
884882
new Thread(() -> {
885883
if (keepAliveNanos > 0) {
886-
final var targetNanoTime = System.nanoTime() + keepAliveNanos + between(-10_000, 10_000);
887-
while (System.nanoTime() < targetNanoTime) {
888-
Thread.yield();
889-
}
884+
waitUntilKeepAliveTime(keepAliveNanos);
890885
}
891-
executor.execute(Task.this);
886+
testSubject.execute(Task.this);
892887
}).start();
893888
}
894889
}
895890
}
896891

897892
for (int i = 0; i < 20; i++) {
898-
logger.trace("--> attempt [{}]", i);
899893
final var doneLatch = new CountDownLatch(1);
900-
executor.execute(new Task(between(1, 500), doneLatch));
894+
testSubject.execute(new Task(between(1, 500), doneLatch));
901895
safeAwait(doneLatch, TimeValue.ONE_MINUTE);
902896
}
903897
} finally {
904-
ThreadPool.terminate(executor, 1, TimeUnit.SECONDS);
898+
ThreadPool.terminate(testSubject, 1, TimeUnit.SECONDS);
899+
}
900+
}
901+
902+
private void testScalingWithEmptyCoreAndMaxMultipleThreads(EsThreadPoolExecutor testSubject) {
903+
final var keepAliveNanos = testSubject.getKeepAliveTime(TimeUnit.NANOSECONDS);
904+
// Use max pool size with one additional scheduler task if a keep alive time is set.
905+
final var schedulerTasks = testSubject.getMaximumPoolSize() + (keepAliveNanos > 0 ? 1 : 0);
906+
907+
class TaskScheduler {
908+
final SubscribableListener<Void> result = new SubscribableListener<>();
909+
final ExecutorService scheduler;
910+
final CyclicBarrier cyclicBarrier;
911+
final Semaphore taskCompletions;
912+
private int remaining;
913+
914+
TaskScheduler(ExecutorService scheduler, int iterations) {
915+
this.scheduler = scheduler;
916+
this.taskCompletions = new Semaphore(0);
917+
this.cyclicBarrier = new CyclicBarrier(schedulerTasks, () -> remaining--);
918+
this.remaining = iterations;
919+
}
920+
921+
public void start() {
922+
// The scheduler tasks are running on the dedicated scheduler thread pool. Each task submits
923+
// a test task on the EsThreadPoolExecutor (`testSubject`) releasing one `taskCompletions` permit.
924+
final Runnable schedulerTask = () -> {
925+
try {
926+
while (remaining > 0) {
927+
// Wait for all scheduler threads to be ready for the next attempt.
928+
var first = cyclicBarrier.await(SAFE_AWAIT_TIMEOUT.millis(), TimeUnit.MILLISECONDS) == schedulerTasks - 1;
929+
if (first && keepAliveNanos > 0) {
930+
// The task submitted by the first scheduler task (after reaching the keep alive time) is the task
931+
// that might starve without any worker available unless an additional worker probe is submitted.
932+
waitUntilKeepAliveTime(keepAliveNanos);
933+
}
934+
// Test EsThreadPoolExecutor by submitting a task that releases one permit.
935+
testSubject.execute(taskCompletions::release);
936+
if (first) {
937+
// Let the first scheduler task (by arrival on the barrier) wait for all permits.
938+
var success = taskCompletions.tryAcquire(
939+
schedulerTasks,
940+
SAFE_AWAIT_TIMEOUT.millis(),
941+
TimeUnit.MILLISECONDS
942+
);
943+
if (success == false) {
944+
var msg = Strings.format(
945+
"timed out waiting for [%s] of [%s] tasks to complete [queue size: %s, workers: %s] ",
946+
schedulerTasks - taskCompletions.availablePermits(),
947+
schedulerTasks,
948+
testSubject.getQueue().size(),
949+
testSubject.getPoolSize()
950+
);
951+
result.onFailure(new TimeoutException(msg));
952+
return;
953+
}
954+
}
955+
}
956+
} catch (Exception e) {
957+
result.onFailure(e);
958+
return;
959+
}
960+
result.onResponse(null);
961+
};
962+
// Run scheduler tasks on the dedicated scheduler thread pool.
963+
for (int i = 0; i < schedulerTasks; i++) {
964+
scheduler.execute(schedulerTask);
965+
}
966+
}
967+
}
968+
969+
try (var scheduler = Executors.newFixedThreadPool(schedulerTasks)) {
970+
for (int i = 0; i < 100; i++) {
971+
TaskScheduler taskScheduler = new TaskScheduler(scheduler, between(10, 200));
972+
taskScheduler.start();
973+
safeAwait(taskScheduler.result);
974+
}
975+
} finally {
976+
ThreadPool.terminate(testSubject, 1, TimeUnit.SECONDS);
977+
}
978+
}
979+
980+
private void waitUntilKeepAliveTime(long keepAliveNanos) {
981+
var targetNanoTime = System.nanoTime() + keepAliveNanos + between(-1_000, 1_000);
982+
while (System.nanoTime() < targetNanoTime) {
983+
Thread.yield();
905984
}
906985
}
907986
}

0 commit comments

Comments
 (0)