Skip to content

Commit a580a21

Browse files
[8.x] Return failure store status on ingest failures for data streams without failure store enabled (#121544) (#123170)
* Return failure store status on ingest failures for data streams without failure store enabled (#121544) Updates the on failure handler to accept the additional context required to report to the client whether or not the document could have been rerouted if failure store was enabled. --------- Co-authored-by: elasticsearchmachine <[email protected]> (cherry picked from commit 59fc383) * fix test compile --------- Co-authored-by: Elastic Machine <[email protected]>
1 parent 284b621 commit a580a21

File tree

6 files changed

+280
-67
lines changed

6 files changed

+280
-67
lines changed

modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/190_failure_store_redirection.yml

Lines changed: 88 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -786,7 +786,7 @@ teardown:
786786
- is_false: items.1.create.failure_store
787787

788788
---
789-
"Test failure store status with bulk request":
789+
"Test failure store status with bulk request failing on mappings":
790790
- do:
791791
allowed_warnings:
792792
- "index template [generic_logs_template] has index patterns [logs-*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [generic_logs_template] will take precedence during new index creation"
@@ -865,3 +865,90 @@ teardown:
865865
- match: { items.3.create.status: 400 }
866866
- match: { items.3.create.error.type: document_parsing_exception }
867867
- match: { items.3.create.failure_store: not_enabled }
868+
869+
---
870+
"Test failure store status with bulk request failing in ingest":
871+
- do:
872+
ingest.put_pipeline:
873+
id: "failing_pipeline"
874+
body: >
875+
{
876+
"description": "_description",
877+
"processors": [
878+
{
879+
"fail": {
880+
"message" : "error_message",
881+
"tag": "foo-tag"
882+
}
883+
}
884+
]
885+
}
886+
- match: { acknowledged: true }
887+
888+
- do:
889+
allowed_warnings:
890+
- "index template [generic_logs_template] has index patterns [logs-*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [generic_logs_template] will take precedence during new index creation"
891+
indices.put_index_template:
892+
name: generic_logs_template
893+
body:
894+
index_patterns: logs-*
895+
data_stream: {}
896+
template:
897+
settings:
898+
number_of_shards: 1
899+
number_of_replicas: 1
900+
index:
901+
default_pipeline: "failing_pipeline"
902+
mappings:
903+
properties:
904+
'@timestamp':
905+
type: date
906+
count:
907+
type: long
908+
data_stream_options:
909+
failure_store:
910+
enabled: true
911+
- do:
912+
allowed_warnings:
913+
- "index template [no-fs] has index patterns [no-fs*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [no-fs] will take precedence during new index creation"
914+
indices.put_index_template:
915+
name: no-fs
916+
body:
917+
index_patterns: no-fs*
918+
data_stream: {}
919+
template:
920+
settings:
921+
number_of_shards: 1
922+
number_of_replicas: 0
923+
index:
924+
default_pipeline: "failing_pipeline"
925+
mappings:
926+
properties:
927+
'@timestamp':
928+
type: date
929+
count:
930+
type: long
931+
data_stream_options:
932+
failure_store:
933+
enabled: false
934+
935+
- do:
936+
bulk:
937+
refresh: true
938+
body:
939+
- '{ "create": { "_index": "logs-foobar", "_id": "1" } }'
940+
- '{ "@timestamp": "2022-01-01", "count": 1 }'
941+
- '{ "create": { "_index": "no-fs", "_id": "1" } }'
942+
- '{ "@timestamp": "2022-01-01", "count": 1 }'
943+
- is_true: errors
944+
# Successfully indexed to backing index
945+
- match: { items.0.create._index: '/\.fs-logs-foobar-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
946+
- match: { items.0.create.status: 201 }
947+
- match: { items.0.create.failure_store: used }
948+
949+
# Rejected, eligible to go to failure store, but failure store not enabled
950+
- match: { items.1.create._index: 'no-fs' }
951+
- match: { items.1.create.status: 500 }
952+
- match: { items.1.create.failure_store: not_enabled }
953+
- match: { items.1.create.error.type: fail_processor_exception }
954+
- contains: { items.1.create.error.reason: error_message }

server/src/main/java/org/elasticsearch/action/bulk/BulkRequestModifier.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -179,15 +179,15 @@ private void assertResponsesAreCorrect(BulkItemResponse[] bulkResponses, BulkIte
179179
* @param slot the slot in the bulk request to mark as failed.
180180
* @param e the failure encountered.
181181
*/
182-
synchronized void markItemAsFailed(int slot, Exception e) {
182+
synchronized void markItemAsFailed(int slot, Exception e, IndexDocFailureStoreStatus failureStoreStatus) {
183183
final DocWriteRequest<?> docWriteRequest = bulkRequest.requests().get(slot);
184184
final String id = Objects.requireNonNullElse(docWriteRequest.id(), DROPPED_OR_FAILED_ITEM_WITH_AUTO_GENERATED_ID);
185185
// We hit a error during preprocessing a request, so we:
186186
// 1) Remember the request item slot from the bulk, so that when we're done processing all requests we know what failed
187187
// 2) Add a bulk item failure for this request
188188
// 3) Continue with the next request in the bulk.
189189
failedSlots.set(slot);
190-
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(docWriteRequest.index(), id, e);
190+
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(docWriteRequest.index(), id, e, failureStoreStatus);
191191
itemResponses.add(BulkItemResponse.failure(slot, docWriteRequest.opType(), failure));
192192
}
193193

@@ -223,7 +223,7 @@ public void markItemForFailureStore(int slot, String targetIndexName, Exception
223223
assert false
224224
: "Attempting to route a failed write request type to a failure store but the failure store is not enabled! "
225225
+ "This should be guarded against in TransportBulkAction#shouldStoreFailure()";
226-
markItemAsFailed(slot, e);
226+
markItemAsFailed(slot, e, IndexDocFailureStoreStatus.NOT_APPLICABLE_OR_UNKNOWN);
227227
} else {
228228
// We get the index write request to find the source of the failed document
229229
IndexRequest indexRequest = TransportBulkAction.getIndexWriteRequest(bulkRequest.requests().get(slot));
@@ -238,7 +238,7 @@ public void markItemForFailureStore(int slot, String targetIndexName, Exception
238238
+ "], index: ["
239239
+ targetIndexName
240240
+ "]";
241-
markItemAsFailed(slot, e);
241+
markItemAsFailed(slot, e, IndexDocFailureStoreStatus.NOT_APPLICABLE_OR_UNKNOWN);
242242
logger.debug(
243243
() -> "Attempted to redirect an invalid write operation after ingest failure - type: ["
244244
+ bulkRequest.requests().get(slot).getClass().getName()
@@ -267,7 +267,7 @@ public void markItemForFailureStore(int slot, String targetIndexName, Exception
267267
+ "]",
268268
ioException
269269
);
270-
markItemAsFailed(slot, e);
270+
markItemAsFailed(slot, e, IndexDocFailureStoreStatus.FAILED);
271271
}
272272
}
273273
}

server/src/main/java/org/elasticsearch/ingest/IngestService.java

Lines changed: 49 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
2222
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
2323
import org.elasticsearch.action.bulk.FailureStoreMetrics;
24+
import org.elasticsearch.action.bulk.IndexDocFailureStoreStatus;
2425
import org.elasticsearch.action.bulk.TransportBulkAction;
2526
import org.elasticsearch.action.index.IndexRequest;
2627
import org.elasticsearch.action.ingest.DeletePipelineRequest;
@@ -721,12 +722,34 @@ void validatePipeline(Map<DiscoveryNode, IngestInfo> ingestInfos, String pipelin
721722
ExceptionsHelper.rethrowAndSuppress(exceptions);
722723
}
723724

724-
private record IngestPipelinesExecutionResult(boolean success, boolean shouldKeep, Exception exception, String failedIndex) {
725+
private record IngestPipelinesExecutionResult(
726+
boolean success,
727+
boolean shouldKeep,
728+
Exception exception,
729+
String failedIndex,
730+
IndexDocFailureStoreStatus failureStoreStatus
731+
) {
725732

726-
private static final IngestPipelinesExecutionResult SUCCESSFUL_RESULT = new IngestPipelinesExecutionResult(true, true, null, null);
727-
private static final IngestPipelinesExecutionResult DISCARD_RESULT = new IngestPipelinesExecutionResult(true, false, null, null);
733+
private static final IngestPipelinesExecutionResult SUCCESSFUL_RESULT = new IngestPipelinesExecutionResult(
734+
true,
735+
true,
736+
null,
737+
null,
738+
IndexDocFailureStoreStatus.NOT_APPLICABLE_OR_UNKNOWN
739+
);
740+
private static final IngestPipelinesExecutionResult DISCARD_RESULT = new IngestPipelinesExecutionResult(
741+
true,
742+
false,
743+
null,
744+
null,
745+
IndexDocFailureStoreStatus.NOT_APPLICABLE_OR_UNKNOWN
746+
);
728747
private static IngestPipelinesExecutionResult failAndStoreFor(String index, Exception e) {
729-
return new IngestPipelinesExecutionResult(false, true, e, index);
748+
return new IngestPipelinesExecutionResult(false, true, e, index, IndexDocFailureStoreStatus.USED);
749+
}
750+
751+
private static IngestPipelinesExecutionResult failWithoutStoringIn(String index, Exception e) {
752+
return new IngestPipelinesExecutionResult(false, true, e, index, IndexDocFailureStoreStatus.NOT_ENABLED);
730753
}
731754
}
732755

@@ -756,7 +779,7 @@ public void executeBulkRequest(
756779
final IntConsumer onDropped,
757780
final Function<String, Boolean> resolveFailureStore,
758781
final TriConsumer<Integer, String, Exception> onStoreFailure,
759-
final BiConsumer<Integer, Exception> onFailure,
782+
final TriConsumer<Integer, Exception, IndexDocFailureStoreStatus> onFailure,
760783
final BiConsumer<Thread, Exception> onCompletion,
761784
final Executor executor
762785
) {
@@ -816,18 +839,26 @@ public void onResponse(IngestPipelinesExecutionResult result) {
816839
firstPipeline.getMetrics().postIngestBytes(indexRequest.ramBytesUsed());
817840
}
818841
} else {
819-
// We were given a failure result in the onResponse method, so we must store the failure
820-
// Recover the original document state, track a failed ingest, and pass it along
821-
updateIndexRequestMetadata(indexRequest, originalDocumentMetadata);
822842
totalMetrics.ingestFailed();
823-
onStoreFailure.apply(slot, result.failedIndex, result.exception);
843+
if (IndexDocFailureStoreStatus.NOT_ENABLED.equals(result.failureStoreStatus)) {
844+
// A failure result, but despite the target being a data stream, it does not have failure
845+
// storage enabled currently. Capture the status in the onFailure call and skip any further
846+
// processing
847+
onFailure.apply(slot, result.exception, result.failureStoreStatus);
848+
} else {
849+
// We were given a failure result in the onResponse method, so we must store the failure
850+
// Recover the original document state, track a failed ingest, and pass it along
851+
updateIndexRequestMetadata(indexRequest, originalDocumentMetadata);
852+
onStoreFailure.apply(slot, result.failedIndex, result.exception);
853+
}
824854
}
825855
}
826856

827857
@Override
828858
public void onFailure(Exception e) {
859+
// The target of the request does not allow failure storage, or failed for unforeseen reason
829860
totalMetrics.ingestFailed();
830-
onFailure.accept(slot, e);
861+
onFailure.apply(slot, e, IndexDocFailureStoreStatus.NOT_APPLICABLE_OR_UNKNOWN);
831862
}
832863
},
833864
() -> {
@@ -949,15 +980,15 @@ private void executePipelines(
949980
if (failureStoreResolution != null && failureStoreResolution) {
950981
failureStoreMetrics.incrementFailureStore(originalIndex, errorType, FailureStoreMetrics.ErrorLocation.PIPELINE);
951982
listener.onResponse(IngestPipelinesExecutionResult.failAndStoreFor(originalIndex, e));
983+
} else if (failureStoreResolution != null) {
984+
// If this document targeted a data stream that didn't have the failure store enabled, we increment
985+
// the rejected counter.
986+
// We also increment the total counter because this request will not reach the code that increments
987+
// the total counter for non-rejected documents.
988+
failureStoreMetrics.incrementTotal(originalIndex);
989+
failureStoreMetrics.incrementRejected(originalIndex, errorType, FailureStoreMetrics.ErrorLocation.PIPELINE, false);
990+
listener.onResponse(IngestPipelinesExecutionResult.failWithoutStoringIn(originalIndex, e));
952991
} else {
953-
if (failureStoreResolution != null) {
954-
// If this document targeted a data stream that didn't have the failure store enabled, we increment
955-
// the rejected counter.
956-
// We also increment the total counter because this request will not reach the code that increments
957-
// the total counter for non-rejected documents.
958-
failureStoreMetrics.incrementTotal(originalIndex);
959-
failureStoreMetrics.incrementRejected(originalIndex, errorType, FailureStoreMetrics.ErrorLocation.PIPELINE, false);
960-
}
961992
listener.onFailure(e);
962993
}
963994
};

server/src/test/java/org/elasticsearch/action/bulk/BulkRequestModifierTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public void testBulkRequestModifier() {
4444
for (int i = 0; modifier.hasNext(); i++) {
4545
modifier.next();
4646
if (randomBoolean()) {
47-
modifier.markItemAsFailed(i, new RuntimeException());
47+
modifier.markItemAsFailed(i, new RuntimeException(), randomFrom(IndexDocFailureStoreStatus.values()));
4848
failedSlots.add(i);
4949
}
5050
}
@@ -110,7 +110,7 @@ public void testPipelineFailures() {
110110

111111
// actually mark the failures
112112
for (int i : failures) {
113-
modifier.markItemAsFailed(i, new RuntimeException());
113+
modifier.markItemAsFailed(i, new RuntimeException(), randomFrom(IndexDocFailureStoreStatus.values()));
114114
}
115115

116116
// So half of the requests have "failed", so only the successful requests are left:

server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
117117
@Captor
118118
ArgumentCaptor<TriConsumer<Integer, String, Exception>> redirectHandler;
119119
@Captor
120-
ArgumentCaptor<BiConsumer<Integer, Exception>> failureHandler;
120+
ArgumentCaptor<TriConsumer<Integer, Exception, IndexDocFailureStoreStatus>> failureHandler;
121121
@Captor
122122
ArgumentCaptor<BiConsumer<Thread, Exception>> completionHandler;
123123
@Captor
@@ -413,7 +413,8 @@ public void testIngestLocal() throws Exception {
413413

414414
// now check success
415415
Iterator<DocWriteRequest<?>> req = bulkDocsItr.getValue().iterator();
416-
failureHandler.getValue().accept(0, exception); // have an exception for our one index request
416+
// have an exception for our one index request
417+
failureHandler.getValue().apply(0, exception, IndexDocFailureStoreStatus.NOT_APPLICABLE_OR_UNKNOWN);
417418
indexRequest2.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
418419
// ensure redirects on failure store data stream
419420
assertTrue(redirectPredicate.getValue().apply(WITH_FAILURE_STORE_ENABLED + "-1"));
@@ -510,7 +511,8 @@ public void testIngestSystemLocal() throws Exception {
510511

511512
// now check success
512513
Iterator<DocWriteRequest<?>> req = bulkDocsItr.getValue().iterator();
513-
failureHandler.getValue().accept(0, exception); // have an exception for our one index request
514+
// have an exception for our one index request
515+
failureHandler.getValue().apply(0, exception, IndexDocFailureStoreStatus.NOT_APPLICABLE_OR_UNKNOWN);
514516
indexRequest2.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
515517
completionHandler.getValue().accept(DUMMY_WRITE_THREAD, null);
516518
assertTrue(action.isExecuted);

0 commit comments

Comments
 (0)