Skip to content

Commit 31c4288

Browse files
authored
fix buffer sync logic using modern concurrency primitives (#991)
* fix buffer sync logic using modern concurrency primitives Signed-off-by: Gregor Zeitlinger <[email protected]> * format Signed-off-by: Gregor Zeitlinger <[email protected]> * update benchmarks Signed-off-by: Gregor Zeitlinger <[email protected]> * add errorprone Signed-off-by: Gregor Zeitlinger <[email protected]> * update benchmarks Signed-off-by: Gregor Zeitlinger <[email protected]> --------- Signed-off-by: Gregor Zeitlinger <[email protected]>
1 parent 47ad012 commit 31c4288

File tree

5 files changed

+101
-44
lines changed

5 files changed

+101
-44
lines changed

benchmarks/pom.xml

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
<?xml version="1.0" encoding="UTF-8"?>
2-
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
2+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
34
<modelVersion>4.0.0</modelVersion>
45

56
<parent>
@@ -51,9 +52,9 @@
5152
<version>${simpleclient.version}</version>
5253
</dependency>
5354
<dependency>
54-
<groupId>com.codahale.metrics</groupId>
55-
<artifactId>metrics-core</artifactId>
56-
<version>${codahale.version}</version>
55+
<groupId>com.codahale.metrics</groupId>
56+
<artifactId>metrics-core</artifactId>
57+
<version>${codahale.version}</version>
5758
</dependency>
5859
<dependency>
5960
<groupId>io.opentelemetry</groupId>
@@ -74,6 +75,25 @@
7475
<build>
7576
<finalName>${project.artifactId}</finalName>
7677
<plugins>
78+
<plugin>
79+
<groupId>org.apache.maven.plugins</groupId>
80+
<artifactId>maven-compiler-plugin</artifactId>
81+
<configuration>
82+
<source>1.8</source>
83+
<target>1.8</target>
84+
<compilerArgs>
85+
<!-- need to add parameters to prevent inheritance -->
86+
<arg>-parameters</arg>
87+
</compilerArgs>
88+
<annotationProcessorPaths>
89+
<path>
90+
<groupId>org.openjdk.jmh</groupId>
91+
<artifactId>jmh-generator-annprocess</artifactId>
92+
<version>${jmh.version}</version>
93+
</path>
94+
</annotationProcessorPaths>
95+
</configuration>
96+
</plugin>
7797
<plugin>
7898
<groupId>org.apache.maven.plugins</groupId>
7999
<artifactId>maven-shade-plugin</artifactId>
@@ -86,7 +106,8 @@
86106
<configuration>
87107
<finalName>benchmarks</finalName>
88108
<transformers>
89-
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
109+
<transformer
110+
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
90111
<mainClass>io.prometheus.metrics.benchmarks.BenchmarkRunner</mainClass>
91112
</transformer>
92113
</transformers>

benchmarks/src/main/java/io/prometheus/metrics/benchmarks/CounterBenchmark.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,21 +18,21 @@
1818
import org.openjdk.jmh.annotations.Threads;
1919

2020
/**
21-
* Results on a machine with dedicated 8 vCPU cores:
21+
* Results on a machine with dedicated Core i7 1265U:
2222
*
2323
* <pre>
2424
*
2525
* Benchmark Mode Cnt Score Error Units
26-
* CounterBenchmark.codahaleIncNoLabels thrpt 25 25761.677 ± 122.947 ops/s
27-
* CounterBenchmark.openTelemetryAdd thrpt 25 545.026 ± 33.913 ops/s
28-
* CounterBenchmark.openTelemetryInc thrpt 25 550.577 ± 45.415 ops/s
29-
* CounterBenchmark.openTelemetryIncNoLabels thrpt 25 527.638 ± 32.020 ops/s
30-
* CounterBenchmark.prometheusAdd thrpt 25 20341.474 ± 40.973 ops/s
31-
* CounterBenchmark.prometheusInc thrpt 25 26414.616 ± 96.666 ops/s
32-
* CounterBenchmark.prometheusNoLabelsInc thrpt 25 26177.676 ± 120.342 ops/s
33-
* CounterBenchmark.simpleclientAdd thrpt 25 5503.867 ± 161.313 ops/s
34-
* CounterBenchmark.simpleclientInc thrpt 25 5568.125 ± 53.291 ops/s
35-
* CounterBenchmark.simpleclientNoLabelsInc thrpt 25 5394.692 ± 130.531 ops/s
26+
* CounterBenchmark.codahaleIncNoLabels thrpt 25 32969.795 ± 1547.775 ops/s
27+
* CounterBenchmark.openTelemetryAdd thrpt 25 747.068 ± 93.128 ops/s
28+
* CounterBenchmark.openTelemetryInc thrpt 25 760.784 ± 47.595 ops/s
29+
* CounterBenchmark.openTelemetryIncNoLabels thrpt 25 824.346 ± 45.131 ops/s
30+
* CounterBenchmark.prometheusAdd thrpt 25 28403.000 ± 250.774 ops/s
31+
* CounterBenchmark.prometheusInc thrpt 25 38368.142 ± 361.914 ops/s
32+
* CounterBenchmark.prometheusNoLabelsInc thrpt 25 35558.069 ± 4020.926 ops/s
33+
* CounterBenchmark.simpleclientAdd thrpt 25 4081.152 ± 620.094 ops/s
34+
* CounterBenchmark.simpleclientInc thrpt 25 5735.644 ± 1205.329 ops/s
35+
* CounterBenchmark.simpleclientNoLabelsInc thrpt 25 6852.563 ± 544.481 ops/s
3636
* </pre>
3737
*
3838
* Prometheus counters are faster than counters of other libraries. For example, incrementing a

benchmarks/src/main/java/io/prometheus/metrics/benchmarks/HistogramBenchmark.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,15 @@
1717
import org.openjdk.jmh.annotations.Threads;
1818

1919
/**
20-
* Results on a machine with dedicated 8 vCPU cores:
20+
* Results on a machine with dedicated Core i7 1265U:
2121
*
2222
* <pre>
2323
* Benchmark Mode Cnt Score Error Units
24-
* HistogramBenchmark.openTelemetryClassic thrpt 25 258.660 ± 6.736 ops/s
25-
* HistogramBenchmark.openTelemetryExponential thrpt 25 210.963 ± 11.288 ops/s
26-
* HistogramBenchmark.prometheusClassic thrpt 25 1528.871 ± 43.598 ops/s
27-
* HistogramBenchmark.prometheusNative thrpt 25 1282.643 ± 110.210 ops/s
28-
* HistogramBenchmark.simpleclient thrpt 25 3376.016 ± 173.545 ops/s
24+
* HistogramBenchmark.openTelemetryClassic thrpt 25 390.982 ± 16.058 ops/s
25+
* HistogramBenchmark.openTelemetryExponential thrpt 25 320.160 ± 18.056 ops/s
26+
* HistogramBenchmark.prometheusClassic thrpt 25 2385.862 ± 34.766 ops/s
27+
* HistogramBenchmark.prometheusNative thrpt 25 1947.371 ± 48.193 ops/s
28+
* HistogramBenchmark.simpleclient thrpt 25 4324.961 ± 50.938 ops/s
2929
* </pre>
3030
*
3131
* The simpleclient (i.e. client_java version 0.16.0 and older) histograms perform about the same as

pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,11 @@
4848
<name>Gregor Zeitlinger</name>
4949
<email>[email protected]</email>
5050
</developer>
51+
<developer>
52+
<id>dhoard</id>
53+
<name>Doug Hoard</name>
54+
<email>[email protected]</email>
55+
</developer>
5156
</developers>
5257

5358
<modules>

prometheus-metrics-core/src/main/java/io/prometheus/metrics/core/metrics/Buffer.java

Lines changed: 53 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
import io.prometheus.metrics.model.snapshots.DataPointSnapshot;
44
import java.util.Arrays;
55
import java.util.concurrent.atomic.AtomicLong;
6+
import java.util.concurrent.locks.Condition;
7+
import java.util.concurrent.locks.ReentrantLock;
68
import java.util.function.Consumer;
79
import java.util.function.Function;
810
import java.util.function.Supplier;
@@ -15,17 +17,19 @@
1517
*/
1618
class Buffer {
1719

18-
private static final long signBit = 1L << 63;
20+
private static final long bufferActiveBit = 1L << 63;
1921
private final AtomicLong observationCount = new AtomicLong(0);
2022
private double[] observationBuffer = new double[0];
2123
private int bufferPos = 0;
2224
private boolean reset = false;
23-
private final Object appendLock = new Object();
24-
private final Object runLock = new Object();
25+
26+
ReentrantLock appendLock = new ReentrantLock();
27+
ReentrantLock runLock = new ReentrantLock();
28+
Condition bufferFilled = appendLock.newCondition();
2529

2630
boolean append(double value) {
2731
long count = observationCount.incrementAndGet();
28-
if ((count & signBit) == 0) {
32+
if ((count & bufferActiveBit) == 0) {
2933
return false; // sign bit not set -> buffer not active.
3034
} else {
3135
doAppend(value);
@@ -34,12 +38,17 @@ boolean append(double value) {
3438
}
3539

3640
private void doAppend(double amount) {
37-
synchronized (appendLock) {
41+
appendLock.lock();
42+
try {
3843
if (bufferPos >= observationBuffer.length) {
3944
observationBuffer = Arrays.copyOf(observationBuffer, observationBuffer.length + 128);
4045
}
4146
observationBuffer[bufferPos] = amount;
4247
bufferPos++;
48+
49+
bufferFilled.signalAll();
50+
} finally {
51+
appendLock.unlock();
4352
}
4453
}
4554

@@ -48,33 +57,55 @@ void reset() {
4857
reset = true;
4958
}
5059

51-
@SuppressWarnings("ThreadPriorityCheck")
5260
<T extends DataPointSnapshot> T run(
53-
Function<Long, Boolean> complete, Supplier<T> runnable, Consumer<Double> observeFunction) {
61+
Function<Long, Boolean> complete,
62+
Supplier<T> createResult,
63+
Consumer<Double> observeFunction) {
5464
double[] buffer;
5565
int bufferSize;
5666
T result;
57-
synchronized (runLock) {
58-
long count = observationCount.getAndAdd(signBit);
59-
while (!complete.apply(count)) {
60-
Thread.yield();
61-
}
62-
result = runnable.get();
63-
int expectedBufferSize;
64-
if (reset) {
65-
expectedBufferSize = (int) ((observationCount.getAndSet(0) & ~signBit) - count);
66-
reset = false;
67-
} else {
68-
expectedBufferSize = (int) (observationCount.addAndGet(signBit) - count);
69-
}
70-
while (bufferPos != expectedBufferSize) {
71-
Thread.yield();
67+
68+
runLock.lock();
69+
try {
70+
// Signal that the buffer is active.
71+
Long expectedCount = observationCount.getAndAdd(bufferActiveBit);
72+
73+
appendLock.lock();
74+
try {
75+
while (!complete.apply(expectedCount)) {
76+
// Wait until all in-flight threads have added their observations to the buffer.
77+
bufferFilled.await();
78+
}
79+
result = createResult.get();
80+
81+
// Signal that the buffer is inactive.
82+
int expectedBufferSize;
83+
if (reset) {
84+
expectedBufferSize =
85+
(int) ((observationCount.getAndSet(0) & ~bufferActiveBit) - expectedCount);
86+
reset = false;
87+
} else {
88+
expectedBufferSize = (int) (observationCount.addAndGet(bufferActiveBit) - expectedCount);
89+
}
90+
91+
while (bufferPos < expectedBufferSize) {
92+
// Wait until all in-flight threads have added their observations to the buffer.
93+
bufferFilled.await();
94+
}
95+
} finally {
96+
appendLock.unlock();
7297
}
98+
7399
buffer = observationBuffer;
74100
bufferSize = bufferPos;
75101
observationBuffer = new double[0];
76102
bufferPos = 0;
103+
} catch (InterruptedException e) {
104+
throw new RuntimeException(e);
105+
} finally {
106+
runLock.unlock();
77107
}
108+
78109
for (int i = 0; i < bufferSize; i++) {
79110
observeFunction.accept(buffer[i]);
80111
}

0 commit comments

Comments
 (0)