| 
9 | 9 | 
 
  | 
10 | 10 | package org.elasticsearch.gradle.testclusters;  | 
11 | 11 | 
 
  | 
 | 12 | +import com.fasterxml.jackson.databind.ObjectMapper;  | 
 | 13 | +import com.fasterxml.jackson.databind.node.ObjectNode;  | 
12 | 14 | import com.sun.net.httpserver.HttpExchange;  | 
13 | 15 | import com.sun.net.httpserver.HttpHandler;  | 
14 | 16 | import com.sun.net.httpserver.HttpServer;  | 
15 | 17 | 
 
  | 
 | 18 | +import org.apache.commons.io.IOUtils;  | 
 | 19 | +import org.apache.commons.lang3.stream.Streams;  | 
16 | 20 | import org.gradle.api.logging.Logger;  | 
17 | 21 | import org.gradle.api.logging.Logging;  | 
 | 22 | +import org.slf4j.LoggerFactory;  | 
18 | 23 | 
 
  | 
 | 24 | +import java.io.BufferedReader;  | 
19 | 25 | import java.io.ByteArrayOutputStream;  | 
20 | 26 | import java.io.IOException;  | 
21 | 27 | import java.io.InputStream;  | 
 | 28 | +import java.io.InputStreamReader;  | 
22 | 29 | import java.io.OutputStream;  | 
23 | 30 | import java.net.InetSocketAddress;  | 
 | 31 | +import java.util.Arrays;  | 
 | 32 | +import java.util.regex.Pattern;  | 
 | 33 | +import java.util.stream.Collectors;  | 
 | 34 | + | 
 | 35 | +import javax.annotation.concurrent.NotThreadSafe;  | 
24 | 36 | 
 
  | 
25 | 37 | /**  | 
26 | 38 |  * This is a server which just accepts lines of JSON code and if the JSON  | 
 | 
32 | 44 |  * <p>  | 
33 | 45 |  * The HTTP server used is the JDK embedded com.sun.net.httpserver  | 
34 | 46 |  */  | 
 | 47 | +@NotThreadSafe  | 
