Skip to content

Commit d838bb4

Browse files
authored
Drop non-string labels if connected to older APM Server (#746)
1 parent 808e002 commit d838bb4

File tree

18 files changed

+245
-104
lines changed

18 files changed

+245
-104
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
## Bug Fixes
99
* Some JMS Consumers and Producers are filtered due to class name filtering in instrumentation matching
1010
* Jetty: When no display name is set and context path is "/" transaction service names will now correctly fall back to configured values
11+
* Drops non-String labels when connected to APM Server < 6.7 to avoid validation errors (#687)
1112

1213
# 1.7.0
1314

apm-agent-core/src/main/java/co/elastic/apm/agent/bci/bytebuddy/CustomElementMatchers.java

Lines changed: 1 addition & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
package co.elastic.apm.agent.bci.bytebuddy;
2626

2727
import co.elastic.apm.agent.matcher.WildcardMatcher;
28+
import co.elastic.apm.agent.util.Version;
2829
import com.blogspot.mydailyjava.weaklockfree.WeakConcurrentMap;
2930
import net.bytebuddy.description.NamedElement;
3031
import net.bytebuddy.description.method.MethodDescription;
@@ -199,35 +200,6 @@ private static Version readImplementationVersionFromManifest(@Nullable Protectio
199200
return version;
200201
}
201202

202-
/**
203-
* Based on <a href="https://gist.github.com/brianguertin/ada4b65c6d1c4f6d3eee3c12b6ce021b">https://gist.github.com/brianguertin</a>.
204-
* This code was released into the public domain by Brian Guertin on July 8, 2016 citing, verbatim the unlicense.
205-
*/
206-
private static class Version implements Comparable<Version> {
207-
private final int[] numbers;
208-
209-
Version(String version) {
210-
final String[] parts = version.split("\\-")[0].split("\\.");
211-
numbers = new int[parts.length];
212-
for (int i = 0; i < parts.length; i++) {
213-
numbers[i] = Integer.valueOf(parts[i]);
214-
}
215-
}
216-
217-
@Override
218-
public int compareTo(Version another) {
219-
final int maxLength = Math.max(numbers.length, another.numbers.length);
220-
for (int i = 0; i < maxLength; i++) {
221-
final int left = i < numbers.length ? numbers[i] : 0;
222-
final int right = i < another.numbers.length ? another.numbers[i] : 0;
223-
if (left != right) {
224-
return left < right ? -1 : 1;
225-
}
226-
}
227-
return 0;
228-
}
229-
}
230-
231203
/**
232204
* Matches overridden methods of a super class or implemented methods of an interface.
233205
* Recursively traverses the superclasses and interfaces.

apm-agent-core/src/main/java/co/elastic/apm/agent/impl/ElasticApmTracerBuilder.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -101,21 +101,27 @@ public ElasticApmTracerBuilder withConfig(String key, String value) {
101101
}
102102

103103
public ElasticApmTracer build() {
104+
boolean addApmServerConfigSource = false;
104105
if (configurationRegistry == null) {
106+
addApmServerConfigSource = true;
105107
final List<ConfigurationSource> configSources = getConfigSources(agentArguments);
106108
configurationRegistry = getDefaultConfigurationRegistry(configSources);
107109
}
108110
final ApmServerClient apmServerClient = new ApmServerClient(configurationRegistry.getConfig(ReporterConfiguration.class));
109-
final DslJsonSerializer payloadSerializer = new DslJsonSerializer(configurationRegistry.getConfig(StacktraceConfiguration.class));
111+
final DslJsonSerializer payloadSerializer = new DslJsonSerializer(configurationRegistry.getConfig(StacktraceConfiguration.class), apmServerClient);
110112
final MetaData metaData = MetaData.create(configurationRegistry, null, null);
111-
ApmServerConfigurationSource configurationSource = new ApmServerConfigurationSource(payloadSerializer, metaData, apmServerClient);
112-
configurationRegistry.addConfigurationSource(configurationSource);
113+
ApmServerConfigurationSource configurationSource = null;
114+
if (addApmServerConfigSource) {
115+
configurationSource = new ApmServerConfigurationSource(payloadSerializer, metaData, apmServerClient);
116+
configurationRegistry.addConfigurationSource(configurationSource);
117+
}
113118
if (reporter == null) {
114119
reporter = new ReporterFactory().createReporter(configurationRegistry, apmServerClient, metaData);
115120
}
116121
if (lifecycleListeners.isEmpty()) {
117-
lifecycleListeners.add(new ApmServerHealthChecker(apmServerClient));
118-
lifecycleListeners.add(configurationSource);
122+
if (configurationSource != null) {
123+
lifecycleListeners.add(configurationSource);
124+
}
119125
lifecycleListeners.addAll(DependencyInjectingServiceLoader.load(LifecycleListener.class));
120126
}
121127
return new ElasticApmTracer(configurationRegistry, reporter, lifecycleListeners);

apm-agent-core/src/main/java/co/elastic/apm/agent/report/ApmServerClient.java

Lines changed: 38 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
*/
2525
package co.elastic.apm.agent.report;
2626

27+
import co.elastic.apm.agent.util.Version;
2728
import co.elastic.apm.agent.util.VersionUtils;
2829
import org.slf4j.Logger;
2930
import org.slf4j.LoggerFactory;
@@ -42,6 +43,7 @@
4243
import java.util.Collections;
4344
import java.util.List;
4445
import java.util.concurrent.Callable;
46+
import java.util.concurrent.Future;
4547
import java.util.concurrent.atomic.AtomicInteger;
4648

4749
/**
@@ -64,34 +66,44 @@ public class ApmServerClient {
6466

6567
private static final Logger logger = LoggerFactory.getLogger(ApmServerClient.class);
6668
private static final String USER_AGENT = "elasticapm-java/" + VersionUtils.getAgentVersion();
69+
private static final Version APM_SERVER_NON_STRING_LABEL_SUPPORT = new Version("6.7.0");
6770
private final ReporterConfiguration reporterConfiguration;
6871
private volatile List<URL> serverUrls;
72+
private volatile Future<Version> apmServerVersion;
6973
private final AtomicInteger errorCount = new AtomicInteger();
74+
private final ApmServerHealthChecker healthChecker;
7075

7176
public ApmServerClient(ReporterConfiguration reporterConfiguration) {
7277
this(reporterConfiguration, shuffleUrls(reporterConfiguration.getServerUrls()));
7378
}
7479

7580
public ApmServerClient(ReporterConfiguration reporterConfiguration, List<URL> shuffledUrls) {
7681
this.reporterConfiguration = reporterConfiguration;
82+
this.healthChecker = new ApmServerHealthChecker(this);
7783
this.reporterConfiguration.getServerUrlsOption().addChangeListener(new ConfigurationOption.ChangeListener<List<URL>>() {
7884
@Override
7985
public void onChange(ConfigurationOption<?> configurationOption, List<URL> oldValue, List<URL> newValue) {
8086
logger.debug("server_urls override with value = ({}).", newValue);
8187
if (newValue != null && !newValue.isEmpty()) {
82-
serverUrls = shuffleUrls(newValue);
83-
errorCount.set(0);
88+
setServerUrls(shuffleUrls(newValue));
8489
}
8590
}
8691
});
87-
this.serverUrls = Collections.unmodifiableList(shuffledUrls);
92+
setServerUrls(Collections.unmodifiableList(shuffledUrls));
93+
}
94+
95+
private void setServerUrls(List<URL> serverUrls) {
96+
this.serverUrls = serverUrls;
97+
this.apmServerVersion = healthChecker.checkHealthAndGetMinVersion();
98+
this.errorCount.set(0);
8899
}
89100

90101
private static List<URL> shuffleUrls(List<URL> serverUrls) {
91102
// shuffling the URL list helps to distribute the load across the apm servers
92103
// when there are multiple agents, they should not all start connecting to the same apm server
93-
Collections.shuffle(serverUrls);
94-
return serverUrls;
104+
List<URL> copy = new ArrayList<>(serverUrls);
105+
Collections.shuffle(copy);
106+
return copy;
95107
}
96108

97109
private static void trustAll(HttpsURLConnection connection) {
@@ -212,12 +224,13 @@ public <V> V execute(String path, ConnectionHandler<V> connectionHandler) throws
212224
throw previousException;
213225
}
214226

215-
public void executeForAllUrls(String path, ConnectionHandler<Void> connectionHandler) {
227+
public <T> List<T> executeForAllUrls(String path, ConnectionHandler<T> connectionHandler) {
228+
List<T> results = new ArrayList<>(serverUrls.size());
216229
for (URL serverUrl : serverUrls) {
217230
HttpURLConnection connection = null;
218231
try {
219232
connection = startRequestToUrl(appendPath(serverUrl, path));
220-
connectionHandler.withConnection(connection);
233+
results.add(connectionHandler.withConnection(connection));
221234
} catch (Exception e) {
222235
logger.debug("Exception while interacting with APM Server", e);
223236
} finally {
@@ -226,6 +239,7 @@ public void executeForAllUrls(String path, ConnectionHandler<Void> connectionHan
226239
}
227240
}
228241
}
242+
return results;
229243
}
230244

231245
URL getCurrentUrl() {
@@ -255,6 +269,23 @@ List<URL> getServerUrls() {
255269
return this.serverUrls;
256270
}
257271

272+
public boolean supportsNonStringLabels() {
273+
return isAtLeast(APM_SERVER_NON_STRING_LABEL_SUPPORT);
274+
}
275+
276+
public boolean isAtLeast(Version apmServerVersion) {
277+
try {
278+
Version localApmServerVersion = this.apmServerVersion.get();
279+
if (localApmServerVersion == null) {
280+
return false;
281+
}
282+
return localApmServerVersion.compareTo(apmServerVersion) >= 0;
283+
} catch (Exception e) {
284+
logger.debug(e.getMessage(), e);
285+
return false;
286+
}
287+
}
288+
258289
public interface ConnectionHandler<T> {
259290
@Nullable
260291
T withConnection(HttpURLConnection connection) throws IOException;

apm-agent-core/src/main/java/co/elastic/apm/agent/report/ApmServerHealthChecker.java

Lines changed: 34 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -24,36 +24,49 @@
2424
*/
2525
package co.elastic.apm.agent.report;
2626

27-
import co.elastic.apm.agent.context.LifecycleListener;
28-
import co.elastic.apm.agent.impl.ElasticApmTracer;
2927
import co.elastic.apm.agent.util.ExecutorUtils;
28+
import co.elastic.apm.agent.util.Version;
29+
import com.dslplatform.json.DslJson;
30+
import com.dslplatform.json.JsonReader;
31+
import com.dslplatform.json.MapConverter;
32+
import com.dslplatform.json.Nullable;
3033
import org.slf4j.Logger;
3134
import org.slf4j.LoggerFactory;
3235

3336
import java.net.HttpURLConnection;
37+
import java.util.Collections;
38+
import java.util.List;
39+
import java.util.concurrent.Callable;
40+
import java.util.concurrent.Future;
3441
import java.util.concurrent.ThreadPoolExecutor;
3542

36-
public class ApmServerHealthChecker implements Runnable, LifecycleListener {
43+
import static java.nio.charset.StandardCharsets.UTF_8;
44+
45+
public class ApmServerHealthChecker implements Callable<Version> {
3746
private static final Logger logger = LoggerFactory.getLogger(ApmServerHealthChecker.class);
3847

3948
private final ApmServerClient apmServerClient;
49+
private final DslJson<Object> dslJson = new DslJson<>();
4050

4151
public ApmServerHealthChecker(ApmServerClient apmServerClient) {
4252
this.apmServerClient = apmServerClient;
4353
}
4454

45-
@Override
46-
public void start(ElasticApmTracer tracer) {
55+
public Future<Version> checkHealthAndGetMinVersion() {
4756
ThreadPoolExecutor pool = ExecutorUtils.createSingleThreadDeamonPool("apm-server-healthcheck", 1);
48-
pool.execute(this);
49-
pool.shutdown();
57+
try {
58+
return pool.submit(this);
59+
} finally {
60+
pool.shutdown();
61+
}
5062
}
5163

64+
@Nullable
5265
@Override
53-
public void run() {
54-
apmServerClient.executeForAllUrls("/", new ApmServerClient.ConnectionHandler<Void>() {
66+
public Version call() {
67+
List<Version> versions = apmServerClient.executeForAllUrls("/", new ApmServerClient.ConnectionHandler<Version>() {
5568
@Override
56-
public Void withConnection(HttpURLConnection connection) {
69+
public Version withConnection(HttpURLConnection connection) {
5770
try {
5871
if (logger.isDebugEnabled()) {
5972
logger.debug("Starting healthcheck to {}", connection.getURL());
@@ -69,17 +82,23 @@ public Void withConnection(HttpURLConnection connection) {
6982
}
7083
} else {
7184
// prints out the version info of the APM Server
72-
logger.info("Elastic APM server is available: {}", HttpUtils.getBody(connection));
85+
String body = HttpUtils.getBody(connection);
86+
logger.info("Elastic APM server is available: {}", body);
87+
final JsonReader<Object> reader = dslJson.newReader(body.getBytes(UTF_8));
88+
reader.startObject();
89+
String versionString = MapConverter.deserialize(reader).get("version");
90+
return new Version(versionString);
7391
}
7492
} catch (Exception e) {
7593
logger.warn("Elastic APM server {} is not available ({})", connection.getURL(), e.getMessage());
7694
}
7795
return null;
7896
}
7997
});
80-
}
81-
82-
@Override
83-
public void stop() {
98+
versions.remove(null);
99+
if (!versions.isEmpty()) {
100+
return Collections.min(versions);
101+
}
102+
return null;
84103
}
85104
}

apm-agent-core/src/main/java/co/elastic/apm/agent/report/ReporterFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public Reporter createReporter(ConfigurationRegistry configurationRegistry, ApmS
4747
private ReportingEventHandler getReportingEventHandler(ConfigurationRegistry configurationRegistry,
4848
ReporterConfiguration reporterConfiguration, MetaData metaData, ApmServerClient apmServerClient) {
4949

50-
final DslJsonSerializer payloadSerializer = new DslJsonSerializer(configurationRegistry.getConfig(StacktraceConfiguration.class));
50+
final DslJsonSerializer payloadSerializer = new DslJsonSerializer(configurationRegistry.getConfig(StacktraceConfiguration.class), apmServerClient);
5151
final ProcessorEventHandler processorEventHandler = ProcessorEventHandler.loadProcessors(configurationRegistry);
5252
return new IntakeV2ReportingEventHandler(reporterConfiguration, processorEventHandler, payloadSerializer, metaData, apmServerClient);
5353
}

apm-agent-core/src/main/java/co/elastic/apm/agent/report/serialize/DslJsonSerializer.java

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import co.elastic.apm.agent.metrics.Labels;
5656
import co.elastic.apm.agent.metrics.MetricRegistry;
5757
import co.elastic.apm.agent.metrics.MetricSet;
58+
import co.elastic.apm.agent.report.ApmServerClient;
5859
import co.elastic.apm.agent.util.PotentiallyMultiValuedMap;
5960
import com.dslplatform.json.BoolConverter;
6061
import com.dslplatform.json.DslJson;
@@ -101,11 +102,13 @@ public class DslJsonSerializer implements PayloadSerializer, MetricRegistry.Metr
101102
private final Collection<String> excludedStackFrames = Arrays.asList("java.lang.reflect", "com.sun", "sun.", "jdk.internal.");
102103
private final StringBuilder replaceBuilder = new StringBuilder(MAX_LONG_STRING_VALUE_LENGTH + 1);
103104
private final StacktraceConfiguration stacktraceConfiguration;
105+
private final ApmServerClient apmServerClient;
104106
@Nullable
105107
private OutputStream os;
106108

107-
public DslJsonSerializer(StacktraceConfiguration stacktraceConfiguration) {
109+
public DslJsonSerializer(StacktraceConfiguration stacktraceConfiguration, ApmServerClient apmServerClient) {
108110
this.stacktraceConfiguration = stacktraceConfiguration;
111+
this.apmServerClient = apmServerClient;
109112
jw = new DslJson<>(new DslJson.Settings<>()).newWriter(BUFFER_SIZE);
110113
}
111114

@@ -774,7 +777,7 @@ private void serializeContext(final TransactionContext context, TraceContext tra
774777
serializeResponse(context.getResponse());
775778
if (context.hasCustom()) {
776779
writeFieldName("custom");
777-
serializeStringKeyScalarValueMap(context.getCustomIterator(), replaceBuilder, jw, true);
780+
serializeStringKeyScalarValueMap(context.getCustomIterator(), replaceBuilder, jw, true, true);
778781
jw.writeByte(COMMA);
779782
}
780783
writeFieldName("tags");
@@ -786,31 +789,28 @@ private void serializeContext(final TransactionContext context, TraceContext tra
786789
// visible for testing
787790
void serializeLabels(AbstractContext context) {
788791
if (context.hasLabels()) {
789-
serializeStringKeyScalarValueMap(context.getLabelIterator(), replaceBuilder, jw, false);
792+
serializeStringKeyScalarValueMap(context.getLabelIterator(), replaceBuilder, jw, false, apmServerClient.supportsNonStringLabels());
790793
} else {
791794
jw.writeByte(OBJECT_START);
792795
jw.writeByte(OBJECT_END);
793796
}
794797
}
795798

796-
static void serializeStringLabels(Iterator<? extends Map.Entry<String, String>> iterator, StringBuilder replaceBuilder, JsonWriter jw) {
797-
serializeStringKeyScalarValueMap(iterator, replaceBuilder, jw, false);
798-
}
799-
800799
private static void serializeStringKeyScalarValueMap(Iterator<? extends Map.Entry<String, ? /* String|Number|Boolean */>> it,
801-
StringBuilder replaceBuilder, JsonWriter jw, boolean extendedStringLimit) {
800+
StringBuilder replaceBuilder, JsonWriter jw, boolean extendedStringLimit,
801+
boolean supportsNonStringValues) {
802802
jw.writeByte(OBJECT_START);
803803
if (it.hasNext()) {
804804
Map.Entry<String, ?> kv = it.next();
805805
writeStringValue(sanitizeLabelKey(kv.getKey(), replaceBuilder), replaceBuilder, jw);
806806
jw.writeByte(JsonWriter.SEMI);
807-
serializeScalarValue(replaceBuilder, jw, kv.getValue(), extendedStringLimit);
807+
serializeScalarValue(replaceBuilder, jw, kv.getValue(), extendedStringLimit, supportsNonStringValues);
808808
while (it.hasNext()) {
809809
jw.writeByte(COMMA);
810810
kv = it.next();
811811
writeStringValue(sanitizeLabelKey(kv.getKey(), replaceBuilder), replaceBuilder, jw);
812812
jw.writeByte(JsonWriter.SEMI);
813-
serializeScalarValue(replaceBuilder, jw, kv.getValue(), extendedStringLimit);
813+
serializeScalarValue(replaceBuilder, jw, kv.getValue(), extendedStringLimit, supportsNonStringValues);
814814
}
815815
}
816816
jw.writeByte(OBJECT_END);
@@ -851,21 +851,29 @@ private static void serialize(Labels labels, StringBuilder replaceBuilder, JsonW
851851
}
852852
writeStringValue(sanitizeLabelKey(labels.getKey(i), replaceBuilder), replaceBuilder, jw);
853853
jw.writeByte(JsonWriter.SEMI);
854-
serializeScalarValue(replaceBuilder, jw, labels.getValue(i), false);
854+
serializeScalarValue(replaceBuilder, jw, labels.getValue(i), false, false);
855855
}
856856
}
857857

858-
private static void serializeScalarValue(StringBuilder replaceBuilder, JsonWriter jw, Object value, boolean extendedStringLimit) {
858+
private static void serializeScalarValue(StringBuilder replaceBuilder, JsonWriter jw, Object value, boolean extendedStringLimit, boolean supportsNonStringValues) {
859859
if (value instanceof String) {
860860
if (extendedStringLimit) {
861861
writeLongStringValue((String) value, replaceBuilder, jw);
862862
} else {
863863
writeStringValue((String) value, replaceBuilder, jw);
864864
}
865865
} else if (value instanceof Number) {
866-
NumberConverter.serialize(((Number) value).doubleValue(), jw);
866+
if (supportsNonStringValues) {
867+
NumberConverter.serialize(((Number) value).doubleValue(), jw);
868+
} else {
869+
jw.writeNull();
870+
}
867871
} else if (value instanceof Boolean) {
868-
BoolConverter.serialize((Boolean) value, jw);
872+
if (supportsNonStringValues) {
873+
BoolConverter.serialize((Boolean) value, jw);
874+
} else {
875+
jw.writeNull();
876+
}
869877
} else {
870878
// can't happen, as AbstractContext enforces the values to be either String, Number or boolean
871879
jw.writeString("invalid value");

0 commit comments

Comments
 (0)