Skip to content

Commit 07e7139

Browse files
committed
Add remediate processor to remediate failure docs.
1 parent f5759b1 commit 07e7139

File tree

5 files changed

+275
-0
lines changed

5 files changed

+275
-0
lines changed

libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/plugins/IngestCommonPluginBridge.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
package org.elasticsearch.logstashbridge.plugins;
1010

1111
import org.elasticsearch.ingest.common.IngestCommonPlugin;
12+
import org.elasticsearch.ingest.common.RemediateProcessor;
1213
import org.elasticsearch.logstashbridge.StableBridgeAPI;
1314
import org.elasticsearch.logstashbridge.ingest.ProcessorBridge;
1415

@@ -40,6 +41,7 @@ public class IngestCommonPluginBridge implements IngestPluginBridge {
4041
public static final String LOWERCASE_PROCESSOR_TYPE = org.elasticsearch.ingest.common.LowercaseProcessor.TYPE;
4142
public static final String NETWORK_DIRECTION_PROCESSOR_TYPE = org.elasticsearch.ingest.common.NetworkDirectionProcessor.TYPE;
4243
public static final String REGISTERED_DOMAIN_PROCESSOR_TYPE = org.elasticsearch.ingest.common.RegisteredDomainProcessor.TYPE;
44+
public static final String REMEDIATE_PROCESSOR_TYPE = RemediateProcessor.TYPE;
4345
public static final String REMOVE_PROCESSOR_TYPE = org.elasticsearch.ingest.common.RemoveProcessor.TYPE;
4446
public static final String RENAME_PROCESSOR_TYPE = org.elasticsearch.ingest.common.RenameProcessor.TYPE;
4547
public static final String REROUTE_PROCESSOR_TYPE = org.elasticsearch.ingest.common.RerouteProcessor.TYPE;

modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
6464
entry(NetworkDirectionProcessor.TYPE, new NetworkDirectionProcessor.Factory(parameters.scriptService)),
6565
entry(PipelineProcessor.TYPE, new PipelineProcessor.Factory(parameters.ingestService)),
6666
entry(RegisteredDomainProcessor.TYPE, new RegisteredDomainProcessor.Factory()),
67+
entry(RemediateProcessor.TYPE, new RemediateProcessor.Factory()),
6768
entry(RemoveProcessor.TYPE, new RemoveProcessor.Factory(parameters.scriptService)),
6869
entry(RenameProcessor.TYPE, new RenameProcessor.Factory(parameters.scriptService)),
6970
entry(RerouteProcessor.TYPE, new RerouteProcessor.Factory()),
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.ingest.common;
11+
12+
import org.elasticsearch.cluster.metadata.ProjectId;
13+
import org.elasticsearch.ingest.AbstractProcessor;
14+
import org.elasticsearch.ingest.IngestDocument;
15+
import org.elasticsearch.ingest.Processor;
16+
17+
import java.util.Map;
18+
19+
/**
20+
* Processor that remediates a failure document by:
21+
* - Copying the original index (and routing if present) from ctx.document.* into the document metadata
22+
* - Extracting the original source from ctx.document.source
23+
* - Removing the "error" and "document" fields from the root
24+
* - Restoring the original source fields at the root of the document
25+
*/
26+
public final class RemediateProcessor extends AbstractProcessor {
27+
28+
public static final String TYPE = "remediate";
29+
30+
RemediateProcessor(String tag, String description) {
31+
super(tag, description);
32+
}
33+
34+
@Override
35+
@SuppressWarnings("unchecked")
36+
public IngestDocument execute(IngestDocument document) throws Exception {
37+
// Get the nested 'document' field, which holds the original document and metadata.
38+
Map<String, Object> failedDocument = (Map<String, Object>) document.getFieldValue("document", Map.class);
39+
40+
// Copy the original index and routing back to the document's metadata.
41+
String originalIndex = (String) failedDocument.get("index");
42+
if (originalIndex != null) {
43+
document.setFieldValue("_index", originalIndex);
44+
}
45+
46+
String originalRouting = (String) failedDocument.get("routing");
47+
if (originalRouting != null) {
48+
document.setFieldValue("_routing", originalRouting);
49+
}
50+
51+
// Get the original document's source.
52+
Map<String, Object> originalSource = (Map<String, Object>) failedDocument.get("source");
53+
54+
// Remove the 'error' and 'document' fields from the top-level document.
55+
document.removeField("error");
56+
document.removeField("document");
57+
58+
// Extract all fields from the original source back to the root of the document.
59+
for (Map.Entry<String, Object> entry : originalSource.entrySet()) {
60+
document.setFieldValue(entry.getKey(), entry.getValue());
61+
}
62+
63+
// Return the modified document.
64+
return document;
65+
}
66+
67+
@Override
68+
public String getType() {
69+
return TYPE;
70+
}
71+
72+
public static final class Factory implements Processor.Factory {
73+
@Override
74+
public RemediateProcessor create(
75+
Map<String, Processor.Factory> registry,
76+
String processorTag,
77+
String description,
78+
Map<String, Object> config,
79+
ProjectId projectId
80+
) throws Exception {
81+
return new RemediateProcessor(processorTag, description);
82+
}
83+
}
84+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.ingest.common;
11+
12+
import org.elasticsearch.test.ESTestCase;
13+
import org.junit.Before;
14+
15+
import java.util.HashMap;
16+
import java.util.Map;
17+
18+
import static org.hamcrest.Matchers.equalTo;
19+
20+
public class RemediateProcessorFactoryTests extends ESTestCase {
21+
22+
private RemediateProcessor.Factory factory;
23+
24+
@Before
25+
public void init() {
26+
factory = new RemediateProcessor.Factory();
27+
}
28+
29+
public void testCreate() throws Exception {
30+
RemediateProcessor.Factory factory = new RemediateProcessor.Factory();
31+
Map<String, Object> config = new HashMap<>();
32+
33+
String processorTag = randomAlphaOfLength(10);
34+
RemediateProcessor processor = factory.create(null, processorTag, null, config, null);
35+
36+
assertThat(processor.getTag(), equalTo(processorTag));
37+
}
38+
39+
public void testCreateWithDescription() throws Exception {
40+
RemediateProcessor.Factory factory = new RemediateProcessor.Factory();
41+
Map<String, Object> config = new HashMap<>();
42+
43+
String processorTag = randomAlphaOfLength(10);
44+
String description = randomAlphaOfLength(20);
45+
RemediateProcessor processor = factory.create(null, processorTag, description, config, null);
46+
47+
assertThat(processor.getTag(), equalTo(processorTag));
48+
assertThat(processor.getDescription(), equalTo(description));
49+
}
50+
51+
public void testUnknownConfigOptionsFail() {
52+
RemediateProcessor.Factory factory = new RemediateProcessor.Factory();
53+
Map<String, Object> config = new HashMap<>();
54+
config.put("unsupported_option", true);
55+
56+
String processorTag = randomAlphaOfLength(10);
57+
expectThrows(IllegalArgumentException.class, () -> factory.create(null, processorTag, null, config, null));
58+
}
59+
}
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.ingest.common;
11+
12+
import org.elasticsearch.ingest.IngestDocument;
13+
import org.elasticsearch.ingest.Processor;
14+
import org.elasticsearch.test.ESTestCase;
15+
16+
import java.util.HashMap;
17+
import java.util.Map;
18+
19+
import static org.hamcrest.Matchers.containsString;
20+
import static org.hamcrest.Matchers.equalTo;
21+
import static org.hamcrest.Matchers.is;
22+
23+
public class RemediateProcessorTests extends ESTestCase {
24+
25+
private static Processor createRemediateProcessor() {
26+
return new RemediateProcessor(randomAlphaOfLength(8), null);
27+
}
28+
29+
private static IngestDocument createFailureDoc(
30+
String currentIndex,
31+
String originalIndex,
32+
String routing,
33+
Map<String, Object> originalSource
34+
) {
35+
Map<String, Object> sourceAndMetadata = new HashMap<>();
36+
// current metadata
37+
sourceAndMetadata.put("_index", currentIndex);
38+
sourceAndMetadata.put("_id", "test-id");
39+
if (randomBoolean()) {
40+
sourceAndMetadata.put("_version", 1);
41+
}
42+
43+
// failure wrapper
44+
Map<String, Object> documentWrapper = new HashMap<>();
45+
documentWrapper.put("index", originalIndex);
46+
if (routing != null) {
47+
documentWrapper.put("routing", routing);
48+
}
49+
documentWrapper.put("source", originalSource);
50+
51+
sourceAndMetadata.put("error", "simulated failure");
52+
sourceAndMetadata.put("document", documentWrapper);
53+
54+
// no special ingest-metadata
55+
Map<String, Object> ingestMetadata = new HashMap<>();
56+
57+
return new IngestDocument(sourceAndMetadata, ingestMetadata);
58+
}
59+
60+
public void testRemediate_basic() throws Exception {
61+
Processor processor = createRemediateProcessor();
62+
63+
Map<String, Object> originalSource = new HashMap<>();
64+
originalSource.put("a", 1);
65+
originalSource.put("b", "x");
66+
67+
IngestDocument doc = createFailureDoc("failure-index", "orig-index", null, originalSource);
68+
69+
processor.execute(doc);
70+
71+
// metadata index restored
72+
assertThat(doc.getFieldValue("_index", String.class), equalTo("orig-index"));
73+
// routing is not set when not provided
74+
assertThat(doc.hasField("_routing", true), is(false));
75+
76+
// source restored at root
77+
assertThat(doc.getFieldValue("a", Integer.class), equalTo(1));
78+
assertThat(doc.getFieldValue("b", String.class), equalTo("x"));
79+
80+
// failure scaffolding removed
81+
assertThat(doc.hasField("error", true), is(false));
82+
assertThat(doc.hasField("document", true), is(false));
83+
}
84+
85+
public void testRemediate_withRouting() throws Exception {
86+
Processor processor = createRemediateProcessor();
87+
88+
Map<String, Object> originalSource = new HashMap<>();
89+
originalSource.put("nested", Map.of("k", "v"));
90+
91+
IngestDocument doc = createFailureDoc("dlq-index", "original-idx", "orig-route", originalSource);
92+
93+
processor.execute(doc);
94+
95+
assertThat(doc.getFieldValue("_index", String.class), equalTo("original-idx"));
96+
assertThat(doc.getFieldValue("_routing", String.class), equalTo("orig-route"));
97+
assertThat(doc.getFieldValue("nested.k", String.class), equalTo("v"));
98+
assertThat(doc.hasField("error", true), is(false));
99+
assertThat(doc.hasField("document", true), is(false));
100+
}
101+
102+
public void testRemediate_missingDocument_throws() {
103+
Processor processor = createRemediateProcessor();
104+
105+
Map<String, Object> sourceAndMetadata = new HashMap<>();
106+
sourceAndMetadata.put("_index", "failure-index");
107+
sourceAndMetadata.put("error", "simulated failure");
108+
// no "document" field
109+
110+
IngestDocument doc = new IngestDocument(sourceAndMetadata, new HashMap<>());
111+
112+
Exception e = expectThrows(IllegalArgumentException.class, () -> processor.execute(doc));
113+
assertThat(e.getMessage(), containsString("document"));
114+
}
115+
116+
public void testRemediate_missingSourceInsideDocument_throws() {
117+
Processor processor = createRemediateProcessor();
118+
119+
Map<String, Object> sourceAndMetadata = new HashMap<>();
120+
sourceAndMetadata.put("_index", "failure-index");
121+
sourceAndMetadata.put("error", "simulated failure");
122+
sourceAndMetadata.put("document", Map.of("index", "orig-index")); // missing "source"
123+
124+
IngestDocument doc = new IngestDocument(sourceAndMetadata, new HashMap<>());
125+
126+
Exception e = expectThrows(IllegalArgumentException.class, () -> processor.execute(doc));
127+
assertThat(e.getMessage(), containsString("source"));
128+
}
129+
}

0 commit comments

Comments
 (0)