Skip to content

Commit 0bb4c2b

Browse files
authored
Forward port #85000 and #84838 to 8.1 branch (#85040)
Forward port #85000 (Revert enrich cache lookup optimisation ) and #84838 (CompoundProcessor should also catch exceptions when executing a processor) to 8.1 branch. Revert enrich cache lookup optimisation (#85028) Forwardporting #85000 to master branch. This PR reverts the optimisation that was added via #77259. This optimisation cleverly ensures no duplicate searches happen if multiple threads concurrently execute the same search. However there are issues with the implementation that cause issues like #84781. The optimisation make use of CompletableFuture and in this case we don't check whether the result has completed exceptionally. Which causes the callback not being invoked and this leads to bulk request not being completed and hanging around. The ingest framework due to its asynchronous nature is already complex and adding CompletableFuture into the mix makes debugging these issues very time consuming. This is the main reason why we like to revert this commit. * CompoundProcessor should also catch exceptions when executing a processor (#84838) (#85035) Currently, CompoundProcessor does not catch Exception and if a processor throws an error and a method higher in the call stack doesn't catch the exception then pipeline execution stalls and bulk requests may not complete. Usually these exceptions are caught by IngestService#executePipelines(...) method, but when a processor executes async (for example: enrich processor) and the thread that executes enrich is no longer the original write thread then there is no logic that deals with failing pipeline execution and cleaning resources up. This then leads to memory leaks. Closes #84781 Also change how 'pipeline doesn't exist' error is thrown in TrackingResultProcessor. With the change to CompoundProcessor thrown exceptions are caught and delegated to handler. SimulateExecutionService in verbose mode ignores exceptions delegated to its handler, since it assumes that processorResultList contains the result (successful or not successful) of every processor in the pipeline. In case TrackingResultProcessor for PipelineProcessor couldn't find the mentioned pipeline then it just throws an error without updating the processorResultList. This commit addresses that.
1 parent d6cdf13 commit 0bb4c2b

File tree

8 files changed

+133
-146
lines changed

8 files changed

+133
-146
lines changed

docs/changelog/84838.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 84838
2+
summary: '`CompoundProcessor` should also catch exceptions when executing a processor'
3+
area: Ingest
4+
type: bug
5+
issues:
6+
- 84781

modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/90_simulate.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -953,7 +953,6 @@ teardown:
953953
---
954954
"Test simulate with provided pipeline that does not exist":
955955
- do:
956-
catch: bad_request
957956
ingest.simulate:
958957
verbose: true
959958
body: >
@@ -974,5 +973,6 @@ teardown:
974973
}
975974
]
976975
}
977-
- match: { error.root_cause.0.type: "illegal_argument_exception" }
978-
- match: { error.root_cause.0.reason: "Pipeline processor configured for non-existent pipeline [____pipeline_doesnot_exist___]" }
976+
- match: { docs.0.processor_results.0.status: "error" }
977+
- match: { docs.0.processor_results.0.error.root_cause.0.type: "illegal_argument_exception" }
978+
- match: { docs.0.processor_results.0.error.root_cause.0.reason: "Pipeline processor configured for non-existent pipeline [____pipeline_doesnot_exist___]" }

server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java

