|
21 | 21 | import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; |
22 | 22 | import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; |
23 | 23 | import org.elasticsearch.action.bulk.FailureStoreMetrics; |
| 24 | +import org.elasticsearch.action.bulk.IndexDocFailureStoreStatus; |
24 | 25 | import org.elasticsearch.action.bulk.TransportBulkAction; |
25 | 26 | import org.elasticsearch.action.index.IndexRequest; |
26 | 27 | import org.elasticsearch.action.ingest.DeletePipelineRequest; |
@@ -729,12 +730,32 @@ void validatePipeline(Map<DiscoveryNode, IngestInfo> ingestInfos, String pipelin |
729 | 730 | ExceptionsHelper.rethrowAndSuppress(exceptions); |
730 | 731 | } |
731 | 732 |
|
732 | | - private record IngestPipelinesExecutionResult(boolean success, boolean shouldKeep, Exception exception, String failedIndex) { |
733 | | - |
734 | | - private static final IngestPipelinesExecutionResult SUCCESSFUL_RESULT = new IngestPipelinesExecutionResult(true, true, null, null); |
735 | | - private static final IngestPipelinesExecutionResult DISCARD_RESULT = new IngestPipelinesExecutionResult(true, false, null, null); |
| 733 | + private record IngestPipelinesExecutionResult( |
| 734 | + boolean success, |
| 735 | + boolean shouldKeep, |
| 736 | + Exception exception, |
| 737 | + String failedIndex, |
| 738 | + IndexDocFailureStoreStatus failureStoreStatus |
| 739 | + ) { |
| 740 | + private static final IngestPipelinesExecutionResult SUCCESSFUL_RESULT = new IngestPipelinesExecutionResult( |
| 741 | + true, |
| 742 | + true, |
| 743 | + null, |
| 744 | + null, |
| 745 | + IndexDocFailureStoreStatus.NOT_APPLICABLE_OR_UNKNOWN |
| 746 | + ); |
| 747 | + private static final IngestPipelinesExecutionResult DISCARD_RESULT = new IngestPipelinesExecutionResult( |
| 748 | + true, |
| 749 | + false, |
| 750 | + null, |
| 751 | + null, |
| 752 | + IndexDocFailureStoreStatus.NOT_APPLICABLE_OR_UNKNOWN |
| 753 | + ); |
736 | 754 | private static IngestPipelinesExecutionResult failAndStoreFor(String index, Exception e) { |
737 | | - return new IngestPipelinesExecutionResult(false, true, e, index); |
| 755 | + return new IngestPipelinesExecutionResult(false, true, e, index, IndexDocFailureStoreStatus.USED); |
| 756 | + } |
| 757 | + private static IngestPipelinesExecutionResult failWithoutStoringIn(String index, Exception e) { |
| 758 | + return new IngestPipelinesExecutionResult(false, true, e, index, IndexDocFailureStoreStatus.NOT_ENABLED); |
738 | 759 | } |
739 | 760 | } |
740 | 761 |
|
@@ -764,7 +785,7 @@ public void executeBulkRequest( |
764 | 785 | final IntConsumer onDropped, |
765 | 786 | final Function<String, Boolean> resolveFailureStore, |
766 | 787 | final TriConsumer<Integer, String, Exception> onStoreFailure, |
767 | | - final BiConsumer<Integer, Exception> onFailure, |
| 788 | + final TriConsumer<Integer, Exception, IndexDocFailureStoreStatus> onFailure, |
768 | 789 | final BiConsumer<Thread, Exception> onCompletion, |
769 | 790 | final Executor executor |
770 | 791 | ) { |
@@ -821,18 +842,26 @@ public void onResponse(IngestPipelinesExecutionResult result) { |
821 | 842 | firstPipeline.getMetrics().postIngestBytes(indexRequest.ramBytesUsed()); |
822 | 843 | } |
823 | 844 | } else { |
824 | | - // We were given a failure result in the onResponse method, so we must store the failure |
825 | | - // Recover the original document state, track a failed ingest, and pass it along |
826 | | - updateIndexRequestMetadata(indexRequest, originalDocumentMetadata); |
827 | 845 | totalMetrics.ingestFailed(); |
828 | | - onStoreFailure.apply(slot, result.failedIndex, result.exception); |
| 846 | + if (IndexDocFailureStoreStatus.NOT_ENABLED.equals(result.failureStoreStatus)) { |
| 847 | + // A failure result, but despite the target being a data stream, it does not have failure |
| 848 | + // storage enabled currently. Capture the status in the onFailure call and skip any further |
| 849 | + // processing |
| 850 | + onFailure.apply(slot, result.exception, result.failureStoreStatus); |
| 851 | + } else { |
| 852 | + // We were given a failure result in the onResponse method, so we must store the failure |
| 853 | + // Recover the original document state, track a failed ingest, and pass it along |
| 854 | + updateIndexRequestMetadata(indexRequest, originalDocumentMetadata); |
| 855 | + onStoreFailure.apply(slot, result.failedIndex, result.exception); |
| 856 | + } |
829 | 857 | } |
830 | 858 | } |
831 | 859 |
|
832 | 860 | @Override |
833 | 861 | public void onFailure(Exception e) { |
| 862 | + // The target of the request does not allow failure storage, or failed for unforeseen reason |
834 | 863 | totalMetrics.ingestFailed(); |
835 | | - onFailure.accept(slot, e); |
| 864 | + onFailure.apply(slot, e, IndexDocFailureStoreStatus.NOT_APPLICABLE_OR_UNKNOWN); |
836 | 865 | } |
837 | 866 | }, |
838 | 867 | () -> { |
@@ -954,15 +983,15 @@ private void executePipelines( |
954 | 983 | if (failureStoreResolution != null && failureStoreResolution) { |
955 | 984 | failureStoreMetrics.incrementFailureStore(originalIndex, errorType, FailureStoreMetrics.ErrorLocation.PIPELINE); |
956 | 985 | listener.onResponse(IngestPipelinesExecutionResult.failAndStoreFor(originalIndex, e)); |
| 986 | + } else if (failureStoreResolution != null) { |
| 987 | + // If this document targeted a data stream that didn't have the failure store enabled, we increment |
| 988 | + // the rejected counter. |
| 989 | + // We also increment the total counter because this request will not reach the code that increments |
| 990 | + // the total counter for non-rejected documents. |
| 991 | + failureStoreMetrics.incrementTotal(originalIndex); |
| 992 | + failureStoreMetrics.incrementRejected(originalIndex, errorType, FailureStoreMetrics.ErrorLocation.PIPELINE, false); |
| 993 | + listener.onResponse(IngestPipelinesExecutionResult.failWithoutStoringIn(originalIndex, e)); |
957 | 994 | } else { |
958 | | - if (failureStoreResolution != null) { |
959 | | - // If this document targeted a data stream that didn't have the failure store enabled, we increment |
960 | | - // the rejected counter. |
961 | | - // We also increment the total counter because this request will not reach the code that increments |
962 | | - // the total counter for non-rejected documents. |
963 | | - failureStoreMetrics.incrementTotal(originalIndex); |
964 | | - failureStoreMetrics.incrementRejected(originalIndex, errorType, FailureStoreMetrics.ErrorLocation.PIPELINE, false); |
965 | | - } |
966 | 995 | listener.onFailure(e); |
967 | 996 | } |
968 | 997 | }; |
|
0 commit comments