3838import java .util .Collections ;
3939import java .util .Iterator ;
4040import java .util .List ;
41+ import java .util .Objects ;
4142import java .util .Optional ;
4243import java .util .concurrent .CompletionStage ;
4344import java .util .concurrent .Executors ;
@@ -84,11 +85,11 @@ public class BulkIngester<Context> implements AutoCloseable {
8485 private final FnCondition sendRequestCondition = new FnCondition (lock , this ::canSendRequest );
8586 private final FnCondition closeCondition = new FnCondition (lock , this ::closedAndFlushed );
8687 private final AtomicInteger listenerInProgressCount = new AtomicInteger ();
88+ private final AtomicInteger retriesInProgressCount = new AtomicInteger ();
8789
8890 private static class RequestExecution <Context > {
8991 public final long id ;
9092 public final BulkRequest request ;
91- // TODO context list grows in size while elements are null
9293 public final List <Context > contexts ;
9394 public final CompletionStage <BulkResponse > futureResponse ;
9495
@@ -140,20 +141,14 @@ private BulkIngester(Builder<Context> builder) {
140141 if (backoffPolicy == null ) {
141142 backoffPolicy = BackoffPolicy .noBackoff ();
142143 }
143- // preparing a scheduler that will trigger flushes when it finds enqueued requests ready to be retried
144- // TODO should we just keep a single scheduler?
144+ // preparing a scheduler that will trigger flushes to retry failed requests
145145 else {
146146 retryScheduler = Executors .newScheduledThreadPool (maxRequests + 1 , (r ) -> {
147147 Thread t = Executors .defaultThreadFactory ().newThread (r );
148148 t .setName ("bulk-ingester-retry#" + ingesterId + "#" + t .getId ());
149149 t .setDaemon (true );
150150 return t ;
151151 });
152- retryScheduler .scheduleWithFixedDelay (
153- this ::retryFlush ,
154- 1000 , 1000 , // TODO should we hardcode this?
155- TimeUnit .MILLISECONDS
156- );
157152 }
158153 }
159154
@@ -264,7 +259,8 @@ private boolean canAddOperation() {
264259 }
265260
266261 private boolean closedAndFlushed () {
267- return isClosed && operations .isEmpty () && requestsInFlightCount == 0 && listenerInProgressCount .get () == 0 ;
262+ return isClosed && operations .isEmpty () && requestsInFlightCount == 0
263+ && listenerInProgressCount .get () == 0 && retriesInProgressCount .get () == 0 ;
268264 }
269265
270266 //----- Ingester logic
@@ -300,18 +296,6 @@ private void failsafeFlush() {
300296 }
301297 }
302298
303- // triggers a flush if it finds queued retries
304- private void retryFlush () {
305- try {
306- if (operations .stream ().anyMatch (op -> op .getRetries () != null && op .isSendable ())) {
307- flush ();
308- }
309- } catch (Throwable thr ) {
310- // Log the error and continue
311- logger .error ("Error in background flush" , thr );
312- }
313- }
314-
315299 public void flush () {
316300 List <BulkOperationRepeatable <Context >> sentRequests = new ArrayList <>();
317301 RequestExecution <Context > exec = sendRequestCondition .whenReadyIf (
@@ -334,13 +318,14 @@ public void flush() {
334318 .map (BulkOperationRepeatable ::getContext )
335319 .collect (Collectors .toList ());
336320
321+ // If all contexts are null, no need for the list
322+ if (contexts .stream ().allMatch (Objects ::isNull )) {
323+ contexts = null ;
324+ }
325+
337326 // Build the request
338327 BulkRequest request = newRequest ().operations (immediateOps ).build ();
339328
340- List <Context > requestContexts = contexts .isEmpty () ?
341- Collections .nCopies (immediateOpsRep .size (),
342- null ) : contexts ;
343-
344329 // Prepare for next round
345330 sentRequests .addAll (immediateOpsRep );
346331 operations .removeAll (immediateOpsRep );
@@ -351,7 +336,7 @@ public void flush() {
351336
352337 if (listener != null ) {
353338 // synchronous execution to make sure it actually runs before
354- listener .beforeBulk (id , request , requestContexts );
339+ listener .beforeBulk (id , request , contexts );
355340 }
356341
357342 CompletionStage <BulkResponse > result = client .bulk (request );
@@ -362,13 +347,14 @@ public void flush() {
362347 request = null ;
363348 }
364349
365- return new RequestExecution <>(id , request , requestContexts , result );
350+ return new RequestExecution <>(id , request , contexts , result );
366351 });
367352
368353 if (exec != null ) {
369354 // A request was actually sent
370355 exec .futureResponse .handle ((resp , thr ) -> {
371356 if (resp != null ) {
357+
372358 // Success? Checking if total or partial
373359 List <BulkResponseItem > failedRequestsCanRetry = resp .items ().stream ()
374360 .filter (i -> i .error () != null && i .status () == 429 )
@@ -391,23 +377,73 @@ public void flush() {
391377 }
392378 } else {
393379 // Partial success, retrying failed requests if policy allows it
394- // Getting original requests
380+ // Keeping list of retryables, to exclude them for calling listener later
381+ List <BulkOperationRepeatable <Context >> retryableReq = new ArrayList <>();
382+ List <BulkResponseItem > retryableResp = new ArrayList <>();
395383 for (BulkResponseItem bulkItemResponse : failedRequestsCanRetry ) {
396384 int index = resp .items ().indexOf (bulkItemResponse );
385+ // Getting original failed, requests and keeping successful ones to send to the
386+ // listener
397387 BulkOperationRepeatable <Context > original = sentRequests .get (index );
398388 if (original .canRetry ()) {
389+ retryableResp .add (bulkItemResponse );
399390 Iterator <Long > retries =
400391 Optional .ofNullable (original .getRetries ()).orElse (backoffPolicy .iterator ());
401- addRetry (new BulkOperationRepeatable <>(original .getOperation (),
402- original .getContext (), retries ));
392+ BulkOperationRepeatable <Context > refire =
393+ new BulkOperationRepeatable <>(original .getOperation (),
394+ original .getContext (), retries );
395+ retryableReq .add (refire );
396+ addRetry (refire );
397+ logger .warn ("Added failed request back in queue, retrying in : " + refire .getCurrentRetryTimeDelay () + " ms" );
403398 // TODO remove after checking
404399 assert (bulkItemResponse .operationType ().toString ().equals (sentRequests .get (index ).getOperation ()._kind ().toString ()));
400+ } else {
401+ logger .warn ("Retries finished for request: " + original .getOperation ()._kind ().toString ());
405402 }
406- else {
407- System .out .println ("Retries finished" );
408- // TODO should print some message?
403+ }
404+ // Scheduling flushes for just sent out retryable requests
405+ if (!retryableReq .isEmpty ()) {
406+ // if size <= 3, all times
407+ // if size > 3, schedule just first, last and median
408+ scheduleRetries (retryableReq );
409+ }
410+ // Retrieving list of remaining successful or not retryable requests
411+ sentRequests .removeAll (retryableReq );
412+ if (!sentRequests .isEmpty ()) {
413+ if (listener != null ) {
414+ // Creating partial BulkRequest
415+ BulkRequest partialRequest = newRequest ()
416+ .operations (sentRequests .stream ()
417+ .map (BulkOperationRepeatable ::getOperation )
418+ .collect (Collectors .toList ()))
419+ .build ();
420+ // Getting contexts
421+ List <Context > partialCtx = sentRequests .stream ()
422+ .map (BulkOperationRepeatable ::getContext )
423+ .collect (Collectors .toList ());
424+ // Filtering response
425+ List <BulkResponseItem > partialItems = resp .items ();
426+ partialItems .removeAll (retryableResp );
427+
428+ BulkResponse partialResp = BulkResponse .of (br -> br
429+ .items (partialItems )
430+ .errors (resp .errors ()) // TODO sure?
431+ .took (resp .took ())
432+ .ingestTook (resp .ingestTook ()));
433+
434+ listenerInProgressCount .incrementAndGet ();
435+ scheduler .submit (() -> {
436+ try {
437+ listener .afterBulk (exec .id , partialRequest , partialCtx , partialResp );
438+ } finally {
439+ if (listenerInProgressCount .decrementAndGet () == 0 ) {
440+ closeCondition .signalIfReady ();
441+ }
442+ }
443+ });
409444 }
410445 }
446+
411447 }
412448 } else {
413449 // Failure
@@ -434,6 +470,27 @@ public void flush() {
434470 }
435471 }
436472
473+ private void scheduleRetries (List <BulkOperationRepeatable <Context >> retryableReq ) {
474+ List <Long > sortedDelays = retryableReq .stream ()
475+ .map (BulkOperationRepeatable ::getCurrentRetryTimeDelay )
476+ .distinct ()
477+ .sorted ()
478+ .collect (Collectors .toList ());
479+
480+ // scheduling earlier delay, first in list
481+ retryScheduler .schedule (this ::flush , sortedDelays .get (0 ), TimeUnit .MILLISECONDS );
482+ if (sortedDelays .size () == 2 ) {
483+ // special case, scheduling both delays
484+ retryScheduler .schedule (this ::flush , sortedDelays .get (1 ), TimeUnit .MILLISECONDS );
485+ } else if (sortedDelays .size () > 2 ) {
486+ // general case, scheduling median and latest delays
487+ retryScheduler .schedule (this ::flush , sortedDelays .get (sortedDelays .size () / 2 ),
488+ TimeUnit .MILLISECONDS );
489+ retryScheduler .schedule (this ::flush , sortedDelays .get (sortedDelays .size () - 1 ),
490+ TimeUnit .MILLISECONDS );
491+ }
492+ }
493+
437494 public void add (BulkOperation operation , Context context ) {
438495 if (isClosed ) {
439496 throw new IllegalStateException ("Ingester has been closed" );
@@ -445,30 +502,25 @@ public void add(BulkOperation operation, Context context) {
445502 innerAdd (repeatableOp );
446503 }
447504
505+ // Same as "add", but skips the closed check to allow retries to be added even after ingester closure
448506 private void addRetry (BulkOperationRepeatable <Context > repeatableOp ) {
449- innerAdd (repeatableOp );
507+ // Sending the operation back in the queue using the retry scheduler
508+ retriesInProgressCount .incrementAndGet ();
509+ retryScheduler .submit (() -> {
510+ try {
511+ innerAdd (repeatableOp );
512+ } finally {
513+ if (retriesInProgressCount .decrementAndGet () == 0 ) {
514+ closeCondition .signalIfReady ();
515+ }
516+ }
517+ });
450518 }
451519
452520 private void innerAdd (BulkOperationRepeatable <Context > repeatableOp ) {
453521 IngesterOperation ingestOp = IngesterOperation .of (repeatableOp , client ._jsonpMapper ());
454522
455523 addCondition .whenReady (() -> {
456-
457- // Context context = repeatableOp.getContext();
458- //
459- // if (context != null) {
460- // // Lazily build the context list
461- // if (contexts == null) {
462- // int size = operations.size();
463- // if (size == 0) {
464- // contexts = new ArrayList<>();
465- // } else {
466- // contexts = new ArrayList<>(Collections.nCopies(size, null));
467- // }
468- // }
469- // contexts.add(context);
470- // }
471-
472524 operations .add (ingestOp .operation ());
473525 currentSize += ingestOp .size ();
474526
0 commit comments