77
88package org .elasticsearch .xpack .search ;
99
10- import org .elasticsearch .ExceptionsHelper ;
1110import org .elasticsearch .client .Request ;
1211import org .elasticsearch .client .Response ;
1312import org .elasticsearch .common .xcontent .XContentHelper ;
1413import org .elasticsearch .core .TimeValue ;
1514import org .elasticsearch .plugins .Plugin ;
15+ import org .elasticsearch .search .ErrorTraceHelper ;
1616import org .elasticsearch .test .ESIntegTestCase ;
17- import org .elasticsearch .transport .TransportMessageListener ;
18- import org .elasticsearch .transport .TransportService ;
1917import org .elasticsearch .xcontent .XContentType ;
2018import org .junit .Before ;
2119
2220import java .io .IOException ;
2321import java .util .Collection ;
2422import java .util .List ;
2523import java .util .Map ;
26- import java .util .Optional ;
27- import java .util .concurrent .atomic .AtomicBoolean ;
24+ import java .util .function .BooleanSupplier ;
2825
2926public class AsyncSearchErrorTraceIT extends ESIntegTestCase {
3027
@@ -38,25 +35,11 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
3835 return List .of (AsyncSearch .class );
3936 }
4037
41- private AtomicBoolean transportMessageHasStackTrace ;
38+ private BooleanSupplier transportMessageHasStackTrace ;
4239
4340 @ Before
44- private void setupMessageListener () {
45- internalCluster ().getDataNodeInstances (TransportService .class ).forEach (ts -> {
46- ts .addMessageListener (new TransportMessageListener () {
47- @ Override
48- public void onResponseSent (long requestId , String action , Exception error ) {
49- TransportMessageListener .super .onResponseSent (requestId , action , error );
50- if (action .startsWith ("indices:data/read/search" )) {
51- Optional <Throwable > throwable = ExceptionsHelper .unwrapCausesAndSuppressed (
52- error ,
53- t -> t .getStackTrace ().length > 0
54- );
55- transportMessageHasStackTrace .set (throwable .isPresent ());
56- }
57- }
58- });
59- });
41+ public void setupMessageListener () {
42+ transportMessageHasStackTrace = ErrorTraceHelper .setupErrorTraceListener (internalCluster ());
6043 }
6144
6245 private void setupIndexWithDocs () {
@@ -70,7 +53,6 @@ private void setupIndexWithDocs() {
7053 }
7154
7255 public void testAsyncSearchFailingQueryErrorTraceDefault () throws IOException , InterruptedException {
73- transportMessageHasStackTrace = new AtomicBoolean ();
7456 setupIndexWithDocs ();
7557
7658 Request searchRequest = new Request ("POST" , "/_async_search" );
@@ -93,11 +75,10 @@ public void testAsyncSearchFailingQueryErrorTraceDefault() throws IOException, I
9375 responseEntity = performRequestAndGetResponseEntityAfterDelay (request , TimeValue .timeValueSeconds (1L ));
9476 }
9577 // check that the stack trace was not sent from the data node to the coordinating node
96- assertFalse (transportMessageHasStackTrace .get ());
78+ assertFalse (transportMessageHasStackTrace .getAsBoolean ());
9779 }
9880
9981 public void testAsyncSearchFailingQueryErrorTraceTrue () throws IOException , InterruptedException {
100- transportMessageHasStackTrace = new AtomicBoolean ();
10182 setupIndexWithDocs ();
10283
10384 Request searchRequest = new Request ("POST" , "/_async_search" );
@@ -122,11 +103,10 @@ public void testAsyncSearchFailingQueryErrorTraceTrue() throws IOException, Inte
122103 responseEntity = performRequestAndGetResponseEntityAfterDelay (request , TimeValue .timeValueSeconds (1L ));
123104 }
124105 // check that the stack trace was sent from the data node to the coordinating node
125- assertTrue (transportMessageHasStackTrace .get ());
106+ assertTrue (transportMessageHasStackTrace .getAsBoolean ());
126107 }
127108
128109 public void testAsyncSearchFailingQueryErrorTraceFalse () throws IOException , InterruptedException {
129- transportMessageHasStackTrace = new AtomicBoolean ();
130110 setupIndexWithDocs ();
131111
132112 Request searchRequest = new Request ("POST" , "/_async_search" );
@@ -151,11 +131,10 @@ public void testAsyncSearchFailingQueryErrorTraceFalse() throws IOException, Int
151131 responseEntity = performRequestAndGetResponseEntityAfterDelay (request , TimeValue .timeValueSeconds (1L ));
152132 }
153133 // check that the stack trace was not sent from the data node to the coordinating node
154- assertFalse (transportMessageHasStackTrace .get ());
134+ assertFalse (transportMessageHasStackTrace .getAsBoolean ());
155135 }
156136
157137 public void testAsyncSearchFailingQueryErrorTraceFalseOnSubmitAndTrueOnGet () throws IOException , InterruptedException {
158- transportMessageHasStackTrace = new AtomicBoolean ();
159138 setupIndexWithDocs ();
160139
161140 Request searchRequest = new Request ("POST" , "/_async_search" );
@@ -180,11 +159,10 @@ public void testAsyncSearchFailingQueryErrorTraceFalseOnSubmitAndTrueOnGet() thr
180159 responseEntity = performRequestAndGetResponseEntityAfterDelay (request , TimeValue .timeValueSeconds (1L ));
181160 }
182161 // check that the stack trace was not sent from the data node to the coordinating node
183- assertFalse (transportMessageHasStackTrace .get ());
162+ assertFalse (transportMessageHasStackTrace .getAsBoolean ());
184163 }
185164
186165 public void testAsyncSearchFailingQueryErrorTraceTrueOnSubmitAndFalseOnGet () throws IOException , InterruptedException {
187- transportMessageHasStackTrace = new AtomicBoolean ();
188166 setupIndexWithDocs ();
189167
190168 Request searchRequest = new Request ("POST" , "/_async_search" );
@@ -209,7 +187,7 @@ public void testAsyncSearchFailingQueryErrorTraceTrueOnSubmitAndFalseOnGet() thr
209187 responseEntity = performRequestAndGetResponseEntityAfterDelay (request , TimeValue .timeValueSeconds (1L ));
210188 }
211189 // check that the stack trace was sent from the data node to the coordinating node
212- assertTrue (transportMessageHasStackTrace .get ());
190+ assertTrue (transportMessageHasStackTrace .getAsBoolean ());
213191 }
214192
215193 private Map <String , Object > performRequestAndGetResponseEntityAfterDelay (Request r , TimeValue sleep ) throws IOException ,
0 commit comments