From e8ac2a56a082db43c7f55f6ba052b06fdb67011b Mon Sep 17 00:00:00 2001 From: Moritz Mack Date: Fri, 4 Jul 2025 09:46:34 +0200 Subject: [PATCH] Improved usability of the mock APM server for local testing. To verify specific metrics or transactions, you can now filter the output using one or several of the options below. Each of these takes a comma-separated list of wildcard expressions. - `-apm-metrics` - `-apm-transactions` - `-apm-transactions-excludes` Relates to ES-10969 --- .../gradle/testclusters/MockApmServer.java | 143 +++++++++++------- .../gradle/testclusters/RunTask.java | 67 ++++++-- modules/apm/METERING.md | 15 +- 3 files changed, 156 insertions(+), 69 deletions(-) diff --git a/build-tools/src/main/java/org/elasticsearch/gradle/testclusters/MockApmServer.java b/build-tools/src/main/java/org/elasticsearch/gradle/testclusters/MockApmServer.java index b64f372364f22..251a436757dd1 100644 --- a/build-tools/src/main/java/org/elasticsearch/gradle/testclusters/MockApmServer.java +++ b/build-tools/src/main/java/org/elasticsearch/gradle/testclusters/MockApmServer.java @@ -9,18 +9,30 @@ package org.elasticsearch.gradle.testclusters; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpHandler; import com.sun.net.httpserver.HttpServer; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.stream.Streams; import org.gradle.api.logging.Logger; import org.gradle.api.logging.Logging; +import org.slf4j.LoggerFactory; +import java.io.BufferedReader; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; +import java.io.InputStreamReader; import java.io.OutputStream; import java.net.InetSocketAddress; +import java.util.Arrays; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import javax.annotation.concurrent.NotThreadSafe; /** * This is a server which just accepts lines of JSON code and if the JSON @@ -32,102 +44,127 @@ *

* The HTTP server used is the JDK embedded com.sun.net.httpserver */ +@NotThreadSafe public class MockApmServer { private static final Logger logger = Logging.getLogger(MockApmServer.class); - private int port; + private static final org.slf4j.Logger log = LoggerFactory.getLogger(MockApmServer.class); + private final Pattern metricFilter; + private final Pattern transactionFilter; + private final Pattern transactionExcludesFilter; - public MockApmServer(int port) { - this.port = port; - } + private HttpServer instance; - /** - * Simple main that starts a mock APM server and prints the port it is - * running on. This is not needed - * for testing, it is just a convenient template for trying things out - * if you want play around. - */ - public static void main(String[] args) throws IOException, InterruptedException { - MockApmServer server = new MockApmServer(9999); - server.start(); + public MockApmServer(String metricFilter, String transactionFilter, String transactionExcludesFilter) { + this.metricFilter = createWildcardPattern(metricFilter); + this.transactionFilter = createWildcardPattern(transactionFilter); + this.transactionExcludesFilter = createWildcardPattern(transactionExcludesFilter); } - private static volatile HttpServer instance; + private Pattern createWildcardPattern(String filter) { + if (filter == null || filter.isEmpty()) { + return null; + } + var pattern = Arrays.stream(filter.split(",\\s*")) + .map(Pattern::quote) + .map(s -> s.replace("*", "\\E.*\\Q")) + .collect(Collectors.joining(")|(", "(", ")")); + return Pattern.compile(pattern); + } /** * Start the Mock APM server. Just returns empty JSON structures for every incoming message * - * @return - the port the Mock APM server started on * @throws IOException */ - public synchronized int start() throws IOException { + public void start() throws IOException { if (instance != null) { - String hostname = instance.getAddress().getHostName(); - int port = instance.getAddress().getPort(); - logger.lifecycle("MockApmServer is already running. Reusing on address:port " + hostname + ":" + port); - return port; + throw new IllegalStateException("MockApmServer already started"); } - InetSocketAddress addr = new InetSocketAddress("0.0.0.0", port); + InetSocketAddress addr = new InetSocketAddress("0.0.0.0", 0); HttpServer server = HttpServer.create(addr, 10); - server.createContext("/exit", new ExitHandler()); server.createContext("/", new RootHandler()); - server.start(); instance = server; logger.lifecycle("MockApmServer started on port " + server.getAddress().getPort()); - return server.getAddress().getPort(); } public int getPort() { - return port; + if (instance == null) { + throw new IllegalStateException("MockApmServer not started"); + } + return instance.getAddress().getPort(); } /** * Stop the server gracefully if possible */ - public synchronized void stop() { - logger.lifecycle("stopping apm server"); - instance.stop(1); - instance = null; + public void stop() { + if (instance != null) { + logger.lifecycle("stopping apm server"); + instance.stop(1); + instance = null; + } } class RootHandler implements HttpHandler { public void handle(HttpExchange t) { try { InputStream body = t.getRequestBody(); - ByteArrayOutputStream bytes = new ByteArrayOutputStream(); - byte[] buffer = new byte[8 * 1024]; - int lengthRead; - while ((lengthRead = body.read(buffer)) > 0) { - bytes.write(buffer, 0, lengthRead); + if (metricFilter == null && transactionFilter == null) { + logRequestBody(body); + } else { + logFiltered(body); } - logger.lifecycle(("MockApmServer reading JSON objects: " + bytes.toString())); String response = "{}"; t.sendResponseHeaders(200, response.length()); - OutputStream os = t.getResponseBody(); - os.write(response.getBytes()); - os.close(); + try (OutputStream os = t.getResponseBody()) { + os.write(response.getBytes()); + } } catch (Exception e) { e.printStackTrace(); } } - } - static class ExitHandler implements HttpHandler { - private static final int STOP_TIME = 3; + private void logRequestBody(InputStream body) throws IOException { + ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + IOUtils.copy(body, bytes); + logger.lifecycle(("MockApmServer reading JSON objects: " + bytes.toString())); + } - public void handle(HttpExchange t) { - try { - InputStream body = t.getRequestBody(); - String response = "{}"; - t.sendResponseHeaders(200, response.length()); - OutputStream os = t.getResponseBody(); - os.write(response.getBytes()); - os.close(); - instance.stop(STOP_TIME); - instance = null; - } catch (Exception e) { - e.printStackTrace(); + private void logFiltered(InputStream body) throws IOException { + ObjectMapper mapper = new ObjectMapper(); + try (BufferedReader reader = new BufferedReader(new InputStreamReader(body))) { + String line; + String tier = null; + String node = null; + + while ((line = reader.readLine()) != null) { + var jsonNode = mapper.readTree(line); + + if (jsonNode.has("metadata")) { + node = jsonNode.path("metadata").path("service").path("node").path("configured_name").asText(null); + tier = jsonNode.path("metadata").path("labels").path("node_tier").asText(null); + } else if (transactionFilter != null && jsonNode.has("transaction")) { + var transaction = jsonNode.get("transaction"); + var name = transaction.get("name").asText(); + if (transactionFilter.matcher(name).matches() + && (transactionExcludesFilter == null || transactionExcludesFilter.matcher(name).matches() == false)) { + logger.lifecycle("Transaction [{}/{}]: {}", node, tier, transaction); + } + } else if (metricFilter != null && jsonNode.has("metricset")) { + var metricset = jsonNode.get("metricset"); + var samples = (ObjectNode) metricset.get("samples"); + for (var name : Streams.of(samples.fieldNames()).toList()) { + if (metricFilter.matcher(name).matches() == false) { + samples.remove(name); + } + } + if (samples.isEmpty() == false) { + logger.lifecycle("Metricset [{}/{}]", node, tier, metricset); + } + } + } } } } diff --git a/build-tools/src/main/java/org/elasticsearch/gradle/testclusters/RunTask.java b/build-tools/src/main/java/org/elasticsearch/gradle/testclusters/RunTask.java index 0d981e8e5d17c..725de1ac9448c 100644 --- a/build-tools/src/main/java/org/elasticsearch/gradle/testclusters/RunTask.java +++ b/build-tools/src/main/java/org/elasticsearch/gradle/testclusters/RunTask.java @@ -46,6 +46,12 @@ public abstract class RunTask extends DefaultTestClustersTask { private Boolean apmServerEnabled = false; + private String apmServerMetrics = null; + + private String apmServerTransactions = null; + + private String apmServerTransactionsExcludes = null; + private List plugins; private Boolean preserveData = false; @@ -99,11 +105,44 @@ public Boolean getApmServerEnabled() { return apmServerEnabled; } + @Input + @Optional + public String getApmServerMetrics() { + return apmServerMetrics; + } + + @Input + @Optional + public String getApmServerTransactions() { + return apmServerTransactions; + } + + @Input + @Optional + public String getApmServerTransactionsExcludes() { + return apmServerTransactionsExcludes; + } + @Option(option = "with-apm-server", description = "Run simple logging http server to accept apm requests") public void setApmServerEnabled(Boolean apmServerEnabled) { this.apmServerEnabled = apmServerEnabled; } + @Option(option = "apm-metrics", description = "Metric wildcard filter for APM server") + public void setApmServerMetrics(String apmServerMetrics) { + this.apmServerMetrics = apmServerMetrics; + } + + @Option(option = "apm-transactions", description = "Transaction wildcard filter for APM server") + public void setApmServerTransactions(String apmServerTransactions) { + this.apmServerTransactions = apmServerTransactions; + } + + @Option(option = "apm-transactions-excludes", description = "Transaction wildcard filter for APM server") + public void setApmServerTransactionsExcludes(String apmServerTransactionsExcludes) { + this.apmServerTransactionsExcludes = apmServerTransactionsExcludes; + } + @Option(option = "with-plugins", description = "Run distribution with plugins installed") public void setPlugins(String plugins) { this.plugins = Arrays.asList(plugins.split(",")); @@ -204,6 +243,15 @@ public void beforeStart() { getDataPath = n -> dataDir.resolve(n.getName()); } + if (apmServerEnabled) { + try { + mockServer = new MockApmServer(apmServerMetrics, apmServerTransactions, apmServerTransactionsExcludes); + mockServer.start(); + } catch (IOException e) { + throw new GradleException("Unable to start APM server: " + e.getMessage(), e); + } + } + for (ElasticsearchCluster cluster : getClusters()) { cluster.setPreserveDataDir(preserveData); for (ElasticsearchNode node : cluster.getNodes()) { @@ -232,19 +280,12 @@ public void beforeStart() { node.setting("xpack.security.transport.ssl.keystore.path", "transport.keystore"); node.setting("xpack.security.transport.ssl.certificate_authorities", "transport.ca"); } - - if (apmServerEnabled) { - mockServer = new MockApmServer(9999); - try { - mockServer.start(); - node.setting("telemetry.metrics.enabled", "true"); - node.setting("telemetry.tracing.enabled", "true"); - node.setting("telemetry.agent.transaction_sample_rate", "0.10"); - node.setting("telemetry.agent.metrics_interval", "10s"); - node.setting("telemetry.agent.server_url", "http://127.0.0.1:" + mockServer.getPort()); - } catch (IOException e) { - logger.warn("Unable to start APM server", e); - } + if (mockServer != null) { + node.setting("telemetry.metrics.enabled", "true"); + node.setting("telemetry.tracing.enabled", "true"); + node.setting("telemetry.agent.transaction_sample_rate", "1.0"); + node.setting("telemetry.agent.metrics_interval", "10s"); + node.setting("telemetry.agent.server_url", "http://127.0.0.1:" + mockServer.getPort()); } // in serverless metrics are enabled by default // if metrics were not enabled explicitly for gradlew run we should disable them diff --git a/modules/apm/METERING.md b/modules/apm/METERING.md index 427168e6b183e..36081081f4f5d 100644 --- a/modules/apm/METERING.md +++ b/modules/apm/METERING.md @@ -87,11 +87,20 @@ of value that was reported during the metric event ## Development -### Mock http server +### Local mock APM server The quickest way to verify that your metrics are working is to run `./gradlew run --with-apm-server`. -This will run ES node (or nodes in serverless) and also start a mock http server that will act -as an apm server. This fake http server will log all the http messages it receives from apm-agent +This will run an ES node (or nodes in serverless) and also start a mock APM server that logs all messages it receives from apm-agent. + +To verify specific metrics or transactions, you can filter the output using one or several of the options below. Each of these takes a comma-separated list of wildcard expressions. +- `-apm-metrics` +- `-apm-transactions` +- `-apm-transactions-excludes` + +For example: +``` +./gradlew run -Dtests.es.logger.level=WARN --with-apm-server --apm-metric="es.rest.requests.total", --apm-transactions="cluster:monitor/*" --apm-transactions-excludes="cluster:monitor/xpack/*"``` +``` ### With APM server in cloud You can also run local ES node with an apm server in cloud.