Skip to content

Commit ed2ed8a

Browse files
committed
Rely on Sling Jobs Persisted Logs for asynchronous installation logging
Implement a polling mechanism for logs connected to the Sling Job ID. That should work reliably even if the Job is processed on another server. Leverage ids to deduplicate server-side events This closes #815
1 parent c9b45a8 commit ed2ed8a

File tree

6 files changed

+212
-92
lines changed

6 files changed

+212
-92
lines changed

accesscontroltool-bundle/src/main/java/biz/netcentric/cq/tools/actool/api/AcInstallationService.java

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package biz.netcentric.cq.tools.actool.api;
22

3+
import java.time.Duration;
4+
import java.util.Optional;
35
import java.util.function.BiConsumer;
46
import java.util.function.Consumer;
57

@@ -30,21 +32,54 @@ public interface AcInstallationService {
3032
* @throws IllegalStateException if another asynchronous installation is currently running
3133
* @return the job id
3234
* @since 3.6.0
33-
* @see #attachLogListener(String, BiConsumer, Consumer)
35+
* @see #pollLog(String, int, BiConsumer, Duration, Duration)
36+
* @deprecated use {@link #applyAsynchronously(InstallationOptions, boolean)} instead, this method is just a shortcut
37+
* to {@link #applyAsynchronously(InstallationOptions, boolean)} with last argument set to {@code false}.
3438
*/
35-
public String applyAsynchronously(InstallationOptions options);
39+
@Deprecated(since = "4.0.0", forRemoval = true)
40+
public default String applyAsynchronously(InstallationOptions options) {
41+
return applyAsynchronously(options, false);
42+
}
43+
44+
/**
45+
* Applies the configuration asynchronously.
46+
* Almost immediately returns a string with the ID of the started job.
47+
* Only one execution at a time is allowed.
48+
* @param options the installation options which further specify the installation
49+
* @param isVerboseLoggingEnabled if {@code true}, the log will contain more details, otherwise it will be more concise
50+
* @throws IllegalStateException if another asynchronous installation is currently running
51+
* @return the job id
52+
* @since 4.0.0
53+
* @see #pollLog(String, int, BiConsumer, Duration, Duration)
54+
*/
55+
public String applyAsynchronously(InstallationOptions options, boolean isVerboseLoggingEnabled);
3656

3757
/** Attaches the log listener callback to an installation triggered previously via {@link #applyAsynchronously(InstallationOptions)}.
3858
*
3959
* @param jobId the job id returned by {@link #applyAsynchronously(InstallationOptions)}
40-
* @param listener the listener to attach, receives the level and the message per each log line
60+
* @param listener the listener to attach, receives the level and the message per each log message
4161
* @param finishListener the listener to attach, receives a boolean status indicating success or failure once the installation was finished
4262
* @return {@code true} if the listeners were attached successfully (i.e. an installation with the given executionId was triggered before and is still ongoing), {@code false} otherwise
4363
* @since 3.6.0
4464
* @see #applyAsynchronously(InstallationOptions)
65+
* @deprecated use {@link #pollLog(String, int, BiConsumer, Duration, Duration)} instead, this one is no longer functional.
4566
*/
67+
@Deprecated(since = "4.0.0", forRemoval = true)
4668
public boolean attachLogListener(String jobId, BiConsumer<InstallationLogLevel, String> listener, Consumer<Boolean> finishListener);
4769

70+
/**
71+
* Polls the log of an asynchronous installation job with the given ID until the job finishes or the timeout is reached.
72+
* This method is blocking. It may be called multiple times in case a previous execution returned {@code false} due to a timeout or temporary error.
73+
* In that case make sure to set the {@code offset} parameter to the last known offset of the log messages.
74+
* @param jobId the job id returned by {@link #applyAsynchronously(InstallationOptions)}
75+
* @param offset the offset of the last log message received, or {@code 0} to start from the beginning
76+
* @param logConsumer is called for each log message (may contain new lines), the first parameter is the optional offset of the log message (for deduplication), the second parameter is the log message itself
77+
* @param timeOut the maximum time to wait for the job to finish, if the job does not finish within this time, the method returns {@code false}
78+
* @param pollInterval the interval between polls of the log, if the job is still running, the method will wait for this duration before polling again
79+
* @return {@code true} if the logs were either polled successfully or the given job id is no longer running or invalid, {@code false} if the job ran into a timeout or a temporary error
80+
*/
81+
public boolean pollLog(String jobId, int offset, BiConsumer<Optional<Integer>, String> logConsumer, Duration timeOut, Duration pollInterval);
82+
4883
/**
4984
* Checks if the asynchronous installation job with the given ID is running.
5085
* @param jobId

accesscontroltool-bundle/src/main/java/biz/netcentric/cq/tools/actool/api/package-info.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
@Version("3.2.0")
1+
@Version("3.3.0")
22
package biz.netcentric.cq.tools.actool.api;
33

44
/*-

accesscontroltool-bundle/src/main/java/biz/netcentric/cq/tools/actool/impl/AcInstallationServiceImpl.java

Lines changed: 117 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,19 @@
1919

2020
import java.io.IOException;
2121
import java.io.UncheckedIOException;
22+
import java.time.Duration;
2223
import java.util.ArrayList;
2324
import java.util.Arrays;
2425
import java.util.Collection;
2526
import java.util.Collections;
2627
import java.util.Dictionary;
28+
import java.util.HashMap;
2729
import java.util.HashSet;
2830
import java.util.Iterator;
2931
import java.util.LinkedHashSet;
3032
import java.util.List;
3133
import java.util.Map;
34+
import java.util.Optional;
3235
import java.util.Set;
3336
import java.util.TreeMap;
3437
import java.util.function.BiConsumer;
@@ -56,6 +59,9 @@
5659
import org.apache.sling.event.jobs.Job;
5760
import org.apache.sling.event.jobs.JobManager;
5861
import org.apache.sling.event.jobs.consumer.JobConsumer;
62+
import org.apache.sling.event.jobs.consumer.JobExecutionContext;
63+
import org.apache.sling.event.jobs.consumer.JobExecutionResult;
64+
import org.apache.sling.event.jobs.consumer.JobExecutor;
5965
import org.apache.sling.jcr.api.SlingRepository;
6066
import org.osgi.framework.BundleContext;
6167
import org.osgi.framework.FrameworkUtil;
@@ -72,6 +78,7 @@
7278

7379
import biz.netcentric.cq.tools.actool.aceinstaller.AceBeanInstaller;
7480
import biz.netcentric.cq.tools.actool.api.AcInstallationService;
81+
import biz.netcentric.cq.tools.actool.api.HistoryEntry;
7582
import biz.netcentric.cq.tools.actool.api.InstallationLog;
7683
import biz.netcentric.cq.tools.actool.api.InstallationLogLevel;
7784
import biz.netcentric.cq.tools.actool.api.InstallationOptions;
@@ -101,15 +108,23 @@
101108

102109
@Component(property = JobConsumer.PROPERTY_TOPICS + "=" + AcInstallationServiceImpl.JOB_TOPIC)
103110
@Designate(ocd=Configuration.class)
104-
public class AcInstallationServiceImpl implements AcInstallationService, AcInstallationServiceInternal, JobConsumer {
111+
public class AcInstallationServiceImpl implements AcInstallationService, AcInstallationServiceInternal, JobExecutor {
105112
private static final Logger LOG = LoggerFactory.getLogger(AcInstallationServiceImpl.class);
106113

107114
private static final String CONFIG_PID = "biz.netcentric.cq.tools.actool.impl.AcInstallationServiceImpl";
108115
private static final String LEGACY_CONFIG_PID = "biz.netcentric.cq.tools.actool.aceservice.impl.AceServiceImpl";
109116
private static final String LEGACY_PROPERTY_CONFIGURATION_PATH = "AceService.configurationPath";
110117
protected static final String JOB_TOPIC = "biz/netcentric/cq/tools/actool/installation";
118+
protected static final String JOB_PROPERTY_IS_LOG_VERBOSE = "isLogVerbose";
111119

112-
private static final String JOB_PROPERTY_INSTALLATION_OPTIONS = "installationOptions";
120+
/**
121+
* Retrieving the job relies on a query under the hood but the query index is updated asynchronously
122+
* so we need to poll for the job until it is found or the timeout is reached.
123+
* AEM uses an <a href="https://jackrabbit.apache.org/oak/docs/query/indexing.html#nrt-indexing">{@code npr} index</a> which should be updated after some seconds.
124+
* Polling with an interval of 400ms for a maximum of 3 seconds should be sufficient.
125+
*/
126+
private static final long GET_JOB_TIMEOUT_MS = 3000;
127+
private static final long GET_JOB_POLLING_INTERVAL_MS = 400;
113128

114129
@Reference(policyOption = ReferencePolicyOption.GREEDY)
115130
AuthorizableInstallerService authorizableCreatorService;
@@ -150,8 +165,6 @@ public class AcInstallationServiceImpl implements AcInstallationService, AcInsta
150165
@Reference(policyOption = ReferencePolicyOption.GREEDY)
151166
JobManager jobManager;
152167

153-
private PersistableInstallationLogger asyncInstallLog;
154-
155168
private List<String> configurationRootPaths;
156169

157170
@ObjectClassDefinition(name = "AC Tool Installation Service",
@@ -192,14 +205,16 @@ public void activate(Configuration configuration, BundleContext bundleContext) t
192205
}
193206

194207
@Override
195-
public String applyAsynchronously(InstallationOptions options) {
208+
public String applyAsynchronously(InstallationOptions options, boolean isLogVerbose) {
196209
Job oldJob = jobManager.getJob(AcInstallationServiceImpl.JOB_TOPIC, null);
197210
if (oldJob != null) {
198211
throw new IllegalStateException("Another asynchronous installation is currently running");
199212
}
200-
asyncInstallLog = new PersistableInstallationLogger();
213+
Map<String, Object> jobProperties = new HashMap<>();
214+
jobProperties.put(JOB_PROPERTY_IS_LOG_VERBOSE, isLogVerbose);
215+
jobProperties.putAll(options.getPersistableProperties());
201216
Job job = jobManager.createJob(JOB_TOPIC)
202-
.properties(options.getPersistableProperties())
217+
.properties(jobProperties)
203218
.add();
204219
if (job == null) {
205220
throw new IllegalStateException("Could not schedule asynchronous installation, check the log for details!");
@@ -208,13 +223,20 @@ public String applyAsynchronously(InstallationOptions options) {
208223
}
209224

210225
@Override
211-
public JobResult process(Job job) {
226+
public JobExecutionResult process(Job job, JobExecutionContext context) {
212227
// cannot use deserialization due to https://issues.apache.org/jira/browse/SLING-12745
213228
Map<String, Object> jobProperties = job.getPropertyNames().stream().collect(Collectors.toMap(Function.identity(), job::getProperty));
214-
if (asyncInstallLog == null) {
215-
LOG.warn("Job {} was scheduled on another instance, no async install log available. Creating a new one.", job.getId());
216-
asyncInstallLog = new PersistableInstallationLogger();
217-
}
229+
PersistableInstallationLogger asyncInstallLog = new PersistableInstallationLogger();
230+
boolean isLogVerbose = Boolean.parseBoolean(jobProperties.getOrDefault(JOB_PROPERTY_IS_LOG_VERBOSE, "false").toString());
231+
asyncInstallLog.attachMessageListener((logLevel, message) -> {
232+
if (isLogVerbose || logLevel != InstallationLogLevel.TRACE) {
233+
context.log(logLevel.toString() + ": " + message);
234+
}
235+
});
236+
asyncInstallLog.attachFinishListener(isSuccess ->
237+
context.log("Installation finished with status: " + (isSuccess.equals(Boolean.TRUE) ? "SUCCESS" : "FAILURE"))
238+
);
239+
218240
InstallationOptionsBuilder optionsBuilder = new InstallationOptionsBuilder(jobProperties);
219241
InstallationOptions options = optionsBuilder.build();
220242
apply(options, asyncInstallLog);
@@ -223,34 +245,99 @@ public JobResult process(Job job) {
223245
} catch (IOException e) {
224246
throw new UncheckedIOException("Could not close installation log", e);
225247
}
226-
return asyncInstallLog.getErrors().isEmpty() ? JobResult.OK : JobResult.CANCEL;
248+
final JobExecutionResult result;
249+
if (asyncInstallLog.getErrors().isEmpty()) {
250+
result = context.result().succeeded();
251+
} else {
252+
result = context.result().message("Asynchronous installation completed with errors: " + asyncInstallLog.getErrors().stream()
253+
.map(HistoryEntry::getMessage).collect(Collectors.joining(","))).cancelled();
254+
}
255+
return result;
227256
}
228257

229-
230258
@Override
231259
public boolean attachLogListener(String jobId, BiConsumer<InstallationLogLevel, String> messageListener, Consumer<Boolean> finishListener) {
232-
Job job = jobManager.getJobById(jobId);
233-
if (!isRunning(jobId)) {
234-
// finished job or unknown job not to distinguish, as only failed jobs are kept in the history
235-
LOG.debug("No job found with id {}", jobId);
236-
return false;
260+
return false;
261+
}
262+
263+
@Override
264+
public boolean pollLog(String jobId, int offset, BiConsumer<Optional<Integer>, String> logConsumer, Duration timeOut, Duration pollInterval) {
265+
if (offset < 0) {
266+
offset = 0;
237267
}
238-
if (asyncInstallLog == null) {
239-
// no async install log, most probably job is running on another instance
240-
messageListener.accept(InstallationLogLevel.INFO, "No async install log available, but job found with id " + jobId + ". Job running on another instance with id \"" + job.getTargetInstance() + "\".");
241-
messageListener.accept(InstallationLogLevel.INFO, "Please check the logs of the other instance for details.");
242-
} else {
243-
asyncInstallLog.attachMessageListener(messageListener);
244-
asyncInstallLog.attachFinishListener(finishListener);
268+
long startTime = System.currentTimeMillis();
269+
while (System.currentTimeMillis() < startTime + timeOut.toMillis()) {
270+
Job job = getJob(jobId).orElse(null);
271+
if (job == null) {
272+
if (offset == 0) {
273+
logConsumer.accept(Optional.empty(), "Job " + jobId + " not found anymore, it is completed already and history is no longer available, check the persisted log");
274+
}
275+
LOG.debug("Job {} not found, it is probably completed already and history is no longer available, check the persisted log", jobId);
276+
return true;
277+
}
278+
offset = pollLog(job, offset, logConsumer);
279+
if (job.getJobState() != Job.JobState.ACTIVE && job.getJobState() != Job.JobState.QUEUED) {
280+
// consume remaining log lines
281+
offset = pollLog(job, offset, logConsumer);
282+
if (offset == 0) {
283+
logConsumer.accept(Optional.empty(), "Job " + jobId + " is not active, current state: " + job.getJobState());
284+
}
285+
LOG.debug("Job {} is not active or queued, current state: {}", jobId, job.getJobState());
286+
return true;
287+
}
288+
try {
289+
Thread.sleep(pollInterval.toMillis());
290+
} catch (InterruptedException e) {
291+
Thread.currentThread().interrupt();
292+
logConsumer.accept(Optional.empty(), "Polling for job's " + jobId + " log was interrupted");
293+
return false;
294+
}
245295
}
246-
return true;
296+
LOG.debug("Polling for job's {} log timed out after {} seconds", jobId, timeOut.getSeconds());
297+
return false;
298+
}
299+
300+
/** As retrieving the job relies on a query under the hood but the query index is updated asynchronously
301+
* we need to poll for the job until it is found or the timeout is reached.
302+
* @param jobId
303+
* @return the job if it is found within the timeout, otherwise an empty Optional
304+
*/
305+
private Optional<Job> getJob(String jobId) {
306+
// now poll for the job, as it might be that the search index has not been updated yet to contain the job in its new location
307+
long startTime = System.currentTimeMillis();
308+
while (System.currentTimeMillis() < startTime + GET_JOB_TIMEOUT_MS) {
309+
Job job = jobManager.getJobById(jobId);
310+
if (job != null) {
311+
return Optional.of(job);
312+
} else {
313+
try {
314+
Thread.sleep(GET_JOB_POLLING_INTERVAL_MS);
315+
} catch (InterruptedException e) {
316+
Thread.currentThread().interrupt();
317+
LOG.warn("Polling for job's {} log was interrupted", jobId, e);
318+
return Optional.empty();
319+
}
320+
}
321+
}
322+
LOG.warn("Job with id {} not found", jobId);
323+
return Optional.empty();
324+
}
325+
326+
private int pollLog(Job job, int offset, BiConsumer<Optional<Integer>,String> logLineConsumer) {
327+
String[] logs = job.getProgressLog();
328+
if (logs != null && logs.length > offset) {
329+
for (;offset < logs.length; offset++) {
330+
logLineConsumer.accept(Optional.of(offset), logs[offset]);
331+
}
332+
}
333+
return offset;
247334
}
248335

249-
250336
@Override
251337
public boolean isRunning(String jobId) {
252-
Job job = jobManager.getJobById(jobId);
253-
return job != null;
338+
// checks if an active or queued job with the given id exists
339+
Job job = getJob(jobId).orElse(null);
340+
return job != null && (job.getJobState() == Job.JobState.ACTIVE || job.getJobState() == Job.JobState.QUEUED);
254341
}
255342

256343
@Override

0 commit comments

Comments
 (0)