Skip to content

Commit 9644135

Browse files
authored
Fix async enrich execution prematurely releases enrich policy lock (#94702) (#94841)
This PR ups the timeout on the EnrichExecutor's task API call and adds additional logic in the event that the task await call fails. Without this change, the task API call can timeout and unlock the policy prematurely. Premature unlocking can lead to the index being removed while the policy is executing. (cherry picked from commit f56fc01) # Conflicts: # x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutor.java # x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutorTests.java
1 parent c40f7af commit 9644135

File tree

4 files changed

+272
-9
lines changed

4 files changed

+272
-9
lines changed

docs/changelog/94702.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 94702
2+
summary: Fix async enrich execution prematurely releases enrich policy lock
3+
area: Ingest Node
4+
type: bug
5+
issues:
6+
- 94690

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutor.java

Lines changed: 63 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,21 @@
77

88
package org.elasticsearch.xpack.enrich;
99

10+
import org.apache.logging.log4j.LogManager;
11+
import org.apache.logging.log4j.Logger;
12+
import org.elasticsearch.ElasticsearchTimeoutException;
13+
import org.elasticsearch.ExceptionsHelper;
1014
import org.elasticsearch.ResourceNotFoundException;
1115
import org.elasticsearch.action.ActionListener;
1216
import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskRequest;
17+
import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskResponse;
1318
import org.elasticsearch.client.Client;
1419
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
1520
import org.elasticsearch.cluster.service.ClusterService;
1621
import org.elasticsearch.common.settings.Settings;
1722
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
1823
import org.elasticsearch.core.Releasable;
24+
import org.elasticsearch.core.TimeValue;
1925
import org.elasticsearch.tasks.TaskId;
2026
import org.elasticsearch.threadpool.ThreadPool;
2127
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
@@ -30,6 +36,8 @@
3036

3137
public class EnrichPolicyExecutor {
3238

39+
private static final Logger logger = LogManager.getLogger(EnrichPolicyExecutor.class);
40+
3341
public static final String TASK_ACTION = "policy_execution";
3442

3543
private final ClusterService clusterService;
@@ -69,6 +77,7 @@ public void coordinatePolicyExecution(
6977
ActionListener<ExecuteEnrichPolicyAction.Response> listener
7078
) {
7179
long nowTimestamp = nowSupplier.getAsLong();
80+
String policyName = request.getName();
7281
String enrichIndexName = EnrichPolicy.getIndexName(request.getName(), nowTimestamp);
7382
Releasable policyLock = tryLockingPolicy(request.getName(), enrichIndexName);
7483
try {
@@ -77,14 +86,19 @@ public void coordinatePolicyExecution(
7786
internalRequest.setParentTask(request.getParentTask());
7887
client.execute(InternalExecutePolicyAction.INSTANCE, internalRequest, ActionListener.wrap(response -> {
7988
if (response.getStatus() != null) {
89+
logger.debug("Unlocking enrich policy [{}:{}] on complete with no task scheduled", policyName, enrichIndexName);
8090
policyLock.close();
8191
listener.onResponse(response);
8292
} else {
8393
assert response.getTaskId() != null : "If the execute response does not have a status it must return a task id";
84-
awaitTaskCompletionAndThenRelease(response.getTaskId(), policyLock);
94+
awaitTaskCompletionAndThenRelease(response.getTaskId(), () -> {
95+
logger.debug("Unlocking enrich policy [{}:{}] on completion of task status", policyName, enrichIndexName);
96+
policyLock.close();
97+
}, policyName, enrichIndexName);
8598
listener.onResponse(response);
8699
}
87100
}, e -> {
101+
logger.debug("Unlocking enrich policy [{}:{}] on failure to execute internal action", policyName, enrichIndexName);
88102
policyLock.close();
89103
listener.onFailure(e);
90104
}));
@@ -138,11 +152,54 @@ private Releasable tryLockingPolicy(String policyName, String enrichIndexName) {
138152
};
139153
}
140154

141-
private void awaitTaskCompletionAndThenRelease(TaskId taskId, Releasable policyLock) {
142-
GetTaskRequest getTaskRequest = new GetTaskRequest();
143-
getTaskRequest.setTaskId(taskId);
144-
getTaskRequest.setWaitForCompletion(true);
145-
client.admin().cluster().getTask(getTaskRequest, ActionListener.wrap(policyLock::close));
155+
private void awaitTaskCompletionAndThenRelease(
156+
TaskId taskId,
157+
Releasable policyLock,
158+
final String policyName,
159+
final String enrichIndexName
160+
) {
161+
GetTaskRequest getTaskRequest = new GetTaskRequest().setTaskId(taskId).setWaitForCompletion(true).setTimeout(TimeValue.MAX_VALUE);
162+
client.admin().cluster().getTask(getTaskRequest, new ActionListener<GetTaskResponse>() {
163+
@Override
164+
public void onResponse(GetTaskResponse getTaskResponse) {
165+
policyLock.close();
166+
}
167+
168+
@Override
169+
public void onFailure(Exception exception) {
170+
if (ExceptionsHelper.unwrap(exception, ResourceNotFoundException.class) != null) {
171+
// Could not find task, which means it completed, failed, or the node is gone. Clean up policy lock.
172+
logger.debug(
173+
"Assuming async policy [{}:{}] execution task [{}] has ended after not being able to retrieve it from remote host",
174+
policyName,
175+
enrichIndexName,
176+
taskId
177+
);
178+
policyLock.close();
179+
} else if (ExceptionsHelper.unwrap(exception, ElasticsearchTimeoutException.class) != null) {
180+
// Timeout occurred while waiting for completion, launch the wait again
181+
logger.debug(
182+
"Retrying task wait after encountering timeout during async policy execution result [{}:{}]",
183+
policyName,
184+
enrichIndexName
185+
);
186+
awaitTaskCompletionAndThenRelease(taskId, policyLock, policyName, enrichIndexName);
187+
} else {
188+
// We've encountered an unforeseen problem while waiting for the policy to complete. Could be a network error or
189+
// something else. Instead of keeping the policy locked forever and potentially jamming the enrich feature during
190+
// an unstable cluster event, we should unlock it and accept the possibility of an inconsistent execution.
191+
logger.error(
192+
"Emergency unlock for enrich policy ["
193+
+ policyName
194+
+ ":"
195+
+ enrichIndexName
196+
+ "] on failure to determine task status caused by unhandled exception",
197+
exception
198+
);
199+
policyLock.close();
200+
}
201+
}
202+
});
146203
}
147204

148205
private Runnable createPolicyRunner(

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyLocks.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
1010
import org.elasticsearch.core.Releasable;
1111

12-
import java.util.HashSet;
12+
import java.util.Collections;
1313
import java.util.Set;
1414
import java.util.concurrent.ConcurrentHashMap;
1515
import java.util.concurrent.Semaphore;
@@ -108,11 +108,13 @@ public EnrichPolicyLock lockPolicy(String policyName, String enrichIndexName) {
108108
}
109109

110110
public Set<String> lockedPolices() {
111-
return new HashSet<>(policyLocks.keySet());
111+
// Wrap as unmodifiable instead of copying
112+
return Collections.unmodifiableSet(policyLocks.keySet());
112113
}
113114

114115
public Set<String> inflightPolicyIndices() {
115-
return new HashSet<>(workingIndices.keySet());
116+
// Wrap as unmodifiable instead of copying
117+
return Collections.unmodifiableSet(workingIndices.keySet());
116118
}
117119

118120
}

x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutorTests.java

Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,16 @@
77

88
package org.elasticsearch.xpack.enrich;
99

10+
import org.elasticsearch.ElasticsearchException;
11+
import org.elasticsearch.ElasticsearchTimeoutException;
1012
import org.elasticsearch.ResourceNotFoundException;
1113
import org.elasticsearch.action.ActionListener;
1214
import org.elasticsearch.action.ActionRequest;
1315
import org.elasticsearch.action.ActionResponse;
1416
import org.elasticsearch.action.ActionType;
1517
import org.elasticsearch.action.LatchedActionListener;
18+
import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskAction;
19+
import org.elasticsearch.action.support.PlainActionFuture;
1620
import org.elasticsearch.client.Client;
1721
import org.elasticsearch.cluster.ClusterName;
1822
import org.elasticsearch.cluster.ClusterState;
@@ -22,22 +26,31 @@
2226
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
2327
import org.elasticsearch.core.Map;
2428
import org.elasticsearch.indices.TestIndexNameExpressionResolver;
29+
import org.elasticsearch.tasks.TaskId;
2530
import org.elasticsearch.test.ESTestCase;
2631
import org.elasticsearch.test.client.NoOpClient;
2732
import org.elasticsearch.threadpool.TestThreadPool;
2833
import org.elasticsearch.threadpool.ThreadPool;
2934
import org.elasticsearch.xcontent.XContentType;
3035
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
3136
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
37+
import org.elasticsearch.xpack.enrich.action.InternalExecutePolicyAction;
3238
import org.junit.AfterClass;
39+
import org.junit.Assert;
3340
import org.junit.BeforeClass;
3441

42+
import java.util.concurrent.BrokenBarrierException;
3543
import java.util.concurrent.CountDownLatch;
44+
import java.util.concurrent.CyclicBarrier;
3645
import java.util.concurrent.TimeUnit;
46+
import java.util.concurrent.TimeoutException;
47+
import java.util.concurrent.atomic.AtomicBoolean;
3748

3849
import static org.hamcrest.CoreMatchers.containsString;
3950
import static org.hamcrest.CoreMatchers.equalTo;
4051
import static org.hamcrest.CoreMatchers.is;
52+
import static org.hamcrest.CoreMatchers.notNullValue;
53+
import static org.hamcrest.CoreMatchers.nullValue;
4154
import static org.hamcrest.Matchers.empty;
4255
import static org.mockito.Mockito.mock;
4356
import static org.mockito.Mockito.when;
@@ -183,6 +196,191 @@ public void testMaximumPolicyExecutionLimit() throws InterruptedException {
183196
finalTaskComplete.await();
184197
}
185198

199+
public void testWaitForCompletionConditionRemainsLocked() throws Exception {
200+
String testPolicyName = "test_policy";
201+
String testTaskId = randomAlphaOfLength(10) + ":" + randomIntBetween(100, 300);
202+
boolean completeWithResourceNotFound = randomBoolean();
203+
204+
// Client calls are forked to a different thread which will await on this latch before actually running anything
205+
CountDownLatch clientBlockingLatch = new CountDownLatch(1);
206+
// When the client is called with a GetTask call a second time, it should count down this latch, so we can check the lock status.
207+
CountDownLatch secondGetTaskWasCalled = new CountDownLatch(1);
208+
// A barrier to repeatedly control when the async client will respond with Get Task API results.
209+
CyclicBarrier getTaskActionBlockingBarrier = new CyclicBarrier(2);
210+
// State flag to ensure first Get Task API call will fail.
211+
AtomicBoolean shouldGetTaskApiReturnTimeout = new AtomicBoolean(true);
212+
213+
// Create the async testing client
214+
Client client = new NoOpClient(testThreadPool) {
215+
@Override
216+
protected <Request extends ActionRequest, Response extends ActionResponse> void doExecute(
217+
ActionType<Response> action,
218+
Request request,
219+
ActionListener<Response> listener
220+
) {
221+
// Validate the request on the submitting thread before forking its execution.
222+
if (request instanceof InternalExecutePolicyAction.Request) {
223+
assertFalse(((InternalExecutePolicyAction.Request) request).isWaitForCompletion());
224+
}
225+
// Execute all client operations on another thread.
226+
testThreadPool.generic().execute(() -> {
227+
try {
228+
// All client operations should wait until we're ready in the test.
229+
clientBlockingLatch.await();
230+
} catch (InterruptedException e) {
231+
Thread.currentThread().interrupt();
232+
}
233+
234+
if (GetTaskAction.INSTANCE.equals(action)) {
235+
if (shouldGetTaskApiReturnTimeout.get() == false) {
236+
// This is the second call to the Get Task API, so count down the latch to let the main test logic know.
237+
secondGetTaskWasCalled.countDown();
238+
}
239+
// Enrich uses GetTaskAction to detect when the task completes during wait_for_completion. The first call will
240+
// throw a timeout, and all remaining calls will return normally.
241+
try {
242+
// Wait until the signal is given to respond to the get task action
243+
getTaskActionBlockingBarrier.await();
244+
} catch (InterruptedException | BrokenBarrierException e) {
245+
throw new RuntimeException(e);
246+
}
247+
// First call is a timeout to test the recovery logic. Remaining calls will no-op which should complete
248+
// the execution.
249+
if (shouldGetTaskApiReturnTimeout.getAndSet(false)) {
250+
listener.onFailure(new ElasticsearchTimeoutException("Test call has timed out"));
251+
} else if (completeWithResourceNotFound) {
252+
listener.onFailure(new ElasticsearchException("Test wrapping", new ResourceNotFoundException("test")));
253+
} else {
254+
listener.onResponse(null);
255+
}
256+
} else if (InternalExecutePolicyAction.INSTANCE.equals(action)) {
257+
// Return a fake task id for the run
258+
@SuppressWarnings("unchecked")
259+
Response response = (Response) new ExecuteEnrichPolicyAction.Response(new TaskId(testTaskId));
260+
listener.onResponse(response);
261+
} else {
262+
listener.onResponse(null);
263+
}
264+
});
265+
}
266+
};
267+
268+
// Set up
269+
final EnrichPolicyLocks enrichPolicyLocks = new EnrichPolicyLocks();
270+
final EnrichPolicyExecutor testExecutor = new EnrichPolicyExecutor(
271+
Settings.EMPTY,
272+
null,
273+
client,
274+
testThreadPool,
275+
TestIndexNameExpressionResolver.newInstance(testThreadPool.getThreadContext()),
276+
enrichPolicyLocks,
277+
ESTestCase::randomNonNegativeLong
278+
);
279+
280+
// Launch a fake policy run that will block until firstTaskBlock is counted down.
281+
PlainActionFuture<ExecuteEnrichPolicyAction.Response> firstTaskResult = PlainActionFuture.newFuture();
282+
testExecutor.coordinatePolicyExecution(
283+
new ExecuteEnrichPolicyAction.Request(testPolicyName).setWaitForCompletion(false),
284+
firstTaskResult
285+
);
286+
287+
// Check to make sure the policy is locked. Do this instead of an assertTrue so that we can clean up if something breaks.
288+
if (enrichPolicyLocks.lockedPolices().contains(testPolicyName) == false) {
289+
// If this fails, be a good citizen and conclude the fake runs to keep the logs clean from interrupted exceptions during cleanup
290+
clientBlockingLatch.countDown();
291+
try {
292+
firstTaskResult.get(3, TimeUnit.SECONDS);
293+
} catch (Exception e) {
294+
logger.error("Encountered ignorable exception during test cleanup");
295+
}
296+
try {
297+
// Wait on the timing out request
298+
getTaskActionBlockingBarrier.await(3, TimeUnit.SECONDS);
299+
// Wait on the response request
300+
getTaskActionBlockingBarrier.await(3, TimeUnit.SECONDS);
301+
} catch (InterruptedException | BrokenBarrierException | TimeoutException e) {
302+
logger.error("Encountered ignorable barrier wait exception during test cleanup");
303+
}
304+
fail("Enrich policy was not locked during task submission when it should have been");
305+
}
306+
307+
// Free the client to execute
308+
clientBlockingLatch.countDown();
309+
310+
// Wait for task id to be returned
311+
try {
312+
ExecuteEnrichPolicyAction.Response response = firstTaskResult.actionGet();
313+
assertThat(response.getStatus(), is(nullValue()));
314+
assertThat(response.getTaskId(), is(notNullValue()));
315+
} catch (AssertionError e) {
316+
// conclude the fake runs
317+
try {
318+
// Wait on the timing out request
319+
getTaskActionBlockingBarrier.await(3, TimeUnit.SECONDS);
320+
// Wait on the response request
321+
getTaskActionBlockingBarrier.await(3, TimeUnit.SECONDS);
322+
} catch (InterruptedException | BrokenBarrierException | TimeoutException be) {
323+
logger.error("Encountered ignorable barrier wait exception during test cleanup");
324+
}
325+
throw e;
326+
}
327+
328+
// Check to make sure the policy is locked still
329+
if (enrichPolicyLocks.lockedPolices().contains(testPolicyName) == false) {
330+
// keep the logs clean
331+
try {
332+
// Wait on the timing out request
333+
getTaskActionBlockingBarrier.await(3, TimeUnit.SECONDS);
334+
// Wait on the response request
335+
getTaskActionBlockingBarrier.await(3, TimeUnit.SECONDS);
336+
} catch (InterruptedException | BrokenBarrierException | TimeoutException e) {
337+
logger.error("Encountered ignorable barrier wait exception during test cleanup");
338+
}
339+
fail("Enrich policy was not locked after task response when it should have been");
340+
}
341+
342+
// Now lets return a timeout response on the getTaskAPI
343+
try {
344+
getTaskActionBlockingBarrier.await(3, TimeUnit.SECONDS);
345+
} catch (BrokenBarrierException e) {
346+
throw new RuntimeException("Unexpected broken barrier exception", e);
347+
}
348+
349+
// Wait for the executor to call back to the client with a new get task action
350+
try {
351+
// Don't need to clean up any barrier states here because the client was never called again
352+
assertTrue(
353+
"Expected task API to be called a second time by the executor after first call timed out",
354+
secondGetTaskWasCalled.await(3, TimeUnit.SECONDS)
355+
);
356+
} catch (InterruptedException e) {
357+
// We were interrupted, which means we shouldn't wait on any barriers.
358+
Assert.fail("Thread interrupted while waiting for background executor to call task API");
359+
}
360+
361+
// Ensure that the policy remained locked
362+
if (enrichPolicyLocks.lockedPolices().contains(testPolicyName) == false) {
363+
// Another thread is waiting to send a task API response, signal it before failing test to keep the logs clean.
364+
try {
365+
getTaskActionBlockingBarrier.await(3, TimeUnit.SECONDS);
366+
} catch (InterruptedException | BrokenBarrierException | TimeoutException e) {
367+
logger.error("Encountered ignorable barrier wait exception during test cleanup");
368+
}
369+
fail("Enrich policy was not locked after timeout when it should have been");
370+
}
371+
372+
// If the lock has remained, then the client should have resubmitted the task wait operation. Signal a new response that will
373+
// complete the task wait
374+
try {
375+
getTaskActionBlockingBarrier.await(3, TimeUnit.SECONDS);
376+
} catch (BrokenBarrierException e) {
377+
throw new RuntimeException("Unexpected broken barrier exception", e);
378+
}
379+
380+
// At this point the task should complete and unlock the policy correctly
381+
assertBusy(() -> assertFalse(enrichPolicyLocks.lockedPolices().contains(testPolicyName)), 3, TimeUnit.SECONDS);
382+
}
383+
186384
public void testRunPolicyLocallyMissingPolicy() {
187385
EnrichPolicy enrichPolicy = EnrichPolicyTests.randomEnrichPolicy(XContentType.JSON);
188386
ClusterState clusterState = ClusterState.builder(new ClusterName("_name"))

0 commit comments

Comments
 (0)