Skip to content

Commit 37e66e0

Browse files
authored
Add span links to messaging system tracing (#2610)
1 parent d36e7dc commit 37e66e0

File tree

41 files changed

+1495
-366
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+1495
-366
lines changed

CHANGELOG.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ the agent will now include the database name in the dependency, thus `mysql/my-d
3939
* Added support for downloading the latest agent version through the attach CLI by setting `--download-agent-version latest`. In
4040
addition, when using the `apm-agent-attach-cli-slim.jar`, which does not contain a bundled agent, the latest version will be downloaded
4141
from maven at runtime unless configured otherwise through `--download-agent-version` - {pull}2659[#2659]
42+
* Added span-links to messaging systems instrumentation (supported by APM Server 8.3+ only) - {pull}2610[#2610]
4243
4344
[float]
4445
===== Bug fixes

apm-agent-core/src/main/java/co/elastic/apm/agent/configuration/MessagingConfiguration.java

Lines changed: 47 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,9 @@
3030
public class MessagingConfiguration extends ConfigurationOptionProvider {
3131
private static final String MESSAGING_CATEGORY = "Messaging";
3232
private static final String MESSAGE_POLLING_TRANSACTION_STRATEGY = "message_polling_transaction_strategy";
33+
private static final String MESSAGE_BATCH_STRATEGY = "message_batch_strategy";
3334

34-
private ConfigurationOption<Strategy> messagePollingTransactionStrategy = ConfigurationOption.enumOption(Strategy.class)
35+
private ConfigurationOption<JmsStrategy> messagePollingTransactionStrategy = ConfigurationOption.enumOption(JmsStrategy.class)
3536
.key(MESSAGE_POLLING_TRANSACTION_STRATEGY)
3637
.configurationCategory(MESSAGING_CATEGORY)
3738
.tags("internal")
@@ -41,7 +42,19 @@ public class MessagingConfiguration extends ConfigurationOptionProvider {
4142
"\n" +
4243
"This option is case-insensitive and is only relevant for JMS.")
4344
.dynamic(true)
44-
.buildWithDefault(Strategy.HANDLING);
45+
.buildWithDefault(JmsStrategy.HANDLING);
46+
47+
private ConfigurationOption<BatchStrategy> messageBatchStrategy = ConfigurationOption.enumOption(BatchStrategy.class)
48+
.key(MESSAGE_BATCH_STRATEGY)
49+
.configurationCategory(MESSAGING_CATEGORY)
50+
.tags("internal")
51+
.description("Determines whether Spring messaging system libraries should create a batch for the processing of the entire \n" +
52+
"message/record batch, or one transaction for each message/record processing, typically by wrapping the message batch data \n" +
53+
"structure. Valid options are `SINGLE_HANDLING` and `BATCH_HANDLING`. \n" +
54+
"\n" +
55+
"This option is case-insensitive and is only relevant for Spring messaging system libraries that support batch processing.")
56+
.dynamic(true)
57+
.buildWithDefault(BatchStrategy.BATCH_HANDLING);
4558

4659
private ConfigurationOption<Boolean> collectQueueAddress = ConfigurationOption.booleanOption()
4760
.key("collect_queue_address")
@@ -79,10 +92,14 @@ public class MessagingConfiguration extends ConfigurationOptionProvider {
7992
.dynamic(true)
8093
.buildWithDefault(Boolean.TRUE);
8194

82-
public MessagingConfiguration.Strategy getMessagePollingTransactionStrategy() {
95+
public JmsStrategy getMessagePollingTransactionStrategy() {
8396
return messagePollingTransactionStrategy.get();
8497
}
8598

99+
public BatchStrategy getMessageBatchStrategy() {
100+
return messageBatchStrategy.get();
101+
}
102+
86103
public List<WildcardMatcher> getIgnoreMessageQueues() {
87104
return ignoreMessageQueues.get();
88105
}
@@ -95,9 +112,35 @@ public boolean shouldEndMessagingTransactionOnPoll() {
95112
return endMessagingTransactionOnPoll.get();
96113
}
97114

98-
public enum Strategy {
115+
public enum JmsStrategy {
116+
/**
117+
* Create a transaction capturing JMS {@code receive} invocations
118+
*/
99119
POLLING,
120+
/**
121+
* Use heuristics to create a transaction that captures the JMS message handling execution. This strategy requires heuristics
122+
* when JMS {@code receive} APIs are used (rather than {@code onMessage}), as there is no API representing message handling start
123+
* and end. Even though this is riskier and less deterministic, it is the default JMS tracing strategy otherwise all
124+
* "interesting" subsequent events that follow message receive will be missed because there will be no active transaction.
125+
*/
100126
HANDLING,
127+
/**
128+
* Create a transaction both for the polling ({@code receive}) action AND the subsequent message handling.
129+
*/
101130
BOTH
102131
}
132+
133+
/**
134+
* Only relevant for Spring wrappers around supported messaging clients, such as AMQP.
135+
*/
136+
public enum BatchStrategy {
137+
/**
138+
* Create a transaction for each received message/record, typically by wrapping the message batch data structure
139+
*/
140+
SINGLE_HANDLING,
141+
/**
142+
* Create a single transaction encapsulating the entire message/record batch-processing.
143+
*/
144+
BATCH_HANDLING
145+
}
103146
}

apm-agent-core/src/main/java/co/elastic/apm/agent/impl/ElasticApmTracer.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ public class ElasticApmTracer implements Tracer {
8484
private final ObjectPool<Transaction> transactionPool;
8585
private final ObjectPool<Span> spanPool;
8686
private final ObjectPool<ErrorCapture> errorPool;
87+
private final ObjectPool<TraceContext> spanLinkPool;
8788
private final Reporter reporter;
8889
private final ObjectPoolFactory objectPoolFactory;
8990
// Maintains a stack of all the activated spans/contexts
@@ -145,6 +146,9 @@ public void onChange(ConfigurationOption<?> configurationOption, Boolean oldValu
145146
// we are assuming that we don't need as many errors as spans or transactions
146147
errorPool = poolFactory.createErrorPool(maxPooledElements / 2, this);
147148

149+
// span links pool allows for 10X the maximum allowed span links per span
150+
spanLinkPool = poolFactory.createSpanLinkPool(AbstractSpan.MAX_ALLOWED_SPAN_LINKS * 10, this);
151+
148152
sampler = ProbabilitySampler.of(coreConfiguration.getSampleRate().get());
149153
coreConfiguration.getSampleRate().addChangeListener(new ConfigurationOption.ChangeListener<Double>() {
150154
@Override
@@ -447,6 +451,10 @@ public void endError(ErrorCapture error) {
447451
reporter.report(error);
448452
}
449453

454+
public TraceContext createSpanLink() {
455+
return spanLinkPool.createInstance();
456+
}
457+
450458
public void recycle(Transaction transaction) {
451459
transactionPool.recycle(transaction);
452460
}
@@ -459,6 +467,10 @@ public void recycle(ErrorCapture error) {
459467
errorPool.recycle(error);
460468
}
461469

470+
public void recycle(TraceContext traceContext) {
471+
spanLinkPool.recycle(traceContext);
472+
}
473+
462474
public synchronized void stop() {
463475
if (tracerState == TracerState.STOPPED) {
464476
// may happen if explicitly stopped in a unit test and executed again within a shutdown hook

apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/AbstractSpan.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232

3333
import javax.annotation.Nullable;
3434
import java.util.HashMap;
35+
import java.util.List;
3536
import java.util.Map;
3637
import java.util.concurrent.TimeUnit;
3738
import java.util.concurrent.atomic.AtomicInteger;
@@ -46,6 +47,7 @@ public abstract class AbstractSpan<T extends AbstractSpan<T>> implements Recycla
4647
public static final int PRIO_DEFAULT = 0;
4748
private static final Logger logger = LoggerFactory.getLogger(AbstractSpan.class);
4849
private static final Logger oneTimeDuplicatedEndLogger = LoggerUtils.logOnce(logger);
50+
private static final Logger oneTimeMaxSpanLinksLogger = LoggerUtils.logOnce(logger);
4951

5052
protected static final double MS_IN_MICROS = TimeUnit.MILLISECONDS.toMicros(1);
5153
protected final TraceContext traceContext;
@@ -117,6 +119,10 @@ public abstract class AbstractSpan<T extends AbstractSpan<T>> implements Recycla
117119

118120
protected final AtomicReference<Span> bufferedSpan = new AtomicReference<>();
119121

122+
// Span links handling
123+
public static final int MAX_ALLOWED_SPAN_LINKS = 1000;
124+
private final List<TraceContext> spanLinks = new UniqueSpanLinkArrayList();
125+
120126
@Nullable
121127
private OTelSpanKind otelKind = null;
122128

@@ -369,6 +375,50 @@ public TraceContext getTraceContext() {
369375
return traceContext;
370376
}
371377

378+
/**
379+
* Adds a span link based on the tracecontext header retrieved from the provided {@code carrier} through the provided {@code
380+
* headerGetter}.
381+
* @param childContextCreator the proper tracecontext inference implementation, corresponding on the header and types
382+
* @param headerGetter the proper header extractor, corresponding the header and carrier types
383+
* @param carrier the object from which the tracecontext header is to be retrieved
384+
* @param <H> the tracecontext header type - either binary ({@code byte[]}) or textual ({@code String})
385+
* @param <C> the tracecontext header carrier type, e.g. Kafka record or JMS message
386+
* @return {@code true} if added, {@code false} otherwise
387+
*/
388+
public <H, C> boolean addSpanLink(TraceContext.ChildContextCreatorTwoArg<C, HeaderGetter<H, C>> childContextCreator,
389+
HeaderGetter<H, C> headerGetter, @Nullable C carrier) {
390+
if (spanLinks.size() == MAX_ALLOWED_SPAN_LINKS) {
391+
oneTimeMaxSpanLinksLogger.warn("Span links for {} has reached the allowed maximum ({}). No more spans will be linked.",
392+
this, MAX_ALLOWED_SPAN_LINKS);
393+
return false;
394+
}
395+
boolean added = false;
396+
try {
397+
TraceContext childTraceContext = tracer.createSpanLink();
398+
if (childContextCreator.asChildOf(childTraceContext, carrier, headerGetter)) {
399+
added = spanLinks.add(childTraceContext);
400+
}
401+
if (!added) {
402+
tracer.recycle(childTraceContext);
403+
}
404+
} catch (Exception e) {
405+
logger.error(String.format("Failed to add span link to %s from header carrier %s and %s", this, carrier,
406+
headerGetter.getClass().getName()), e);
407+
}
408+
return added;
409+
}
410+
411+
/**
412+
* Returns a list of links from this span to other spans in the format of child {@link TraceContext}s, of which parent is the linked
413+
* span. For each entry in the returned list, the linked span's {@code traceId} can be retrieved through
414+
* {@link TraceContext#getTraceId()} and the {@code spanId} can be retrieved through {@link TraceContext#getParentId()}.
415+
*
416+
* @return a list of child {@link TraceContext}s of linked spans
417+
*/
418+
public List<TraceContext> getSpanLinks() {
419+
return spanLinks;
420+
}
421+
372422
@Override
373423
public void resetState() {
374424
finished = true;
@@ -388,10 +438,19 @@ public void resetState() {
388438
userOutcome = null;
389439
hasCapturedExceptions = false;
390440
bufferedSpan.set(null);
441+
recycleSpanLinks();
391442
otelKind = null;
392443
otelAttributes.clear();
393444
}
394445

446+
private void recycleSpanLinks() {
447+
//noinspection ForLoopReplaceableByForEach
448+
for (int i = 0; i < spanLinks.size(); i++) {
449+
tracer.recycle(spanLinks.get(i));
450+
}
451+
spanLinks.clear();
452+
}
453+
395454
public Span createSpan() {
396455
return createSpan(traceContext.getClock().getEpochMicros());
397456
}

apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/TraceContext.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -265,13 +265,13 @@ public static TraceContext with128BitId(ElasticApmTracer tracer) {
265265
}
266266

267267
@SuppressWarnings("unchecked")
268-
public static <C> ChildContextCreatorTwoArg<C, TextHeaderGetter<C>> getFromTraceContextTextHeaders() {
269-
return (ChildContextCreatorTwoArg<C, TextHeaderGetter<C>>) FROM_TRACE_CONTEXT_TEXT_HEADERS;
268+
public static <C> ChildContextCreatorTwoArg<C, HeaderGetter<String, C>> getFromTraceContextTextHeaders() {
269+
return (ChildContextCreatorTwoArg<C, HeaderGetter<String, C>>) FROM_TRACE_CONTEXT_TEXT_HEADERS;
270270
}
271271

272272
@SuppressWarnings("unchecked")
273-
public static <C> ChildContextCreatorTwoArg<C, BinaryHeaderGetter<C>> getFromTraceContextBinaryHeaders() {
274-
return (ChildContextCreatorTwoArg<C, BinaryHeaderGetter<C>>) FROM_TRACE_CONTEXT_BINARY_HEADERS;
273+
public static <C> ChildContextCreatorTwoArg<C, HeaderGetter<byte[], C>> getFromTraceContextBinaryHeaders() {
274+
return (ChildContextCreatorTwoArg<C, HeaderGetter<byte[], C>>) FROM_TRACE_CONTEXT_BINARY_HEADERS;
275275
}
276276

277277
public static ChildContextCreator<Tracer> fromActive() {
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
* Licensed to Elasticsearch B.V. under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch B.V. licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package co.elastic.apm.agent.impl.transaction;
20+
21+
import java.util.ArrayList;
22+
import java.util.Collection;
23+
import java.util.HashSet;
24+
import java.util.Set;
25+
26+
/**
27+
* A {@link TraceContext} list that enforces uniqueness for the purpose of span links storage.
28+
* We cannot naturally use {@link Set} implementations because {@link TraceContext#equals} is comparing own ID with other's ID. This is
29+
* inappropriate for span links, which we consider equal if their <b>parent IDs</b> are equal.
30+
* So instead, we use a limited subclass of {@link ArrayList} that maintains a parent ID cache for equality checks.
31+
* As a side benefit, this gives us the ability to iterate over span links based on index and avoid the related iterator allocation, which
32+
* we wouldn't have if using a {@link Set}.
33+
*/
34+
public class UniqueSpanLinkArrayList extends ArrayList<TraceContext> {
35+
36+
private final Set<Id> parentIdSet = new HashSet<>();
37+
38+
@Override
39+
public boolean retainAll(Collection<?> c) {
40+
throw new UnsupportedOperationException();
41+
}
42+
43+
@Override
44+
public TraceContext set(int index, TraceContext traceContext) {
45+
throw new UnsupportedOperationException();
46+
}
47+
48+
@Override
49+
public boolean add(TraceContext traceContext) {
50+
if (parentIdSet.add(traceContext.getParentId())) {
51+
return super.add(traceContext);
52+
}
53+
return false;
54+
}
55+
56+
@Override
57+
public void add(int index, TraceContext traceContext) {
58+
throw new UnsupportedOperationException();
59+
}
60+
61+
@Override
62+
public TraceContext remove(int index) {
63+
throw new UnsupportedOperationException();
64+
}
65+
66+
@Override
67+
public boolean remove(Object o) {
68+
throw new UnsupportedOperationException();
69+
}
70+
71+
@Override
72+
public void clear() {
73+
parentIdSet.clear();
74+
super.clear();
75+
}
76+
77+
@Override
78+
public boolean addAll(Collection<? extends TraceContext> c) {
79+
throw new UnsupportedOperationException();
80+
}
81+
82+
@Override
83+
public boolean addAll(int index, Collection<? extends TraceContext> c) {
84+
throw new UnsupportedOperationException();
85+
}
86+
87+
@Override
88+
protected void removeRange(int fromIndex, int toIndex) {
89+
throw new UnsupportedOperationException();
90+
}
91+
92+
@Override
93+
public boolean removeAll(Collection<?> c) {
94+
throw new UnsupportedOperationException();
95+
}
96+
}

apm-agent-core/src/main/java/co/elastic/apm/agent/objectpool/ObjectPoolFactory.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import co.elastic.apm.agent.impl.ElasticApmTracer;
2222
import co.elastic.apm.agent.impl.error.ErrorCapture;
2323
import co.elastic.apm.agent.impl.transaction.Span;
24+
import co.elastic.apm.agent.impl.transaction.TraceContext;
2425
import co.elastic.apm.agent.impl.transaction.Transaction;
2526
import co.elastic.apm.agent.objectpool.impl.QueueBasedObjectPool;
2627
import org.jctools.queues.atomic.AtomicQueueFactory;
@@ -59,4 +60,13 @@ public ErrorCapture createInstance() {
5960
}
6061
});
6162
}
63+
64+
public ObjectPool<TraceContext> createSpanLinkPool(int maxCapacity, final ElasticApmTracer tracer) {
65+
return createRecyclableObjectPool(maxCapacity, new Allocator<TraceContext>() {
66+
@Override
67+
public TraceContext createInstance() {
68+
return TraceContext.with64BitId(tracer);
69+
}
70+
});
71+
}
6272
}

0 commit comments

Comments
 (0)