Skip to content

Commit 6f6f609

Browse files
authored
Make ES query recording allocation free (#283)
1 parent 32a3642 commit 6f6f609

File tree

22 files changed

+358
-190
lines changed

22 files changed

+358
-190
lines changed

apm-agent-benchmarks/src/main/java/co/elastic/apm/benchmark/objectpool/ObjectPoolBenchmark.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,13 +60,13 @@ public static void main(String[] args) throws RunnerException {
6060
@Setup
6161
public void setUp() {
6262
tracer = new ElasticApmTracerBuilder().build();
63-
blockingQueueObjectPool = new QueueBasedObjectPool<>(new ArrayBlockingQueue<>(256), true, () -> new Transaction(tracer));
64-
jctoolsQueueObjectPool = new QueueBasedObjectPool<>(new MpmcArrayQueue<>(256), true, () -> new Transaction(tracer));
65-
jctoolsAtomicQueueObjectPool = new QueueBasedObjectPool<>(new MpmcAtomicArrayQueue<>(256), true, () -> new Transaction(tracer));
66-
agronaQueueObjectPool = new QueueBasedObjectPool<>(new ManyToManyConcurrentArrayQueue<>(256), true, () -> new Transaction(tracer));
63+
blockingQueueObjectPool = QueueBasedObjectPool.ofRecyclable(new ArrayBlockingQueue<>(256), true, () -> new Transaction(tracer));
64+
jctoolsQueueObjectPool = QueueBasedObjectPool.ofRecyclable(new MpmcArrayQueue<>(256), true, () -> new Transaction(tracer));
65+
jctoolsAtomicQueueObjectPool = QueueBasedObjectPool.ofRecyclable(new MpmcAtomicArrayQueue<>(256), true, () -> new Transaction(tracer));
66+
agronaQueueObjectPool = QueueBasedObjectPool.ofRecyclable(new ManyToManyConcurrentArrayQueue<>(256), true, () -> new Transaction(tracer));
6767
mixedObjectPool = new MixedObjectPool<>(() -> new Transaction(tracer),
6868
new ThreadLocalObjectPool<>(256, true, () -> new Transaction(tracer)),
69-
new QueueBasedObjectPool<>(new ManyToManyConcurrentArrayQueue<>(256), true, () -> new Transaction(tracer)));
69+
QueueBasedObjectPool.ofRecyclable(new ManyToManyConcurrentArrayQueue<>(256), true, () -> new Transaction(tracer)));
7070
threadLocalObjectPool = new ThreadLocalObjectPool<>(64, true, () -> new Transaction(tracer));
7171
}
7272

apm-agent-core/src/main/java/co/elastic/apm/impl/ElasticApmTracer.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
import co.elastic.apm.impl.transaction.TraceContext;
3131
import co.elastic.apm.impl.transaction.Transaction;
3232
import co.elastic.apm.objectpool.ObjectPool;
33-
import co.elastic.apm.objectpool.RecyclableObjectFactory;
33+
import co.elastic.apm.objectpool.Allocator;
3434
import co.elastic.apm.objectpool.impl.QueueBasedObjectPool;
3535
import co.elastic.apm.report.Reporter;
3636
import co.elastic.apm.report.ReporterConfiguration;
@@ -85,23 +85,23 @@ protected Deque<Object> initialValue() {
8585
this.spanListeners = spanListeners;
8686
int maxPooledElements = configurationRegistry.getConfig(ReporterConfiguration.class).getMaxQueueSize() * 2;
8787
coreConfiguration = configurationRegistry.getConfig(CoreConfiguration.class);
88-
transactionPool = new QueueBasedObjectPool<>(AtomicQueueFactory.<Transaction>newQueue(createBoundedMpmc(maxPooledElements)), false,
89-
new RecyclableObjectFactory<Transaction>() {
88+
transactionPool = QueueBasedObjectPool.ofRecyclable(AtomicQueueFactory.<Transaction>newQueue(createBoundedMpmc(maxPooledElements)), false,
89+
new Allocator<Transaction>() {
9090
@Override
9191
public Transaction createInstance() {
9292
return new Transaction(ElasticApmTracer.this);
9393
}
9494
});
95-
spanPool = new QueueBasedObjectPool<>(AtomicQueueFactory.<Span>newQueue(createBoundedMpmc(maxPooledElements)), false,
96-
new RecyclableObjectFactory<Span>() {
95+
spanPool = QueueBasedObjectPool.ofRecyclable(AtomicQueueFactory.<Span>newQueue(createBoundedMpmc(maxPooledElements)), false,
96+
new Allocator<Span>() {
9797
@Override
9898
public Span createInstance() {
9999
return new Span(ElasticApmTracer.this);
100100
}
101101
});
102102
// we are assuming that we don't need as many errors as spans or transactions
103-
errorPool = new QueueBasedObjectPool<>(AtomicQueueFactory.<ErrorCapture>newQueue(createBoundedMpmc(maxPooledElements / 2)), false,
104-
new RecyclableObjectFactory<ErrorCapture>() {
103+
errorPool = QueueBasedObjectPool.ofRecyclable(AtomicQueueFactory.<ErrorCapture>newQueue(createBoundedMpmc(maxPooledElements / 2)), false,
104+
new Allocator<ErrorCapture>() {
105105
@Override
106106
public ErrorCapture createInstance() {
107107
return new ErrorCapture(ElasticApmTracer.this);

apm-agent-core/src/main/java/co/elastic/apm/impl/transaction/Db.java

Lines changed: 62 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,37 @@
2020

2121
package co.elastic.apm.impl.transaction;
2222

23+
import co.elastic.apm.objectpool.Allocator;
24+
import co.elastic.apm.objectpool.ObjectPool;
2325
import co.elastic.apm.objectpool.Recyclable;
26+
import co.elastic.apm.objectpool.impl.QueueBasedObjectPool;
27+
import co.elastic.apm.objectpool.impl.Resetter;
28+
import co.elastic.apm.report.serialize.DslJsonSerializer;
29+
import org.jctools.queues.atomic.MpmcAtomicArrayQueue;
2430

2531
import javax.annotation.Nullable;
32+
import java.nio.CharBuffer;
2633

2734

2835
/**
2936
* An object containing contextual data for database spans
3037
*/
3138
public class Db implements Recyclable {
3239

40+
private final ObjectPool<CharBuffer> charBufferPool = QueueBasedObjectPool.of(new MpmcAtomicArrayQueue<CharBuffer>(128), false,
41+
new Allocator<CharBuffer>() {
42+
@Override
43+
public CharBuffer createInstance() {
44+
return CharBuffer.allocate(DslJsonSerializer.MAX_LONG_STRING_VALUE_LENGTH);
45+
}
46+
},
47+
new Resetter<CharBuffer>() {
48+
@Override
49+
public void recycle(CharBuffer object) {
50+
object.clear();
51+
}
52+
});
53+
3354
/**
3455
* Database instance name
3556
*/
@@ -40,6 +61,10 @@ public class Db implements Recyclable {
4061
*/
4162
@Nullable
4263
private String statement;
64+
65+
@Nullable
66+
private CharBuffer statementBuffer;
67+
4368
/**
4469
* Database type. For any SQL database, "sql". For others, the lower-case database category, e.g. "cassandra", "hbase", or "redis"
4570
*/
@@ -83,6 +108,37 @@ public Db withStatement(@Nullable String statement) {
83108
return this;
84109
}
85110

111+
/**
112+
* Gets a pooled {@link CharBuffer} to record the DB statement and associates it with this instance.
113+
* <p>
114+
* Note: you may not hold a reference to the returned {@link CharBuffer} as it will be reused.
115+
* </p>
116+
* <p>
117+
* Note: This method is not thread safe
118+
* </p>
119+
*
120+
* @return a {@link CharBuffer} to record the DB statement
121+
*/
122+
public CharBuffer withStatementBuffer() {
123+
if (this.statementBuffer == null) {
124+
this.statementBuffer = charBufferPool.createInstance();
125+
}
126+
return this.statementBuffer;
127+
}
128+
129+
/**
130+
* Returns the associated pooled {@link CharBuffer} to record the DB statement.
131+
* <p>
132+
* Note: returns {@code null} unless {@link #withStatementBuffer()} has previously been called
133+
* </p>
134+
*
135+
* @return a {@link CharBuffer} to record the DB statement, or {@code null}
136+
*/
137+
@Nullable
138+
public CharBuffer getStatementBuffer() {
139+
return statementBuffer;
140+
}
141+
86142
/**
87143
* Database type. For any SQL database, "sql". For others, the lower-case database category, e.g. "cassandra", "hbase", or "redis"
88144
*/
@@ -121,13 +177,18 @@ public void resetState() {
121177
statement = null;
122178
type = null;
123179
user = null;
180+
if (statementBuffer != null) {
181+
charBufferPool.recycle(statementBuffer);
182+
}
183+
statementBuffer = null;
124184
}
125185

126186
public boolean hasContent() {
127187
return instance != null ||
128188
statement != null ||
129189
type != null ||
130-
user != null;
190+
user != null ||
191+
statementBuffer != null;
131192
}
132193

133194
public void copyFrom(Db other) {
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
*/
2020
package co.elastic.apm.objectpool;
2121

22-
public interface RecyclableObjectFactory<T extends Recyclable> {
22+
public interface Allocator<T> {
2323

2424
T createInstance();
2525
}

apm-agent-core/src/main/java/co/elastic/apm/objectpool/NoopObjectPool.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,21 +24,21 @@
2424

2525
public class NoopObjectPool<T extends Recyclable> implements ObjectPool<T> {
2626

27-
private final RecyclableObjectFactory<T> recyclableObjectFactory;
27+
private final Allocator<T> allocator;
2828

29-
public NoopObjectPool(RecyclableObjectFactory<T> recyclableObjectFactory) {
30-
this.recyclableObjectFactory = recyclableObjectFactory;
29+
public NoopObjectPool(Allocator<T> allocator) {
30+
this.allocator = allocator;
3131
}
3232

3333
@Nullable
3434
@Override
3535
public T tryCreateInstance() {
36-
return recyclableObjectFactory.createInstance();
36+
return allocator.createInstance();
3737
}
3838

3939
@Override
4040
public T createInstance() {
41-
return recyclableObjectFactory.createInstance();
41+
return allocator.createInstance();
4242
}
4343

4444
@Override

apm-agent-core/src/main/java/co/elastic/apm/objectpool/ObjectPool.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import javax.annotation.Nullable;
2323
import java.io.Closeable;
2424

25-
public interface ObjectPool<T extends Recyclable> extends Closeable {
25+
public interface ObjectPool<T> extends Closeable {
2626
@Nullable
2727
T tryCreateInstance();
2828

apm-agent-core/src/main/java/co/elastic/apm/objectpool/impl/AbstractObjectPool.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,18 +20,17 @@
2020
package co.elastic.apm.objectpool.impl;
2121

2222
import co.elastic.apm.objectpool.ObjectPool;
23-
import co.elastic.apm.objectpool.Recyclable;
24-
import co.elastic.apm.objectpool.RecyclableObjectFactory;
23+
import co.elastic.apm.objectpool.Allocator;
2524

2625
import java.util.concurrent.atomic.AtomicInteger;
2726

28-
public abstract class AbstractObjectPool<T extends Recyclable> implements ObjectPool<T> {
27+
public abstract class AbstractObjectPool<T> implements ObjectPool<T> {
2928

30-
protected final RecyclableObjectFactory<T> recyclableObjectFactory;
29+
protected final Allocator<T> allocator;
3130
private final AtomicInteger garbageCreated = new AtomicInteger();
3231

33-
protected AbstractObjectPool(RecyclableObjectFactory<T> recyclableObjectFactory) {
34-
this.recyclableObjectFactory = recyclableObjectFactory;
32+
protected AbstractObjectPool(Allocator<T> allocator) {
33+
this.allocator = allocator;
3534
}
3635

3736
@Override
@@ -40,7 +39,7 @@ public T createInstance() {
4039
if (recyclable == null) {
4140
// queue is empty, falling back to creating a new instance
4241
garbageCreated.incrementAndGet();
43-
return recyclableObjectFactory.createInstance();
42+
return allocator.createInstance();
4443
} else {
4544
return recyclable;
4645
}

apm-agent-core/src/main/java/co/elastic/apm/objectpool/impl/MixedObjectPool.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,19 +20,18 @@
2020
package co.elastic.apm.objectpool.impl;
2121

2222
import co.elastic.apm.objectpool.ObjectPool;
23-
import co.elastic.apm.objectpool.Recyclable;
24-
import co.elastic.apm.objectpool.RecyclableObjectFactory;
23+
import co.elastic.apm.objectpool.Allocator;
2524

2625
import javax.annotation.Nullable;
2726
import java.io.IOException;
2827

29-
public class MixedObjectPool<T extends Recyclable> extends AbstractObjectPool<T> {
28+
public class MixedObjectPool<T> extends AbstractObjectPool<T> {
3029

3130
private final ObjectPool<T> primaryPool;
3231
private final ObjectPool<T> secondaryPool;
3332

34-
public MixedObjectPool(final RecyclableObjectFactory<T> recyclableObjectFactory, ObjectPool<T> primaryPool, ObjectPool<T> secondaryPool) {
35-
super(recyclableObjectFactory);
33+
public MixedObjectPool(final Allocator<T> allocator, ObjectPool<T> primaryPool, ObjectPool<T> secondaryPool) {
34+
super(allocator);
3635
this.primaryPool = primaryPool;
3736
this.secondaryPool = secondaryPool;
3837
}

apm-agent-core/src/main/java/co/elastic/apm/objectpool/impl/QueueBasedObjectPool.java

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,32 +19,42 @@
1919
*/
2020
package co.elastic.apm.objectpool.impl;
2121

22+
import co.elastic.apm.objectpool.Allocator;
2223
import co.elastic.apm.objectpool.Recyclable;
23-
import co.elastic.apm.objectpool.RecyclableObjectFactory;
2424
import com.lmax.disruptor.EventFactory;
2525

2626
import javax.annotation.Nullable;
2727
import java.util.Collection;
2828
import java.util.Iterator;
2929
import java.util.Queue;
3030

31-
public class QueueBasedObjectPool<T extends Recyclable> extends AbstractObjectPool<T> implements Collection<T> {
31+
public class QueueBasedObjectPool<T> extends AbstractObjectPool<T> implements Collection<T> {
3232

3333
private final Queue<T> queue;
34+
private final Resetter<T> resetter;
3435

3536
/**
3637
* @param queue the underlying queue
37-
* @param preAllocate when set to true, the recyclableObjectFactory will be used to create maxPooledElements objects
38+
* @param preAllocate when set to true, the allocator will be used to create maxPooledElements objects
3839
* which are then stored in the queue
39-
* @param recyclableObjectFactory a factory method which is used to create new instances of the recyclable object. This factory is
40+
* @param allocator a factory method which is used to create new instances of the recyclable object. This factory is
4041
* used when there are no objects in the queue and to preallocate the queue.
4142
*/
42-
public QueueBasedObjectPool(Queue<T> queue, boolean preAllocate, RecyclableObjectFactory<T> recyclableObjectFactory) {
43-
super(recyclableObjectFactory);
43+
public static <T extends Recyclable> QueueBasedObjectPool<T> ofRecyclable(Queue<T> queue, boolean preAllocate, Allocator<T> allocator) {
44+
return new QueueBasedObjectPool<>(queue, preAllocate, allocator, Resetter.ForRecyclable.<T>get());
45+
}
46+
47+
public static <T> QueueBasedObjectPool<T> of(Queue<T> queue, boolean preAllocate, Allocator<T> allocator, Resetter<T> resetter) {
48+
return new QueueBasedObjectPool<>(queue, preAllocate, allocator, resetter);
49+
}
50+
51+
private QueueBasedObjectPool(Queue<T> queue, boolean preAllocate, Allocator<T> allocator, Resetter<T> resetter) {
52+
super(allocator);
4453
this.queue = queue;
54+
this.resetter = resetter;
4555
if (preAllocate) {
4656
for (int i = 0; i < this.queue.size(); i++) {
47-
this.queue.offer(recyclableObjectFactory.createInstance());
57+
this.queue.offer(allocator.createInstance());
4858
}
4959
}
5060
}
@@ -57,7 +67,7 @@ public T tryCreateInstance() {
5767

5868
@Override
5969
public void recycle(T obj) {
60-
obj.resetState();
70+
resetter.recycle(obj);
6171
queue.offer(obj);
6272
}
6373

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*-
2+
* #%L
3+
* Elastic APM Java agent
4+
* %%
5+
* Copyright (C) 2018 Elastic and contributors
6+
* %%
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
* #L%
19+
*/
20+
package co.elastic.apm.objectpool.impl;
21+
22+
import co.elastic.apm.objectpool.Recyclable;
23+
24+
public interface Resetter<T> {
25+
26+
void recycle(T object);
27+
28+
class ForRecyclable<T extends Recyclable> implements Resetter<T> {
29+
private static ForRecyclable INSTANCE = new ForRecyclable();
30+
31+
public static <T extends Recyclable> Resetter<T> get() {
32+
return INSTANCE;
33+
}
34+
35+
@Override
36+
public void recycle(Recyclable object) {
37+
object.resetState();
38+
}
39+
}
40+
41+
}

0 commit comments

Comments
 (0)