Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,30 @@

package org.elasticsearch.gradle.testclusters;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import com.sun.net.httpserver.HttpServer;

import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.stream.Streams;
import org.gradle.api.logging.Logger;
import org.gradle.api.logging.Logging;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import javax.annotation.concurrent.NotThreadSafe;

/**
* This is a server which just accepts lines of JSON code and if the JSON
Expand All @@ -32,102 +44,127 @@
* <p>
* The HTTP server used is the JDK embedded com.sun.net.httpserver
*/
@NotThreadSafe
public class MockApmServer {
private static final Logger logger = Logging.getLogger(MockApmServer.class);
private int port;
private static final org.slf4j.Logger log = LoggerFactory.getLogger(MockApmServer.class);
private final Pattern metricFilter;
private final Pattern transactionFilter;
private final Pattern transactionExcludesFilter;

public MockApmServer(int port) {
this.port = port;
}
private HttpServer instance;

/**
* Simple main that starts a mock APM server and prints the port it is
* running on. This is not needed
* for testing, it is just a convenient template for trying things out
* if you want play around.
*/
public static void main(String[] args) throws IOException, InterruptedException {
MockApmServer server = new MockApmServer(9999);
server.start();
public MockApmServer(String metricFilter, String transactionFilter, String transactionExcludesFilter) {
this.metricFilter = createWildcardPattern(metricFilter);
this.transactionFilter = createWildcardPattern(transactionFilter);
this.transactionExcludesFilter = createWildcardPattern(transactionExcludesFilter);
}

private static volatile HttpServer instance;
private Pattern createWildcardPattern(String filter) {
if (filter == null || filter.isEmpty()) {
return null;
}
var pattern = Arrays.stream(filter.split(",\\s*"))
.map(Pattern::quote)
.map(s -> s.replace("*", "\\E.*\\Q"))
.collect(Collectors.joining(")|(", "(", ")"));
return Pattern.compile(pattern);
}

/**
* Start the Mock APM server. Just returns empty JSON structures for every incoming message
*
* @return - the port the Mock APM server started on
* @throws IOException
*/
public synchronized int start() throws IOException {
public void start() throws IOException {
if (instance != null) {
String hostname = instance.getAddress().getHostName();
int port = instance.getAddress().getPort();
logger.lifecycle("MockApmServer is already running. Reusing on address:port " + hostname + ":" + port);
return port;
throw new IllegalStateException("MockApmServer already started");
}
InetSocketAddress addr = new InetSocketAddress("0.0.0.0", port);
InetSocketAddress addr = new InetSocketAddress("0.0.0.0", 0);
HttpServer server = HttpServer.create(addr, 10);
server.createContext("/exit", new ExitHandler());
server.createContext("/", new RootHandler());

server.start();
instance = server;
logger.lifecycle("MockApmServer started on port " + server.getAddress().getPort());
return server.getAddress().getPort();
}

public int getPort() {
return port;
if (instance == null) {
throw new IllegalStateException("MockApmServer not started");
}
return instance.getAddress().getPort();
}

/**
* Stop the server gracefully if possible
*/
public synchronized void stop() {
logger.lifecycle("stopping apm server");
instance.stop(1);
instance = null;
public void stop() {
if (instance != null) {
logger.lifecycle("stopping apm server");
instance.stop(1);
instance = null;
}
}

class RootHandler implements HttpHandler {
public void handle(HttpExchange t) {
try {
InputStream body = t.getRequestBody();
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
byte[] buffer = new byte[8 * 1024];
int lengthRead;
while ((lengthRead = body.read(buffer)) > 0) {
bytes.write(buffer, 0, lengthRead);
if (metricFilter == null && transactionFilter == null) {
logRequestBody(body);
} else {
logFiltered(body);
}
logger.lifecycle(("MockApmServer reading JSON objects: " + bytes.toString()));

String response = "{}";
t.sendResponseHeaders(200, response.length());
OutputStream os = t.getResponseBody();
os.write(response.getBytes());
os.close();
try (OutputStream os = t.getResponseBody()) {
os.write(response.getBytes());
}
} catch (Exception e) {
e.printStackTrace();
}
}
}

static class ExitHandler implements HttpHandler {
private static final int STOP_TIME = 3;
private void logRequestBody(InputStream body) throws IOException {
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
IOUtils.copy(body, bytes);
logger.lifecycle(("MockApmServer reading JSON objects: " + bytes.toString()));
}

public void handle(HttpExchange t) {
try {
InputStream body = t.getRequestBody();
String response = "{}";
t.sendResponseHeaders(200, response.length());
OutputStream os = t.getResponseBody();
os.write(response.getBytes());
os.close();
instance.stop(STOP_TIME);
instance = null;
} catch (Exception e) {
e.printStackTrace();
private void logFiltered(InputStream body) throws IOException {
ObjectMapper mapper = new ObjectMapper();
try (BufferedReader reader = new BufferedReader(new InputStreamReader(body))) {
String line;
String tier = null;
String node = null;

while ((line = reader.readLine()) != null) {
var jsonNode = mapper.readTree(line);

if (jsonNode.has("metadata")) {
node = jsonNode.path("metadata").path("service").path("node").path("configured_name").asText(null);
tier = jsonNode.path("metadata").path("labels").path("node_tier").asText(null);
} else if (transactionFilter != null && jsonNode.has("transaction")) {
var transaction = jsonNode.get("transaction");
var name = transaction.get("name").asText();
if (transactionFilter.matcher(name).matches()
&& (transactionExcludesFilter == null || transactionExcludesFilter.matcher(name).matches() == false)) {
logger.lifecycle("Transaction [{}/{}]: {}", node, tier, transaction);
}
} else if (metricFilter != null && jsonNode.has("metricset")) {
var metricset = jsonNode.get("metricset");
var samples = (ObjectNode) metricset.get("samples");
for (var name : Streams.of(samples.fieldNames()).toList()) {
if (metricFilter.matcher(name).matches() == false) {
samples.remove(name);
}
}
if (samples.isEmpty() == false) {
logger.lifecycle("Metricset [{}/{}]", node, tier, metricset);
}
}
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ public abstract class RunTask extends DefaultTestClustersTask {

private Boolean apmServerEnabled = false;

private String apmServerMetrics = null;

private String apmServerTransactions = null;

private String apmServerTransactionsExcludes = null;

private List<String> plugins;

private Boolean preserveData = false;
Expand Down Expand Up @@ -99,11 +105,44 @@ public Boolean getApmServerEnabled() {
return apmServerEnabled;
}

@Input
@Optional
public String getApmServerMetrics() {
return apmServerMetrics;
}

@Input
@Optional
public String getApmServerTransactions() {
return apmServerTransactions;
}

@Input
@Optional
public String getApmServerTransactionsExcludes() {
return apmServerTransactionsExcludes;
}

@Option(option = "with-apm-server", description = "Run simple logging http server to accept apm requests")
public void setApmServerEnabled(Boolean apmServerEnabled) {
this.apmServerEnabled = apmServerEnabled;
}

@Option(option = "apm-metrics", description = "Metric wildcard filter for APM server")
public void setApmServerMetrics(String apmServerMetrics) {
this.apmServerMetrics = apmServerMetrics;
}

@Option(option = "apm-transactions", description = "Transaction wildcard filter for APM server")
public void setApmServerTransactions(String apmServerTransactions) {
this.apmServerTransactions = apmServerTransactions;
}

@Option(option = "apm-transactions-excludes", description = "Transaction wildcard filter for APM server")
public void setApmServerTransactionsExcludes(String apmServerTransactionsExcludes) {
this.apmServerTransactionsExcludes = apmServerTransactionsExcludes;
}

@Option(option = "with-plugins", description = "Run distribution with plugins installed")
public void setPlugins(String plugins) {
this.plugins = Arrays.asList(plugins.split(","));
Expand Down Expand Up @@ -204,6 +243,15 @@ public void beforeStart() {
getDataPath = n -> dataDir.resolve(n.getName());
}

if (apmServerEnabled) {
try {
mockServer = new MockApmServer(apmServerMetrics, apmServerTransactions, apmServerTransactionsExcludes);
mockServer.start();
} catch (IOException e) {
throw new GradleException("Unable to start APM server: " + e.getMessage(), e);
}
}

for (ElasticsearchCluster cluster : getClusters()) {
cluster.setPreserveDataDir(preserveData);
for (ElasticsearchNode node : cluster.getNodes()) {
Expand Down Expand Up @@ -232,19 +280,12 @@ public void beforeStart() {
node.setting("xpack.security.transport.ssl.keystore.path", "transport.keystore");
node.setting("xpack.security.transport.ssl.certificate_authorities", "transport.ca");
}

if (apmServerEnabled) {
mockServer = new MockApmServer(9999);
try {
mockServer.start();
node.setting("telemetry.metrics.enabled", "true");
node.setting("telemetry.tracing.enabled", "true");
node.setting("telemetry.agent.transaction_sample_rate", "0.10");
node.setting("telemetry.agent.metrics_interval", "10s");
node.setting("telemetry.agent.server_url", "http://127.0.0.1:" + mockServer.getPort());
} catch (IOException e) {
logger.warn("Unable to start APM server", e);
}
if (mockServer != null) {
node.setting("telemetry.metrics.enabled", "true");
node.setting("telemetry.tracing.enabled", "true");
node.setting("telemetry.agent.transaction_sample_rate", "1.0");
node.setting("telemetry.agent.metrics_interval", "10s");
node.setting("telemetry.agent.server_url", "http://127.0.0.1:" + mockServer.getPort());
}
// in serverless metrics are enabled by default
// if metrics were not enabled explicitly for gradlew run we should disable them
Expand Down
15 changes: 12 additions & 3 deletions modules/apm/METERING.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,20 @@ of value that was reported during the metric event

## Development

### Mock http server
### Local mock APM server

The quickest way to verify that your metrics are working is to run `./gradlew run --with-apm-server`.
This will run ES node (or nodes in serverless) and also start a mock http server that will act
as an apm server. This fake http server will log all the http messages it receives from apm-agent
This will run an ES node (or nodes in serverless) and also start a mock APM server that logs all messages it receives from apm-agent.

To verify specific metrics or transactions, you can filter the output using one or several of the options below. Each of these takes a comma-separated list of wildcard expressions.
- `-apm-metrics`
- `-apm-transactions`
- `-apm-transactions-excludes`

For example:
```
./gradlew run -Dtests.es.logger.level=WARN --with-apm-server --apm-metric="es.rest.requests.total", --apm-transactions="cluster:monitor/*" --apm-transactions-excludes="cluster:monitor/xpack/*"```
```

### With APM server in cloud
You can also run local ES node with an apm server in cloud.
Expand Down