7
7
8
8
package org .elasticsearch .xpack .search ;
9
9
10
- import org .elasticsearch .ExceptionsHelper ;
11
10
import org .elasticsearch .client .Request ;
12
11
import org .elasticsearch .client .Response ;
13
12
import org .elasticsearch .common .xcontent .XContentHelper ;
14
13
import org .elasticsearch .core .TimeValue ;
15
14
import org .elasticsearch .plugins .Plugin ;
15
+ import org .elasticsearch .search .ErrorTraceHelper ;
16
16
import org .elasticsearch .test .ESIntegTestCase ;
17
- import org .elasticsearch .transport .TransportMessageListener ;
18
- import org .elasticsearch .transport .TransportService ;
19
17
import org .elasticsearch .xcontent .XContentType ;
20
18
import org .junit .Before ;
21
19
22
20
import java .io .IOException ;
23
21
import java .util .Collection ;
24
22
import java .util .List ;
25
23
import java .util .Map ;
26
- import java .util .Optional ;
27
- import java .util .concurrent .atomic .AtomicBoolean ;
24
+ import java .util .function .BooleanSupplier ;
28
25
29
26
public class AsyncSearchErrorTraceIT extends ESIntegTestCase {
30
27
@@ -38,25 +35,11 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
38
35
return List .of (AsyncSearch .class );
39
36
}
40
37
41
- private AtomicBoolean transportMessageHasStackTrace ;
38
+ private BooleanSupplier transportMessageHasStackTrace ;
42
39
43
40
@ Before
44
- private void setupMessageListener () {
45
- internalCluster ().getDataNodeInstances (TransportService .class ).forEach (ts -> {
46
- ts .addMessageListener (new TransportMessageListener () {
47
- @ Override
48
- public void onResponseSent (long requestId , String action , Exception error ) {
49
- TransportMessageListener .super .onResponseSent (requestId , action , error );
50
- if (action .startsWith ("indices:data/read/search" )) {
51
- Optional <Throwable > throwable = ExceptionsHelper .unwrapCausesAndSuppressed (
52
- error ,
53
- t -> t .getStackTrace ().length > 0
54
- );
55
- transportMessageHasStackTrace .set (throwable .isPresent ());
56
- }
57
- }
58
- });
59
- });
41
+ public void setupMessageListener () {
42
+ transportMessageHasStackTrace = ErrorTraceHelper .setupErrorTraceListener (internalCluster ());
60
43
}
61
44
62
45
private void setupIndexWithDocs () {
@@ -70,7 +53,6 @@ private void setupIndexWithDocs() {
70
53
}
71
54
72
55
public void testAsyncSearchFailingQueryErrorTraceDefault () throws IOException , InterruptedException {
73
- transportMessageHasStackTrace = new AtomicBoolean ();
74
56
setupIndexWithDocs ();
75
57
76
58
Request searchRequest = new Request ("POST" , "/_async_search" );
@@ -93,11 +75,10 @@ public void testAsyncSearchFailingQueryErrorTraceDefault() throws IOException, I
93
75
responseEntity = performRequestAndGetResponseEntityAfterDelay (request , TimeValue .timeValueSeconds (1L ));
94
76
}
95
77
// check that the stack trace was not sent from the data node to the coordinating node
96
- assertFalse (transportMessageHasStackTrace .get ());
78
+ assertFalse (transportMessageHasStackTrace .getAsBoolean ());
97
79
}
98
80
99
81
public void testAsyncSearchFailingQueryErrorTraceTrue () throws IOException , InterruptedException {
100
- transportMessageHasStackTrace = new AtomicBoolean ();
101
82
setupIndexWithDocs ();
102
83
103
84
Request searchRequest = new Request ("POST" , "/_async_search" );
@@ -122,11 +103,10 @@ public void testAsyncSearchFailingQueryErrorTraceTrue() throws IOException, Inte
122
103
responseEntity = performRequestAndGetResponseEntityAfterDelay (request , TimeValue .timeValueSeconds (1L ));
123
104
}
124
105
// check that the stack trace was sent from the data node to the coordinating node
125
- assertTrue (transportMessageHasStackTrace .get ());
106
+ assertTrue (transportMessageHasStackTrace .getAsBoolean ());
126
107
}
127
108
128
109
public void testAsyncSearchFailingQueryErrorTraceFalse () throws IOException , InterruptedException {
129
- transportMessageHasStackTrace = new AtomicBoolean ();
130
110
setupIndexWithDocs ();
131
111
132
112
Request searchRequest = new Request ("POST" , "/_async_search" );
@@ -151,11 +131,10 @@ public void testAsyncSearchFailingQueryErrorTraceFalse() throws IOException, Int
151
131
responseEntity = performRequestAndGetResponseEntityAfterDelay (request , TimeValue .timeValueSeconds (1L ));
152
132
}
153
133
// check that the stack trace was not sent from the data node to the coordinating node
154
- assertFalse (transportMessageHasStackTrace .get ());
134
+ assertFalse (transportMessageHasStackTrace .getAsBoolean ());
155
135
}
156
136
157
137
public void testAsyncSearchFailingQueryErrorTraceFalseOnSubmitAndTrueOnGet () throws IOException , InterruptedException {
158
- transportMessageHasStackTrace = new AtomicBoolean ();
159
138
setupIndexWithDocs ();
160
139
161
140
Request searchRequest = new Request ("POST" , "/_async_search" );
@@ -180,11 +159,10 @@ public void testAsyncSearchFailingQueryErrorTraceFalseOnSubmitAndTrueOnGet() thr
180
159
responseEntity = performRequestAndGetResponseEntityAfterDelay (request , TimeValue .timeValueSeconds (1L ));
181
160
}
182
161
// check that the stack trace was not sent from the data node to the coordinating node
183
- assertFalse (transportMessageHasStackTrace .get ());
162
+ assertFalse (transportMessageHasStackTrace .getAsBoolean ());
184
163
}
185
164
186
165
public void testAsyncSearchFailingQueryErrorTraceTrueOnSubmitAndFalseOnGet () throws IOException , InterruptedException {
187
- transportMessageHasStackTrace = new AtomicBoolean ();
188
166
setupIndexWithDocs ();
189
167
190
168
Request searchRequest = new Request ("POST" , "/_async_search" );
@@ -209,7 +187,7 @@ public void testAsyncSearchFailingQueryErrorTraceTrueOnSubmitAndFalseOnGet() thr
209
187
responseEntity = performRequestAndGetResponseEntityAfterDelay (request , TimeValue .timeValueSeconds (1L ));
210
188
}
211
189
// check that the stack trace was sent from the data node to the coordinating node
212
- assertTrue (transportMessageHasStackTrace .get ());
190
+ assertTrue (transportMessageHasStackTrace .getAsBoolean ());
213
191
}
214
192
215
193
private Map <String , Object > performRequestAndGetResponseEntityAfterDelay (Request r , TimeValue sleep ) throws IOException ,
0 commit comments