Lines changed: 34 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -133,30 +133,47 @@ void innerExecute(int currentProcessor, IngestDocument ingestDocument, BiConsume
133133
final IngestMetric metric = processorWithMetric.v2();
134134
final long startTimeInNanos = relativeTimeProvider.getAsLong();
135135
metric.preIngest();
136-
processor.execute(ingestDocument, (result, e) -> {
137-
long ingestTimeInNanos = relativeTimeProvider.getAsLong() - startTimeInNanos;
138-
metric.postIngest(ingestTimeInNanos);
136+
try {
137+
processor.execute(ingestDocument, (result, e) -> {
138+
long ingestTimeInNanos = relativeTimeProvider.getAsLong() - startTimeInNanos;
139+
metric.postIngest(ingestTimeInNanos);
139140

140-
if (e != null) {
141-
metric.ingestFailed();
142-
if (ignoreFailure) {
143-
innerExecute(currentProcessor + 1, ingestDocument, handler);
141+
if (e != null) {
142+
executeOnFailure(currentProcessor, ingestDocument, handler, processor, metric, e);
144143
} else {
145-
IngestProcessorException compoundProcessorException = newCompoundProcessorException(e, processor, ingestDocument);
146-
if (onFailureProcessors.isEmpty()) {
147-
handler.accept(null, compoundProcessorException);
144+
if (result != null) {
145+
innerExecute(currentProcessor + 1, result, handler);
148146
} else {
149-
executeOnFailureAsync(0, ingestDocument, compoundProcessorException, handler);
147+
handler.accept(null, null);
150148
}
151149
}
150+
});
151+
} catch (Exception e) {
152+
long ingestTimeInNanos = relativeTimeProvider.getAsLong() - startTimeInNanos;
153+
metric.postIngest(ingestTimeInNanos);
154+
executeOnFailure(currentProcessor, ingestDocument, handler, processor, metric, e);
155+
}
156+
}
157+
158+
private void executeOnFailure(
159+
int currentProcessor,
160+
IngestDocument ingestDocument,
161+
BiConsumer<IngestDocument, Exception> handler,
162+
Processor processor,
163+
IngestMetric metric,
164+
Exception e
165+
) {
166+
metric.ingestFailed();
167+
if (ignoreFailure) {
168+
innerExecute(currentProcessor + 1, ingestDocument, handler);
169+
} else {
170+
IngestProcessorException compoundProcessorException = newCompoundProcessorException(e, processor, ingestDocument);
171+
if (onFailureProcessors.isEmpty()) {
172+
handler.accept(null, compoundProcessorException);
152173
} else {
153-
if (result != null) {
154-
innerExecute(currentProcessor + 1, result, handler);
155-
} else {
156-
handler.accept(null, null);
157-
}
174+
executeOnFailureAsync(0, ingestDocument, compoundProcessorException, handler);
158175
}
159-
});
176+
}
160177
}
161178

162179
void executeOnFailureAsync(

server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,11 +69,23 @@ public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Ex
6969
IngestDocument ingestDocumentCopy = new IngestDocument(ingestDocument);
7070
Pipeline pipelineToCall = pipelineProcessor.getPipeline(ingestDocument);
7171
if (pipelineToCall == null) {
72-
throw new IllegalArgumentException(
72+
IllegalArgumentException e = new IllegalArgumentException(
7373
"Pipeline processor configured for non-existent pipeline ["
7474
+ pipelineProcessor.getPipelineToCallName(ingestDocument)
7575
+ ']'
7676
);
77+
// Add error as processor result, otherwise this gets lost in SimulateExecutionService#execute(...) and
78+
// an empty response gets returned by the ingest simulate api.
79+
processorResultList.add(
80+
new SimulateProcessorResult(
81+
pipelineProcessor.getType(),
82+
pipelineProcessor.getTag(),
83+
pipelineProcessor.getDescription(),
84+
e,
85+
conditionalWithResult
86+
)
87+
);
88+
throw e;
7789
}
7890
ingestDocumentCopy.executePipeline(pipelineToCall, (result, e) -> {
7991
// special handling for pipeline cycle errors

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichCache.java

Lines changed: 21 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
package org.elasticsearch.xpack.enrich;
99

10-
import org.elasticsearch.action.ActionListener;
1110
import org.elasticsearch.action.search.SearchRequest;
1211
import org.elasticsearch.action.search.SearchResponse;
1312
import org.elasticsearch.cluster.metadata.IndexAbstraction;
@@ -24,11 +23,6 @@
2423
import java.util.List;
2524
import java.util.Map;
2625
import java.util.Objects;
27-
import java.util.concurrent.CompletableFuture;
28-
import java.util.concurrent.ExecutionException;
29-
import java.util.function.BiConsumer;
30-
31-
import static org.elasticsearch.action.ActionListener.wrap;
3226

3327
/**
3428
* A simple cache for enrich that uses {@link Cache}. There is one instance of this cache and
@@ -48,24 +42,32 @@
4842
* current enrich index the enrich alias of an policy refers to. It would require checking
4943
* all cached entries on each cluster state update)
5044
*/
51-
public class EnrichCache {
45+
public final class EnrichCache {
5246

53-
protected final Cache<CacheKey, CompletableFuture<List<Map<?, ?>>>> cache;
47+
private final Cache<CacheKey, List<Map<?, ?>>> cache;
5448
private volatile Metadata metadata;
5549

5650
EnrichCache(long maxSize) {
57-
this.cache = CacheBuilder.<CacheKey, CompletableFuture<List<Map<?, ?>>>>builder().setMaximumWeight(maxSize).build();
51+
this.cache = CacheBuilder.<CacheKey, List<Map<?, ?>>>builder().setMaximumWeight(maxSize).build();
5852
}
5953

60-
/**
61-
* Get the value from the cache if present. Returns immediately.
62-
* See {@link #resolveOrDispatchSearch(SearchRequest, BiConsumer, BiConsumer)} to implement a read-through, possibly async interaction.
63-
* @param searchRequest the key
64-
* @return the cached value or null
65-
*/
66-
CompletableFuture<List<Map<?, ?>>> get(SearchRequest searchRequest) {
67-
CacheKey cacheKey = toKey(searchRequest);
68-
return cache.get(cacheKey);
54+
List<Map<?, ?>> get(SearchRequest searchRequest) {
55+
String enrichIndex = getEnrichIndexKey(searchRequest);
56+
CacheKey cacheKey = new CacheKey(enrichIndex, searchRequest);
57+
58+
List<Map<?, ?>> response = cache.get(cacheKey);
59+
if (response != null) {
60+
return deepCopy(response, false);
61+
} else {
62+
return null;
63+
}
64+
}
65+
66+
void put(SearchRequest searchRequest, List<Map<?, ?>> response) {
67+
String enrichIndex = getEnrichIndexKey(searchRequest);
68+
CacheKey cacheKey = new CacheKey(enrichIndex, searchRequest);
69+
70+
cache.put(cacheKey, response);
6971
}
7072

7173
void setMetadata(Metadata metadata) {
@@ -83,61 +85,13 @@ public EnrichStatsAction.Response.CacheStats getStats(String localNodeId) {
8385
);
8486
}
8587

86-
/**
87-
* resolves the entry from the cache and provides reports the result to the `callBack` This method does not dispatch any logic
88-
* to another thread. Under contention the searchDispatcher is only called once when the value is not in the cache. The
89-
* searchDispatcher should schedule the search / callback _asynchronously_ because if the searchDispatcher blocks, then this
90-
* method will block. The callback is call on the thread calling this method or under cache miss and contention, the thread running
91-
* the part of the searchDispatcher that calls the callback.
92-
* @param searchRequest the cache key and input for the search dispatcher
93-
* @param searchDispatcher the logical block to be called on cache miss
94-
* @param callBack the callback which gets the value asynchronously, which could be a searchResponse or exception (negative lookup)
95-
*/
96-
public void resolveOrDispatchSearch(
97-
SearchRequest searchRequest,
98-
BiConsumer<SearchRequest, ActionListener<SearchResponse>> searchDispatcher,
99-
BiConsumer<List<Map<?, ?>>, Exception> callBack
100-
) {
101-
CacheKey cacheKey = toKey(searchRequest);
102-
try {
103-
CompletableFuture<List<Map<?, ?>>> cacheEntry = cache.computeIfAbsent(cacheKey, request -> {
104-
CompletableFuture<List<Map<?, ?>>> completableFuture = new CompletableFuture<>();
105-
searchDispatcher.accept(
106-
searchRequest,
107-
wrap(response -> completableFuture.complete(toCacheValue(response)), completableFuture::completeExceptionally)
108-
);
109-
return completableFuture;
110-
});
111-
cacheEntry.whenComplete((response, throwable) -> {
112-
if (throwable != null) {
113-
// Don't cache failures
114-
cache.invalidate(cacheKey, cacheEntry);
115-
if (throwable instanceof Exception e) {
116-
callBack.accept(null, e);
117-
return;
118-
}
119-
// Let ElasticsearchUncaughtExceptionHandler handle this, which should halt Elasticsearch
120-
throw (Error) throwable;
121-
}
122-
callBack.accept(deepCopy(response, false), null);
123-
});
124-
} catch (ExecutionException e) {
125-
callBack.accept(null, e);
126-
}
127-
}
128-
129-
protected CacheKey toKey(SearchRequest searchRequest) {
130-
String enrichIndex = getEnrichIndexKey(searchRequest);
131-
return new CacheKey(enrichIndex, searchRequest);
132-
}
133-
13488
private String getEnrichIndexKey(SearchRequest searchRequest) {
13589
String alias = searchRequest.indices()[0];
13690
IndexAbstraction ia = metadata.getIndicesLookup().get(alias);
13791
return ia.getIndices().get(0).getName();
13892
}
13993

140-
private List<Map<?, ?>> toCacheValue(SearchResponse response) {
94+
List<Map<?, ?>> toCacheValue(SearchResponse response) {
14195
List<Map<?, ?>> result = new ArrayList<>(response.getHits().getHits().length);
14296
for (SearchHit hit : response.getHits()) {
14397
result.add(deepCopy(hit.getSourceAsMap(), true));

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
*/
77
package org.elasticsearch.xpack.enrich;
88

9+
import org.elasticsearch.action.ActionListener;
910
import org.elasticsearch.action.search.SearchRequest;
1011
import org.elasticsearch.client.internal.Client;
1112
import org.elasticsearch.client.internal.OriginSettingClient;
@@ -128,10 +129,18 @@ public void accept(ClusterState state) {
128129
EnrichCache enrichCache
129130
) {
130131
Client originClient = new OriginSettingClient(client, ENRICH_ORIGIN);
131-
return (req, handler) -> enrichCache.resolveOrDispatchSearch(
132-
req,
133-
(searchRequest, listener) -> originClient.execute(EnrichCoordinatorProxyAction.INSTANCE, searchRequest, listener),
134-
handler
135-
);
132+
return (req, handler) -> {
133+
// intentionally non-locking for simplicity...it's OK if we re-put the same key/value in the cache during a race condition.
134+
List<Map<?, ?>> response = enrichCache.get(req);
135+
if (response != null) {
136+
handler.accept(response, null);
137+
} else {
138+
originClient.execute(EnrichCoordinatorProxyAction.INSTANCE, req, ActionListener.wrap(resp -> {
139+
List<Map<?, ?>> value = enrichCache.toCacheValue(resp);
140+
enrichCache.put(req, value);
141+
handler.accept(EnrichCache.deepCopy(value, false), null);
142+
}, e -> { handler.accept(null, e); }));
143+
}
144+
};
136145
}
137146
}

x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/BasicEnrichTests.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -371,6 +371,39 @@ public void testTemplating() throws Exception {
371371
assertThat(keys.contains(userEntry.get(MATCH_FIELD)), is(true));
372372
}
373373

374+
public void testFailureAfterEnrich() throws Exception {
375+
List<String> keys = createSourceMatchIndex(1, 1);
376+
String policyName = "my-policy";
377+
EnrichPolicy enrichPolicy = new EnrichPolicy(
378+
EnrichPolicy.MATCH_TYPE,
379+
null,
380+
List.of(SOURCE_INDEX_NAME),
381+
MATCH_FIELD,
382+
Arrays.asList(DECORATE_FIELDS)
383+
);
384+
PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(policyName, enrichPolicy);
385+
client().execute(PutEnrichPolicyAction.INSTANCE, request).actionGet();
386+
client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(policyName)).actionGet();
387+
388+
// A pipeline with a foreach that uses a non existing field that is specified after enrich has run:
389+
String pipelineName = "my-pipeline";
390+
String pipelineBody = "{\"processors\": [{\"enrich\": {\"policy_name\":\""
391+
+ policyName
392+
+ "\", \"field\": \"email\", \"target_field\": \"users\"}},"
393+
+ "{ \"foreach\": {\"field\":\"users\", \"processor\":{\"append\":{\"field\":\"matched2\",\"value\":\"{{_ingest._value}}\"}}}}"
394+
+ "]}";
395+
PutPipelineRequest putPipelineRequest = new PutPipelineRequest(pipelineName, new BytesArray(pipelineBody), XContentType.JSON);
396+
client().admin().cluster().putPipeline(putPipelineRequest).actionGet();
397+
398+
for (int i = 0; i < 5; i++) {
399+
IndexRequest indexRequest = new IndexRequest("my-index").id("1")
400+
.setPipeline(pipelineName)
401+
.source(Map.of(MATCH_FIELD, "non_existing"));
402+
Exception e = expectThrows(IllegalArgumentException.class, () -> client().index(indexRequest).actionGet());
403+
assertThat(e.getMessage(), equalTo("field [users] not present as part of path [users]"));
404+
}
405+
}
406+
374407
private List<String> createSourceMatchIndex(int numKeys, int numDocsPerKey) {
375408
Set<String> keys = new HashSet<>();
376409
for (int id = 0; id < numKeys; id++) {

0 commit comments

Comments
 (0)