35 | 48 | public class MockApmServer {  | 
36 | 49 |     private static final Logger logger = Logging.getLogger(MockApmServer.class);  | 
37 |  | -    private int port;  | 
 | 50 | +    private static final org.slf4j.Logger log = LoggerFactory.getLogger(MockApmServer.class);  | 
 | 51 | +    private final Pattern metricFilter;  | 
 | 52 | +    private final Pattern transactionFilter;  | 
 | 53 | +    private final Pattern transactionExcludesFilter;  | 
38 | 54 | 
 
  | 
39 |  | -    public MockApmServer(int port) {  | 
40 |  | -        this.port = port;  | 
41 |  | -    }  | 
 | 55 | +    private HttpServer instance;  | 
42 | 56 | 
 
  | 
43 |  | -    /**  | 
44 |  | -     * Simple main that starts a mock APM server and prints the port it is  | 
45 |  | -     * running on. This is not needed  | 
46 |  | -     * for testing, it is just a convenient template for trying things out  | 
47 |  | -     * if you want play around.  | 
48 |  | -     */  | 
49 |  | -    public static void main(String[] args) throws IOException, InterruptedException {  | 
50 |  | -        MockApmServer server = new MockApmServer(9999);  | 
51 |  | -        server.start();  | 
 | 57 | +    public MockApmServer(String metricFilter, String transactionFilter, String transactionExcludesFilter) {  | 
 | 58 | +        this.metricFilter = createWildcardPattern(metricFilter);  | 
 | 59 | +        this.transactionFilter = createWildcardPattern(transactionFilter);  | 
 | 60 | +        this.transactionExcludesFilter = createWildcardPattern(transactionExcludesFilter);  | 
52 | 61 |     }  | 
53 | 62 | 
 
  | 
54 |  | -    private static volatile HttpServer instance;  | 
 | 63 | +    private Pattern createWildcardPattern(String filter) {  | 
 | 64 | +        if (filter == null || filter.isEmpty()) {  | 
 | 65 | +            return null;  | 
 | 66 | +        }  | 
 | 67 | +        var pattern = Arrays.stream(filter.split(",\\s*"))  | 
 | 68 | +            .map(Pattern::quote)  | 
 | 69 | +            .map(s -> s.replace("*", "\\E.*\\Q"))  | 
 | 70 | +            .collect(Collectors.joining(")|(", "(", ")"));  | 
 | 71 | +        return Pattern.compile(pattern);  | 
 | 72 | +    }  | 
55 | 73 | 
 
  | 
56 | 74 |     /**  | 
57 | 75 |      * Start the Mock APM server. Just returns empty JSON structures for every incoming message  | 
58 | 76 |      *  | 
59 |  | -     * @return - the port the Mock APM server started on  | 
60 | 77 |      * @throws IOException  | 
61 | 78 |      */  | 
62 |  | -    public synchronized int start() throws IOException {  | 
 | 79 | +    public void start() throws IOException {  | 
63 | 80 |         if (instance != null) {  | 
64 |  | -            String hostname = instance.getAddress().getHostName();  | 
65 |  | -            int port = instance.getAddress().getPort();  | 
66 |  | -            logger.lifecycle("MockApmServer is already running. Reusing on address:port " + hostname + ":" + port);  | 
67 |  | -            return port;  | 
 | 81 | +            throw new IllegalStateException("MockApmServer already started");  | 
68 | 82 |         }  | 
69 |  | -        InetSocketAddress addr = new InetSocketAddress("0.0.0.0", port);  | 
 | 83 | +        InetSocketAddress addr = new InetSocketAddress("0.0.0.0", 0);  | 
70 | 84 |         HttpServer server = HttpServer.create(addr, 10);  | 
71 |  | -        server.createContext("/exit", new ExitHandler());  | 
72 | 85 |         server.createContext("/", new RootHandler());  | 
73 |  | - | 
74 | 86 |         server.start();  | 
75 | 87 |         instance = server;  | 
76 | 88 |         logger.lifecycle("MockApmServer started on port " + server.getAddress().getPort());  | 
77 |  | -        return server.getAddress().getPort();  | 
78 | 89 |     }  | 
79 | 90 | 
 
  | 
80 | 91 |     public int getPort() {  | 
81 |  | -        return port;  | 
 | 92 | +        if (instance == null) {  | 
 | 93 | +            throw new IllegalStateException("MockApmServer not started");  | 
 | 94 | +        }  | 
 | 95 | +        return instance.getAddress().getPort();  | 
82 | 96 |     }  | 
83 | 97 | 
 
  | 
84 | 98 |     /**  | 
85 | 99 |      * Stop the server gracefully if possible  | 
86 | 100 |      */  | 
87 |  | -    public synchronized void stop() {  | 
88 |  | -        logger.lifecycle("stopping apm server");  | 
89 |  | -        instance.stop(1);  | 
90 |  | -        instance = null;  | 
 | 101 | +    public void stop() {  | 
 | 102 | +        if (instance != null) {  | 
 | 103 | +            logger.lifecycle("stopping apm server");  | 
 | 104 | +            instance.stop(1);  | 
 | 105 | +            instance = null;  | 
 | 106 | +        }  | 
91 | 107 |     }  | 
92 | 108 | 
 
  | 
93 | 109 |     class RootHandler implements HttpHandler {  | 
94 | 110 |         public void handle(HttpExchange t) {  | 
95 | 111 |             try {  | 
96 | 112 |                 InputStream body = t.getRequestBody();  | 
97 |  | -                ByteArrayOutputStream bytes = new ByteArrayOutputStream();  | 
98 |  | -                byte[] buffer = new byte[8 * 1024];  | 
99 |  | -                int lengthRead;  | 
100 |  | -                while ((lengthRead = body.read(buffer)) > 0) {  | 
101 |  | -                    bytes.write(buffer, 0, lengthRead);  | 
 | 113 | +                if (metricFilter == null && transactionFilter == null) {  | 
 | 114 | +                    logRequestBody(body);  | 
 | 115 | +                } else {  | 
 | 116 | +                    logFiltered(body);  | 
102 | 117 |                 }  | 
103 |  | -                logger.lifecycle(("MockApmServer reading JSON objects: " + bytes.toString()));  | 
104 | 118 | 
 
  | 
105 | 119 |                 String response = "{}";  | 
106 | 120 |                 t.sendResponseHeaders(200, response.length());  | 
107 |  | -                OutputStream os = t.getResponseBody();  | 
108 |  | -                os.write(response.getBytes());  | 
109 |  | -                os.close();  | 
 | 121 | +                try (OutputStream os = t.getResponseBody()) {  | 
 | 122 | +                    os.write(response.getBytes());  | 
 | 123 | +                }  | 
110 | 124 |             } catch (Exception e) {  | 
111 | 125 |                 e.printStackTrace();  | 
112 | 126 |             }  | 
113 | 127 |         }  | 
114 |  | -    }  | 
115 | 128 | 
 
  | 
116 |  | -    static class ExitHandler implements HttpHandler {  | 
117 |  | -        private static final int STOP_TIME = 3;  | 
 | 129 | +        private void logRequestBody(InputStream body) throws IOException {  | 
 | 130 | +            ByteArrayOutputStream bytes = new ByteArrayOutputStream();  | 
 | 131 | +            IOUtils.copy(body, bytes);  | 
 | 132 | +            logger.lifecycle(("MockApmServer reading JSON objects: " + bytes.toString()));  | 
 | 133 | +        }  | 
118 | 134 | 
 
  | 
119 |  | -        public void handle(HttpExchange t) {  | 
120 |  | -            try {  | 
121 |  | -                InputStream body = t.getRequestBody();  | 
122 |  | -                String response = "{}";  | 
123 |  | -                t.sendResponseHeaders(200, response.length());  | 
124 |  | -                OutputStream os = t.getResponseBody();  | 
125 |  | -                os.write(response.getBytes());  | 
126 |  | -                os.close();  | 
127 |  | -                instance.stop(STOP_TIME);  | 
128 |  | -                instance = null;  | 
129 |  | -            } catch (Exception e) {  | 
130 |  | -                e.printStackTrace();  | 
 | 135 | +        private void logFiltered(InputStream body) throws IOException {  | 
 | 136 | +            ObjectMapper mapper = new ObjectMapper();  | 
 | 137 | +            try (BufferedReader reader = new BufferedReader(new InputStreamReader(body))) {  | 
 | 138 | +                String line;  | 
 | 139 | +                String tier = null;  | 
 | 140 | +                String node = null;  | 
 | 141 | + | 
 | 142 | +                while ((line = reader.readLine()) != null) {  | 
 | 143 | +                    var jsonNode = mapper.readTree(line);  | 
 | 144 | + | 
 | 145 | +                    if (jsonNode.has("metadata")) {  | 
 | 146 | +                        node = jsonNode.path("metadata").path("service").path("node").path("configured_name").asText(null);  | 
 | 147 | +                        tier = jsonNode.path("metadata").path("labels").path("node_tier").asText(null);  | 
 | 148 | +                    } else if (transactionFilter != null && jsonNode.has("transaction")) {  | 
 | 149 | +                        var transaction = jsonNode.get("transaction");  | 
 | 150 | +                        var name = transaction.get("name").asText();  | 
 | 151 | +                        if (transactionFilter.matcher(name).matches()  | 
 | 152 | +                            && (transactionExcludesFilter == null || transactionExcludesFilter.matcher(name).matches() == false)) {  | 
 | 153 | +                            logger.lifecycle("Transaction [{}/{}]: {}", node, tier, transaction);  | 
 | 154 | +                        }  | 
 | 155 | +                    } else if (metricFilter != null && jsonNode.has("metricset")) {  | 
 | 156 | +                        var metricset = jsonNode.get("metricset");  | 
 | 157 | +                        var samples = (ObjectNode) metricset.get("samples");  | 
 | 158 | +                        for (var name : Streams.of(samples.fieldNames()).toList()) {  | 
 | 159 | +                            if (metricFilter.matcher(name).matches() == false) {  | 
 | 160 | +                                samples.remove(name);  | 
 | 161 | +                            }  | 
 | 162 | +                        }  | 
 | 163 | +                        if (samples.isEmpty() == false) {  | 
 | 164 | +                            logger.lifecycle("Metricset [{}/{}]", node, tier, metricset);  | 
 | 165 | +                        }  | 
 | 166 | +                    }  | 
 | 167 | +                }  | 
131 | 168 |             }  | 
132 | 169 |         }  | 
133 | 170 |     }  | 
 | 
0 commit comments