Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -536,9 +536,6 @@ tests:
- class: org.elasticsearch.search.query.VectorIT
method: testFilteredQueryStrategy
issue: https://github.com/elastic/elasticsearch/issues/129517
- class: org.elasticsearch.test.apmintegration.TracesApmIT
method: testApmIntegration
issue: https://github.com/elastic/elasticsearch/issues/129651
- class: org.elasticsearch.snapshots.SnapshotShutdownIT
method: testSnapshotShutdownProgressTracker
issue: https://github.com/elastic/elasticsearch/issues/129752
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ public void testApmIntegration() throws Exception {
var remainingAssertions = Stream.concat(valueAssertions.keySet().stream(), histogramAssertions.keySet().stream())
.collect(Collectors.joining(","));
assertTrue("Timeout when waiting for assertions to complete. Remaining assertions to match: " + remainingAssertions, completed);
mockApmServer.stop();
}

private <T> Map.Entry<String, Predicate<Map<String, Object>>> assertion(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
Expand All @@ -42,7 +41,7 @@ public class RecordingApmServer extends ExternalResource {
private static HttpServer server;
private final Thread messageConsumerThread = consumerThread();
private volatile Consumer<String> consumer;
private volatile boolean consumerRunning = true;
private volatile boolean running = true;

@Override
protected void before() throws Throwable {
Expand All @@ -56,7 +55,7 @@ protected void before() throws Throwable {

private Thread consumerThread() {
return new Thread(() -> {
while (consumerRunning) {
while (running) {
if (consumer != null) {
try {
String msg = received.poll(1L, TimeUnit.SECONDS);
Expand All @@ -75,21 +74,27 @@ private Thread consumerThread() {
@Override
protected void after() {
server.stop(1);
consumerRunning = false;
consumer = null;
}

void stop() {
running = false;
}

private void handle(HttpExchange exchange) throws IOException {
try (exchange) {
try {
try (InputStream requestBody = exchange.getRequestBody()) {
if (requestBody != null) {
var read = readJsonMessages(requestBody);
received.addAll(read);
if (running) {
try {
try (InputStream requestBody = exchange.getRequestBody()) {
if (requestBody != null) {
var read = readJsonMessages(requestBody);
received.addAll(read);
}
}
}

} catch (RuntimeException e) {
logger.warn("failed to parse request", e);
} catch (RuntimeException e) {
logger.warn("failed to parse request", e);
}
}
exchange.sendResponseHeaders(201, 0);
}
Expand All @@ -104,14 +109,7 @@ public int getPort() {
return server.getAddress().getPort();
}

public List<String> getMessages() {
List<String> list = new ArrayList<>(received.size());
received.drainTo(list);
return list;
}

public void addMessageConsumer(Consumer<String> messageConsumer) {
this.consumer = messageConsumer;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,11 @@ public void testApmIntegration() throws Exception {

client().performRequest(nodeStatsRequest);

finished.await(30, TimeUnit.SECONDS);
var completed = finished.await(30, TimeUnit.SECONDS);
assertTrue("Timeout when waiting for assertions to complete", completed);
assertThat(assertions, equalTo(Collections.emptySet()));

mockApmServer.stop();
}

private boolean isTransactionTraceMessage(Map<String, Object> apmMessage) {
Expand Down Expand Up @@ -143,5 +146,4 @@ private Map<String, Object> parseMap(String message) {
return Collections.emptyMap();
}
}

}