Skip to content

Commit 7baf17f

Browse files
committed
Add pre_recovery ingest metadata field to store original doc info
1 parent ed25287 commit 7baf17f

File tree

3 files changed

+252
-89
lines changed

3 files changed

+252
-89
lines changed

modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/RecoverFailureDocumentProcessor.java

Lines changed: 49 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.elasticsearch.ingest.IngestDocument;
1515
import org.elasticsearch.ingest.Processor;
1616

17+
import java.util.HashMap;
1718
import java.util.Map;
1819

1920
/**
@@ -34,13 +35,25 @@ public final class RecoverFailureDocumentProcessor extends AbstractProcessor {
3435
@Override
3536
@SuppressWarnings("unchecked")
3637
public IngestDocument execute(IngestDocument document) throws Exception {
37-
// Get the nested 'document' field, which holds the original document and metadata.
3838
if (document.hasField("document") == false) {
3939
throw new IllegalArgumentException("field [document] not present as part of path [document]");
4040
}
41+
42+
if (document.hasField("document.source") == false) {
43+
throw new IllegalArgumentException("field [source] not present as part of path [document.source]");
44+
}
45+
46+
if (document.hasField("error") == false) {
47+
throw new IllegalArgumentException("field [error] not present as part of path [error]");
48+
}
49+
50+
// store pre-recovery data in ingest metadata
51+
storePreRecoveryData(document);
52+
53+
// Get the nested 'document' field, which holds the original document and metadata.
4154
Map<String, Object> failedDocument = (Map<String, Object>) document.getFieldValue("document", Map.class);
4255

43-
// Copy the original index and routing back to the document's metadata.
56+
// Copy the original index, routing, and id back to the document's metadata.
4457
String originalIndex = (String) failedDocument.get("index");
4558
if (originalIndex != null) {
4659
document.setFieldValue("_index", originalIndex);
@@ -51,20 +64,18 @@ public IngestDocument execute(IngestDocument document) throws Exception {
5164
document.setFieldValue("_routing", originalRouting);
5265
}
5366

54-
// Get the original document's source.
55-
Map<String, Object> originalSource = (Map<String, Object>) failedDocument.get("source");
56-
if (originalSource == null) {
57-
throw new IllegalArgumentException("field [source] not present as part of path [document.source]");
67+
String originalId = (String) failedDocument.get("id");
68+
if (originalId != null) {
69+
document.setFieldValue("_id", originalId);
5870
}
5971

60-
// Remove the 'error' and 'document' fields from the top-level document.
61-
document.removeField("error");
62-
document.removeField("document");
72+
// Get the original document's source.
73+
Map<String, Object> originalSource = (Map<String, Object>) failedDocument.get("source");
6374

64-
// Extract all fields from the original source back to the root of the document.
65-
for (Map.Entry<String, Object> entry : originalSource.entrySet()) {
66-
document.setFieldValue(entry.getKey(), entry.getValue());
67-
}
75+
// Source should match original source contents.
76+
Map<String, Object> source = document.getSource();
77+
source.clear();
78+
source.putAll(originalSource);
6879

6980
// Return the modified document.
7081
return document;
@@ -87,4 +98,29 @@ public RecoverFailureDocumentProcessor create(
8798
return new RecoverFailureDocumentProcessor(processorTag, description);
8899
}
89100
}
101+
102+
private static void storePreRecoveryData(IngestDocument document) {
103+
Map<String, Object> sourceAndMetadataMap = document.getSourceAndMetadata();
104+
105+
// Create the pre_recovery data structure
106+
Map<String, Object> preRecoveryData = new HashMap<>();
107+
108+
// Copy everything from the current document
109+
sourceAndMetadataMap.forEach((key, value) -> {
110+
if ("document".equals(key) && value instanceof Map) {
111+
// For the document field, copy everything except source
112+
@SuppressWarnings("unchecked")
113+
Map<String, Object> docMap = (Map<String, Object>) value;
114+
Map<String, Object> docCopy = new HashMap<>(docMap);
115+
docCopy.remove("source");
116+
preRecoveryData.put(key, docCopy);
117+
} else {
118+
// Copy all other fields as-is
119+
preRecoveryData.put(key, value);
120+
}
121+
});
122+
123+
// Store directly in ingest metadata
124+
document.getIngestMetadata().put("pre_recovery", preRecoveryData);
125+
}
90126
}

0 commit comments

Comments
 (0)