12
12
import org .elasticsearch .action .DocWriteRequest ;
13
13
import org .elasticsearch .action .admin .indices .refresh .RefreshRequest ;
14
14
import org .elasticsearch .action .search .SearchResponse ;
15
+ import org .elasticsearch .client .Client ;
15
16
import org .elasticsearch .common .settings .Settings ;
16
17
import org .elasticsearch .core .Tuple ;
17
18
import org .elasticsearch .index .query .QueryBuilders ;
34
35
public class BulkProcessor2RetryIT extends ESIntegTestCase {
35
36
private static final String INDEX_NAME = "test" ;
36
37
Map <String , Integer > requestToExecutionCountMap = new ConcurrentHashMap <>();
38
+ /*
39
+ * We can't call ESIntegTestCase.client() from a transport thread because it winds up calling a blocking operation that trips an
40
+ * assertion error if you're doing it from the transport thread. So we stash a random client in this variable for use when we nned a
41
+ * client in a transport thread.
42
+ */
43
+ private Client clientsForTransportThread ;
37
44
38
45
@ Override
39
46
protected Settings nodeSettings (int nodeOrdinal , Settings otherSettings ) {
@@ -58,14 +65,14 @@ public void testBulkRejectionLoadWithoutBackoff() throws Throwable {
58
65
// value = "org.elasticsearch.action.bulk.Retry2:trace",
59
66
// reason = "Logging information about locks useful for tracking down deadlock"
60
67
// )
61
- @ AwaitsFix (bugUrl = "https://github.com/elastic/elasticsearch/issues/94941" )
62
68
public void testBulkRejectionLoadWithBackoff () throws Throwable {
63
69
boolean rejectedExecutionExpected = false ;
64
70
executeBulkRejectionLoad (8 , rejectedExecutionExpected );
65
71
}
66
72
67
73
@ SuppressWarnings ("unchecked" )
68
74
private void executeBulkRejectionLoad (int maxRetries , boolean rejectedExecutionExpected ) throws Throwable {
75
+ clientsForTransportThread = client ();
69
76
int numberOfAsyncOps = randomIntBetween (600 , 700 );
70
77
final CountDownLatch latch = new CountDownLatch (numberOfAsyncOps );
71
78
final Set <BulkResponse > successfulResponses = Collections .newSetFromMap (new ConcurrentHashMap <>());
@@ -110,6 +117,8 @@ public void afterBulk(long executionId, BulkRequest request, Exception failure)
110
117
}
111
118
rejectedAfterAllRetries = true ;
112
119
}
120
+ } else if (failure .getStatus () == RestStatus .SERVICE_UNAVAILABLE ) {
121
+ // The test framework throws this at us sometimes
113
122
} else {
114
123
throw new AssertionError ("Unexpected failure status: " + failure .getStatus ());
115
124
}
@@ -128,6 +137,8 @@ public void afterBulk(long executionId, BulkRequest request, Exception failure)
128
137
rejectedAfterAllRetries = true ;
129
138
}
130
139
// ignored, we exceeded the write queue size when dispatching the initial bulk request
140
+ } else if (ExceptionsHelper .status (failureTuple .v2 ()) == RestStatus .SERVICE_UNAVAILABLE ) {
141
+ // The test framework throws this at us sometimes
131
142
} else {
132
143
Throwable t = failureTuple .v2 ();
133
144
// we're not expecting any other errors
@@ -164,7 +175,7 @@ void countAndBulk(BulkRequest request, ActionListener<BulkResponse> listener) {
164
175
for (DocWriteRequest <?> docWriteRequest : request .requests ) {
165
176
requestToExecutionCountMap .compute (docWriteRequest .id (), (key , value ) -> value == null ? 1 : value + 1 );
166
177
}
167
- client () .bulk (request , listener );
178
+ clientsForTransportThread .bulk (request , listener );
168
179
}
169
180
170
181
}
0 commit comments