2424import co .elastic .clients .elasticsearch .core .BulkRequest ;
2525import co .elastic .clients .elasticsearch .core .BulkResponse ;
2626import co .elastic .clients .elasticsearch .core .bulk .BulkOperation ;
27+ import co .elastic .clients .elasticsearch .core .bulk .BulkResponseItem ;
2728import co .elastic .clients .transport .BackoffPolicy ;
2829import co .elastic .clients .transport .TransportOptions ;
2930import co .elastic .clients .util .ApiTypeHelper ;
3334
3435import javax .annotation .Nullable ;
3536import java .time .Duration ;
36- import java .time .Instant ;
3737import java .util .ArrayList ;
3838import java .util .Collections ;
39+ import java .util .Iterator ;
3940import java .util .List ;
4041import java .util .Optional ;
4142import java .util .concurrent .CompletionStage ;
@@ -66,6 +67,7 @@ public class BulkIngester<Context> implements AutoCloseable {
6667
6768 private @ Nullable ScheduledFuture <?> flushTask ;
6869 private @ Nullable ScheduledExecutorService scheduler ;
70+ private @ Nullable ScheduledExecutorService retryScheduler ;
6971 private boolean isExternalScheduler = false ;
7072 private BackoffPolicy backoffPolicy ;
7173
@@ -81,7 +83,7 @@ public class BulkIngester<Context> implements AutoCloseable {
8183 private final FnCondition addCondition = new FnCondition (lock , this ::canAddOperation );
8284 private final FnCondition sendRequestCondition = new FnCondition (lock , this ::canSendRequest );
8385 private final FnCondition closeCondition = new FnCondition (lock , this ::closedAndFlushed );
84- private AtomicInteger listenerInProgressCount = new AtomicInteger ();
86+ private final AtomicInteger listenerInProgressCount = new AtomicInteger ();
8587
8688 private static class RequestExecution <Context > {
8789 public final long id ;
@@ -138,6 +140,21 @@ private BulkIngester(Builder<Context> builder) {
138140 if (backoffPolicy == null ) {
139141 backoffPolicy = BackoffPolicy .noBackoff ();
140142 }
143+ // preparing a scheduler that will trigger flushes when it finds enqueued requests ready to be retried
144+ // TODO should we just keep a single scheduler?
145+ else {
146+ retryScheduler = Executors .newScheduledThreadPool (maxRequests + 1 , (r ) -> {
147+ Thread t = Executors .defaultThreadFactory ().newThread (r );
148+ t .setName ("bulk-ingester-retry#" + ingesterId + "#" + t .getId ());
149+ t .setDaemon (true );
150+ return t ;
151+ });
152+ retryScheduler .scheduleWithFixedDelay (
153+ this ::retryFlush ,
154+ 1000 ,1000 , // TODO should we hardcode this?
155+ TimeUnit .MILLISECONDS
156+ );
157+ }
141158 }
142159
143160 //----- Getters
@@ -283,9 +300,20 @@ private void failsafeFlush() {
283300 }
284301 }
285302
303+ // triggers a flush if it finds queued retries
304+ private void retryFlush () {
305+ try {
306+ if (operations .stream ().anyMatch (op -> op .getRetries () != null && op .isSendable ())) {
307+ flush ();
308+ }
309+ } catch (Throwable thr ) {
310+ // Log the error and continue
311+ logger .error ("Error in background flush" , thr );
312+ }
313+ }
314+
286315 public void flush () {
287- // Keeping sent operations for possible retries
288- List <BulkOperationRepeatable <Context >> requestsSent = new ArrayList <>();
316+ List <BulkOperationRepeatable <Context >> sentRequests = new ArrayList <>();
289317 RequestExecution <Context > exec = sendRequestCondition .whenReadyIf (
290318 () -> {
291319 // May happen on manual and periodic flushes
@@ -294,7 +322,7 @@ public void flush() {
294322 () -> {
295323 // Selecting operations that can be sent immediately
296324 List <BulkOperationRepeatable <Context >> immediateOpsRep = operations .stream ()
297- .filter (BulkOperationRepeatable ::canRetry )
325+ .filter (BulkOperationRepeatable ::isSendable )
298326 .collect (Collectors .toList ());
299327
300328 // Dividing actual operations from contexts
@@ -309,11 +337,12 @@ public void flush() {
309337 // Build the request
310338 BulkRequest request = newRequest ().operations (immediateOps ).build ();
311339
312- List <Context > requestContexts = contexts .isEmpty () ? Collections .nCopies (immediateOpsRep .size (),
313- null ) : contexts ;
340+ List <Context > requestContexts = contexts .isEmpty () ?
341+ Collections .nCopies (immediateOpsRep .size (),
342+ null ) : contexts ;
314343
315344 // Prepare for next round
316- requestsSent .addAll (immediateOpsRep );
345+ sentRequests .addAll (immediateOpsRep );
317346 operations .removeAll (immediateOpsRep );
318347 currentSize = operations .size ();
319348 addCondition .signalIfReady ();
@@ -340,18 +369,43 @@ public void flush() {
340369 // A request was actually sent
341370 exec .futureResponse .handle ((resp , thr ) -> {
342371 if (resp != null ) {
343- // Success
344- if (listener != null ) {
345- listenerInProgressCount .incrementAndGet ();
346- scheduler .submit (() -> {
347- try {
348- listener .afterBulk (exec .id , exec .request , exec .contexts , resp );
349- } finally {
350- if (listenerInProgressCount .decrementAndGet () == 0 ) {
351- closeCondition .signalIfReady ();
372+ // Success? Checking if total or partial
373+ List <BulkResponseItem > failedRequestsCanRetry = resp .items ().stream ()
374+ .filter (i -> i .error () != null && i .status () == 429 )
375+ .collect (Collectors .toList ());
376+
377+ if (failedRequestsCanRetry .isEmpty () || !backoffPolicy .equals (BackoffPolicy .noBackoff ())) {
378+ // Total success! ...or there's no retry policy implemented. Either way, can call
379+ // listener after bulk
380+ if (listener != null ) {
381+ listenerInProgressCount .incrementAndGet ();
382+ scheduler .submit (() -> {
383+ try {
384+ listener .afterBulk (exec .id , exec .request , exec .contexts , resp );
385+ } finally {
386+ if (listenerInProgressCount .decrementAndGet () == 0 ) {
387+ closeCondition .signalIfReady ();
388+ }
352389 }
390+ });
391+ }
392+ } else {
393+ // Partial success, retrying failed requests if policy allows it
394+ // Getting original requests
395+ for (BulkResponseItem bulkItemResponse : failedRequestsCanRetry ) {
396+ int index = resp .items ().indexOf (bulkItemResponse );
397+ BulkOperationRepeatable <Context > original = sentRequests .get (index );
398+ if (original .getRetries ().hasNext ()) {
399+ Iterator <Long > retries =
400+ Optional .ofNullable (original .getRetries ()).orElse (backoffPolicy .iterator ());
401+ addRetry (new BulkOperationRepeatable <>(original .getOperation (),
402+ original .getContext (), retries ));
403+ // TODO remove after checking
404+ assert (bulkItemResponse .operationType ().toString ().equals (sentRequests .get (index ).getOperation ()._kind ().toString ()));
353405 }
354- });
406+ // TODO should print some message?
407+
408+ }
355409 }
356410 } else {
357411 // Failure
@@ -383,7 +437,8 @@ public void add(BulkOperation operation, Context context) {
383437 throw new IllegalStateException ("Ingester has been closed" );
384438 }
385439
386- BulkOperationRepeatable <Context > repeatableOp = new BulkOperationRepeatable <>(operation , context , Optional .empty ());
440+ BulkOperationRepeatable <Context > repeatableOp = new BulkOperationRepeatable <>(operation , context ,
441+ null );
387442
388443 innerAdd (repeatableOp );
389444 }
0 commit comments