15
15
import org .apache .logging .log4j .LogManager ;
16
16
import org .apache .logging .log4j .Logger ;
17
17
import org .elasticsearch .core .SuppressForbidden ;
18
- import org .elasticsearch .xcontent .spi .XContentProvider ;
19
18
import org .junit .rules .ExternalResource ;
20
19
21
20
import java .io .BufferedReader ;
25
24
import java .net .InetAddress ;
26
25
import java .net .InetSocketAddress ;
27
26
import java .nio .charset .StandardCharsets ;
28
- import java .util .ArrayList ;
29
27
import java .util .List ;
30
28
import java .util .concurrent .ArrayBlockingQueue ;
31
29
import java .util .concurrent .TimeUnit ;
35
33
public class RecordingApmServer extends ExternalResource {
36
34
private static final Logger logger = LogManager .getLogger (RecordingApmServer .class );
37
35
38
- private static final XContentProvider .FormatProvider XCONTENT = XContentProvider .provider ().getJsonXContent ();
39
-
40
36
final ArrayBlockingQueue <String > received = new ArrayBlockingQueue <>(1000 );
41
37
42
38
private static HttpServer server ;
43
39
private final Thread messageConsumerThread = consumerThread ();
44
40
private volatile Consumer <String > consumer ;
45
- private volatile boolean consumerRunning = true ;
41
+ private volatile boolean running = true ;
46
42
47
43
@ Override
48
44
protected void before () throws Throwable {
@@ -56,7 +52,7 @@ protected void before() throws Throwable {
56
52
57
53
private Thread consumerThread () {
58
54
return new Thread (() -> {
59
- while (consumerRunning ) {
55
+ while (running ) {
60
56
if (consumer != null ) {
61
57
try {
62
58
String msg = received .poll (1L , TimeUnit .SECONDS );
@@ -74,28 +70,38 @@ private Thread consumerThread() {
74
70
75
71
@ Override
76
72
protected void after () {
73
+ running = false ;
77
74
server .stop (1 );
78
- consumerRunning = false ;
75
+ consumer = null ;
79
76
}
80
77
81
78
private void handle (HttpExchange exchange ) throws IOException {
82
79
try (exchange ) {
83
- try {
84
- try (InputStream requestBody = exchange .getRequestBody ()) {
85
- if (requestBody != null ) {
86
- var read = readJsonMessages (requestBody );
87
- received .addAll (read );
80
+ if (running ) {
81
+ try {
82
+ try (InputStream requestBody = exchange .getRequestBody ()) {
83
+ if (requestBody != null ) {
84
+ var read = readJsonMessages (requestBody );
85
+ received .addAll (read );
86
+ }
88
87
}
89
- }
90
88
91
- } catch (RuntimeException e ) {
92
- logger .warn ("failed to parse request" , e );
89
+ } catch (Throwable t ) {
90
+ // The lifetime of HttpServer makes message handling "brittle": we need to start handling and recording received
91
+ // messages before the test starts running. We should also stop handling them before the test ends (and the test
92
+ // cluster is torn down), or we may run into IOException as the communication channel is interrupted.
93
+ // Coordinating the lifecycle of the mock HttpServer and of the test ES cluster is difficult and error-prone, so
94
+ // we just handle Throwable and don't care (log, but don't care): if we have an error in communicating to/from
95
+ // the mock server while the test is running, the test would fail anyway as the expected messages will not arrive, and
96
+ // if we have an error outside the test scope (before or after) that is OK.
97
+ logger .warn ("failed to parse request" , t );
98
+ }
93
99
}
94
100
exchange .sendResponseHeaders (201 , 0 );
95
101
}
96
102
}
97
103
98
- private List <String > readJsonMessages (InputStream input ) throws IOException {
104
+ private List <String > readJsonMessages (InputStream input ) {
99
105
// parse NDJSON
100
106
return new BufferedReader (new InputStreamReader (input , StandardCharsets .UTF_8 )).lines ().toList ();
101
107
}
@@ -104,14 +110,7 @@ public int getPort() {
104
110
return server .getAddress ().getPort ();
105
111
}
106
112
107
- public List <String > getMessages () {
108
- List <String > list = new ArrayList <>(received .size ());
109
- received .drainTo (list );
110
- return list ;
111
- }
112
-
113
113
public void addMessageConsumer (Consumer <String > messageConsumer ) {
114
114
this .consumer = messageConsumer ;
115
115
}
116
-
117
116
}
0 commit comments