Skip to content

Commit 71433e3

Browse files
Merge pull request #261 from wttech/abort-cluster-aware
Abort cluster aware
2 parents 613f74f + 3d2eebe commit 71433e3

File tree

11 files changed

+163
-103
lines changed

11 files changed

+163
-103
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,4 +148,5 @@ aem/home/
148148
.env.*
149149

150150

151-
.aio/*
151+
.aio/*
152+
.aio

Taskfile.yml

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -331,4 +331,26 @@ tasks:
331331
cmd: |
332332
set -e
333333
sh nw npm install
334-
sh nw npx playwright {{.CLI_ARGS | default "test"}}
334+
sh nw npx playwright {{.CLI_ARGS | default "test"}}
335+
336+
rde:setup:
337+
desc: setup access to RDE
338+
cmds:
339+
- command -v aio || npm install -g @adobe/aio-cli
340+
- aio plugins:install @adobe/aio-cli-plugin-aem-rde
341+
- aio login
342+
- echo -e "Connecting to RDE..."
343+
- aio aem:rde:setup
344+
345+
rde:deploy:all:
346+
desc: deploy AEM all package to RDE
347+
cmd: |
348+
package=$(find all/target/ -name "acm.all*.zip" -type f | head -n 1)
349+
if [ -z "$package" ]; then
350+
echo "Error: Could not find package starting with 'acm.all' in all/target"
351+
exit 1
352+
fi
353+
package_name=$(basename "$package")
354+
echo "Package name: $package_name"
355+
echo "Installing package on RDE..."
356+
aio aem:rde:install --type "content-package" "all/target/$package_name"

core/src/main/java/dev/vml/es/acm/core/code/ExecutionContext.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import dev.vml.es.acm.core.repo.Repo;
55
import dev.vml.es.acm.core.util.ResolverUtils;
66
import groovy.lang.Binding;
7+
import java.util.function.Consumer;
78
import org.apache.commons.lang3.StringUtils;
89
import org.apache.sling.api.resource.ResourceResolverFactory;
910
import org.slf4j.Logger;
@@ -48,6 +49,8 @@ public static String varPath(String executionId) {
4849

4950
private final Conditions conditions;
5051

52+
private Consumer<ExecutionStatus> statusListener;
53+
5154
public ExecutionContext(
5255
String id,
5356
String userId,
@@ -155,7 +158,7 @@ public boolean isAborted() {
155158
return getCodeContext()
156159
.getOsgiContext()
157160
.getService(ExecutionQueue.class)
158-
.isAborted(getId());
161+
.isStoppingOrStopped(getId());
159162
}
160163

161164
public void abort() {
@@ -216,4 +219,14 @@ public void variable(String name, Object value) {
216219
public Object variable(String name) {
217220
return codeContext.getBinding().getVariable(name);
218221
}
222+
223+
void listenStatus(Consumer<ExecutionStatus> statusListener) {
224+
this.statusListener = statusListener;
225+
}
226+
227+
void notifyStatus(ExecutionStatus status) {
228+
if (statusListener != null) {
229+
statusListener.accept(status);
230+
}
231+
}
219232
}

core/src/main/java/dev/vml/es/acm/core/code/ExecutionJob.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ public final class ExecutionJob {
1212

1313
public static final String EXECUTION_MODE_PROP = "executionMode";
1414

15+
public static final String ACTIVE_STATUS_PROP = "executionActiveStatus";
16+
1517
private ExecutionJob() {
1618
// intentionally empty
1719
}

core/src/main/java/dev/vml/es/acm/core/code/ExecutionQueue.java

Lines changed: 84 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,18 @@
55
import dev.vml.es.acm.core.event.EventListener;
66
import dev.vml.es.acm.core.event.EventType;
77
import dev.vml.es.acm.core.gui.SpaSettings;
8+
import dev.vml.es.acm.core.repo.Repo;
89
import dev.vml.es.acm.core.util.ExceptionUtils;
910
import dev.vml.es.acm.core.util.ResolverUtils;
1011
import dev.vml.es.acm.core.util.StreamUtils;
1112
import java.util.*;
1213
import java.util.concurrent.CancellationException;
13-
import java.util.concurrent.ConcurrentHashMap;
1414
import java.util.concurrent.ExecutorService;
1515
import java.util.concurrent.Executors;
1616
import java.util.concurrent.Future;
1717
import java.util.stream.Stream;
1818
import org.apache.commons.lang3.StringUtils;
19+
import org.apache.commons.lang3.reflect.FieldUtils;
1920
import org.apache.sling.api.resource.LoginException;
2021
import org.apache.sling.api.resource.ResourceResolver;
2122
import org.apache.sling.api.resource.ResourceResolverFactory;
@@ -75,8 +76,6 @@ public class ExecutionQueue implements JobExecutor, EventListener {
7576

7677
private ExecutorService jobAsyncExecutor;
7778

78-
private final Map<String, Boolean> jobAborted = new ConcurrentHashMap<>();
79-
8079
private Config config;
8180

8281
@Activate
@@ -136,8 +135,13 @@ public Optional<Execution> findByExecutableId(String executableId) {
136135
.findFirst();
137136
}
138137

139-
public boolean isAborted(String executionId) {
140-
return Boolean.TRUE.equals(jobAborted.get(executionId));
138+
public boolean isStoppingOrStopped(String executionId) {
139+
Job job = readJob(executionId).orElse(null);
140+
if (job == null) {
141+
return false;
142+
}
143+
ExecutionStatus status = ExecutionStatus.of(job, executor);
144+
return status == ExecutionStatus.STOPPING || status == ExecutionStatus.STOPPED;
141145
}
142146

143147
public Stream<ExecutionSummary> findAllSummaries() {
@@ -198,7 +202,25 @@ private Optional<Job> readJob(String executionId) {
198202
}
199203

200204
public void stop(String executionId) {
201-
jobManager.stopJobById(executionId);
205+
Job job = readJob(executionId).orElse(null);
206+
if (job == null) {
207+
return;
208+
}
209+
setJobActiveStatus(job, ExecutionStatus.STOPPING);
210+
jobManager.stopJobById(job.getId());
211+
}
212+
213+
private void setJobActiveStatus(Job job, ExecutionStatus status) {
214+
try {
215+
String path = FieldUtils.readField(job, "path", true).toString();
216+
ResolverUtils.useContentResolver(resourceResolverFactory, null, resolver -> {
217+
Repo.quiet(resolver).get(path).save(ExecutionJob.ACTIVE_STATUS_PROP, status.name());
218+
});
219+
} catch (Exception e) {
220+
throw new AcmException(
221+
String.format("Cannot set execution '%s' job active status to '%s'!", job.getId(), status.name()),
222+
e);
223+
}
202224
}
203225

204226
private CodeOutput determineCodeOutput(String executionId) {
@@ -220,81 +242,71 @@ public JobExecutionResult process(Job job, JobExecutionContext context) {
220242
}
221243
});
222244

223-
try {
224-
Long abortStartTime = null;
225-
while (!future.isDone()) {
226-
if (context.isStopped()) {
227-
if (abortStartTime == null) {
228-
abortStartTime = System.currentTimeMillis();
229-
jobAborted.put(job.getId(), Boolean.TRUE);
230-
231-
if (config.abortTimeout() < 0) {
232-
LOG.debug("Execution is aborting gracefully '{}' (no timeout)", queuedExecution);
233-
} else {
234-
LOG.debug(
235-
"Execution is aborting '{}' (timeout: {}ms)",
236-
queuedExecution,
237-
config.abortTimeout());
238-
}
239-
} else if (config.abortTimeout() >= 0) {
240-
long abortDuration = System.currentTimeMillis() - abortStartTime;
241-
if (abortDuration >= config.abortTimeout()) {
242-
LOG.debug(
243-
"Execution abort timeout exceeded ({}ms), forcing abort '{}'",
244-
abortDuration,
245-
queuedExecution);
246-
future.cancel(true);
247-
break;
248-
}
245+
Long abortStartTime = null;
246+
while (!future.isDone()) {
247+
if (context.isStopped() || isStoppingOrStopped(job.getId())) {
248+
if (abortStartTime == null) {
249+
abortStartTime = System.currentTimeMillis();
250+
if (config.abortTimeout() < 0) {
251+
LOG.debug("Execution is aborting gracefully '{}' (no timeout)", queuedExecution);
252+
} else {
253+
LOG.debug("Execution is aborting '{}' (timeout: {}ms)", queuedExecution, config.abortTimeout());
254+
}
255+
} else if (config.abortTimeout() >= 0) {
256+
long abortDuration = System.currentTimeMillis() - abortStartTime;
257+
if (abortDuration >= config.abortTimeout()) {
258+
LOG.debug(
259+
"Execution abort timeout exceeded ({}ms), forcing abort '{}'",
260+
abortDuration,
261+
queuedExecution);
262+
future.cancel(true);
263+
break;
249264
}
250-
}
251-
252-
try {
253-
Thread.sleep(config.asyncPollInterval());
254-
} catch (InterruptedException e) {
255-
Thread.currentThread().interrupt();
256-
LOG.debug("Execution is interrupted '{}'", queuedExecution);
257-
return context.result().cancelled();
258265
}
259266
}
260267

261268
try {
262-
Execution immediateExecution = future.get();
263-
264-
if (immediateExecution.getStatus() == ExecutionStatus.SKIPPED) {
265-
LOG.debug("Execution skipped '{}'", immediateExecution);
266-
return context.result()
267-
.message(QueuedMessage.of(ExecutionStatus.SKIPPED, null)
268-
.toJson())
269-
.cancelled();
270-
} else {
271-
LOG.debug("Execution succeeded '{}'", immediateExecution);
272-
return context.result().succeeded();
273-
}
274-
} catch (CancellationException e) {
275-
LOG.debug("Execution aborted forcefully '{}'", queuedExecution);
269+
Thread.sleep(config.asyncPollInterval());
270+
} catch (InterruptedException e) {
271+
Thread.currentThread().interrupt();
272+
LOG.debug("Execution is interrupted '{}'", queuedExecution);
273+
return context.result().cancelled();
274+
}
275+
}
276+
277+
try {
278+
Execution immediateExecution = future.get();
279+
280+
if (immediateExecution.getStatus() == ExecutionStatus.SKIPPED) {
281+
LOG.debug("Execution skipped '{}'", immediateExecution);
276282
return context.result()
277-
.message(QueuedMessage.of(ExecutionStatus.ABORTED, ExceptionUtils.toString(e))
278-
.toJson())
283+
.message(QueuedMessage.of(ExecutionStatus.SKIPPED, null).toJson())
279284
.cancelled();
280-
} catch (Exception e) {
281-
AbortException abortException = findAbortException(e);
282-
if (abortException != null) {
283-
LOG.debug("Execution aborted gracefully '{}'", queuedExecution);
284-
return context.result()
285-
.message(QueuedMessage.of(ExecutionStatus.ABORTED, ExceptionUtils.toString(abortException))
286-
.toJson())
287-
.cancelled();
288-
}
289-
290-
LOG.debug("Execution failed '{}'", queuedExecution, e);
285+
} else {
286+
LOG.debug("Execution succeeded '{}'", immediateExecution);
287+
return context.result().succeeded();
288+
}
289+
} catch (CancellationException e) {
290+
LOG.debug("Execution aborted forcefully '{}'", queuedExecution);
291+
return context.result()
292+
.message(QueuedMessage.of(ExecutionStatus.ABORTED, ExceptionUtils.toString(e))
293+
.toJson())
294+
.cancelled();
295+
} catch (Exception e) {
296+
AbortException abortException = findAbortException(e);
297+
if (abortException != null) {
298+
LOG.debug("Execution aborted gracefully '{}'", queuedExecution);
291299
return context.result()
292-
.message(QueuedMessage.of(ExecutionStatus.FAILED, ExceptionUtils.toString(e))
300+
.message(QueuedMessage.of(ExecutionStatus.ABORTED, ExceptionUtils.toString(abortException))
293301
.toJson())
294-
.failed();
302+
.cancelled();
295303
}
296-
} finally {
297-
jobAborted.remove(job.getId());
304+
305+
LOG.debug("Execution failed '{}'", queuedExecution, e);
306+
return context.result()
307+
.message(QueuedMessage.of(ExecutionStatus.FAILED, ExceptionUtils.toString(e))
308+
.toJson())
309+
.failed();
298310
}
299311
}
300312

@@ -323,6 +335,7 @@ private Execution executeAsync(ExecutionContextOptions contextOptions, QueuedExe
323335
determineOutput(
324336
contextOptions.getExecutionMode(),
325337
execution.getJob().getId()))) {
338+
context.listenStatus(status -> setJobActiveStatus(execution.getJob(), status));
326339
return executor.execute(context);
327340
} catch (LoginException e) {
328341
throw new AcmException(String.format("Cannot access repository for execution '%s'", execution.getId()), e);

core/src/main/java/dev/vml/es/acm/core/code/ExecutionStatus.java

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,21 +14,14 @@ public enum ExecutionStatus {
1414
PARSING,
1515
CHECKING,
1616
RUNNING,
17+
STOPPING,
1718
// queued & immediate execution statuses
1819
SKIPPED,
1920
STOPPED,
2021
ABORTED,
2122
FAILED,
2223
SUCCEEDED;
2324

24-
public boolean isPending() {
25-
return this == QUEUED || isActive();
26-
}
27-
28-
public boolean isActive() {
29-
return this == ACTIVE || this == PARSING || this == CHECKING || this == RUNNING;
30-
}
31-
3225
public static List<ExecutionStatus> manyOf(List<String> names) {
3326
return (names != null ? names.stream() : Stream.<String>empty())
3427
.map(ExecutionStatus::of)
@@ -44,19 +37,20 @@ public static Optional<ExecutionStatus> of(String status) {
4437
}
4538

4639
public static ExecutionStatus of(Job job, Executor executor) {
47-
ExecutionStatus jobStatus = Optional.ofNullable(job.getResultMessage())
40+
ExecutionStatus resultStatus = Optional.ofNullable(job.getResultMessage())
4841
.map(QueuedMessage::fromJson)
4942
.map(QueuedMessage::getStatus)
5043
.orElse(null);
51-
if (jobStatus != null) {
52-
return jobStatus;
44+
if (resultStatus != null) {
45+
return resultStatus;
5346
}
5447

5548
switch (job.getJobState()) {
5649
case QUEUED:
5750
return ExecutionStatus.QUEUED;
5851
case ACTIVE:
59-
return executor.checkStatus(job.getId()).orElse(ExecutionStatus.ACTIVE);
52+
return of(job.getProperty(ExecutionJob.ACTIVE_STATUS_PROP, String.class))
53+
.orElse(ExecutionStatus.ACTIVE);
6054
case STOPPED:
6155
return ExecutionStatus.STOPPED;
6256
case SUCCEEDED:

0 commit comments

Comments
 (0)