Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package biz.netcentric.cq.tools.actool.api;

import java.time.Duration;
import java.util.Optional;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

Expand Down Expand Up @@ -30,21 +32,54 @@ public interface AcInstallationService {
* @throws IllegalStateException if another asynchronous installation is currently running
* @return the job id
* @since 3.6.0
* @see #attachLogListener(String, BiConsumer, Consumer)
* @see #pollLog(String, int, BiConsumer, Duration, Duration)
* @deprecated use {@link #applyAsynchronously(InstallationOptions, boolean)} instead, this method is just a shortcut
* to {@link #applyAsynchronously(InstallationOptions, boolean)} with last argument set to {@code false}.
*/
public String applyAsynchronously(InstallationOptions options);
@Deprecated(since = "4.0.0", forRemoval = true)
public default String applyAsynchronously(InstallationOptions options) {
return applyAsynchronously(options, false);
}

/**
* Applies the configuration asynchronously.
* Almost immediately returns a string with the ID of the started job.
* Only one execution at a time is allowed.
* @param options the installation options which further specify the installation
* @param isVerboseLoggingEnabled if {@code true}, the log will contain more details, otherwise it will be more concise
* @throws IllegalStateException if another asynchronous installation is currently running
* @return the job id
* @since 4.0.0
* @see #pollLog(String, int, BiConsumer, Duration, Duration)
*/
public String applyAsynchronously(InstallationOptions options, boolean isVerboseLoggingEnabled);

/** Attaches the log listener callback to an installation triggered previously via {@link #applyAsynchronously(InstallationOptions)}.
*
* @param jobId the job id returned by {@link #applyAsynchronously(InstallationOptions)}
* @param listener the listener to attach, receives the level and the message per each log line
* @param listener the listener to attach, receives the level and the message per each log message
* @param finishListener the listener to attach, receives a boolean status indicating success or failure once the installation was finished
* @return {@code true} if the listeners were attached successfully (i.e. an installation with the given executionId was triggered before and is still ongoing), {@code false} otherwise
* @since 3.6.0
* @see #applyAsynchronously(InstallationOptions)
* @deprecated use {@link #pollLog(String, int, BiConsumer, Duration, Duration)} instead, this one is no longer functional.
*/
@Deprecated(since = "4.0.0", forRemoval = true)
public boolean attachLogListener(String jobId, BiConsumer<InstallationLogLevel, String> listener, Consumer<Boolean> finishListener);

/**
* Polls the log of an asynchronous installation job with the given ID until the job finishes or the timeout is reached.
* This method is blocking. It may be called multiple times in case a previous execution returned {@code false} due to a timeout or temporary error.
* In that case make sure to set the {@code offset} parameter to the last known offset of the log messages.
* @param jobId the job id returned by {@link #applyAsynchronously(InstallationOptions)}
* @param offset the offset of the last log message received, or {@code 0} to start from the beginning
* @param logConsumer is called for each log message (may contain new lines), the first parameter is the optional offset of the log message (for deduplication), the second parameter is the log message itself
* @param timeOut the maximum time to wait for the job to finish, if the job does not finish within this time, the method returns {@code false}
* @param pollInterval the interval between polls of the log, if the job is still running, the method will wait for this duration before polling again
* @return {@code true} if the logs were either polled successfully or the given job id is no longer running or invalid, {@code false} if the job ran into a timeout or a temporary error
*/
public boolean pollLog(String jobId, int offset, BiConsumer<Optional<Integer>, String> logConsumer, Duration timeOut, Duration pollInterval);

/**
* Checks if the asynchronous installation job with the given ID is running.
* @param jobId
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
@Version("3.2.0")
@Version("3.3.0")
package biz.netcentric.cq.tools.actool.api;

/*-
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,19 @@

import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Dictionary;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.function.BiConsumer;
Expand Down Expand Up @@ -56,6 +59,9 @@
import org.apache.sling.event.jobs.Job;
import org.apache.sling.event.jobs.JobManager;
import org.apache.sling.event.jobs.consumer.JobConsumer;
import org.apache.sling.event.jobs.consumer.JobExecutionContext;
import org.apache.sling.event.jobs.consumer.JobExecutionResult;
import org.apache.sling.event.jobs.consumer.JobExecutor;
import org.apache.sling.jcr.api.SlingRepository;
import org.osgi.framework.BundleContext;
import org.osgi.framework.FrameworkUtil;
Expand All @@ -72,6 +78,7 @@

import biz.netcentric.cq.tools.actool.aceinstaller.AceBeanInstaller;
import biz.netcentric.cq.tools.actool.api.AcInstallationService;
import biz.netcentric.cq.tools.actool.api.HistoryEntry;
import biz.netcentric.cq.tools.actool.api.InstallationLog;
import biz.netcentric.cq.tools.actool.api.InstallationLogLevel;
import biz.netcentric.cq.tools.actool.api.InstallationOptions;
Expand Down Expand Up @@ -101,15 +108,23 @@

@Component(property = JobConsumer.PROPERTY_TOPICS + "=" + AcInstallationServiceImpl.JOB_TOPIC)
@Designate(ocd=Configuration.class)
public class AcInstallationServiceImpl implements AcInstallationService, AcInstallationServiceInternal, JobConsumer {
public class AcInstallationServiceImpl implements AcInstallationService, AcInstallationServiceInternal, JobExecutor {
private static final Logger LOG = LoggerFactory.getLogger(AcInstallationServiceImpl.class);

private static final String CONFIG_PID = "biz.netcentric.cq.tools.actool.impl.AcInstallationServiceImpl";
private static final String LEGACY_CONFIG_PID = "biz.netcentric.cq.tools.actool.aceservice.impl.AceServiceImpl";
private static final String LEGACY_PROPERTY_CONFIGURATION_PATH = "AceService.configurationPath";
protected static final String JOB_TOPIC = "biz/netcentric/cq/tools/actool/installation";
protected static final String JOB_PROPERTY_IS_LOG_VERBOSE = "isLogVerbose";

private static final String JOB_PROPERTY_INSTALLATION_OPTIONS = "installationOptions";
/**
* Retrieving the job relies on a query under the hood but the query index is updated asynchronously
* so we need to poll for the job until it is found or the timeout is reached.
* AEM uses an <a href="https://jackrabbit.apache.org/oak/docs/query/indexing.html#nrt-indexing">{@code npr} index</a> which should be updated after some seconds.
* Polling with an interval of 400ms for a maximum of 3 seconds should be sufficient.
*/
private static final long GET_JOB_TIMEOUT_MS = 3000;
private static final long GET_JOB_POLLING_INTERVAL_MS = 400;

@Reference(policyOption = ReferencePolicyOption.GREEDY)
AuthorizableInstallerService authorizableCreatorService;
Expand Down Expand Up @@ -150,8 +165,6 @@
@Reference(policyOption = ReferencePolicyOption.GREEDY)
JobManager jobManager;

private PersistableInstallationLogger asyncInstallLog;

private List<String> configurationRootPaths;

@ObjectClassDefinition(name = "AC Tool Installation Service",
Expand Down Expand Up @@ -192,14 +205,16 @@
}

@Override
public String applyAsynchronously(InstallationOptions options) {
public String applyAsynchronously(InstallationOptions options, boolean isLogVerbose) {
Job oldJob = jobManager.getJob(AcInstallationServiceImpl.JOB_TOPIC, null);
if (oldJob != null) {
throw new IllegalStateException("Another asynchronous installation is currently running");
}
asyncInstallLog = new PersistableInstallationLogger();
Map<String, Object> jobProperties = new HashMap<>();
jobProperties.put(JOB_PROPERTY_IS_LOG_VERBOSE, isLogVerbose);
jobProperties.putAll(options.getPersistableProperties());
Job job = jobManager.createJob(JOB_TOPIC)
.properties(options.getPersistableProperties())
.properties(jobProperties)
.add();
if (job == null) {
throw new IllegalStateException("Could not schedule asynchronous installation, check the log for details!");
Expand All @@ -208,13 +223,20 @@
}

@Override
public JobResult process(Job job) {
public JobExecutionResult process(Job job, JobExecutionContext context) {
// cannot use deserialization due to https://issues.apache.org/jira/browse/SLING-12745
Map<String, Object> jobProperties = job.getPropertyNames().stream().collect(Collectors.toMap(Function.identity(), job::getProperty));
if (asyncInstallLog == null) {
LOG.warn("Job {} was scheduled on another instance, no async install log available. Creating a new one.", job.getId());
asyncInstallLog = new PersistableInstallationLogger();
}
PersistableInstallationLogger asyncInstallLog = new PersistableInstallationLogger();
boolean isLogVerbose = Boolean.parseBoolean(jobProperties.getOrDefault(JOB_PROPERTY_IS_LOG_VERBOSE, "false").toString());
asyncInstallLog.attachMessageListener((logLevel, message) -> {
if (isLogVerbose || logLevel != InstallationLogLevel.TRACE) {
context.log(logLevel.toString() + ": " + message);
}
});
asyncInstallLog.attachFinishListener(isSuccess ->
context.log("Installation finished with status: " + (isSuccess.equals(Boolean.TRUE) ? "SUCCESS" : "FAILURE"))
);

InstallationOptionsBuilder optionsBuilder = new InstallationOptionsBuilder(jobProperties);
InstallationOptions options = optionsBuilder.build();
apply(options, asyncInstallLog);
Expand All @@ -223,34 +245,99 @@
} catch (IOException e) {
throw new UncheckedIOException("Could not close installation log", e);
}
return asyncInstallLog.getErrors().isEmpty() ? JobResult.OK : JobResult.CANCEL;
final JobExecutionResult result;
if (asyncInstallLog.getErrors().isEmpty()) {
result = context.result().succeeded();
} else {
result = context.result().message("Asynchronous installation completed with errors: " + asyncInstallLog.getErrors().stream()
.map(HistoryEntry::getMessage).collect(Collectors.joining(","))).cancelled();
}
return result;
}


@Override
public boolean attachLogListener(String jobId, BiConsumer<InstallationLogLevel, String> messageListener, Consumer<Boolean> finishListener) {
Job job = jobManager.getJobById(jobId);
if (!isRunning(jobId)) {
// finished job or unknown job not to distinguish, as only failed jobs are kept in the history
LOG.debug("No job found with id {}", jobId);
return false;
return false;
}

@Override
public boolean pollLog(String jobId, int offset, BiConsumer<Optional<Integer>, String> logConsumer, Duration timeOut, Duration pollInterval) {
if (offset < 0) {
offset = 0;
}
if (asyncInstallLog == null) {
// no async install log, most probably job is running on another instance
messageListener.accept(InstallationLogLevel.INFO, "No async install log available, but job found with id " + jobId + ". Job running on another instance with id \"" + job.getTargetInstance() + "\".");
messageListener.accept(InstallationLogLevel.INFO, "Please check the logs of the other instance for details.");
} else {
asyncInstallLog.attachMessageListener(messageListener);
asyncInstallLog.attachFinishListener(finishListener);
long startTime = System.currentTimeMillis();
while (System.currentTimeMillis() < startTime + timeOut.toMillis()) {
Job job = getJob(jobId).orElse(null);
if (job == null) {
if (offset == 0) {
logConsumer.accept(Optional.empty(), "Job " + jobId + " not found anymore, it is completed already and history is no longer available, check the persisted log");
}
LOG.debug("Job {} not found, it is probably completed already and history is no longer available, check the persisted log", jobId);
return true;
}
offset = pollLog(job, offset, logConsumer);
if (job.getJobState() != Job.JobState.ACTIVE && job.getJobState() != Job.JobState.QUEUED) {
// consume remaining log lines
offset = pollLog(job, offset, logConsumer);
if (offset == 0) {
logConsumer.accept(Optional.empty(), "Job " + jobId + " is not active, current state: " + job.getJobState());
}
LOG.debug("Job {} is not active or queued, current state: {}", jobId, job.getJobState());
return true;
}
try {
Thread.sleep(pollInterval.toMillis());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logConsumer.accept(Optional.empty(), "Polling for job's " + jobId + " log was interrupted");
return false;
}
}
return true;
LOG.debug("Polling for job's {} log timed out after {} seconds", jobId, timeOut.getSeconds());
return false;
}

/** As retrieving the job relies on a query under the hood but the query index is updated asynchronously
* we need to poll for the job until it is found or the timeout is reached.
* @param jobId
* @return the job if it is found within the timeout, otherwise an empty Optional
*/
private Optional<Job> getJob(String jobId) {
// now poll for the job, as it might be that the search index has not been updated yet to contain the job in its new location
long startTime = System.currentTimeMillis();
while (System.currentTimeMillis() < startTime + GET_JOB_TIMEOUT_MS) {
Job job = jobManager.getJobById(jobId);
if (job != null) {
return Optional.of(job);
} else {
try {
Thread.sleep(GET_JOB_POLLING_INTERVAL_MS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.warn("Polling for job's {} log was interrupted", jobId, e);
return Optional.empty();
}
}
}
LOG.warn("Job with id {} not found", jobId);
return Optional.empty();
}

private int pollLog(Job job, int offset, BiConsumer<Optional<Integer>,String> logLineConsumer) {
String[] logs = job.getProgressLog();
if (logs != null && logs.length > offset) {
for (;offset < logs.length; offset++) {
logLineConsumer.accept(Optional.of(offset), logs[offset]);
}
}
return offset;
}


@Override
public boolean isRunning(String jobId) {
Job job = jobManager.getJobById(jobId);
return job != null;
// checks if an active or queued job with the given id exists
Job job = getJob(jobId).orElse(null);
return job != null && (job.getJobState() == Job.JobState.ACTIVE || job.getJobState() == Job.JobState.QUEUED);
}

@Override
Expand Down
Loading
Loading