Skip to content

Commit c40f7af

Browse files
authored
[7.17] Refactor enrich maintenance coordination logic (#90931) (#94840)
This PR refactors the locking logic for enrich policies so that enrich index names are resolved early so that they may be explicitly protected from maintenance tasks on the master node. The maintenance service has been optimized to allow for concurrent removal of old enrich indices while policies are executing. Further concurrency changes were made to improve the thread safety of the system (such as removing the double check locking in maintenance and the ability to unlock policies from code that does not hold the lock). (cherry picked from commit 998520e)
1 parent 4ba72b4 commit c40f7af

13 files changed

+353
-264
lines changed

docs/changelog/90931.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 90931
2+
summary: Refactor enrich maintenance coordination logic
3+
area: Ingest Node
4+
type: enhancement
5+
issues: []

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutor.java

Lines changed: 37 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,15 @@
1515
import org.elasticsearch.cluster.service.ClusterService;
1616
import org.elasticsearch.common.settings.Settings;
1717
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
18+
import org.elasticsearch.core.Releasable;
19+
import org.elasticsearch.tasks.TaskId;
1820
import org.elasticsearch.threadpool.ThreadPool;
1921
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
2022
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
2123
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyStatus;
24+
import org.elasticsearch.xpack.enrich.EnrichPolicyLocks.EnrichPolicyLock;
2225
import org.elasticsearch.xpack.enrich.action.InternalExecutePolicyAction;
26+
import org.elasticsearch.xpack.enrich.action.InternalExecutePolicyAction.Request;
2327

2428
import java.util.concurrent.Semaphore;
2529
import java.util.function.LongSupplier;
@@ -64,48 +68,59 @@ public void coordinatePolicyExecution(
6468
ExecuteEnrichPolicyAction.Request request,
6569
ActionListener<ExecuteEnrichPolicyAction.Response> listener
6670
) {
67-
tryLockingPolicy(request.getName());
71+
long nowTimestamp = nowSupplier.getAsLong();
72+
String enrichIndexName = EnrichPolicy.getIndexName(request.getName(), nowTimestamp);
73+
Releasable policyLock = tryLockingPolicy(request.getName(), enrichIndexName);
6874
try {
69-
client.execute(InternalExecutePolicyAction.INSTANCE, request, ActionListener.wrap(response -> {
75+
Request internalRequest = new Request(request.getName(), enrichIndexName);
76+
internalRequest.setWaitForCompletion(request.isWaitForCompletion());
77+
internalRequest.setParentTask(request.getParentTask());
78+
client.execute(InternalExecutePolicyAction.INSTANCE, internalRequest, ActionListener.wrap(response -> {
7079
if (response.getStatus() != null) {
71-
releasePolicy(request.getName());
80+
policyLock.close();
7281
listener.onResponse(response);
7382
} else {
74-
waitAndThenRelease(request.getName(), response);
83+
assert response.getTaskId() != null : "If the execute response does not have a status it must return a task id";
84+
awaitTaskCompletionAndThenRelease(response.getTaskId(), policyLock);
7585
listener.onResponse(response);
7686
}
7787
}, e -> {
78-
releasePolicy(request.getName());
88+
policyLock.close();
7989
listener.onFailure(e);
8090
}));
8191
} catch (Exception e) {
8292
// Be sure to unlock if submission failed.
83-
releasePolicy(request.getName());
93+
policyLock.close();
8494
throw e;
8595
}
8696
}
8797

88-
public void runPolicyLocally(ExecuteEnrichPolicyTask task, String policyName, ActionListener<ExecuteEnrichPolicyStatus> listener) {
98+
public void runPolicyLocally(
99+
ExecuteEnrichPolicyTask task,
100+
String policyName,
101+
String enrichIndexName,
102+
ActionListener<ExecuteEnrichPolicyStatus> listener
103+
) {
89104
try {
90105
EnrichPolicy policy = EnrichStore.getPolicy(policyName, clusterService.state());
91106
if (policy == null) {
92107
throw new ResourceNotFoundException("policy [{}] does not exist", policyName);
93108
}
94109

95110
task.setStatus(new ExecuteEnrichPolicyStatus(ExecuteEnrichPolicyStatus.PolicyPhases.SCHEDULED));
96-
Runnable runnable = createPolicyRunner(policyName, policy, task, listener);
111+
Runnable runnable = createPolicyRunner(policyName, policy, enrichIndexName, task, listener);
97112
threadPool.executor(ThreadPool.Names.GENERIC).execute(runnable);
98113
} catch (Exception e) {
99114
task.setStatus(new ExecuteEnrichPolicyStatus(ExecuteEnrichPolicyStatus.PolicyPhases.FAILED));
100115
throw e;
101116
}
102117
}
103118

104-
private void tryLockingPolicy(String policyName) {
105-
policyLocks.lockPolicy(policyName);
119+
private Releasable tryLockingPolicy(String policyName, String enrichIndexName) {
120+
EnrichPolicyLock policyLock = policyLocks.lockPolicy(policyName, enrichIndexName);
106121
if (policyExecutionPermits.tryAcquire() == false) {
107122
// Release policy lock, and throw a different exception
108-
policyLocks.releasePolicy(policyName);
123+
policyLock.close();
109124
throw new EsRejectedExecutionException(
110125
"Policy execution failed. Policy execution for ["
111126
+ policyName
@@ -115,26 +130,25 @@ private void tryLockingPolicy(String policyName) {
115130
+ "]"
116131
);
117132
}
133+
// Wrap the result so that when releasing it we also release the held execution permit.
134+
return () -> {
135+
try (EnrichPolicyLock ignored = policyLock) {
136+
policyExecutionPermits.release();
137+
}
138+
};
118139
}
119140

120-
private void releasePolicy(String policyName) {
121-
try {
122-
policyExecutionPermits.release();
123-
} finally {
124-
policyLocks.releasePolicy(policyName);
125-
}
126-
}
127-
128-
private void waitAndThenRelease(String policyName, ExecuteEnrichPolicyAction.Response response) {
141+
private void awaitTaskCompletionAndThenRelease(TaskId taskId, Releasable policyLock) {
129142
GetTaskRequest getTaskRequest = new GetTaskRequest();
130-
getTaskRequest.setTaskId(response.getTaskId());
143+
getTaskRequest.setTaskId(taskId);
131144
getTaskRequest.setWaitForCompletion(true);
132-
client.admin().cluster().getTask(getTaskRequest, ActionListener.wrap(() -> releasePolicy(policyName)));
145+
client.admin().cluster().getTask(getTaskRequest, ActionListener.wrap(policyLock::close));
133146
}
134147

135148
private Runnable createPolicyRunner(
136149
String policyName,
137150
EnrichPolicy policy,
151+
String enrichIndexName,
138152
ExecuteEnrichPolicyTask task,
139153
ActionListener<ExecuteEnrichPolicyStatus> listener
140154
) {
@@ -146,7 +160,7 @@ private Runnable createPolicyRunner(
146160
clusterService,
147161
client,
148162
indexNameExpressionResolver,
149-
nowSupplier,
163+
enrichIndexName,
150164
fetchSize,
151165
maxForceMergeAttempts
152166
);

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyLocks.java

Lines changed: 62 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,12 @@
77
package org.elasticsearch.xpack.enrich;
88

99
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
10+
import org.elasticsearch.core.Releasable;
1011

12+
import java.util.HashSet;
13+
import java.util.Set;
1114
import java.util.concurrent.ConcurrentHashMap;
1215
import java.util.concurrent.Semaphore;
13-
import java.util.concurrent.atomic.AtomicLong;
14-
import java.util.concurrent.locks.ReadWriteLock;
15-
import java.util.concurrent.locks.ReentrantReadWriteLock;
1616

1717
/**
1818
* A coordination object that allows multiple distinct polices to be executed concurrently, but also makes sure that a single
@@ -23,103 +23,96 @@
2323
public class EnrichPolicyLocks {
2424

2525
/**
26-
* A snapshot in time detailing if any policy executions are in flight and total number of local executions that
27-
* have been kicked off since the node has started
26+
* An instance of a specific lock on a single policy object. Ensures that when unlocking a policy, the policy is only unlocked if this
27+
* object is the owner of the held lock. Additionally, this manages the lock lifecycle for any other resources tracked by the policy
28+
* coordination logic, such as a policy execution's target index.
2829
*/
29-
public static class EnrichPolicyExecutionState {
30-
final boolean anyPolicyInFlight;
31-
final long executions;
30+
public class EnrichPolicyLock implements Releasable {
31+
private final String policyName;
32+
private final String enrichIndexName;
33+
private final Semaphore executionLease;
3234

33-
EnrichPolicyExecutionState(boolean anyPolicyInFlight, long executions) {
34-
this.anyPolicyInFlight = anyPolicyInFlight;
35-
this.executions = executions;
35+
private EnrichPolicyLock(String policyName, String enrichIndexName, Semaphore executionLease) {
36+
this.policyName = policyName;
37+
this.enrichIndexName = enrichIndexName;
38+
this.executionLease = executionLease;
3639
}
3740

38-
public boolean isAnyPolicyInFlight() {
39-
return anyPolicyInFlight;
41+
/**
42+
* Unlocks this policy for execution and maintenance IFF this lock represents the currently held semaphore for a policy name. If
43+
* this lock was created for an execution, the target index for the policy execution is also cleared from the locked state.
44+
*/
45+
@Override
46+
public void close() {
47+
if (enrichIndexName != null) {
48+
boolean wasRemoved = workingIndices.remove(enrichIndexName, executionLease);
49+
assert wasRemoved
50+
: "Target index [" + enrichIndexName + "] for policy [" + policyName + "] was removed prior to policy unlock";
51+
}
52+
boolean wasRemoved = policyLocks.remove(policyName, executionLease);
53+
assert wasRemoved : "Second attempt was made to unlock policy [" + policyName + "]";
4054
}
4155
}
4256

43-
/**
44-
* A read-write lock that allows for policies to be executed concurrently with minimal overhead, but allows for blocking
45-
* policy locking operations while capturing the state of policy executions.
46-
*/
47-
private final ReadWriteLock currentStateLock = new ReentrantReadWriteLock(true);
48-
4957
/**
5058
* A mapping of policy name to a semaphore used for ensuring that a single policy can only have one execution in flight
5159
* at a time.
5260
*/
5361
private final ConcurrentHashMap<String, Semaphore> policyLocks = new ConcurrentHashMap<>();
5462

5563
/**
56-
* A counter that is used as a sort of policy execution sequence id / dirty bit. This is incremented every time a policy
57-
* successfully acquires an execution lock.
64+
* When a policy is locked for execution the new index that is created is added to this set to keep it from being accidentally
65+
* cleaned up by the maintenance task.
5866
*/
59-
private final AtomicLong policyRunCounter = new AtomicLong(0L);
67+
private final ConcurrentHashMap<String, Semaphore> workingIndices = new ConcurrentHashMap<>();
6068

6169
/**
6270
* Locks a policy to prevent concurrent execution. If the policy is currently executing, this method will immediately
6371
* throw without waiting. This method only blocks if another thread is currently capturing the current policy execution state.
72+
* <br/><br/>
73+
* If a policy is being executed, use {@link EnrichPolicyLocks#lockPolicy(String, String)} instead in order to properly track the
74+
* new enrich index that will be created.
6475
* @param policyName The policy name to lock for execution
6576
* @throws EsRejectedExecutionException if the policy is locked already or if the maximum number of concurrent policy executions
6677
* has been reached
6778
*/
68-
public void lockPolicy(String policyName) {
69-
currentStateLock.readLock().lock();
70-
try {
71-
Semaphore runLock = policyLocks.computeIfAbsent(policyName, (name) -> new Semaphore(1));
72-
boolean acquired = runLock.tryAcquire();
73-
if (acquired == false) {
74-
throw new EsRejectedExecutionException(
75-
"Could not obtain lock because policy execution for [" + policyName + "] is already in progress."
76-
);
77-
}
78-
policyRunCounter.incrementAndGet();
79-
} finally {
80-
currentStateLock.readLock().unlock();
81-
}
79+
public EnrichPolicyLock lockPolicy(String policyName) {
80+
return lockPolicy(policyName, null);
8281
}
8382

8483
/**
85-
* Captures a snapshot of the current policy execution state. This method never blocks, instead assuming that a policy is
86-
* currently starting its execution and returns an appropriate state.
87-
* @return The current state of in-flight policy executions
84+
* Locks a policy to prevent concurrent execution. If the policy is currently executing, this method will immediately
85+
* throw without waiting. This method only blocks if another thread is currently capturing the current policy execution state.
86+
* <br/><br/>
87+
* If a policy needs to be locked just to ensure it is not executing, use {@link EnrichPolicyLocks#lockPolicy(String)} instead since
88+
* no new enrich indices need to be maintained.
89+
* @param policyName The policy name to lock for execution
90+
* @param enrichIndexName If the policy is being executed, this parameter denotes the index that should be protected from maintenance
91+
* operations.
92+
* @throws EsRejectedExecutionException if the policy is locked already or if the maximum number of concurrent policy executions
93+
* has been reached
8894
*/
89-
public EnrichPolicyExecutionState captureExecutionState() {
90-
if (currentStateLock.writeLock().tryLock()) {
91-
try {
92-
long revision = policyRunCounter.get();
93-
long currentPolicyExecutions = policyLocks.mappingCount();
94-
return new EnrichPolicyExecutionState(currentPolicyExecutions > 0L, revision);
95-
} finally {
96-
currentStateLock.writeLock().unlock();
97-
}
95+
public EnrichPolicyLock lockPolicy(String policyName, String enrichIndexName) {
96+
Semaphore runLock = policyLocks.computeIfAbsent(policyName, (name) -> new Semaphore(1));
97+
boolean acquired = runLock.tryAcquire();
98+
if (acquired == false) {
99+
throw new EsRejectedExecutionException(
100+
"Could not obtain lock because policy execution for [" + policyName + "] is already in progress."
101+
);
102+
}
103+
if (enrichIndexName != null) {
104+
Semaphore previous = workingIndices.putIfAbsent(enrichIndexName, runLock);
105+
assert previous == null : "Target index [" + enrichIndexName + "] is already claimed by an execution, or was not cleaned up.";
98106
}
99-
return new EnrichPolicyExecutionState(true, policyRunCounter.get());
107+
return new EnrichPolicyLock(policyName, enrichIndexName, runLock);
100108
}
101109

102-
/**
103-
* Checks if the current execution state matches that of the given execution state. Used to ensure that over a period of time
104-
* no changes to the policy execution state have occurred.
105-
* @param previousState The previous state to check the current state against
106-
* @return true if the current state matches the given previous state, false if policy executions have changed over time.
107-
*/
108-
boolean isSameState(EnrichPolicyExecutionState previousState) {
109-
EnrichPolicyExecutionState currentState = captureExecutionState();
110-
return currentState.anyPolicyInFlight == previousState.anyPolicyInFlight && currentState.executions == previousState.executions;
110+
public Set<String> lockedPolices() {
111+
return new HashSet<>(policyLocks.keySet());
111112
}
112113

113-
/**
114-
* Releases the lock for a given policy name, allowing it to be executed.
115-
* @param policyName The policy to release.
116-
*/
117-
public void releasePolicy(String policyName) {
118-
currentStateLock.readLock().lock();
119-
try {
120-
policyLocks.remove(policyName);
121-
} finally {
122-
currentStateLock.readLock().unlock();
123-
}
114+
public Set<String> inflightPolicyIndices() {
115+
return new HashSet<>(workingIndices.keySet());
124116
}
117+
125118
}

0 commit comments

Comments
 (0)