11package co .elastic .clients .elasticsearch ._helpers .bulk ;
22
3- import co .elastic .clients .elasticsearch .ElasticsearchAsyncClient ;
43import co .elastic .clients .elasticsearch .ElasticsearchClient ;
54import co .elastic .clients .elasticsearch ._types .ErrorCause ;
65import co .elastic .clients .elasticsearch .core .BulkRequest ;
1615import co .elastic .clients .transport .BackoffPolicy ;
1716import co .elastic .clients .transport .ElasticsearchTransport ;
1817import co .elastic .clients .transport .Endpoint ;
19- import co .elastic .clients .transport .TransportException ;
2018import co .elastic .clients .transport .TransportOptions ;
21- import co .elastic .clients .transport .http .RepeatableBodyResponse ;
22- import co .elastic .clients .transport .http .TransportHttpClient ;
2319import org .jetbrains .annotations .Nullable ;
2420import org .junit .jupiter .api .BeforeAll ;
21+ import org .junit .jupiter .api .BeforeEach ;
2522import org .junit .jupiter .api .Test ;
2623
2724import java .io .IOException ;
2825import java .util .ArrayList ;
26+ import java .util .Iterator ;
2927import java .util .List ;
28+ import java .util .Objects ;
3029import java .util .concurrent .CompletableFuture ;
30+ import java .util .concurrent .ConcurrentHashMap ;
31+ import java .util .concurrent .ConcurrentLinkedQueue ;
3132import java .util .concurrent .ExecutorService ;
3233import java .util .concurrent .Executors ;
33- import java .util .concurrent .ScheduledExecutorService ;
3434import java .util .concurrent .TimeUnit ;
3535import java .util .concurrent .atomic .AtomicInteger ;
3636
@@ -44,12 +44,13 @@ public class BulkIngesterRetryPolicyTest {
4444
4545 private BulkOperation create = BulkOperation .of (b -> b .create (c -> c .index ("foo" ).id ("1" ).document ("1" )));
4646 private BulkOperation index = BulkOperation .of (b -> b .index (c -> c .index ("fooo" ).id ("2" ).document ("2" )));
47- private BulkOperation indexFail = BulkOperation .of (b -> b .index (c -> c .index ("fail" ).id ("2" ).document ("2" )));
47+ private BulkOperation indexFail = BulkOperation .of (b -> b .index (c -> c .index ("fail" ).id ("2" ).document (
48+ "2" )));
4849 private BulkOperation delete = BulkOperation .of (b -> b .delete (c -> c .index ("foooo" ).id ("3" )));
4950 private BulkOperation deleteFail = BulkOperation .of (b -> b .delete (c -> c .index ("fail" ).id ("3" )));
5051
51- @ BeforeAll
52- public static void setup (){
52+ @ BeforeEach
53+ public void setup () {
5354 transport = new TestTransport ();
5455 client = new ElasticsearchClient (transport );
5556 listener = new CountingListener ();
@@ -60,27 +61,31 @@ public void retryTestNoScheduledFlushNoContext() throws Exception {
6061
6162 // First test, partial success, other will succeed after retrying
6263 {
63- BulkIngester <Void > ingester = newBasicBulkIngester (listener );
64+ BulkIngester <Integer > ingester = newBasicBulkIngester (listener );
6465
6566 ingester .add (create );
6667 ingester .add (index );
6768 ingester .add (index );
6869
6970 ingester .close ();
7071
71- // 1 instant success, 2 retried, but succeeded. can be either 2 or 3 depending if the retries
72- // get scheduled at the same exact time
72+ // 1 instant success, 2 retried, but succeeded. can be either 2 or 3 depending on the retries,
73+ // if they get scheduled at the same exact time
7374 assertTrue (listener .requests .get () == 2 || listener .requests .get () == 3 );
7475 // eventually all 3 have to succeed
7576 assertTrue (listener .successOperations .get () == 3 );
77+
78+ // 1 for the create and first try for the indexes, 2 + 2 for both index retries,
79+ // which could be scheduled at the same time, so from 3 to 5
80+ assertTrue (listener .sentRequestsTotal .get () >= 3 && listener .sentRequestsTotal .get () <= 5 );
7681 }
7782
7883 // Second test, all requests will succeed after retrying
7984 {
8085 transport .reset ();
8186 listener .reset ();
8287
83- BulkIngester <Void > ingester = newBasicBulkIngester (listener );
88+ BulkIngester <Integer > ingester = newBasicBulkIngester (listener );
8489
8590 ingester .add (index );
8691 ingester .add (index );
@@ -93,14 +98,16 @@ public void retryTestNoScheduledFlushNoContext() throws Exception {
9398 assertTrue (listener .requests .get () >= 1 && listener .requests .get () <= 4 );
9499 // eventually all 4 have to succeed
95100 assertTrue (listener .successOperations .get () == 4 );
101+ // between 3 and 9, depending on scheduler
102+ assertTrue (listener .sentRequestsTotal .get () >= 3 && listener .sentRequestsTotal .get () <= 9 );
96103 }
97104
98105 // Third test, only one retryable (will succeed), other permanent failures
99106 {
100107 transport .reset ();
101108 listener .reset ();
102109
103- BulkIngester <Void > ingester = newBasicBulkIngester (listener );
110+ BulkIngester <Integer > ingester = newBasicBulkIngester (listener );
104111
105112 ingester .add (index );
106113 ingester .add (delete );
@@ -113,14 +120,16 @@ public void retryTestNoScheduledFlushNoContext() throws Exception {
113120
114121 assertTrue (listener .successOperations .get () == 1 );
115122 assertTrue (listener .errorOperations .get () == 2 );
123+ // 1 initial + 2 retries
124+ assertTrue (listener .sentRequestsTotal .get () == 3 );
116125 }
117126
118127 // Fourth test, all requests will be retried until policy allows, then fail
119128 {
120129 transport .reset ();
121130 listener .reset ();
122131
123- BulkIngester <Void > ingester = newBasicBulkIngester (listener );
132+ BulkIngester <Integer > ingester = newBasicBulkIngester (listener );
124133
125134 ingester .add (indexFail );
126135 ingester .add (indexFail );
@@ -133,14 +142,16 @@ public void retryTestNoScheduledFlushNoContext() throws Exception {
133142
134143 assertTrue (listener .successOperations .get () == 0 );
135144 assertTrue (listener .errorOperations .get () == 3 );
145+ // between 8 and 24, depending on scheduler
146+ assertTrue (listener .sentRequestsTotal .get () >= 8 && listener .sentRequestsTotal .get () <= 24 );
136147 }
137148
138149 // Fifth test, one exception that will make everything else fail, no retries triggered
139150 {
140151 transport .reset ();
141152 listener .reset ();
142153
143- BulkIngester <Void > ingester = newBasicBulkIngester (listener );
154+ BulkIngester <Integer > ingester = newBasicBulkIngester (listener );
144155
145156 ingester .add (index );
146157 ingester .add (create );
@@ -153,14 +164,17 @@ public void retryTestNoScheduledFlushNoContext() throws Exception {
153164
154165 assertTrue (listener .successOperations .get () == 0 );
155166 assertTrue (listener .errorOperations .get () == 3 );
167+
168+ // just the one
169+ assertTrue (listener .sentRequestsTotal .get () == 1 );
156170 }
157171
158172 // Sixth test, a mix of everything
159173 {
160174 transport .reset ();
161175 listener .reset ();
162176
163- BulkIngester <Void > ingester = newBasicBulkIngester (listener );
177+ BulkIngester <Integer > ingester = newBasicBulkIngester (listener );
164178
165179 ingester .add (create );
166180 ingester .add (index );
@@ -178,6 +192,9 @@ public void retryTestNoScheduledFlushNoContext() throws Exception {
178192
179193 assertTrue (listener .successOperations .get () == 4 );
180194 assertTrue (listener .errorOperations .get () == 4 );
195+
196+ // between 8 and 18, depending on scheduler
197+ assertTrue (listener .sentRequestsTotal .get () >= 8 && listener .sentRequestsTotal .get () <= 18 );
181198 }
182199
183200 transport .close ();
@@ -186,11 +203,84 @@ public void retryTestNoScheduledFlushNoContext() throws Exception {
186203 @ Test
187204 public void retryTestFlushAndContextExponentialBackoff () throws Exception {
188205
189- TestTransport transport = new TestTransport ();
190- ElasticsearchClient client = new ElasticsearchClient ( transport );
191- CountingListener listener = new CountingListener ( );
206+ // One success, other will succeed after retrying, other will fail eventually
207+ {
208+ BulkIngester < Integer > ingester = newBulkIngesterWithFlushAndContext ( listener );
192209
193- // TODO
210+ ingester .add (create , 1 );
211+ ingester .add (indexFail , 2 );
212+ ingester .add (index , 3 );
213+
214+ ingester .close ();
215+
216+ // should be 3 separate requests sent, one instant, one after a few retries, the last one error.
217+ assertTrue (listener .requests .get () == 3 );
218+ // 2 will succeed, one will fail
219+ assertTrue (listener .successOperations .get () == 2 );
220+ assertTrue (listener .errorOperations .get () == 1 );
221+ // between 8 and 10, depending on scheduler (first one + 2 retries for index + 8 retries for
222+ // indexfail)
223+ assertTrue (listener .sentRequestsTotal .get () >= 8 && listener .sentRequestsTotal .get () <= 11 );
224+ // checking order of contexts after send confirmed
225+ Iterator iter = listener .sentContexts .iterator ();
226+ // first one being completed is create
227+ assertTrue (iter .next ().equals (1 ));
228+ // second one is index, which will take only 2 retries
229+ assertTrue (iter .next ().equals (3 ));
230+ // last one is indexFail, taking 8 retries to fail
231+ assertTrue (iter .next ().equals (2 ));
232+ }
233+
234+ transport .close ();
235+ }
236+
237+ @ Test
238+ public void multiThreadStressTest () throws InterruptedException , IOException {
239+
240+ // DISCLAIMER: this configuration is highly inefficient and only used here to showcase an extreme
241+ // situation where the number of adding threads greatly exceeds the number of concurrent requests
242+ // handled by the ingester. It's strongly recommended to always tweak maxConcurrentRequests
243+ // accordingly.
244+ BulkIngester <Integer > ingester = BulkIngester .of (b -> b
245+ .client (client )
246+ .listener (listener )
247+ .flushInterval (5 , TimeUnit .SECONDS )
248+ .backoffPolicy (BackoffPolicy .constantBackoff (50L , 8 )));
249+
250+ ExecutorService executor = Executors .newFixedThreadPool (50 );
251+
252+ // sends create operations, but once every 1000, one index operation will be sent,
253+ // and once every 5000 an indexFail
254+ for (int i = 0 ; i < 100000 ; i ++) {
255+ int ii = i ;
256+ Runnable thread = () -> {
257+ int finalI = ii ;
258+ if (ii % 1000 == 0 ) {
259+ ingester .add (index , ii );
260+ } else if (ii % 5000 == 0 ) {
261+ ingester .add (indexFail , ii );
262+ } else {
263+ ingester .add (create , ii );
264+ }
265+ };
266+ executor .submit (thread );
267+ }
268+
269+ executor .awaitTermination (10 , TimeUnit .SECONDS );
270+ ingester .close ();
271+
272+ // all operations will succeed eventually, so the total has to be 100000
273+ assertTrue (listener .successOperations .get () == 100000 );
274+ assertTrue (listener .sentContexts .size () == 100000 );
275+ assertTrue (listener .errorOperations .get () == 0 );
276+ // it's difficult to predict how many requests will be sent, but considering they will be sent
277+ // in batches of 1000, without retries it should be exactly 100, considering that 100 out of
278+ // 100000 will be retried 3 times and 20 will be retried 8 times, if they don't get batched together
279+ // with the others it could result in a total of 560, which is highly unlikely.
280+ // more reasonably there will be between 100 and 300 requests sent.
281+ assertTrue (listener .sentRequestsTotal .get () >= 100 && listener .sentRequestsTotal .get () <= 300 );
282+ // same reasoning
283+ assertTrue (listener .requests .get () >= 100 && listener .requests .get () <= 300 );
194284
195285 transport .close ();
196286 }
@@ -201,7 +291,7 @@ private static class TestTransport implements ElasticsearchTransport {
201291 public final AtomicInteger requestsCompleted = new AtomicInteger ();
202292 public final AtomicInteger operations = new AtomicInteger ();
203293
204- public final AtomicInteger retryFailures = new AtomicInteger ();
294+ public ConcurrentHashMap < BulkOperation , Integer > retryFailures = new ConcurrentHashMap <> ();
205295
206296
207297 private final ExecutorService executor = Executors .newCachedThreadPool ();
@@ -229,7 +319,8 @@ public <RequestT, ResponseT, ErrorT> CompletableFuture<ResponseT> performRequest
229319
230320 // For testing purposes, different result depending on the operation type.
231321 // Create will always succeed
232- // Index will always 429 for 3 times, then 200. Index with index name "fail" will only 429.
322+ // Index will always return 429 for 2 times, then 200. Index with index name "fail" will only
323+ // return 429.
233324 // Delete will always return 404. Delete with index name "fail" will throw transport exception.
234325
235326 try {
@@ -244,8 +335,10 @@ public <RequestT, ResponseT, ErrorT> CompletableFuture<ResponseT> performRequest
244335 case Index :
245336 index = ((IndexOperation ) op ._get ()).index ();
246337 operationType = OperationType .Index ;
247- boolean isStillRetrying = retryFailures .incrementAndGet () > 2 ;
248- error = isStillRetrying && !index .equals ("fail" ) ? null : ErrorCause .of (e -> e .reason ("some error" ));
338+ retryFailures .putIfAbsent (op , 0 );
339+ boolean isStillRetrying = retryFailures .computeIfPresent (op , (k , v ) -> v + 1 ) > 2 ;
340+ error = isStillRetrying && !index .equals ("fail" ) ? null :
341+ ErrorCause .of (e -> e .reason ("some error" ));
249342 status = isStillRetrying && !index .equals ("fail" ) ? 200 : 429 ;
250343 break ;
251344 case Delete :
@@ -281,8 +374,7 @@ public <RequestT, ResponseT, ErrorT> CompletableFuture<ResponseT> performRequest
281374 @ SuppressWarnings ("unchecked" )
282375 CompletableFuture <ResponseT > result = (CompletableFuture <ResponseT >) response ;
283376 return result ;
284- }
285- catch (RuntimeException e ){
377+ } catch (RuntimeException e ) {
286378 CompletableFuture <ResponseT > future = new CompletableFuture <>();
287379 future .completeExceptionally (e );
288380 executor .submit (() -> {
@@ -316,55 +408,75 @@ public void reset() {
316408 requestsStarted .set (0 );
317409 requestsCompleted .set (0 );
318410 operations .set (0 );
319- retryFailures . set ( 0 );
411+ retryFailures = new ConcurrentHashMap <>( );
320412 }
321413 }
322414
323- private static class CountingListener implements BulkListener <Void > {
415+ private static class CountingListener implements BulkListener <Integer > {
416+ public final AtomicInteger sentRequestsTotal = new AtomicInteger ();
324417 public final AtomicInteger successOperations = new AtomicInteger ();
325418 public final AtomicInteger errorOperations = new AtomicInteger ();
326419 public final AtomicInteger requests = new AtomicInteger ();
420+ public final ConcurrentLinkedQueue <Integer > sentContexts = new ConcurrentLinkedQueue <>();
327421
328422 @ Override
329- public void beforeBulk (long executionId , BulkRequest request , List <Void > contexts ) {
330-
423+ public void beforeBulk (long executionId , BulkRequest request , List <Integer > contexts ) {
424+ sentRequestsTotal . incrementAndGet ();
331425 }
332426
333427 @ Override
334- public void afterBulk (long executionId , BulkRequest request , List <Void > contexts ,
428+ public void afterBulk (long executionId , BulkRequest request , List <Integer > contexts ,
335429 BulkResponse response ) {
336430 for (BulkResponseItem item : response .items ()) {
337- if (item .error () != null ) {
431+ if (item .error () != null ) {
338432 errorOperations .incrementAndGet ();
339- }
340- else {
433+ } else {
341434 successOperations .incrementAndGet ();
342435 }
343436 }
437+ if (contexts .stream ().anyMatch (Objects ::nonNull )) {
438+ sentContexts .addAll (contexts );
439+ }
344440 requests .incrementAndGet ();
345441 }
346442
347443 @ Override
348- public void afterBulk (long executionId , BulkRequest request , List <Void > contexts , Throwable failure ) {
444+ public void afterBulk (long executionId , BulkRequest request , List <Integer > contexts ,
445+ Throwable failure ) {
349446 failure .printStackTrace ();
350447 errorOperations .addAndGet (request .operations ().size ());
448+ if (contexts .stream ().anyMatch (Objects ::nonNull )) {
449+ sentContexts .addAll (contexts );
450+ }
351451 requests .incrementAndGet ();
352452 }
353453
354454 public void reset () {
355455 successOperations .set (0 );
356456 errorOperations .set (0 );
357457 requests .set (0 );
458+ sentRequestsTotal .set (0 );
358459 }
359460 }
360461
361- private BulkIngester <Void > newBasicBulkIngester (BulkListener <Void > listener ){
462+ private BulkIngester <Integer > newBasicBulkIngester (BulkListener <Integer > listener ) {
463+ return BulkIngester .of (b -> b
464+ .client (client )
465+ .maxOperations (10 )
466+ .maxConcurrentRequests (10 )
467+ .listener (listener )
468+ .backoffPolicy (BackoffPolicy .constantBackoff (50L , 8 ))
469+ );
470+ }
471+
472+ private BulkIngester <Integer > newBulkIngesterWithFlushAndContext (BulkListener <Integer > listener ) {
362473 return BulkIngester .of (b -> b
363474 .client (client )
364475 .maxOperations (10 )
365476 .maxConcurrentRequests (10 )
366477 .listener (listener )
367- .backoffPolicy (BackoffPolicy .constantBackoff (50L ,8 ))
478+ .flushInterval (1000 , TimeUnit .MILLISECONDS )
479+ .backoffPolicy (BackoffPolicy .constantBackoff (50L , 8 ))
368480 );
369481 }
370482}
0 commit comments