3838import java .util .Collections ;
3939import java .util .Iterator ;
4040import java .util .List ;
41+ import java .util .LongSummaryStatistics ;
4142import java .util .Optional ;
4243import java .util .concurrent .CompletionStage ;
4344import java .util .concurrent .Executors ;
@@ -72,8 +73,7 @@ public class BulkIngester<Context> implements AutoCloseable {
7273 private BackoffPolicy backoffPolicy ;
7374
7475 // Current state
75- private List <BulkOperationRepeatable <Context >> operations = new ArrayList <>();
76- //private List<Context> contexts = null; // Created on demand
76+ private List <RetryableBulkOperation <Context >> operations = new ArrayList <>();
7777 private long currentSize ;
7878 private int requestsInFlightCount ;
7979 private volatile boolean isClosed = false ;
@@ -190,7 +190,7 @@ public Duration flushInterval() {
190190 * The number of operations that have been buffered, waiting to be sent.
191191 */
192192 public int pendingOperations () {
193- List <BulkOperationRepeatable <Context >> operations = this .operations ;
193+ List <RetryableBulkOperation <Context >> operations = this .operations ;
194194 return operations == null ? 0 : operations .size ();
195195 }
196196
@@ -296,40 +296,34 @@ private void failsafeFlush() {
296296 }
297297
298298 public void flush () {
299- List <BulkOperationRepeatable <Context >> sentRequests = new ArrayList <>();
299+ List <RetryableBulkOperation <Context >> sentRequests = new ArrayList <>();
300300 RequestExecution <Context > exec = sendRequestCondition .whenReadyIf (
301301 () -> {
302302 // May happen on manual and periodic flushes
303303 return !operations .isEmpty () && operations .stream ()
304- .anyMatch (BulkOperationRepeatable ::isSendable );
304+ .anyMatch (RetryableBulkOperation ::isSendable );
305305 },
306306 () -> {
307- // Selecting operations that can be sent immediately
308- List <BulkOperationRepeatable <Context >> immediateOpsRep = operations .stream ()
309- .filter (BulkOperationRepeatable ::isSendable )
310- .collect (Collectors .toList ());
311-
307+ // Selecting operations that can be sent immediately,
312308 // Dividing actual operations from contexts
313- List <BulkOperation > immediateOps = immediateOpsRep .stream ()
314- .map (BulkOperationRepeatable ::getOperation )
315- .collect (Collectors .toList ());
309+ List <BulkOperation > immediateOps = new ArrayList <>();
310+ List <Context > contexts = new ArrayList <>();
316311
317- List <Context > contexts = immediateOpsRep .stream ()
318- .map (BulkOperationRepeatable ::getContext )
319- .collect (Collectors .toList ());
312+ for (Iterator <RetryableBulkOperation <Context >> it = operations .iterator (); it .hasNext ();){
313+ RetryableBulkOperation <Context > op = it .next ();
314+ if (op .isSendable ()) {
315+ immediateOps .add (op .operation ());
316+ contexts .add (op .context ());
320317
321- // If all contexts are null, no need for the list
322- // TODO want to keep?
323- // if (contexts.stream().allMatch(Objects::isNull)) {
324- // contexts = new ArrayList<>();
325- // }
318+ sentRequests .add (op );
319+ it .remove ();
320+ }
321+ }
326322
327323 // Build the request
328324 BulkRequest request = newRequest ().operations (immediateOps ).build ();
329325
330326 // Prepare for next round
331- sentRequests .addAll (immediateOpsRep );
332- operations .removeAll (immediateOpsRep );
333327 currentSize = operations .size ();
334328 addCondition .signalIfReady ();
335329
@@ -368,40 +362,40 @@ public void flush() {
368362 // Partial success, retrying failed requests if policy allows it
369363 // Keeping list of retryable requests/responses, to exclude them for calling
370364 // listener later
371- List <BulkOperationRepeatable <Context >> retryableReq = new ArrayList <>();
372- List <BulkOperationRepeatable <Context >> refires = new ArrayList <>();
365+ List <RetryableBulkOperation <Context >> retryableReq = new ArrayList <>();
366+ List <RetryableBulkOperation <Context >> refires = new ArrayList <>();
373367 List <BulkResponseItem > retryableResp = new ArrayList <>();
374368
375369 for (BulkResponseItem bulkItemResponse : failedRequestsCanRetry ) {
376370 int index = resp .items ().indexOf (bulkItemResponse );
377- selectingRetries (index , bulkItemResponse , sentRequests , retryableResp ,
378- retryableReq , refires );
371+ selectingRetries (index , bulkItemResponse , sentRequests , retryableResp , retryableReq , refires );
379372 }
380373 // Scheduling flushes for just sent out retryable requests
381374 if (!refires .isEmpty ()) {
382375 scheduleRetries (refires );
383376 }
384377 // Retrieving list of remaining successful or not retryable requests
385- sentRequests . removeAll ( retryableReq );
378+ retryableReq . forEach ( sentRequests :: remove );
386379 if (!sentRequests .isEmpty ()) {
387380 if (listener != null ) {
388381 // Creating partial BulkRequest
389- BulkRequest partialRequest = newRequest ()
390- .operations (sentRequests .stream ()
391- .map (BulkOperationRepeatable ::getOperation )
392- .collect (Collectors .toList ()))
393- .build ();
394- // Getting contexts
395- List <Context > partialCtx = sentRequests .stream ()
396- .map (BulkOperationRepeatable ::getContext )
397- .collect (Collectors .toList ());
382+ List <BulkOperation > partialOps = new ArrayList <>();
383+ List <Context > partialCtx = new ArrayList <>();
384+ for (RetryableBulkOperation <Context > op : sentRequests ) {
385+ partialOps .add (op .operation ());
386+ partialCtx .add (op .context ());
387+ }
388+ BulkRequest partialRequest = newRequest ().operations (partialOps ).build ();
389+
398390 // Filtering response
399- List <BulkResponseItem > partialItems = new ArrayList <>(resp .items ());
400- partialItems .removeAll (retryableResp );
391+ List <BulkResponseItem > partialItems = resp .items ()
392+ .stream ()
393+ .filter (i -> !retryableResp .contains (i ))
394+ .collect (Collectors .toList ());
401395
402396 BulkResponse partialResp = BulkResponse .of (br -> br
403397 .items (partialItems )
404- .errors (resp .errors ()) // TODO sure?
398+ .errors (resp .errors ())
405399 .took (resp .took ())
406400 .ingestTook (resp .ingestTook ()));
407401
@@ -434,24 +428,23 @@ public void flush() {
434428 }
435429
436430 private void selectingRetries (int index , BulkResponseItem bulkItemResponse ,
437- List <BulkOperationRepeatable <Context >> sentRequests ,
431+ List <RetryableBulkOperation <Context >> sentRequests ,
438432 List <BulkResponseItem > retryableResp ,
439- List <BulkOperationRepeatable <Context >> retryableReq ,
440- List <BulkOperationRepeatable <Context >> refires ) {
433+ List <RetryableBulkOperation <Context >> retryableReq ,
434+ List <RetryableBulkOperation <Context >> refires ) {
441435
442436 // Getting original failed, requests and keeping successful ones to send to the listener
443- BulkOperationRepeatable <Context > original = sentRequests .get (index );
437+ RetryableBulkOperation <Context > original = sentRequests .get (index );
444438 if (original .canRetry ()) {
445439 retryableResp .add (bulkItemResponse );
446- Iterator <Long > retries =
447- Optional .ofNullable (original .getRetries ()).orElse (backoffPolicy .iterator ());
448- BulkOperationRepeatable <Context > refire = new BulkOperationRepeatable <>(original .getOperation (), original .getContext (), retries );
440+ Iterator <Long > retryTimes = Optional .ofNullable (original .retries ()).orElse (backoffPolicy .iterator ());
441+ RetryableBulkOperation <Context > refire = new RetryableBulkOperation <>(original .operation (), original .context (), retryTimes );
449442 retryableReq .add (original );
450443 refires .add (refire );
451444 addRetry (refire );
452- logger .warn ("Added failed request back in queue, retrying in : " + refire .getCurrentRetryTimeDelay () + " ms" );
445+ logger .warn ("Added failed request back in queue, retrying in : " + refire .currentRetryTimeDelay () + " ms" );
453446 } else {
454- logger .warn ("Retries finished for request: " + original .getOperation ()._kind ().toString ());
447+ logger .warn ("Retries finished for request: " + original .operation ()._kind ().toString ());
455448 }
456449 }
457450
@@ -485,40 +478,31 @@ private void listenerAfterBulkSuccess(BulkResponse resp, RequestExecution<Contex
485478 }
486479 }
487480
488- private void scheduleRetries (List <BulkOperationRepeatable <Context >> retryableReq ) {
489- List <Long > sortedDelays = retryableReq .stream ()
490- .map (BulkOperationRepeatable ::getCurrentRetryTimeDelay )
491- .distinct ()
492- .sorted ()
493- .collect (Collectors .toList ());
494-
495- // scheduling earlier delay, first in list
496- retryScheduler .schedule (this ::flush , sortedDelays .get (0 ), TimeUnit .MILLISECONDS );
497- if (sortedDelays .size () == 2 ) {
498- // special case, scheduling both delays
499- retryScheduler .schedule (this ::flush , sortedDelays .get (1 ), TimeUnit .MILLISECONDS );
500- } else if (sortedDelays .size () > 2 ) {
501- // general case, scheduling median and latest delays
502- retryScheduler .schedule (this ::flush , sortedDelays .get (sortedDelays .size () / 2 ),
503- TimeUnit .MILLISECONDS );
504- retryScheduler .schedule (this ::flush , sortedDelays .get (sortedDelays .size () - 1 ),
505- TimeUnit .MILLISECONDS );
506- }
481+ private void scheduleRetries (List <RetryableBulkOperation <Context >> retryableReq ) {
482+ LongSummaryStatistics statsDelays = retryableReq .stream ()
483+ .map (RetryableBulkOperation ::currentRetryTimeDelay )
484+ .mapToLong (Long ::longValue )
485+ .summaryStatistics ();
486+
487+ // scheduling earlier and latest delay
488+ retryScheduler .schedule (this ::flush , statsDelays .getMin (), TimeUnit .MILLISECONDS );
489+ retryScheduler .schedule (this ::flush , statsDelays .getMax (), TimeUnit .MILLISECONDS );
490+
507491 }
508492
509493 public void add (BulkOperation operation , Context context ) {
510494 if (isClosed ) {
511495 throw new IllegalStateException ("Ingester has been closed" );
512496 }
513497
514- BulkOperationRepeatable <Context > repeatableOp = new BulkOperationRepeatable <>(operation , context ,
498+ RetryableBulkOperation <Context > repeatableOp = new RetryableBulkOperation <>(operation , context ,
515499 null );
516500
517501 innerAdd (repeatableOp );
518502 }
519503
520504 // Same as "add", but skips the closed check to allow retries to be added even after ingester closure
521- private void addRetry (BulkOperationRepeatable <Context > repeatableOp ) {
505+ private void addRetry (RetryableBulkOperation <Context > repeatableOp ) {
522506 // Sending the operation back in the queue using the retry scheduler
523507 retriesInProgressCount .incrementAndGet ();
524508 retryScheduler .submit (() -> {
@@ -532,7 +516,7 @@ private void addRetry(BulkOperationRepeatable<Context> repeatableOp) {
532516 });
533517 }
534518
535- private void innerAdd (BulkOperationRepeatable <Context > repeatableOp ) {
519+ private void innerAdd (RetryableBulkOperation <Context > repeatableOp ) {
536520 IngesterOperation ingestOp = IngesterOperation .of (repeatableOp , client ._jsonpMapper ());
537521
538522 addCondition .whenReady (() -> {
0 commit comments