1515import org .apache .logging .log4j .LogManager ;
1616import org .apache .logging .log4j .Logger ;
1717import org .elasticsearch .core .SuppressForbidden ;
18- import org .elasticsearch .xcontent .spi .XContentProvider ;
1918import org .junit .rules .ExternalResource ;
2019
2120import java .io .BufferedReader ;
2524import java .net .InetAddress ;
2625import java .net .InetSocketAddress ;
2726import java .nio .charset .StandardCharsets ;
28- import java .util .ArrayList ;
2927import java .util .List ;
3028import java .util .concurrent .ArrayBlockingQueue ;
3129import java .util .concurrent .TimeUnit ;
3533public class RecordingApmServer extends ExternalResource {
3634 private static final Logger logger = LogManager .getLogger (RecordingApmServer .class );
3735
38- private static final XContentProvider .FormatProvider XCONTENT = XContentProvider .provider ().getJsonXContent ();
39-
4036 final ArrayBlockingQueue <String > received = new ArrayBlockingQueue <>(1000 );
4137
4238 private static HttpServer server ;
4339 private final Thread messageConsumerThread = consumerThread ();
4440 private volatile Consumer <String > consumer ;
45- private volatile boolean consumerRunning = true ;
41+ private volatile boolean running = true ;
4642
4743 @ Override
4844 protected void before () throws Throwable {
@@ -56,7 +52,7 @@ protected void before() throws Throwable {
5652
5753 private Thread consumerThread () {
5854 return new Thread (() -> {
59- while (consumerRunning ) {
55+ while (running ) {
6056 if (consumer != null ) {
6157 try {
6258 String msg = received .poll (1L , TimeUnit .SECONDS );
@@ -74,28 +70,38 @@ private Thread consumerThread() {
7470
7571 @ Override
7672 protected void after () {
73+ running = false ;
7774 server .stop (1 );
78- consumerRunning = false ;
75+ consumer = null ;
7976 }
8077
8178 private void handle (HttpExchange exchange ) throws IOException {
8279 try (exchange ) {
83- try {
84- try (InputStream requestBody = exchange .getRequestBody ()) {
85- if (requestBody != null ) {
86- var read = readJsonMessages (requestBody );
87- received .addAll (read );
80+ if (running ) {
81+ try {
82+ try (InputStream requestBody = exchange .getRequestBody ()) {
83+ if (requestBody != null ) {
84+ var read = readJsonMessages (requestBody );
85+ received .addAll (read );
86+ }
8887 }
89- }
9088
91- } catch (RuntimeException e ) {
92- logger .warn ("failed to parse request" , e );
89+ } catch (Throwable t ) {
90+ // The lifetime of HttpServer makes message handling "brittle": we need to start handling and recording received
91+ // messages before the test starts running. We should also stop handling them before the test ends (and the test
92+ // cluster is torn down), or we may run into IOException as the communication channel is interrupted.
93+ // Coordinating the lifecycle of the mock HttpServer and of the test ES cluster is difficult and error-prone, so
94+ // we just handle Throwable and don't care (log, but don't care): if we have an error in communicating to/from
95+ // the mock server while the test is running, the test would fail anyway as the expected messages will not arrive, and
96+ // if we have an error outside the test scope (before or after) that is OK.
97+ logger .warn ("failed to parse request" , t );
98+ }
9399 }
94100 exchange .sendResponseHeaders (201 , 0 );
95101 }
96102 }
97103
98- private List <String > readJsonMessages (InputStream input ) throws IOException {
104+ private List <String > readJsonMessages (InputStream input ) {
99105 // parse NDJSON
100106 return new BufferedReader (new InputStreamReader (input , StandardCharsets .UTF_8 )).lines ().toList ();
101107 }
@@ -104,14 +110,7 @@ public int getPort() {
104110 return server .getAddress ().getPort ();
105111 }
106112
107- public List <String > getMessages () {
108- List <String > list = new ArrayList <>(received .size ());
109- received .drainTo (list );
110- return list ;
111- }
112-
113113 public void addMessageConsumer (Consumer <String > messageConsumer ) {
114114 this .consumer = messageConsumer ;
115115 }
116-
117116}
0 commit comments