Skip to content

Commit e8ac2a5

Browse files
committed
Improved usability of the mock APM server for local testing.
To verify specific metrics or transactions, you can now filter the output using one or several of the options below. Each of these takes a comma-separated list of wildcard expressions. - `-apm-metrics` - `-apm-transactions` - `-apm-transactions-excludes` Relates to ES-10969
1 parent bc02964 commit e8ac2a5

File tree

3 files changed

+156
-69
lines changed

3 files changed

+156
-69
lines changed

build-tools/src/main/java/org/elasticsearch/gradle/testclusters/MockApmServer.java

Lines changed: 90 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -9,18 +9,30 @@
99

1010
package org.elasticsearch.gradle.testclusters;
1111

12+
import com.fasterxml.jackson.databind.ObjectMapper;
13+
import com.fasterxml.jackson.databind.node.ObjectNode;
1214
import com.sun.net.httpserver.HttpExchange;
1315
import com.sun.net.httpserver.HttpHandler;
1416
import com.sun.net.httpserver.HttpServer;
1517

18+
import org.apache.commons.io.IOUtils;
19+
import org.apache.commons.lang3.stream.Streams;
1620
import org.gradle.api.logging.Logger;
1721
import org.gradle.api.logging.Logging;
22+
import org.slf4j.LoggerFactory;
1823

24+
import java.io.BufferedReader;
1925
import java.io.ByteArrayOutputStream;
2026
import java.io.IOException;
2127
import java.io.InputStream;
28+
import java.io.InputStreamReader;
2229
import java.io.OutputStream;
2330
import java.net.InetSocketAddress;
31+
import java.util.Arrays;
32+
import java.util.regex.Pattern;
33+
import java.util.stream.Collectors;
34+
35+
import javax.annotation.concurrent.NotThreadSafe;
2436

