43
43
import org .elasticsearch .common .io .stream .Writeable ;
44
44
import org .elasticsearch .common .util .concurrent .AtomicArray ;
45
45
import org .elasticsearch .features .FeatureService ;
46
- import org .elasticsearch .index .IndexNotFoundException ;
47
46
import org .elasticsearch .index .IndexingPressure ;
48
47
import org .elasticsearch .index .VersionType ;
49
48
import org .elasticsearch .indices .SystemIndices ;
60
59
import java .util .Objects ;
61
60
import java .util .Set ;
62
61
import java .util .SortedMap ;
62
+ import java .util .concurrent .ConcurrentHashMap ;
63
63
import java .util .concurrent .Executor ;
64
64
import java .util .function .Function ;
65
65
import java .util .function .LongSupplier ;
@@ -351,29 +351,36 @@ protected void createMissingIndicesAndIndexData(
351
351
final AtomicArray <BulkItemResponse > responses = new AtomicArray <>(bulkRequest .requests .size ());
352
352
// Optimizing when there are no prerequisite actions
353
353
if (indicesToAutoCreate .isEmpty () && dataStreamsToBeRolledOver .isEmpty () && failureStoresToBeRolledOver .isEmpty ()) {
354
- executeBulk (task , bulkRequest , startTimeNanos , listener , executor , responses , Map . of () );
354
+ executeBulk (task , bulkRequest , startTimeNanos , listener , executor , responses );
355
355
return ;
356
356
}
357
- final Map <String , IndexNotFoundException > indicesThatCannotBeCreated = new HashMap <>();
357
+ Map <String , Exception > indicesExceptions = new ConcurrentHashMap <>();
358
+ Map <String , Exception > dataStreamExceptions = new ConcurrentHashMap <>();
359
+ Map <String , Exception > failureStoreExceptions = new ConcurrentHashMap <>();
358
360
Runnable executeBulkRunnable = () -> executor .execute (new ActionRunnable <>(listener ) {
359
361
@ Override
360
362
protected void doRun () {
361
- executeBulk (task , bulkRequest , startTimeNanos , listener , executor , responses , indicesThatCannotBeCreated );
363
+ failRequestsWhenPrerequisiteActionFailed (
364
+ indicesExceptions ,
365
+ dataStreamExceptions ,
366
+ failureStoreExceptions ,
367
+ bulkRequest ,
368
+ responses
369
+ );
370
+ executeBulk (task , bulkRequest , startTimeNanos , listener , executor , responses );
362
371
}
363
372
});
364
373
try (RefCountingRunnable refs = new RefCountingRunnable (executeBulkRunnable )) {
365
- createIndices (bulkRequest , indicesToAutoCreate , indicesThatCannotBeCreated , responses , refs );
366
- rollOverDataStreams (bulkRequest , dataStreamsToBeRolledOver , false , responses , refs );
367
- rollOverDataStreams (bulkRequest , failureStoresToBeRolledOver , true , responses , refs );
374
+ createIndices (indicesToAutoCreate , refs , indicesExceptions );
375
+ rollOverDataStreams (bulkRequest , dataStreamsToBeRolledOver , false , refs , dataStreamExceptions );
376
+ rollOverDataStreams (bulkRequest , failureStoresToBeRolledOver , true , refs , failureStoreExceptions );
368
377
}
369
378
}
370
379
371
380
private void createIndices (
372
- BulkRequest bulkRequest ,
373
381
Map <String , CreateIndexRequest > indicesToAutoCreate ,
374
- Map <String , IndexNotFoundException > indicesThatCannotBeCreated ,
375
- AtomicArray <BulkItemResponse > responses ,
376
- RefCountingRunnable refs
382
+ RefCountingRunnable refs ,
383
+ final Map <String , Exception > indicesExceptions
377
384
) {
378
385
for (Map .Entry <String , CreateIndexRequest > indexEntry : indicesToAutoCreate .entrySet ()) {
379
386
final String index = indexEntry .getKey ();
@@ -384,25 +391,26 @@ public void onResponse(CreateIndexResponse createIndexResponse) {}
384
391
@ Override
385
392
public void onFailure (Exception e ) {
386
393
final Throwable cause = ExceptionsHelper .unwrapCause (e );
387
- if (cause instanceof IndexNotFoundException indexNotFoundException ) {
388
- synchronized (indicesThatCannotBeCreated ) {
389
- indicesThatCannotBeCreated .put (index , indexNotFoundException );
390
- }
391
- } else if ((cause instanceof ResourceAlreadyExistsException ) == false ) {
394
+ if ((cause instanceof ResourceAlreadyExistsException ) == false ) {
392
395
// fail all requests involving this index, if create didn't work
393
- failRequestsWhenPrerequisiteActionFailed (index , bulkRequest , responses , e );
396
+ indicesExceptions . put (index , e );
394
397
}
395
398
}
396
399
}, refs .acquire ()));
397
400
}
398
401
}
399
402
403
+ // Separate method to allow for overriding in tests.
404
+ void createIndex (CreateIndexRequest createIndexRequest , ActionListener <CreateIndexResponse > listener ) {
405
+ client .execute (AutoCreateAction .INSTANCE , createIndexRequest , listener );
406
+ }
407
+
400
408
private void rollOverDataStreams (
401
409
BulkRequest bulkRequest ,
402
410
Set <String > dataStreamsToBeRolledOver ,
403
411
boolean targetFailureStore ,
404
- AtomicArray < BulkItemResponse > responses ,
405
- RefCountingRunnable refs
412
+ RefCountingRunnable refs ,
413
+ Map < String , Exception > dataStreamExceptions
406
414
) {
407
415
for (String dataStream : dataStreamsToBeRolledOver ) {
408
416
RolloverRequest rolloverRequest = new RolloverRequest (dataStream , null );
@@ -416,7 +424,7 @@ private void rollOverDataStreams(
416
424
}
417
425
// We are executing a lazy rollover because it is an action specialised for this situation, when we want an
418
426
// unconditional and performant rollover.
419
- rolloverClient . execute ( LazyRolloverAction . INSTANCE , rolloverRequest , ActionListener .releaseAfter (new ActionListener <>() {
427
+ rollOver ( rolloverRequest , ActionListener .releaseAfter (new ActionListener <>() {
420
428
421
429
@ Override
422
430
public void onResponse (RolloverResponse result ) {
@@ -431,26 +439,52 @@ public void onResponse(RolloverResponse result) {
431
439
432
440
@ Override
433
441
public void onFailure (Exception e ) {
434
- failRequestsWhenPrerequisiteActionFailed (dataStream , bulkRequest , responses , e );
442
+ dataStreamExceptions . put (dataStream , e );
435
443
}
436
444
}, refs .acquire ()));
437
445
}
438
446
}
439
447
448
+ // Separate method to allow for overriding in tests.
449
+ void rollOver (RolloverRequest rolloverRequest , ActionListener <RolloverResponse > listener ) {
450
+ rolloverClient .execute (LazyRolloverAction .INSTANCE , rolloverRequest , listener );
451
+ }
452
+
440
453
/**
441
- * Fails all requests involving this index or data stream because the prerequisite action failed too .
454
+ * Mark all the requests for which the prerequisite action failed (i.e. index creation or data stream/failure store rollover) as failed.
442
455
*/
443
- private static void failRequestsWhenPrerequisiteActionFailed (
444
- String target ,
456
+ private void failRequestsWhenPrerequisiteActionFailed (
457
+ Map <String , Exception > indicesExceptions ,
458
+ Map <String , Exception > dataStreamExceptions ,
459
+ Map <String , Exception > failureStoreExceptions ,
445
460
BulkRequest bulkRequest ,
446
- AtomicArray <BulkItemResponse > responses ,
447
- Exception error
461
+ AtomicArray <BulkItemResponse > responses
448
462
) {
463
+ if (indicesExceptions .isEmpty () && dataStreamExceptions .isEmpty () && failureStoreExceptions .isEmpty ()) {
464
+ return ;
465
+ }
449
466
for (int i = 0 ; i < bulkRequest .requests .size (); i ++) {
450
467
DocWriteRequest <?> request = bulkRequest .requests .get (i );
451
- if (request != null && setResponseFailureIfIndexMatches ( responses , i , request , target , error ) ) {
452
- bulkRequest . requests . set ( i , null ) ;
468
+ if (request == null ) {
469
+ continue ;
453
470
}
471
+ var exception = indicesExceptions .get (request .index ());
472
+ if (exception == null ) {
473
+ if (request instanceof IndexRequest indexRequest && indexRequest .isWriteToFailureStore ()) {
474
+ exception = failureStoreExceptions .get (request .index ());
475
+ } else {
476
+ exception = dataStreamExceptions .get (request .index ());
477
+ }
478
+ }
479
+ if (exception == null ) {
480
+ continue ;
481
+ }
482
+ var failureStoreStatus = request instanceof IndexRequest ir && ir .isWriteToFailureStore ()
483
+ ? IndexDocFailureStoreStatus .FAILED
484
+ : IndexDocFailureStoreStatus .NOT_APPLICABLE_OR_UNKNOWN ;
485
+ var failure = new BulkItemResponse .Failure (request .index (), request .id (), exception , failureStoreStatus );
486
+ responses .set (i , BulkItemResponse .failure (i , request .opType (), failure ));
487
+ bulkRequest .requests .set (i , null );
454
488
}
455
489
}
456
490
@@ -532,33 +566,13 @@ private static boolean isSystemIndex(SortedMap<String, IndexAbstraction> indices
532
566
}
533
567
}
534
568
535
- void createIndex (CreateIndexRequest createIndexRequest , ActionListener <CreateIndexResponse > listener ) {
536
- client .execute (AutoCreateAction .INSTANCE , createIndexRequest , listener );
537
- }
538
-
539
- private static boolean setResponseFailureIfIndexMatches (
540
- AtomicArray <BulkItemResponse > responses ,
541
- int idx ,
542
- DocWriteRequest <?> request ,
543
- String index ,
544
- Exception e
545
- ) {
546
- if (index .equals (request .index ())) {
547
- BulkItemResponse .Failure failure = new BulkItemResponse .Failure (request .index (), request .id (), e );
548
- responses .set (idx , BulkItemResponse .failure (idx , request .opType (), failure ));
549
- return true ;
550
- }
551
- return false ;
552
- }
553
-
554
569
void executeBulk (
555
570
Task task ,
556
571
BulkRequest bulkRequest ,
557
572
long startTimeNanos ,
558
573
ActionListener <BulkResponse > listener ,
559
574
Executor executor ,
560
- AtomicArray <BulkItemResponse > responses ,
561
- Map <String , IndexNotFoundException > indicesThatCannotBeCreated
575
+ AtomicArray <BulkItemResponse > responses
562
576
) {
563
577
new BulkOperation (
564
578
task ,
@@ -568,7 +582,6 @@ void executeBulk(
568
582
bulkRequest ,
569
583
client ,
570
584
responses ,
571
- indicesThatCannotBeCreated ,
572
585
indexNameExpressionResolver ,
573
586
relativeTimeNanosProvider ,
574
587
startTimeNanos ,
0 commit comments