Skip to content

Commit b78acc2

Browse files
authored
Add recover_failure_document processor to restore failurestore docs to original form (#133360)
* Add recover_failure_document processor to restore failure docs to their original form.
1 parent 41fea9d commit b78acc2

File tree

10 files changed

+795
-0
lines changed

10 files changed

+795
-0
lines changed

docs/changelog/133360.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 133360
2+
summary: Add recover_failure_document processor to remediate failurestore docs
3+
area: Ingest Node
4+
type: feature
5+
issues: []

docs/reference/enrich-processor/index.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,9 @@ Refer to [Enrich your data](docs-content://manage-data/ingest/transform-enrich/d
156156
[`lowercase` processor](/reference/enrich-processor/lowercase-processor.md) and [`uppercase` processor](/reference/enrich-processor/uppercase-processor.md)
157157
: Converts a string field to lowercase or uppercase.
158158

159+
[`recover_failure_document` processor](/reference/enrich-processor/recover-failure-document-processor.md)
160+
: Converts a failure-store document to its original format.
161+
159162
[`split` processor](/reference/enrich-processor/split-processor.md)
160163
: Splits a field into an array of values.
161164

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
---
2+
navigation_title: "Recover Failure Document"
3+
mapped_pages:
4+
- https://www.elastic.co/guide/en/elasticsearch/reference/current/recover_failure_document-processor.html
5+
---
6+
7+
# Recover Failure Document processor [recover_failure_document-processor]
8+
9+
Recovers documents that have been stored in a data stream's failure store by restoring them to their original format. This processor is designed to work with documents that failed during ingestion and were automatically stored in the failure store with additional error metadata and document structure wrapping. The relevant failure store metadata is stashed under _ingest.pre_recovery.
10+
11+
The Recover Failure Document processor performs the following operations:
12+
13+
* Checks the document is a valid failure store document.
14+
* Stores the pre-recovery metadata of the document (all document fields except for `document.source`) under the ingest metadata _ingest.pre_recovery.
15+
* Overwrites `_source` with the original document source from the `document.source` field
16+
* Restores the original document id from `document._id` to the document metadata
17+
* Restores the original index name from `document.index` to the document metadata
18+
* Restores the original routing value from `document.routing` to the document metadata (if present)
19+
20+
$$$recover_failure_document-options$$$
21+
22+
| Name | Required | Default | Description |
23+
| --- | --- | --- | --- |
24+
| `description` | no | - | Description of the processor. Useful for describing the purpose of the processor or its configuration. |
25+
| `if` | no | - | Conditionally execute the processor. See [Conditionally run a processor](docs-content://manage-data/ingest/transform-enrich/ingest-pipelines.md#conditionally-run-processor). |
26+
| `ignore_failure` | no | `false` | Ignore failures for the processor. See [Handling pipeline failures](docs-content://manage-data/ingest/transform-enrich/ingest-pipelines.md#handling-pipeline-failures). |
27+
| `on_failure` | no | - | Handle failures for the processor. See [Handling pipeline failures](docs-content://manage-data/ingest/transform-enrich/ingest-pipelines.md#handling-pipeline-failures). |
28+
| `tag` | no | - | Identifier for the processor. Useful for debugging and metrics. |
29+
30+
## Examples [recover_failure_document-processor-ex]
31+
32+
```console
33+
POST _ingest/pipeline/_simulate
34+
{
35+
"pipeline": {
36+
"processors": [
37+
{
38+
"recover_failure_document": {}
39+
}
40+
]
41+
},
42+
"docs": [
43+
{
44+
"_index": ".fs-my-datastream-ingest-2025.05.09-000001",
45+
"_id": "HnTJs5YBwrYNjPmaFcri",
46+
"_score": 1,
47+
"_source": {
48+
"@timestamp": "2025-05-09T06:41:24.775Z",
49+
"document": {
50+
"index": "my-datastream-ingest",
51+
"source": {
52+
"@timestamp": "2025-04-21T00:00:00Z",
53+
"counter_name": "test"
54+
}
55+
},
56+
"error": {
57+
"type": "illegal_argument_exception",
58+
"message": "field [counter] not present as part of path [counter]",
59+
"stack_trace": "j.l.IllegalArgumentException: field [counter] not present as part of path [counter] at o.e.i.IngestDocument.getFieldValue(IngestDocument.java: 202 at o.e.i.c.SetProcessor.execute(SetProcessor.java: 86) 14 more",
60+
"pipeline_trace": [
61+
"complicated-processor"
62+
],
63+
"pipeline": "complicated-processor",
64+
"processor_type": "set",
65+
"processor_tag": "copy to new counter again"
66+
}
67+
}
68+
}
69+
]
70+
}
71+
```
72+
Which produces the following response:
73+
```console
74+
{
75+
"docs": [
76+
{
77+
"doc": {
78+
"_index": "my-datastream-ingest",
79+
"_version": "-3",
80+
"_id": "HnTJs5YBwrYNjPmaFcri",
81+
"_source": {
82+
"@timestamp": "2025-04-21T00:00:00Z",
83+
"counter_name": "test"
84+
},
85+
"_ingest": {
86+
"pre_recovery": {
87+
"@timestamp": "2025-05-09T06:41:24.775Z",
88+
"_index": ".fs-my-datastream-ingest-2025.05.09-000001",
89+
"document": {
90+
"index": "my-datastream-ingest"
91+
},
92+
"_id": "HnTJs5YBwrYNjPmaFcri",
93+
"error": {
94+
"pipeline": "complicated-processor",
95+
"processor_type": "set",
96+
"processor_tag": "copy to new counter again",
97+
"pipeline_trace": [
98+
"complicated-processor"
99+
],
100+
"stack_trace": "j.l.IllegalArgumentException: field [counter] not present as part of path [counter] at o.e.i.IngestDocument.getFieldValue(IngestDocument.java: 202 at o.e.i.c.SetProcessor.execute(SetProcessor.java: 86) 14 more",
101+
"type": "illegal_argument_exception",
102+
"message": "field [counter] not present as part of path [counter]"
103+
},
104+
"_version": -3
105+
},
106+
"timestamp": "2025-09-04T22:32:12.800709Z"
107+
}
108+
}
109+
}
110+
]
111+
}
112+
```
113+
114+
Documents which do not match the failure store document format result in errors:
115+
```console
116+
POST _ingest/pipeline/_simulate
117+
118+
{
119+
"pipeline": {
120+
"processors": [
121+
{
122+
"recover_failure_document": {}
123+
}
124+
]
125+
},
126+
"docs": [
127+
{
128+
"_index": ".fs-my-datastream-ingest-2025.05.09-000001",
129+
"_id": "HnTJs5YBwrYNjPmaFcri",
130+
"_score": 1,
131+
"_source": {
132+
"@timestamp": "2025-05-09T06:41:24.775Z",
133+
"error": {
134+
"type": "illegal_argument_exception",
135+
"message": "field [counter] not present as part of path [counter]",
136+
"stack_trace": "j.l.IllegalArgumentException: field [counter] not present as part of path [counter] at o.e.i.IngestDocument.getFieldValue(IngestDocument.java: 202 at o.e.i.c.SetProcessor.execute(SetProcessor.java: 86) 14 more",
137+
"pipeline_trace": [
138+
"complicated-processor"
139+
],
140+
"pipeline": "complicated-processor",
141+
"processor_type": "set",
142+
"processor_tag": "copy to new counter again"
143+
}
144+
}
145+
}
146+
]
147+
}
148+
```
149+
Which produces the following response:
150+
```console
151+
{
152+
"docs": [
153+
{
154+
"error": {
155+
"root_cause": [
156+
{
157+
"type": "illegal_argument_exception",
158+
"reason": "field [document] not present as part of path [document]"
159+
}
160+
],
161+
"type": "illegal_argument_exception",
162+
"reason": "field [document] not present as part of path [document]"
163+
}
164+
}
165+
]
166+
}
167+
```

docs/reference/enrich-processor/toc.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ toc:
3232
- file: pipeline-processor.md
3333
- file: redact-processor.md
3434
- file: registered-domain-processor.md
35+
- file: recover-failure-document-processor.md
3536
- file: remove-processor.md
3637
- file: rename-processor.md
3738
- file: reroute-processor.md

libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/plugins/IngestCommonPluginBridge.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
package org.elasticsearch.logstashbridge.plugins;
1010

1111
import org.elasticsearch.ingest.common.IngestCommonPlugin;
12+
import org.elasticsearch.ingest.common.RecoverFailureDocumentProcessor;
1213
import org.elasticsearch.logstashbridge.StableBridgeAPI;
1314
import org.elasticsearch.logstashbridge.ingest.ProcessorFactoryBridge;
1415
import org.elasticsearch.logstashbridge.ingest.ProcessorParametersBridge;
@@ -41,6 +42,7 @@ public final class IngestCommonPluginBridge implements IngestPluginBridge {
4142
public static final String LOWERCASE_PROCESSOR_TYPE = org.elasticsearch.ingest.common.LowercaseProcessor.TYPE;
4243
public static final String NETWORK_DIRECTION_PROCESSOR_TYPE = org.elasticsearch.ingest.common.NetworkDirectionProcessor.TYPE;
4344
public static final String REGISTERED_DOMAIN_PROCESSOR_TYPE = org.elasticsearch.ingest.common.RegisteredDomainProcessor.TYPE;
45+
public static final String RECOVER_FAILURE_DOCUMENT_PROCESSOR_TYPE = RecoverFailureDocumentProcessor.TYPE;
4446
public static final String REMOVE_PROCESSOR_TYPE = org.elasticsearch.ingest.common.RemoveProcessor.TYPE;
4547
public static final String RENAME_PROCESSOR_TYPE = org.elasticsearch.ingest.common.RenameProcessor.TYPE;
4648
public static final String REROUTE_PROCESSOR_TYPE = org.elasticsearch.ingest.common.RerouteProcessor.TYPE;

modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
6464
entry(NetworkDirectionProcessor.TYPE, new NetworkDirectionProcessor.Factory(parameters.scriptService)),
6565
entry(PipelineProcessor.TYPE, new PipelineProcessor.Factory(parameters.ingestService)),
6666
entry(RegisteredDomainProcessor.TYPE, new RegisteredDomainProcessor.Factory()),
67+
entry(RecoverFailureDocumentProcessor.TYPE, new RecoverFailureDocumentProcessor.Factory()),
6768
entry(RemoveProcessor.TYPE, new RemoveProcessor.Factory(parameters.scriptService)),
6869
entry(RenameProcessor.TYPE, new RenameProcessor.Factory(parameters.scriptService)),
6970
entry(RerouteProcessor.TYPE, new RerouteProcessor.Factory()),
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.ingest.common;
11+
12+
import org.elasticsearch.cluster.metadata.ProjectId;
13+
import org.elasticsearch.ingest.AbstractProcessor;
14+
import org.elasticsearch.ingest.IngestDocument;
15+
import org.elasticsearch.ingest.Processor;
16+
17+
import java.util.HashMap;
18+
import java.util.Map;
19+
20+
/**
21+
* Processor that remediates a failure document by:
22+
* - Copying the original index (and routing if present) from ctx.document.* into the document metadata
23+
* - Extracting the original source from ctx.document.source
24+
* - Removing the "error" and "document" fields from the root
25+
* - Restoring the original source fields at the root of the document
26+
*/
27+
public final class RecoverFailureDocumentProcessor extends AbstractProcessor {
28+
29+
public static final String PRE_RECOVERY_FIELD = "pre_recovery";
30+
public static final String DOCUMENT_FIELD = "document";
31+
public static final String SOURCE_FIELD = "source";
32+
public static final String SOURCE_FIELD_PATH = DOCUMENT_FIELD + "." + SOURCE_FIELD;
33+
public static final String ID_FIELD = "id";
34+
public static final String INDEX_FIELD = "index";
35+
public static final String ROUTING_FIELD = "routing";
36+
public static final String ERROR_FIELD = "error";
37+
38+
public static final String MISSING_DOCUMENT_ERROR_MSG =
39+
"failure store document has unexpected structure, missing required [document] field";
40+
public static final String MISSING_SOURCE_ERROR_MSG =
41+
"failure store document has unexpected structure, missing required [document.source] field";
42+
public static final String MISSING_ERROR_ERROR_MSG = "failure store document has unexpected structure, missing required [error] field";
43+
44+
public static final String TYPE = "recover_failure_document";
45+
46+
RecoverFailureDocumentProcessor(String tag, String description) {
47+
super(tag, description);
48+
}
49+
50+
@Override
51+
@SuppressWarnings("unchecked")
52+
public IngestDocument execute(IngestDocument document) throws Exception {
53+
if (document.hasField(DOCUMENT_FIELD) == false) {
54+
throw new IllegalArgumentException(MISSING_DOCUMENT_ERROR_MSG);
55+
}
56+
57+
if (document.hasField(SOURCE_FIELD_PATH) == false) {
58+
throw new IllegalArgumentException(MISSING_SOURCE_ERROR_MSG);
59+
}
60+
61+
if (document.hasField(ERROR_FIELD) == false) {
62+
throw new IllegalArgumentException(MISSING_ERROR_ERROR_MSG);
63+
}
64+
65+
// store pre-recovery data in ingest metadata
66+
storePreRecoveryData(document);
67+
68+
// Get the nested 'document' field, which holds the original document and metadata.
69+
Map<String, Object> failedDocument = (Map<String, Object>) document.getFieldValue(DOCUMENT_FIELD, Map.class);
70+
71+
// Copy the original index, routing, and id back to the document's metadata.
72+
String originalIndex = (String) failedDocument.get(INDEX_FIELD);
73+
if (originalIndex != null) {
74+
document.setFieldValue(IngestDocument.Metadata.INDEX.getFieldName(), originalIndex);
75+
}
76+
77+
String originalRouting = (String) failedDocument.get(ROUTING_FIELD);
78+
if (originalRouting != null) {
79+
document.setFieldValue(IngestDocument.Metadata.ROUTING.getFieldName(), originalRouting);
80+
}
81+
82+
String originalId = (String) failedDocument.get(ID_FIELD);
83+
if (originalId != null) {
84+
document.setFieldValue(IngestDocument.Metadata.ID.getFieldName(), originalId);
85+
}
86+
87+
// Get the original document's source.
88+
Map<String, Object> originalSource = (Map<String, Object>) failedDocument.get(SOURCE_FIELD);
89+
90+
// Overwrite the _source with original source contents.
91+
Map<String, Object> source = document.getSource();
92+
source.clear();
93+
source.putAll(originalSource);
94+
95+
// Return the modified document.
96+
return document;
97+
}
98+
99+
@Override
100+
public String getType() {
101+
return TYPE;
102+
}
103+
104+
public static final class Factory implements Processor.Factory {
105+
@Override
106+
public RecoverFailureDocumentProcessor create(
107+
Map<String, Processor.Factory> registry,
108+
String processorTag,
109+
String description,
110+
Map<String, Object> config,
111+
ProjectId projectId
112+
) throws Exception {
113+
return new RecoverFailureDocumentProcessor(processorTag, description);
114+
}
115+
}
116+
117+
private static void storePreRecoveryData(IngestDocument document) {
118+
Map<String, Object> sourceAndMetadataMap = document.getSourceAndMetadata();
119+
120+
// Create the pre_recovery data structure
121+
Map<String, Object> preRecoveryData = new HashMap<>();
122+
123+
// Copy everything from the current document
124+
sourceAndMetadataMap.forEach((key, value) -> {
125+
if (DOCUMENT_FIELD.equals(key) && value instanceof Map) {
126+
// For the document field, copy everything except source
127+
@SuppressWarnings("unchecked")
128+
Map<String, Object> docMap = (Map<String, Object>) value;
129+
Map<String, Object> docCopy = new HashMap<>(docMap);
130+
docCopy.remove(SOURCE_FIELD);
131+
preRecoveryData.put(key, docCopy);
132+
} else {
133+
// Copy all other fields as-is
134+
preRecoveryData.put(key, value);
135+
}
136+
});
137+
138+
// Store directly in ingest metadata
139+
document.getIngestMetadata().put(PRE_RECOVERY_FIELD, preRecoveryData);
140+
}
141+
}

0 commit comments

Comments
 (0)