Skip to content

Commit d2c8dd1

Browse files
authored
Standardize Redact processor metadata field usage (elastic#134607)
Standardizes how the Redact processor writes its `_ingest._redact._is_redacted` field to always create an intermediate object in the ingest metadata. This protects users from not being able to see the field if the processor is run in `flexible` access pattern mode (and thus would be stored as a dotted field name by default). PR also moves some commonly used ingest document test code to a place where it can be reused.
1 parent df436fd commit d2c8dd1

File tree

4 files changed

+115
-30
lines changed

4 files changed

+115
-30
lines changed

server/src/test/java/org/elasticsearch/ingest/IngestDocumentTests.java

Lines changed: 2 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import java.util.List;
2727
import java.util.Map;
2828
import java.util.Set;
29-
import java.util.concurrent.atomic.AtomicReference;
3029
import java.util.function.BiConsumer;
3130
import java.util.function.Consumer;
3231
import java.util.stream.DoubleStream;
@@ -148,33 +147,7 @@ public void setTestIngestDocument() {
148147
* @throws Exception Any exception thrown from the provided consumer
149148
*/
150149
private void doWithAccessPattern(IngestPipelineFieldAccessPattern accessPattern, Consumer<IngestDocument> action) throws Exception {
151-
AtomicReference<Exception> exceptionAtomicReference = new AtomicReference<>(null);
152-
document.executePipeline(
153-
new Pipeline(
154-
randomAlphanumericOfLength(10),
155-
null,
156-
null,
157-
null,
158-
new CompoundProcessor(new TestProcessor(action)),
159-
accessPattern,
160-
null,
161-
null,
162-
null
163-
),
164-
(ignored, ex) -> {
165-
if (ex != null) {
166-
if (ex instanceof IngestProcessorException ingestProcessorException) {
167-
exceptionAtomicReference.set((Exception) ingestProcessorException.getCause());
168-
} else {
169-
exceptionAtomicReference.set(ex);
170-
}
171-
}
172-
}
173-
);
174-
Exception exception = exceptionAtomicReference.get();
175-
if (exception != null) {
176-
throw exception;
177-
}
150+
IngestPipelineTestUtils.doWithAccessPattern(accessPattern, document, action);
178151
}
179152

180153
/**
@@ -184,7 +157,7 @@ private void doWithAccessPattern(IngestPipelineFieldAccessPattern accessPattern,
184157
* @throws Exception Any exception thrown from the provided consumer
185158
*/
186159
private void doWithRandomAccessPattern(Consumer<IngestDocument> action) throws Exception {
187-
doWithAccessPattern(randomFrom(IngestPipelineFieldAccessPattern.values()), action);
160+
IngestPipelineTestUtils.doWithRandomAccessPattern(document, action);
188161
}
189162

190163
private void assertPathValid(IngestDocument doc, String path) {

test/framework/src/main/java/org/elasticsearch/ingest/IngestPipelineTestUtils.java

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,11 @@
2929
import org.elasticsearch.xcontent.XContentType;
3030

3131
import java.io.IOException;
32+
import java.util.function.Consumer;
3233

3334
import static org.elasticsearch.test.ESTestCase.TEST_REQUEST_TIMEOUT;
35+
import static org.elasticsearch.test.ESTestCase.randomAlphanumericOfLength;
36+
import static org.elasticsearch.test.ESTestCase.randomFrom;
3437
import static org.elasticsearch.test.ESTestCase.safeGet;
3538
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
3639
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
@@ -140,4 +143,86 @@ public static SimulatePipelineRequest jsonSimulatePipelineRequest(String jsonStr
140143
public static SimulatePipelineRequest jsonSimulatePipelineRequest(BytesReference jsonBytes) {
141144
return new SimulatePipelineRequest(ReleasableBytesReference.wrap(jsonBytes), XContentType.JSON);
142145
}
146+
147+
/**
148+
* Executes an action against an ingest document using a random access pattern. A synthetic pipeline instance with the provided
149+
* access pattern is created and executed against the ingest document, thus updating its internal access pattern.
150+
* @param document The document to operate on
151+
* @param action A consumer which takes the updated ingest document during execution
152+
* @throws Exception Any exception thrown from the provided consumer
153+
*/
154+
public static void doWithRandomAccessPattern(IngestDocument document, Consumer<IngestDocument> action) throws Exception {
155+
doWithAccessPattern(randomFrom(IngestPipelineFieldAccessPattern.values()), document, action);
156+
}
157+
158+
/**
159+
* Executes an action against an ingest document using a random access pattern. A synthetic pipeline instance with the provided
160+
* access pattern is created and executed against the ingest document, thus updating its internal access pattern.
161+
* @param accessPattern The access pattern to use when executing the block of code
162+
* @param document The document to operate on
163+
* @param action A consumer which takes the updated ingest document during execution
164+
* @throws Exception Any exception thrown from the provided consumer
165+
*/
166+
public static void doWithAccessPattern(
167+
IngestPipelineFieldAccessPattern accessPattern,
168+
IngestDocument document,
169+
Consumer<IngestDocument> action
170+
) throws Exception {
171+
runWithAccessPattern(accessPattern, document, new TestProcessor(action));
172+
}
173+
174+
/**
175+
* Executes a processor against an ingest document using a random access pattern. A synthetic pipeline instance with the provided
176+
* access pattern is created and executed against the ingest document, thus updating its internal access pattern.
177+
* @param document The document to operate on
178+
* @param processor A processor which takes the updated ingest document during execution
179+
* @return the resulting ingest document instance
180+
* @throws Exception Any exception thrown from the provided consumer
181+
*/
182+
public static IngestDocument runWithRandomAccessPattern(IngestDocument document, Processor processor) throws Exception {
183+
return runWithAccessPattern(randomFrom(IngestPipelineFieldAccessPattern.values()), document, processor);
184+
}
185+
186+
/**
187+
* Executes a processor against an ingest document using the provided access pattern. A synthetic pipeline instance with the provided
188+
* access pattern is created and executed against the ingest document, thus updating its internal access pattern.
189+
* @param accessPattern The access pattern to use when executing the block of code
190+
* @param document The document to operate on
191+
* @param processor A processor which takes the updated ingest document during execution
192+
* @return the resulting ingest document instance
193+
* @throws Exception Any exception thrown from the provided consumer
194+
*/
195+
public static IngestDocument runWithAccessPattern(
196+
IngestPipelineFieldAccessPattern accessPattern,
197+
IngestDocument document,
198+
Processor processor
199+
) throws Exception {
200+
IngestDocument[] ingestDocumentHolder = new IngestDocument[1];
201+
Exception[] exceptionHolder = new Exception[1];
202+
document.executePipeline(
203+
new Pipeline(
204+
randomAlphanumericOfLength(10),
205+
null,
206+
null,
207+
null,
208+
new CompoundProcessor(processor),
209+
accessPattern,
210+
null,
211+
null,
212+
null
213+
),
214+
(result, ex) -> {
215+
ingestDocumentHolder[0] = result;
216+
exceptionHolder[0] = ex;
217+
}
218+
);
219+
Exception exception = exceptionHolder[0];
220+
if (exception != null) {
221+
if (exception instanceof IngestProcessorException ingestProcessorException) {
222+
exception = ((Exception) ingestProcessorException.getCause());
223+
}
224+
throw exception;
225+
}
226+
return ingestDocumentHolder[0];
227+
}
143228
}

x-pack/plugin/redact/src/main/java/org/elasticsearch/xpack/redact/RedactProcessor.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.nio.charset.StandardCharsets;
3232
import java.util.ArrayList;
3333
import java.util.Comparator;
34+
import java.util.HashMap;
3435
import java.util.List;
3536
import java.util.Map;
3637

@@ -227,7 +228,10 @@ private void updateMetadataIfNecessary(IngestDocument ingestDocument, String fie
227228

228229
// document newly redacted
229230
if (alreadyRedacted == false && isRedacted) {
230-
ingestDocument.setFieldValue(METADATA_PATH_REDACT_IS_REDACTED, true);
231+
// Set the field directly in the metadata map to avoid any access pattern related problems
232+
Map<String, Object> redactObject = HashMap.newHashMap(1);
233+
redactObject.put(IS_REDACTED_KEY, true);
234+
ingestDocument.getIngestMetadata().put(REDACT_KEY, redactObject);
231235
}
232236
}
233237

x-pack/plugin/redact/src/test/java/org/elasticsearch/xpack/redact/RedactProcessorTests.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
import org.elasticsearch.grok.MatcherWatchdog;
1212
import org.elasticsearch.index.VersionType;
1313
import org.elasticsearch.ingest.IngestDocument;
14+
import org.elasticsearch.ingest.IngestPipelineFieldAccessPattern;
15+
import org.elasticsearch.ingest.IngestPipelineTestUtils;
1416
import org.elasticsearch.license.MockLicenseState;
1517
import org.elasticsearch.license.TestUtils;
1618
import org.elasticsearch.license.XPackLicenseState;
@@ -369,6 +371,27 @@ public void testTraceRedact() throws Exception {
369371
assertTrue(redactMetadata.containsKey(RedactProcessor.IS_REDACTED_KEY));
370372
assertTrue((Boolean) redactMetadata.get(RedactProcessor.IS_REDACTED_KEY));
371373
}
374+
{
375+
var processor = new RedactProcessor.Factory(mockLicenseState(), MatcherWatchdog.noop()).create(
376+
null,
377+
"t",
378+
"d",
379+
new HashMap<>(config),
380+
null
381+
);
382+
var ingestDoc = createIngestDoc(Map.of("to_redact", "[email protected] will be redacted"));
383+
var redactedDoc = IngestPipelineTestUtils.runWithAccessPattern(IngestPipelineFieldAccessPattern.FLEXIBLE, ingestDoc, processor);
384+
385+
assertEquals("<REDACTED> will be redacted", redactedDoc.getFieldValue("to_redact", String.class));
386+
// validate ingest metadata path correctly resolved in classic mode
387+
assertTrue(redactedDoc.getFieldValue(RedactProcessor.METADATA_PATH_REDACT_IS_REDACTED, Boolean.class));
388+
// validate ingest metadata structure correct
389+
var ingestMeta = redactedDoc.getIngestMetadata();
390+
assertTrue(ingestMeta.containsKey(RedactProcessor.REDACT_KEY));
391+
var redactMetadata = (HashMap<String, Object>) ingestMeta.get(RedactProcessor.REDACT_KEY);
392+
assertTrue(redactMetadata.containsKey(RedactProcessor.IS_REDACTED_KEY));
393+
assertTrue((Boolean) redactMetadata.get(RedactProcessor.IS_REDACTED_KEY));
394+
}
372395
{
373396
var configNoTrace = new HashMap<String, Object>();
374397
configNoTrace.put("field", "to_redact");

0 commit comments

Comments
 (0)