Skip to content

Commit e7dec67

Browse files
committed
Stop RecordingApmServer message processing before finishing the test
1 parent b52e5a7 commit e7dec67

File tree

4 files changed

+22
-24
lines changed

4 files changed

+22
-24
lines changed

muted-tests.yml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -536,9 +536,6 @@ tests:
536536
- class: org.elasticsearch.search.query.VectorIT
537537
method: testFilteredQueryStrategy
538538
issue: https://github.com/elastic/elasticsearch/issues/129517
539-
- class: org.elasticsearch.test.apmintegration.TracesApmIT
540-
method: testApmIntegration
541-
issue: https://github.com/elastic/elasticsearch/issues/129651
542539
- class: org.elasticsearch.snapshots.SnapshotShutdownIT
543540
method: testSnapshotShutdownProgressTracker
544541
issue: https://github.com/elastic/elasticsearch/issues/129752

test/external-modules/apm-integration/src/javaRestTest/java/org/elasticsearch/test/apmintegration/MetricsApmIT.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ public void testApmIntegration() throws Exception {
129129
var remainingAssertions = Stream.concat(valueAssertions.keySet().stream(), histogramAssertions.keySet().stream())
130130
.collect(Collectors.joining(","));
131131
assertTrue("Timeout when waiting for assertions to complete. Remaining assertions to match: " + remainingAssertions, completed);
132+
mockApmServer.stop();
132133
}
133134

134135
private <T> Map.Entry<String, Predicate<Map<String, Object>>> assertion(

test/external-modules/apm-integration/src/javaRestTest/java/org/elasticsearch/test/apmintegration/RecordingApmServer.java

Lines changed: 17 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import java.net.InetAddress;
2626
import java.net.InetSocketAddress;
2727
import java.nio.charset.StandardCharsets;
28-
import java.util.ArrayList;
2928
import java.util.List;
3029
import java.util.concurrent.ArrayBlockingQueue;
3130
import java.util.concurrent.TimeUnit;
@@ -42,7 +41,7 @@ public class RecordingApmServer extends ExternalResource {
4241
private static HttpServer server;
4342
private final Thread messageConsumerThread = consumerThread();
4443
private volatile Consumer<String> consumer;
45-
private volatile boolean consumerRunning = true;
44+
private volatile boolean running = true;
4645

4746
@Override
4847
protected void before() throws Throwable {
@@ -56,7 +55,7 @@ protected void before() throws Throwable {
5655

5756
private Thread consumerThread() {
5857
return new Thread(() -> {
59-
while (consumerRunning) {
58+
while (running) {
6059
if (consumer != null) {
6160
try {
6261
String msg = received.poll(1L, TimeUnit.SECONDS);
@@ -75,21 +74,27 @@ private Thread consumerThread() {
7574
@Override
7675
protected void after() {
7776
server.stop(1);
78-
consumerRunning = false;
77+
consumer = null;
78+
}
79+
80+
void stop() {
81+
running = false;
7982
}
8083

8184
private void handle(HttpExchange exchange) throws IOException {
8285
try (exchange) {
83-
try {
84-
try (InputStream requestBody = exchange.getRequestBody()) {
85-
if (requestBody != null) {
86-
var read = readJsonMessages(requestBody);
87-
received.addAll(read);
86+
if (running) {
87+
try {
88+
try (InputStream requestBody = exchange.getRequestBody()) {
89+
if (requestBody != null) {
90+
var read = readJsonMessages(requestBody);
91+
received.addAll(read);
92+
}
8893
}
89-
}
9094

91-
} catch (RuntimeException e) {
92-
logger.warn("failed to parse request", e);
95+
} catch (RuntimeException e) {
96+
logger.warn("failed to parse request", e);
97+
}
9398
}
9499
exchange.sendResponseHeaders(201, 0);
95100
}
@@ -104,14 +109,7 @@ public int getPort() {
104109
return server.getAddress().getPort();
105110
}
106111

107-
public List<String> getMessages() {
108-
List<String> list = new ArrayList<>(received.size());
109-
received.drainTo(list);
110-
return list;
111-
}
112-
113112
public void addMessageConsumer(Consumer<String> messageConsumer) {
114113
this.consumer = messageConsumer;
115114
}
116-
117115
}

test/external-modules/apm-integration/src/javaRestTest/java/org/elasticsearch/test/apmintegration/TracesApmIT.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,11 @@ public void testApmIntegration() throws Exception {
9191

9292
client().performRequest(nodeStatsRequest);
9393

94-
finished.await(30, TimeUnit.SECONDS);
94+
var completed = finished.await(30, TimeUnit.SECONDS);
95+
assertTrue("Timeout when waiting for assertions to complete", completed);
9596
assertThat(assertions, equalTo(Collections.emptySet()));
97+
98+
mockApmServer.stop();
9699
}
97100

98101
private boolean isTransactionTraceMessage(Map<String, Object> apmMessage) {
@@ -143,5 +146,4 @@ private Map<String, Object> parseMap(String message) {
143146
return Collections.emptyMap();
144147
}
145148
}
146-
147149
}

0 commit comments

Comments
 (0)