2537
/**
2638
* This is a server which just accepts lines of JSON code and if the JSON
@@ -32,102 +44,127 @@
3244
* <p>
3345
* The HTTP server used is the JDK embedded com.sun.net.httpserver
3446
*/
47+
@NotThreadSafe
3548
public class MockApmServer {
3649
private static final Logger logger = Logging.getLogger(MockApmServer.class);
37-
private int port;
50+
private static final org.slf4j.Logger log = LoggerFactory.getLogger(MockApmServer.class);
51+
private final Pattern metricFilter;
52+
private final Pattern transactionFilter;
53+
private final Pattern transactionExcludesFilter;
3854

39-
public MockApmServer(int port) {
40-
this.port = port;
41-
}
55+
private HttpServer instance;
4256

43-
/**
44-
* Simple main that starts a mock APM server and prints the port it is
45-
* running on. This is not needed
46-
* for testing, it is just a convenient template for trying things out
47-
* if you want play around.
48-
*/
49-
public static void main(String[] args) throws IOException, InterruptedException {
50-
MockApmServer server = new MockApmServer(9999);
51-
server.start();
57+
public MockApmServer(String metricFilter, String transactionFilter, String transactionExcludesFilter) {
58+
this.metricFilter = createWildcardPattern(metricFilter);
59+
this.transactionFilter = createWildcardPattern(transactionFilter);
60+
this.transactionExcludesFilter = createWildcardPattern(transactionExcludesFilter);
5261
}
5362

54-
private static volatile HttpServer instance;
63+
private Pattern createWildcardPattern(String filter) {
64+
if (filter == null || filter.isEmpty()) {
65+
return null;
66+
}
67+
var pattern = Arrays.stream(filter.split(",\\s*"))
68+
.map(Pattern::quote)
69+
.map(s -> s.replace("*", "\\E.*\\Q"))
70+
.collect(Collectors.joining(")|(", "(", ")"));
71+
return Pattern.compile(pattern);
72+
}
5573

5674
/**
5775
* Start the Mock APM server. Just returns empty JSON structures for every incoming message
5876
*
59-
* @return - the port the Mock APM server started on
6077
* @throws IOException
6178
*/
62-
public synchronized int start() throws IOException {
79+
public void start() throws IOException {
6380
if (instance != null) {
64-
String hostname = instance.getAddress().getHostName();
65-
int port = instance.getAddress().getPort();
66-
logger.lifecycle("MockApmServer is already running. Reusing on address:port " + hostname + ":" + port);
67-
return port;
81+
throw new IllegalStateException("MockApmServer already started");
6882
}
69-
InetSocketAddress addr = new InetSocketAddress("0.0.0.0", port);
83+
InetSocketAddress addr = new InetSocketAddress("0.0.0.0", 0);
7084
HttpServer server = HttpServer.create(addr, 10);
71-
server.createContext("/exit", new ExitHandler());
7285
server.createContext("/", new RootHandler());
73-
7486
server.start();
7587
instance = server;
7688
logger.lifecycle("MockApmServer started on port " + server.getAddress().getPort());
77-
return server.getAddress().getPort();
7889
}
7990

8091
public int getPort() {
81-
return port;
92+
if (instance == null) {
93+
throw new IllegalStateException("MockApmServer not started");
94+
}
95+
return instance.getAddress().getPort();
8296
}
8397

8498
/**
8599
* Stop the server gracefully if possible
86100
*/
87-
public synchronized void stop() {
88-
logger.lifecycle("stopping apm server");
89-
instance.stop(1);
90-
instance = null;
101+
public void stop() {
102+
if (instance != null) {
103+
logger.lifecycle("stopping apm server");
104+
instance.stop(1);
105+
instance = null;
106+
}
91107
}
92108

93109
class RootHandler implements HttpHandler {
94110
public void handle(HttpExchange t) {
95111
try {
96112
InputStream body = t.getRequestBody();
97-
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
98-
byte[] buffer = new byte[8 * 1024];
99-
int lengthRead;
100-
while ((lengthRead = body.read(buffer)) > 0) {
101-
bytes.write(buffer, 0, lengthRead);
113+
if (metricFilter == null && transactionFilter == null) {
114+
logRequestBody(body);
115+
} else {
116+
logFiltered(body);
102117
}
103-
logger.lifecycle(("MockApmServer reading JSON objects: " + bytes.toString()));
104118

105119
String response = "{}";
106120
t.sendResponseHeaders(200, response.length());
107-
OutputStream os = t.getResponseBody();
108-
os.write(response.getBytes());
109-
os.close();
121+
try (OutputStream os = t.getResponseBody()) {
122+
os.write(response.getBytes());
123+
}
110124
} catch (Exception e) {
111125
e.printStackTrace();
112126
}
113127
}
114-
}
115128

116-
static class ExitHandler implements HttpHandler {
117-
private static final int STOP_TIME = 3;
129+
private void logRequestBody(InputStream body) throws IOException {
130+
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
131+
IOUtils.copy(body, bytes);
132+
logger.lifecycle(("MockApmServer reading JSON objects: " + bytes.toString()));
133+
}
118134

119-
public void handle(HttpExchange t) {
120-
try {
121-
InputStream body = t.getRequestBody();
122-
String response = "{}";
123-
t.sendResponseHeaders(200, response.length());
124-
OutputStream os = t.getResponseBody();
125-
os.write(response.getBytes());
126-
os.close();
127-
instance.stop(STOP_TIME);
128-
instance = null;
129-
} catch (Exception e) {
130-
e.printStackTrace();
135+
private void logFiltered(InputStream body) throws IOException {
136+
ObjectMapper mapper = new ObjectMapper();
137+
try (BufferedReader reader = new BufferedReader(new InputStreamReader(body))) {
138+
String line;
139+
String tier = null;
140+
String node = null;
141+
142+
while ((line = reader.readLine()) != null) {
143+
var jsonNode = mapper.readTree(line);
144+
145+
if (jsonNode.has("metadata")) {
146+
node = jsonNode.path("metadata").path("service").path("node").path("configured_name").asText(null);
147+
tier = jsonNode.path("metadata").path("labels").path("node_tier").asText(null);
148+
} else if (transactionFilter != null && jsonNode.has("transaction")) {
149+
var transaction = jsonNode.get("transaction");
150+
var name = transaction.get("name").asText();
151+
if (transactionFilter.matcher(name).matches()
152+
&& (transactionExcludesFilter == null || transactionExcludesFilter.matcher(name).matches() == false)) {
153+
logger.lifecycle("Transaction [{}/{}]: {}", node, tier, transaction);
154+
}
155+
} else if (metricFilter != null && jsonNode.has("metricset")) {
156+
var metricset = jsonNode.get("metricset");
157+
var samples = (ObjectNode) metricset.get("samples");
158+
for (var name : Streams.of(samples.fieldNames()).toList()) {
159+
if (metricFilter.matcher(name).matches() == false) {
160+
samples.remove(name);
161+
}
162+
}
163+
if (samples.isEmpty() == false) {
164+
logger.lifecycle("Metricset [{}/{}]", node, tier, metricset);
165+
}
166+
}
167+
}
131168
}
132169
}
133170
}

build-tools/src/main/java/org/elasticsearch/gradle/testclusters/RunTask.java

Lines changed: 54 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,12 @@ public abstract class RunTask extends DefaultTestClustersTask {
4646

4747
private Boolean apmServerEnabled = false;
4848

49+
private String apmServerMetrics = null;
50+
51+
private String apmServerTransactions = null;
52+
53+
private String apmServerTransactionsExcludes = null;
54+
4955
private List<String> plugins;
5056

5157
private Boolean preserveData = false;
@@ -99,11 +105,44 @@ public Boolean getApmServerEnabled() {
99105
return apmServerEnabled;
100106
}
101107

108+
@Input
109+
@Optional
110+
public String getApmServerMetrics() {
111+
return apmServerMetrics;
112+
}
113+
114+
@Input
115+
@Optional
116+
public String getApmServerTransactions() {
117+
return apmServerTransactions;
118+
}
119+
120+
@Input
121+
@Optional
122+
public String getApmServerTransactionsExcludes() {
123+
return apmServerTransactionsExcludes;
124+
}
125+
102126
@Option(option = "with-apm-server", description = "Run simple logging http server to accept apm requests")
103127
public void setApmServerEnabled(Boolean apmServerEnabled) {
104128
this.apmServerEnabled = apmServerEnabled;
105129
}
106130

131+
@Option(option = "apm-metrics", description = "Metric wildcard filter for APM server")
132+
public void setApmServerMetrics(String apmServerMetrics) {
133+
this.apmServerMetrics = apmServerMetrics;
134+
}
135+
136+
@Option(option = "apm-transactions", description = "Transaction wildcard filter for APM server")
137+
public void setApmServerTransactions(String apmServerTransactions) {
138+
this.apmServerTransactions = apmServerTransactions;
139+
}
140+
141+
@Option(option = "apm-transactions-excludes", description = "Transaction wildcard filter for APM server")
142+
public void setApmServerTransactionsExcludes(String apmServerTransactionsExcludes) {
143+
this.apmServerTransactionsExcludes = apmServerTransactionsExcludes;
144+
}
145+
107146
@Option(option = "with-plugins", description = "Run distribution with plugins installed")
108147
public void setPlugins(String plugins) {
109148
this.plugins = Arrays.asList(plugins.split(","));
@@ -204,6 +243,15 @@ public void beforeStart() {
204243
getDataPath = n -> dataDir.resolve(n.getName());
205244
}
206245

246+
if (apmServerEnabled) {
247+
try {
248+
mockServer = new MockApmServer(apmServerMetrics, apmServerTransactions, apmServerTransactionsExcludes);
249+
mockServer.start();
250+
} catch (IOException e) {
251+
throw new GradleException("Unable to start APM server: " + e.getMessage(), e);
252+
}
253+
}
254+
207255
for (ElasticsearchCluster cluster : getClusters()) {
208256
cluster.setPreserveDataDir(preserveData);
209257
for (ElasticsearchNode node : cluster.getNodes()) {
@@ -232,19 +280,12 @@ public void beforeStart() {
232280
node.setting("xpack.security.transport.ssl.keystore.path", "transport.keystore");
233281
node.setting("xpack.security.transport.ssl.certificate_authorities", "transport.ca");
234282
}
235-
236-
if (apmServerEnabled) {
237-
mockServer = new MockApmServer(9999);
238-
try {
239-
mockServer.start();
240-
node.setting("telemetry.metrics.enabled", "true");
241-
node.setting("telemetry.tracing.enabled", "true");
242-
node.setting("telemetry.agent.transaction_sample_rate", "0.10");
243-
node.setting("telemetry.agent.metrics_interval", "10s");
244-
node.setting("telemetry.agent.server_url", "http://127.0.0.1:" + mockServer.getPort());
245-
} catch (IOException e) {
246-
logger.warn("Unable to start APM server", e);
247-
}
283+
if (mockServer != null) {
284+
node.setting("telemetry.metrics.enabled", "true");
285+
node.setting("telemetry.tracing.enabled", "true");
286+
node.setting("telemetry.agent.transaction_sample_rate", "1.0");
287+
node.setting("telemetry.agent.metrics_interval", "10s");
288+
node.setting("telemetry.agent.server_url", "http://127.0.0.1:" + mockServer.getPort());
248289
}
249290
// in serverless metrics are enabled by default
250291
// if metrics were not enabled explicitly for gradlew run we should disable them

modules/apm/METERING.md

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,11 +87,20 @@ of value that was reported during the metric event
8787

8888
## Development
8989

90-
### Mock http server
90+
### Local mock APM server
9191

9292
The quickest way to verify that your metrics are working is to run `./gradlew run --with-apm-server`.
93-
This will run ES node (or nodes in serverless) and also start a mock http server that will act
94-
as an apm server. This fake http server will log all the http messages it receives from apm-agent
93+
This will run an ES node (or nodes in serverless) and also start a mock APM server that logs all messages it receives from apm-agent.
94+
95+
To verify specific metrics or transactions, you can filter the output using one or several of the options below. Each of these takes a comma-separated list of wildcard expressions.
96+
- `-apm-metrics`
97+
- `-apm-transactions`
98+
- `-apm-transactions-excludes`
99+
100+
For example:
101+
```
102+
./gradlew run -Dtests.es.logger.level=WARN --with-apm-server --apm-metric="es.rest.requests.total", --apm-transactions="cluster:monitor/*" --apm-transactions-excludes="cluster:monitor/xpack/*"```
103+
```
95104

96105
### With APM server in cloud
97106
You can also run local ES node with an apm server in cloud.

0 commit comments

Comments
 (0)