diff --git a/build-tools/src/main/java/org/elasticsearch/gradle/testclusters/MockApmServer.java b/build-tools/src/main/java/org/elasticsearch/gradle/testclusters/MockApmServer.java index b64f372364f22..251a436757dd1 100644 --- a/build-tools/src/main/java/org/elasticsearch/gradle/testclusters/MockApmServer.java +++ b/build-tools/src/main/java/org/elasticsearch/gradle/testclusters/MockApmServer.java @@ -9,18 +9,30 @@ package org.elasticsearch.gradle.testclusters; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpHandler; import com.sun.net.httpserver.HttpServer; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.stream.Streams; import org.gradle.api.logging.Logger; import org.gradle.api.logging.Logging; +import org.slf4j.LoggerFactory; +import java.io.BufferedReader; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; +import java.io.InputStreamReader; import java.io.OutputStream; import java.net.InetSocketAddress; +import java.util.Arrays; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import javax.annotation.concurrent.NotThreadSafe; /** * This is a server which just accepts lines of JSON code and if the JSON @@ -32,102 +44,127 @@ *

* The HTTP server used is the JDK embedded com.sun.net.httpserver */ +@NotThreadSafe public class MockApmServer { private static final Logger logger = Logging.getLogger(MockApmServer.class); - private int port; + private static final org.slf4j.Logger log = LoggerFactory.getLogger(MockApmServer.class); + private final Pattern metricFilter; + private final Pattern transactionFilter; + private final Pattern transactionExcludesFilter; - public MockApmServer(int port) { - this.port = port; - } + private HttpServer instance; - /** - * Simple main that starts a mock APM server and prints the port it is - * running on. This is not needed - * for testing, it is just a convenient template for trying things out - * if you want play around. - */ - public static void main(String[] args) throws IOException, InterruptedException { - MockApmServer server = new MockApmServer(9999); - server.start(); + public MockApmServer(String metricFilter, String transactionFilter, String transactionExcludesFilter) { + this.metricFilter = createWildcardPattern(metricFilter); + this.transactionFilter = createWildcardPattern(transactionFilter); + this.transactionExcludesFilter = createWildcardPattern(transactionExcludesFilter); } - private static volatile HttpServer instance; + private Pattern createWildcardPattern(String filter) { + if (filter == null || filter.isEmpty()) { + return null; + } + var pattern = Arrays.stream(filter.split(",\\s*")) + .map(Pattern::quote) + .map(s -> s.replace("*", "\\E.*\\Q")) + .collect(Collectors.joining(")|(", "(", ")")); + return Pattern.compile(pattern); + } /** * Start the Mock APM server. Just returns empty JSON structures for every incoming message * - * @return - the port the Mock APM server started on * @throws IOException */ - public synchronized int start() throws IOException { + public void start() throws IOException { if (instance != null) { - String hostname = instance.getAddress().getHostName(); - int port = instance.getAddress().getPort(); - logger.lifecycle("MockApmServer is already running. Reusing on address:port " + hostname + ":" + port); - return port; + throw new IllegalStateException("MockApmServer already started"); } - InetSocketAddress addr = new InetSocketAddress("0.0.0.0", port); + InetSocketAddress addr = new InetSocketAddress("0.0.0.0", 0); HttpServer server = HttpServer.create(addr, 10); - server.createContext("/exit", new ExitHandler()); server.createContext("/", new RootHandler()); - server.start(); instance = server; logger.lifecycle("MockApmServer started on port " + server.getAddress().getPort()); - return server.getAddress().getPort(); } public int getPort() { - return port; + if (instance == null) { + throw new IllegalStateException("MockApmServer not started"); + } + return instance.getAddress().getPort(); } /** * Stop the server gracefully if possible */ - public synchronized void stop() { - logger.lifecycle("stopping apm server"); - instance.stop(1); - instance = null; + public void stop() { + if (instance != null) { + logger.lifecycle("stopping apm server"); + instance.stop(1); + instance = null; + } } class RootHandler implements HttpHandler { public void handle(HttpExchange t) { try { InputStream body = t.getRequestBody(); - ByteArrayOutputStream bytes = new ByteArrayOutputStream(); - byte[] buffer = new byte[8 * 1024]; - int lengthRead; - while ((lengthRead = body.read(buffer)) > 0) { - bytes.write(buffer, 0, lengthRead); + if (metricFilter == null && transactionFilter == null) { + logRequestBody(body); + } else { + logFiltered(body); } - logger.lifecycle(("MockApmServer reading JSON objects: " + bytes.toString())); String response = "{}"; t.sendResponseHeaders(200, response.length()); - OutputStream os = t.getResponseBody(); - os.write(response.getBytes()); - os.close(); + try (OutputStream os = t.getResponseBody()) { + os.write(response.getBytes()); + } } catch (Exception e) { e.printStackTrace(); } } - } - static class ExitHandler implements HttpHandler { - private static final int STOP_TIME = 3; + private void logRequestBody(InputStream body) throws IOException { + ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + IOUtils.copy(body, bytes); + logger.lifecycle(("MockApmServer reading JSON objects: " + bytes.toString())); + } - public void handle(HttpExchange t) { - try { - InputStream body = t.getRequestBody(); - String response = "{}"; - t.sendResponseHeaders(200, response.length()); - OutputStream os = t.getResponseBody(); - os.write(response.getBytes()); - os.close(); - instance.stop(STOP_TIME); - instance = null; - } catch (Exception e) { - e.printStackTrace(); + private void logFiltered(InputStream body) throws IOException { + ObjectMapper mapper = new ObjectMapper(); + try (BufferedReader reader = new BufferedReader(new InputStreamReader(body))) { + String line; + String tier = null; + String node = null; + + while ((line = reader.readLine()) != null) { + var jsonNode = mapper.readTree(line); + + if (jsonNode.has("metadata")) { + node = jsonNode.path("metadata").path("service").path("node").path("configured_name").asText(null); + tier = jsonNode.path("metadata").path("labels").path("node_tier").asText(null); + } else if (transactionFilter != null && jsonNode.has("transaction")) { + var transaction = jsonNode.get("transaction"); + var name = transaction.get("name").asText(); + if (transactionFilter.matcher(name).matches() + && (transactionExcludesFilter == null || transactionExcludesFilter.matcher(name).matches() == false)) { + logger.lifecycle("Transaction [{}/{}]: {}", node, tier, transaction); + } + } else if (metricFilter != null && jsonNode.has("metricset")) { + var metricset = jsonNode.get("metricset"); + var samples = (ObjectNode) metricset.get("samples"); + for (var name : Streams.of(samples.fieldNames()).toList()) { + if (metricFilter.matcher(name).matches() == false) { + samples.remove(name); + } + } + if (samples.isEmpty() == false) { + logger.lifecycle("Metricset [{}/{}]", node, tier, metricset); + } + } + } } } } diff --git a/build-tools/src/main/java/org/elasticsearch/gradle/testclusters/RunTask.java b/build-tools/src/main/java/org/elasticsearch/gradle/testclusters/RunTask.java index 0d981e8e5d17c..725de1ac9448c 100644 --- a/build-tools/src/main/java/org/elasticsearch/gradle/testclusters/RunTask.java +++ b/build-tools/src/main/java/org/elasticsearch/gradle/testclusters/RunTask.java @@ -46,6 +46,12 @@ public abstract class RunTask extends DefaultTestClustersTask { private Boolean apmServerEnabled = false; + private String apmServerMetrics = null; + + private String apmServerTransactions = null; + + private String apmServerTransactionsExcludes = null; + private List plugins; private Boolean preserveData = false; @@ -99,11 +105,44 @@ public Boolean getApmServerEnabled() { return apmServerEnabled; } + @Input + @Optional + public String getApmServerMetrics() { + return apmServerMetrics; + } + + @Input + @Optional + public String getApmServerTransactions() { + return apmServerTransactions; + } + + @Input + @Optional + public String getApmServerTransactionsExcludes() { + return apmServerTransactionsExcludes; + } + @Option(option = "with-apm-server", description = "Run simple logging http server to accept apm requests") public void setApmServerEnabled(Boolean apmServerEnabled) { this.apmServerEnabled = apmServerEnabled; } + @Option(option = "apm-metrics", description = "Metric wildcard filter for APM server") + public void setApmServerMetrics(String apmServerMetrics) { + this.apmServerMetrics = apmServerMetrics; + } + + @Option(option = "apm-transactions", description = "Transaction wildcard filter for APM server") + public void setApmServerTransactions(String apmServerTransactions) { + this.apmServerTransactions = apmServerTransactions; + } + + @Option(option = "apm-transactions-excludes", description = "Transaction wildcard filter for APM server") + public void setApmServerTransactionsExcludes(String apmServerTransactionsExcludes) { + this.apmServerTransactionsExcludes = apmServerTransactionsExcludes; + } + @Option(option = "with-plugins", description = "Run distribution with plugins installed") public void setPlugins(String plugins) { this.plugins = Arrays.asList(plugins.split(",")); @@ -204,6 +243,15 @@ public void beforeStart() { getDataPath = n -> dataDir.resolve(n.getName()); } + if (apmServerEnabled) { + try { + mockServer = new MockApmServer(apmServerMetrics, apmServerTransactions, apmServerTransactionsExcludes); + mockServer.start(); + } catch (IOException e) { + throw new GradleException("Unable to start APM server: " + e.getMessage(), e); + } + } + for (ElasticsearchCluster cluster : getClusters()) { cluster.setPreserveDataDir(preserveData); for (ElasticsearchNode node : cluster.getNodes()) { @@ -232,19 +280,12 @@ public void beforeStart() { node.setting("xpack.security.transport.ssl.keystore.path", "transport.keystore"); node.setting("xpack.security.transport.ssl.certificate_authorities", "transport.ca"); } - - if (apmServerEnabled) { - mockServer = new MockApmServer(9999); - try { - mockServer.start(); - node.setting("telemetry.metrics.enabled", "true"); - node.setting("telemetry.tracing.enabled", "true"); - node.setting("telemetry.agent.transaction_sample_rate", "0.10"); - node.setting("telemetry.agent.metrics_interval", "10s"); - node.setting("telemetry.agent.server_url", "http://127.0.0.1:" + mockServer.getPort()); - } catch (IOException e) { - logger.warn("Unable to start APM server", e); - } + if (mockServer != null) { + node.setting("telemetry.metrics.enabled", "true"); + node.setting("telemetry.tracing.enabled", "true"); + node.setting("telemetry.agent.transaction_sample_rate", "1.0"); + node.setting("telemetry.agent.metrics_interval", "10s"); + node.setting("telemetry.agent.server_url", "http://127.0.0.1:" + mockServer.getPort()); } // in serverless metrics are enabled by default // if metrics were not enabled explicitly for gradlew run we should disable them diff --git a/modules/apm/METERING.md b/modules/apm/METERING.md index 427168e6b183e..36081081f4f5d 100644 --- a/modules/apm/METERING.md +++ b/modules/apm/METERING.md @@ -87,11 +87,20 @@ of value that was reported during the metric event ## Development -### Mock http server +### Local mock APM server The quickest way to verify that your metrics are working is to run `./gradlew run --with-apm-server`. -This will run ES node (or nodes in serverless) and also start a mock http server that will act -as an apm server. This fake http server will log all the http messages it receives from apm-agent +This will run an ES node (or nodes in serverless) and also start a mock APM server that logs all messages it receives from apm-agent. + +To verify specific metrics or transactions, you can filter the output using one or several of the options below. Each of these takes a comma-separated list of wildcard expressions. +- `-apm-metrics` +- `-apm-transactions` +- `-apm-transactions-excludes` + +For example: +``` +./gradlew run -Dtests.es.logger.level=WARN --with-apm-server --apm-metric="es.rest.requests.total", --apm-transactions="cluster:monitor/*" --apm-transactions-excludes="cluster:monitor/xpack/*"``` +``` ### With APM server in cloud You can also run local ES node with an apm server in cloud.