Skip to content

Commit 59fc383

Browse files
jbaieraelasticsearchmachine
andauthored
Return failure store status on ingest failures for data streams without failure store enabled (#121544)
Updates the on failure handler to accept the additional context required to report to the client whether or not the document could have been rerouted if failure store was enabled. --------- Co-authored-by: elasticsearchmachine <[email protected]>
1 parent af213a5 commit 59fc383

File tree

6 files changed

+272
-67
lines changed

6 files changed

+272
-67
lines changed

modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/190_failure_store_redirection.yml

Lines changed: 88 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -786,7 +786,7 @@ teardown:
786786
- is_false: items.1.create.failure_store
787787

788788
---
789-
"Test failure store status with bulk request":
789+
"Test failure store status with bulk request failing on mappings":
790790
- do:
791791
allowed_warnings:
792792
- "index template [generic_logs_template] has index patterns [logs-*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [generic_logs_template] will take precedence during new index creation"
@@ -865,3 +865,90 @@ teardown:
865865
- match: { items.3.create.status: 400 }
866866
- match: { items.3.create.error.type: document_parsing_exception }
867867
- match: { items.3.create.failure_store: not_enabled }
868+
869+
---
870+
"Test failure store status with bulk request failing in ingest":
871+
- do:
872+
ingest.put_pipeline:
873+
id: "failing_pipeline"
874+
body: >
875+
{
876+
"description": "_description",
877+
"processors": [
878+
{
879+
"fail": {
880+
"message" : "error_message",
881+
"tag": "foo-tag"
882+
}
883+
}
884+
]
885+
}
886+
- match: { acknowledged: true }
887+
888+
- do:
889+
allowed_warnings:
890+
- "index template [generic_logs_template] has index patterns [logs-*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [generic_logs_template] will take precedence during new index creation"
891+
indices.put_index_template:
892+
name: generic_logs_template
893+
body:
894+
index_patterns: logs-*
895+
data_stream: {}
896+
template:
897+
settings:
898+
number_of_shards: 1
899+
number_of_replicas: 1
900+
index:
901+
default_pipeline: "failing_pipeline"
902+
mappings:
903+
properties:
904+
'@timestamp':
905+
type: date
906+
count:
907+
type: long
908+
data_stream_options:
909+
failure_store:
910+
enabled: true
911+
- do:
912+
allowed_warnings:
913+
- "index template [no-fs] has index patterns [no-fs*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [no-fs] will take precedence during new index creation"
914+
indices.put_index_template:
915+
name: no-fs
916+
body:
917+
index_patterns: no-fs*
918+
data_stream: {}
919+
template:
920+
settings:
921+
number_of_shards: 1
922+
number_of_replicas: 0
923+
index:
924+
default_pipeline: "failing_pipeline"
925+
mappings:
926+
properties:
927+
'@timestamp':
928+
type: date
929+
count:
930+
type: long
931+
data_stream_options:
932+
failure_store:
933+
enabled: false
934+
935+
- do:
936+
bulk:
937+
refresh: true
938+
body:
939+
- '{ "create": { "_index": "logs-foobar", "_id": "1" } }'
940+
- '{ "@timestamp": "2022-01-01", "count": 1 }'
941+
- '{ "create": { "_index": "no-fs", "_id": "1" } }'
942+
- '{ "@timestamp": "2022-01-01", "count": 1 }'
943+
- is_true: errors
944+
# Successfully indexed to backing index
945+
- match: { items.0.create._index: '/\.fs-logs-foobar-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
946+
- match: { items.0.create.status: 201 }
947+
- match: { items.0.create.failure_store: used }
948+
949+
# Rejected, eligible to go to failure store, but failure store not enabled
950+
- match: { items.1.create._index: 'no-fs' }
951+
- match: { items.1.create.status: 500 }
952+
- match: { items.1.create.failure_store: not_enabled }
953+
- match: { items.1.create.error.type: fail_processor_exception }
954+
- contains: { items.1.create.error.reason: error_message }

server/src/main/java/org/elasticsearch/action/bulk/BulkRequestModifier.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -179,15 +179,15 @@ private void assertResponsesAreCorrect(BulkItemResponse[] bulkResponses, BulkIte
179179
* @param slot the slot in the bulk request to mark as failed.
180180
* @param e the failure encountered.
181181
*/
182-
synchronized void markItemAsFailed(int slot, Exception e) {
182+
synchronized void markItemAsFailed(int slot, Exception e, IndexDocFailureStoreStatus failureStoreStatus) {
183183
final DocWriteRequest<?> docWriteRequest = bulkRequest.requests().get(slot);
184184
final String id = Objects.requireNonNullElse(docWriteRequest.id(), DROPPED_OR_FAILED_ITEM_WITH_AUTO_GENERATED_ID);
185185
// We hit a error during preprocessing a request, so we:
186186
// 1) Remember the request item slot from the bulk, so that when we're done processing all requests we know what failed
187187
// 2) Add a bulk item failure for this request
188188
// 3) Continue with the next request in the bulk.
189189
failedSlots.set(slot);
190-
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(docWriteRequest.index(), id, e);
190+
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(docWriteRequest.index(), id, e, failureStoreStatus);
191191
itemResponses.add(BulkItemResponse.failure(slot, docWriteRequest.opType(), failure));
192192
}
193193

@@ -223,7 +223,7 @@ public void markItemForFailureStore(int slot, String targetIndexName, Exception
223223
assert false
224224
: "Attempting to route a failed write request type to a failure store but the failure store is not enabled! "
225225
+ "This should be guarded against in TransportBulkAction#shouldStoreFailure()";
226-
markItemAsFailed(slot, e);
226+
markItemAsFailed(slot, e, IndexDocFailureStoreStatus.NOT_APPLICABLE_OR_UNKNOWN);
227227
} else {
228228
// We get the index write request to find the source of the failed document
229229
IndexRequest indexRequest = TransportBulkAction.getIndexWriteRequest(bulkRequest.requests().get(slot));
@@ -238,7 +238,7 @@ public void markItemForFailureStore(int slot, String targetIndexName, Exception
238238
+ "], index: ["
239239
+ targetIndexName
240240
+ "]";
241-
markItemAsFailed(slot, e);
241+
markItemAsFailed(slot, e, IndexDocFailureStoreStatus.NOT_APPLICABLE_OR_UNKNOWN);
242242
logger.debug(
243243
() -> "Attempted to redirect an invalid write operation after ingest failure - type: ["
244244
+ bulkRequest.requests().get(slot).getClass().getName()
@@ -267,7 +267,7 @@ public void markItemForFailureStore(int slot, String targetIndexName, Exception
267267
+ "]",
268268
ioException
269269
);
270-
markItemAsFailed(slot, e);
270+
markItemAsFailed(slot, e, IndexDocFailureStoreStatus.FAILED);
271271
}
272272
}
273273
}

server/src/main/java/org/elasticsearch/ingest/IngestService.java

Lines changed: 49 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
2222
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
2323
import org.elasticsearch.action.bulk.FailureStoreMetrics;
24+
import org.elasticsearch.action.bulk.IndexDocFailureStoreStatus;
2425
import org.elasticsearch.action.bulk.TransportBulkAction;
2526
import org.elasticsearch.action.index.IndexRequest;
2627
import org.elasticsearch.action.ingest.DeletePipelineRequest;
@@ -729,12 +730,34 @@ void validatePipeline(Map<DiscoveryNode, IngestInfo> ingestInfos, String pipelin
729730
ExceptionsHelper.rethrowAndSuppress(exceptions);
730731
}
731732

732-
private record IngestPipelinesExecutionResult(boolean success, boolean shouldKeep, Exception exception, String failedIndex) {
733+
private record IngestPipelinesExecutionResult(
734+
boolean success,
735+
boolean shouldKeep,
736+
Exception exception,
737+
String failedIndex,
738+
IndexDocFailureStoreStatus failureStoreStatus
739+
) {
733740

734-
private static final IngestPipelinesExecutionResult SUCCESSFUL_RESULT = new IngestPipelinesExecutionResult(true, true, null, null);
735-
private static final IngestPipelinesExecutionResult DISCARD_RESULT = new IngestPipelinesExecutionResult(true, false, null, null);
741+
private static final IngestPipelinesExecutionResult SUCCESSFUL_RESULT = new IngestPipelinesExecutionResult(
742+
true,
743+
true,
744+
null,
745+
null,
746+
IndexDocFailureStoreStatus.NOT_APPLICABLE_OR_UNKNOWN
747+
);
748+
private static final IngestPipelinesExecutionResult DISCARD_RESULT = new IngestPipelinesExecutionResult(
749+
true,
750+
false,
751+
null,
752+
null,
753+
IndexDocFailureStoreStatus.NOT_APPLICABLE_OR_UNKNOWN
754+
);
736755
private static IngestPipelinesExecutionResult failAndStoreFor(String index, Exception e) {
737-
return new IngestPipelinesExecutionResult(false, true, e, index);
756+
return new IngestPipelinesExecutionResult(false, true, e, index, IndexDocFailureStoreStatus.USED);
757+
}
758+
759+
private static IngestPipelinesExecutionResult failWithoutStoringIn(String index, Exception e) {
760+
return new IngestPipelinesExecutionResult(false, true, e, index, IndexDocFailureStoreStatus.NOT_ENABLED);
738761
}
739762
}
740763

@@ -764,7 +787,7 @@ public void executeBulkRequest(
764787
final IntConsumer onDropped,
765788
final Function<String, Boolean> resolveFailureStore,
766789
final TriConsumer<Integer, String, Exception> onStoreFailure,
767-
final BiConsumer<Integer, Exception> onFailure,
790+
final TriConsumer<Integer, Exception, IndexDocFailureStoreStatus> onFailure,
768791
final BiConsumer<Thread, Exception> onCompletion,
769792
final Executor executor
770793
) {
@@ -821,18 +844,26 @@ public void onResponse(IngestPipelinesExecutionResult result) {
821844
firstPipeline.getMetrics().postIngestBytes(indexRequest.ramBytesUsed());
822845
}
823846
} else {
824-
// We were given a failure result in the onResponse method, so we must store the failure
825-
// Recover the original document state, track a failed ingest, and pass it along
826-
updateIndexRequestMetadata(indexRequest, originalDocumentMetadata);
827847
totalMetrics.ingestFailed();
828-
onStoreFailure.apply(slot, result.failedIndex, result.exception);
848+
if (IndexDocFailureStoreStatus.NOT_ENABLED.equals(result.failureStoreStatus)) {
849+
// A failure result, but despite the target being a data stream, it does not have failure
850+
// storage enabled currently. Capture the status in the onFailure call and skip any further
851+
// processing
852+
onFailure.apply(slot, result.exception, result.failureStoreStatus);
853+
} else {
854+
// We were given a failure result in the onResponse method, so we must store the failure
855+
// Recover the original document state, track a failed ingest, and pass it along
856+
updateIndexRequestMetadata(indexRequest, originalDocumentMetadata);
857+
onStoreFailure.apply(slot, result.failedIndex, result.exception);
858+
}
829859
}
830860
}
831861

832862
@Override
833863
public void onFailure(Exception e) {
864+
// The target of the request does not allow failure storage, or failed for unforeseen reason
834865
totalMetrics.ingestFailed();
835-
onFailure.accept(slot, e);
866+
onFailure.apply(slot, e, IndexDocFailureStoreStatus.NOT_APPLICABLE_OR_UNKNOWN);
836867
}
837868
},
838869
() -> {
@@ -954,15 +985,15 @@ private void executePipelines(
954985
if (failureStoreResolution != null && failureStoreResolution) {
955986
failureStoreMetrics.incrementFailureStore(originalIndex, errorType, FailureStoreMetrics.ErrorLocation.PIPELINE);
956987
listener.onResponse(IngestPipelinesExecutionResult.failAndStoreFor(originalIndex, e));
988+
} else if (failureStoreResolution != null) {
989+
// If this document targeted a data stream that didn't have the failure store enabled, we increment
990+
// the rejected counter.
991+
// We also increment the total counter because this request will not reach the code that increments
992+
// the total counter for non-rejected documents.
993+
failureStoreMetrics.incrementTotal(originalIndex);
994+
failureStoreMetrics.incrementRejected(originalIndex, errorType, FailureStoreMetrics.ErrorLocation.PIPELINE, false);
995+
listener.onResponse(IngestPipelinesExecutionResult.failWithoutStoringIn(originalIndex, e));
957996
} else {
958-
if (failureStoreResolution != null) {
959-
// If this document targeted a data stream that didn't have the failure store enabled, we increment
960-
// the rejected counter.
961-
// We also increment the total counter because this request will not reach the code that increments
962-
// the total counter for non-rejected documents.
963-
failureStoreMetrics.incrementTotal(originalIndex);
964-
failureStoreMetrics.incrementRejected(originalIndex, errorType, FailureStoreMetrics.ErrorLocation.PIPELINE, false);
965-
}
966997
listener.onFailure(e);
967998
}
968999
};

server/src/test/java/org/elasticsearch/action/bulk/BulkRequestModifierTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public void testBulkRequestModifier() {
4444
for (int i = 0; modifier.hasNext(); i++) {
4545
modifier.next();
4646
if (randomBoolean()) {
47-
modifier.markItemAsFailed(i, new RuntimeException());
47+
modifier.markItemAsFailed(i, new RuntimeException(), randomFrom(IndexDocFailureStoreStatus.values()));
4848
failedSlots.add(i);
4949
}
5050
}
@@ -110,7 +110,7 @@ public void testPipelineFailures() {
110110

111111
// actually mark the failures
112112
for (int i : failures) {
113-
modifier.markItemAsFailed(i, new RuntimeException());
113+
modifier.markItemAsFailed(i, new RuntimeException(), randomFrom(IndexDocFailureStoreStatus.values()));
114114
}
115115

116116
// So half of the requests have "failed", so only the successful requests are left:

server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
117117
@Captor
118118
ArgumentCaptor<TriConsumer<Integer, String, Exception>> redirectHandler;
119119
@Captor
120-
ArgumentCaptor<BiConsumer<Integer, Exception>> failureHandler;
120+
ArgumentCaptor<TriConsumer<Integer, Exception, IndexDocFailureStoreStatus>> failureHandler;
121121
@Captor
122122
ArgumentCaptor<BiConsumer<Thread, Exception>> completionHandler;
123123
@Captor
@@ -412,7 +412,8 @@ public void testIngestLocal() throws Exception {
412412

413413
// now check success
414414
Iterator<DocWriteRequest<?>> req = bulkDocsItr.getValue().iterator();
415-
failureHandler.getValue().accept(0, exception); // have an exception for our one index request
415+
// have an exception for our one index request
416+
failureHandler.getValue().apply(0, exception, IndexDocFailureStoreStatus.NOT_APPLICABLE_OR_UNKNOWN);
416417
indexRequest2.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
417418
// ensure redirects on failure store data stream
418419
assertTrue(redirectPredicate.getValue().apply(WITH_FAILURE_STORE_ENABLED + "-1"));
@@ -509,7 +510,8 @@ public void testIngestSystemLocal() throws Exception {
509510

510511
// now check success
511512
Iterator<DocWriteRequest<?>> req = bulkDocsItr.getValue().iterator();
512-
failureHandler.getValue().accept(0, exception); // have an exception for our one index request
513+
// have an exception for our one index request
514+
failureHandler.getValue().apply(0, exception, IndexDocFailureStoreStatus.NOT_APPLICABLE_OR_UNKNOWN);
513515
indexRequest2.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
514516
completionHandler.getValue().accept(DUMMY_WRITE_THREAD, null);
515517
assertTrue(action.isExecuted);

0 commit comments

Comments
 (0)