diff --git a/accesscontroltool-bundle/src/main/java/biz/netcentric/cq/tools/actool/api/AcInstallationService.java b/accesscontroltool-bundle/src/main/java/biz/netcentric/cq/tools/actool/api/AcInstallationService.java index 4bb793e0..07ad7a19 100644 --- a/accesscontroltool-bundle/src/main/java/biz/netcentric/cq/tools/actool/api/AcInstallationService.java +++ b/accesscontroltool-bundle/src/main/java/biz/netcentric/cq/tools/actool/api/AcInstallationService.java @@ -1,5 +1,7 @@ package biz.netcentric.cq.tools.actool.api; +import java.time.Duration; +import java.util.Optional; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -30,21 +32,54 @@ public interface AcInstallationService { * @throws IllegalStateException if another asynchronous installation is currently running * @return the job id * @since 3.6.0 - * @see #attachLogListener(String, BiConsumer, Consumer) + * @see #pollLog(String, int, BiConsumer, Duration, Duration) + * @deprecated use {@link #applyAsynchronously(InstallationOptions, boolean)} instead, this method is just a shortcut + * to {@link #applyAsynchronously(InstallationOptions, boolean)} with last argument set to {@code false}. */ - public String applyAsynchronously(InstallationOptions options); + @Deprecated(since = "4.0.0", forRemoval = true) + public default String applyAsynchronously(InstallationOptions options) { + return applyAsynchronously(options, false); + } + + /** + * Applies the configuration asynchronously. + * Almost immediately returns a string with the ID of the started job. + * Only one execution at a time is allowed. + * @param options the installation options which further specify the installation + * @param isVerboseLoggingEnabled if {@code true}, the log will contain more details, otherwise it will be more concise + * @throws IllegalStateException if another asynchronous installation is currently running + * @return the job id + * @since 4.0.0 + * @see #pollLog(String, int, BiConsumer, Duration, Duration) + */ + public String applyAsynchronously(InstallationOptions options, boolean isVerboseLoggingEnabled); /** Attaches the log listener callback to an installation triggered previously via {@link #applyAsynchronously(InstallationOptions)}. * * @param jobId the job id returned by {@link #applyAsynchronously(InstallationOptions)} - * @param listener the listener to attach, receives the level and the message per each log line + * @param listener the listener to attach, receives the level and the message per each log message * @param finishListener the listener to attach, receives a boolean status indicating success or failure once the installation was finished * @return {@code true} if the listeners were attached successfully (i.e. an installation with the given executionId was triggered before and is still ongoing), {@code false} otherwise * @since 3.6.0 * @see #applyAsynchronously(InstallationOptions) + * @deprecated use {@link #pollLog(String, int, BiConsumer, Duration, Duration)} instead, this one is no longer functional. */ + @Deprecated(since = "4.0.0", forRemoval = true) public boolean attachLogListener(String jobId, BiConsumer listener, Consumer finishListener); + /** + * Polls the log of an asynchronous installation job with the given ID until the job finishes or the timeout is reached. + * This method is blocking. It may be called multiple times in case a previous execution returned {@code false} due to a timeout or temporary error. + * In that case make sure to set the {@code offset} parameter to the last known offset of the log messages. + * @param jobId the job id returned by {@link #applyAsynchronously(InstallationOptions)} + * @param offset the offset of the last log message received, or {@code 0} to start from the beginning + * @param logConsumer is called for each log message (may contain new lines), the first parameter is the optional offset of the log message (for deduplication), the second parameter is the log message itself + * @param timeOut the maximum time to wait for the job to finish, if the job does not finish within this time, the method returns {@code false} + * @param pollInterval the interval between polls of the log, if the job is still running, the method will wait for this duration before polling again + * @return {@code true} if the logs were either polled successfully or the given job id is no longer running or invalid, {@code false} if the job ran into a timeout or a temporary error + */ + public boolean pollLog(String jobId, int offset, BiConsumer, String> logConsumer, Duration timeOut, Duration pollInterval); + /** * Checks if the asynchronous installation job with the given ID is running. * @param jobId diff --git a/accesscontroltool-bundle/src/main/java/biz/netcentric/cq/tools/actool/api/package-info.java b/accesscontroltool-bundle/src/main/java/biz/netcentric/cq/tools/actool/api/package-info.java index 73ed870d..b8cc3f91 100644 --- a/accesscontroltool-bundle/src/main/java/biz/netcentric/cq/tools/actool/api/package-info.java +++ b/accesscontroltool-bundle/src/main/java/biz/netcentric/cq/tools/actool/api/package-info.java @@ -1,4 +1,4 @@ -@Version("3.2.0") +@Version("3.3.0") package biz.netcentric.cq.tools.actool.api; /*- diff --git a/accesscontroltool-bundle/src/main/java/biz/netcentric/cq/tools/actool/impl/AcInstallationServiceImpl.java b/accesscontroltool-bundle/src/main/java/biz/netcentric/cq/tools/actool/impl/AcInstallationServiceImpl.java index 546b32f2..1448187e 100644 --- a/accesscontroltool-bundle/src/main/java/biz/netcentric/cq/tools/actool/impl/AcInstallationServiceImpl.java +++ b/accesscontroltool-bundle/src/main/java/biz/netcentric/cq/tools/actool/impl/AcInstallationServiceImpl.java @@ -19,16 +19,19 @@ import java.io.IOException; import java.io.UncheckedIOException; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Dictionary; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.TreeMap; import java.util.function.BiConsumer; @@ -56,6 +59,9 @@ import org.apache.sling.event.jobs.Job; import org.apache.sling.event.jobs.JobManager; import org.apache.sling.event.jobs.consumer.JobConsumer; +import org.apache.sling.event.jobs.consumer.JobExecutionContext; +import org.apache.sling.event.jobs.consumer.JobExecutionResult; +import org.apache.sling.event.jobs.consumer.JobExecutor; import org.apache.sling.jcr.api.SlingRepository; import org.osgi.framework.BundleContext; import org.osgi.framework.FrameworkUtil; @@ -72,6 +78,7 @@ import biz.netcentric.cq.tools.actool.aceinstaller.AceBeanInstaller; import biz.netcentric.cq.tools.actool.api.AcInstallationService; +import biz.netcentric.cq.tools.actool.api.HistoryEntry; import biz.netcentric.cq.tools.actool.api.InstallationLog; import biz.netcentric.cq.tools.actool.api.InstallationLogLevel; import biz.netcentric.cq.tools.actool.api.InstallationOptions; @@ -101,15 +108,23 @@ @Component(property = JobConsumer.PROPERTY_TOPICS + "=" + AcInstallationServiceImpl.JOB_TOPIC) @Designate(ocd=Configuration.class) -public class AcInstallationServiceImpl implements AcInstallationService, AcInstallationServiceInternal, JobConsumer { +public class AcInstallationServiceImpl implements AcInstallationService, AcInstallationServiceInternal, JobExecutor { private static final Logger LOG = LoggerFactory.getLogger(AcInstallationServiceImpl.class); private static final String CONFIG_PID = "biz.netcentric.cq.tools.actool.impl.AcInstallationServiceImpl"; private static final String LEGACY_CONFIG_PID = "biz.netcentric.cq.tools.actool.aceservice.impl.AceServiceImpl"; private static final String LEGACY_PROPERTY_CONFIGURATION_PATH = "AceService.configurationPath"; protected static final String JOB_TOPIC = "biz/netcentric/cq/tools/actool/installation"; + protected static final String JOB_PROPERTY_IS_LOG_VERBOSE = "isLogVerbose"; - private static final String JOB_PROPERTY_INSTALLATION_OPTIONS = "installationOptions"; + /** + * Retrieving the job relies on a query under the hood but the query index is updated asynchronously + * so we need to poll for the job until it is found or the timeout is reached. + * AEM uses an {@code npr} index which should be updated after some seconds. + * Polling with an interval of 400ms for a maximum of 3 seconds should be sufficient. + */ + private static final long GET_JOB_TIMEOUT_MS = 3000; + private static final long GET_JOB_POLLING_INTERVAL_MS = 400; @Reference(policyOption = ReferencePolicyOption.GREEDY) AuthorizableInstallerService authorizableCreatorService; @@ -150,8 +165,6 @@ public class AcInstallationServiceImpl implements AcInstallationService, AcInsta @Reference(policyOption = ReferencePolicyOption.GREEDY) JobManager jobManager; - private PersistableInstallationLogger asyncInstallLog; - private List configurationRootPaths; @ObjectClassDefinition(name = "AC Tool Installation Service", @@ -192,14 +205,16 @@ public void activate(Configuration configuration, BundleContext bundleContext) t } @Override - public String applyAsynchronously(InstallationOptions options) { + public String applyAsynchronously(InstallationOptions options, boolean isLogVerbose) { Job oldJob = jobManager.getJob(AcInstallationServiceImpl.JOB_TOPIC, null); if (oldJob != null) { throw new IllegalStateException("Another asynchronous installation is currently running"); } - asyncInstallLog = new PersistableInstallationLogger(); + Map jobProperties = new HashMap<>(); + jobProperties.put(JOB_PROPERTY_IS_LOG_VERBOSE, isLogVerbose); + jobProperties.putAll(options.getPersistableProperties()); Job job = jobManager.createJob(JOB_TOPIC) - .properties(options.getPersistableProperties()) + .properties(jobProperties) .add(); if (job == null) { throw new IllegalStateException("Could not schedule asynchronous installation, check the log for details!"); @@ -208,13 +223,20 @@ public String applyAsynchronously(InstallationOptions options) { } @Override - public JobResult process(Job job) { + public JobExecutionResult process(Job job, JobExecutionContext context) { // cannot use deserialization due to https://issues.apache.org/jira/browse/SLING-12745 Map jobProperties = job.getPropertyNames().stream().collect(Collectors.toMap(Function.identity(), job::getProperty)); - if (asyncInstallLog == null) { - LOG.warn("Job {} was scheduled on another instance, no async install log available. Creating a new one.", job.getId()); - asyncInstallLog = new PersistableInstallationLogger(); - } + PersistableInstallationLogger asyncInstallLog = new PersistableInstallationLogger(); + boolean isLogVerbose = Boolean.parseBoolean(jobProperties.getOrDefault(JOB_PROPERTY_IS_LOG_VERBOSE, "false").toString()); + asyncInstallLog.attachMessageListener((logLevel, message) -> { + if (isLogVerbose || logLevel != InstallationLogLevel.TRACE) { + context.log(logLevel.toString() + ": " + message); + } + }); + asyncInstallLog.attachFinishListener(isSuccess -> + context.log("Installation finished with status: " + (isSuccess.equals(Boolean.TRUE) ? "SUCCESS" : "FAILURE")) + ); + InstallationOptionsBuilder optionsBuilder = new InstallationOptionsBuilder(jobProperties); InstallationOptions options = optionsBuilder.build(); apply(options, asyncInstallLog); @@ -223,34 +245,99 @@ public JobResult process(Job job) { } catch (IOException e) { throw new UncheckedIOException("Could not close installation log", e); } - return asyncInstallLog.getErrors().isEmpty() ? JobResult.OK : JobResult.CANCEL; + final JobExecutionResult result; + if (asyncInstallLog.getErrors().isEmpty()) { + result = context.result().succeeded(); + } else { + result = context.result().message("Asynchronous installation completed with errors: " + asyncInstallLog.getErrors().stream() + .map(HistoryEntry::getMessage).collect(Collectors.joining(","))).cancelled(); + } + return result; } - @Override public boolean attachLogListener(String jobId, BiConsumer messageListener, Consumer finishListener) { - Job job = jobManager.getJobById(jobId); - if (!isRunning(jobId)) { - // finished job or unknown job not to distinguish, as only failed jobs are kept in the history - LOG.debug("No job found with id {}", jobId); - return false; + return false; + } + + @Override + public boolean pollLog(String jobId, int offset, BiConsumer, String> logConsumer, Duration timeOut, Duration pollInterval) { + if (offset < 0) { + offset = 0; } - if (asyncInstallLog == null) { - // no async install log, most probably job is running on another instance - messageListener.accept(InstallationLogLevel.INFO, "No async install log available, but job found with id " + jobId + ". Job running on another instance with id \"" + job.getTargetInstance() + "\"."); - messageListener.accept(InstallationLogLevel.INFO, "Please check the logs of the other instance for details."); - } else { - asyncInstallLog.attachMessageListener(messageListener); - asyncInstallLog.attachFinishListener(finishListener); + long startTime = System.currentTimeMillis(); + while (System.currentTimeMillis() < startTime + timeOut.toMillis()) { + Job job = getJob(jobId).orElse(null); + if (job == null) { + if (offset == 0) { + logConsumer.accept(Optional.empty(), "Job " + jobId + " not found anymore, it is completed already and history is no longer available, check the persisted log"); + } + LOG.debug("Job {} not found, it is probably completed already and history is no longer available, check the persisted log", jobId); + return true; + } + offset = pollLog(job, offset, logConsumer); + if (job.getJobState() != Job.JobState.ACTIVE && job.getJobState() != Job.JobState.QUEUED) { + // consume remaining log lines + offset = pollLog(job, offset, logConsumer); + if (offset == 0) { + logConsumer.accept(Optional.empty(), "Job " + jobId + " is not active, current state: " + job.getJobState()); + } + LOG.debug("Job {} is not active or queued, current state: {}", jobId, job.getJobState()); + return true; + } + try { + Thread.sleep(pollInterval.toMillis()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logConsumer.accept(Optional.empty(), "Polling for job's " + jobId + " log was interrupted"); + return false; + } } - return true; + LOG.debug("Polling for job's {} log timed out after {} seconds", jobId, timeOut.getSeconds()); + return false; + } + + /** As retrieving the job relies on a query under the hood but the query index is updated asynchronously + * we need to poll for the job until it is found or the timeout is reached. + * @param jobId + * @return the job if it is found within the timeout, otherwise an empty Optional + */ + private Optional getJob(String jobId) { + // now poll for the job, as it might be that the search index has not been updated yet to contain the job in its new location + long startTime = System.currentTimeMillis(); + while (System.currentTimeMillis() < startTime + GET_JOB_TIMEOUT_MS) { + Job job = jobManager.getJobById(jobId); + if (job != null) { + return Optional.of(job); + } else { + try { + Thread.sleep(GET_JOB_POLLING_INTERVAL_MS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.warn("Polling for job's {} log was interrupted", jobId, e); + return Optional.empty(); + } + } + } + LOG.warn("Job with id {} not found", jobId); + return Optional.empty(); + } + + private int pollLog(Job job, int offset, BiConsumer,String> logLineConsumer) { + String[] logs = job.getProgressLog(); + if (logs != null && logs.length > offset) { + for (;offset < logs.length; offset++) { + logLineConsumer.accept(Optional.of(offset), logs[offset]); + } + } + return offset; } - @Override public boolean isRunning(String jobId) { - Job job = jobManager.getJobById(jobId); - return job != null; + // checks if an active or queued job with the given id exists + Job job = getJob(jobId).orElse(null); + return job != null && (job.getJobState() == Job.JobState.ACTIVE || job.getJobState() == Job.JobState.QUEUED); } @Override diff --git a/accesscontroltool-bundle/src/main/java/biz/netcentric/cq/tools/actool/ui/AcToolUiService.java b/accesscontroltool-bundle/src/main/java/biz/netcentric/cq/tools/actool/ui/AcToolUiService.java index 0e210a49..1a5f11a4 100644 --- a/accesscontroltool-bundle/src/main/java/biz/netcentric/cq/tools/actool/ui/AcToolUiService.java +++ b/accesscontroltool-bundle/src/main/java/biz/netcentric/cq/tools/actool/ui/AcToolUiService.java @@ -16,10 +16,12 @@ import static org.apache.commons.lang3.StringEscapeUtils.escapeHtml4; +import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.io.PrintWriter; +import java.io.StringReader; import java.io.UncheckedIOException; import java.net.URL; import java.net.URLConnection; @@ -35,8 +37,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; @@ -64,7 +65,6 @@ import org.slf4j.LoggerFactory; import biz.netcentric.cq.tools.actool.api.AcInstallationService; -import biz.netcentric.cq.tools.actool.api.InstallationLogLevel; import biz.netcentric.cq.tools.actool.api.InstallationOptionsBuilder; import biz.netcentric.cq.tools.actool.dumpservice.ConfigDumpService; import biz.netcentric.cq.tools.actool.helper.UncheckedRepositoryException; @@ -133,11 +133,6 @@ public class AcToolUiService { private final Configuration config; - /** - * Singleton message queue for the currently running asynchronous installation. - */ - private BlockingQueue messages; - @Activate public AcToolUiService(Configuration config) { this.config = config; @@ -297,35 +292,15 @@ protected void doPost(final HttpServletRequest req, final HttpServletResponse re builder.updateExistingExternalGroups(); } try { - String jobId = acInstallationService.applyAsynchronously(builder.build()); - messages = new LinkedBlockingQueue<>(); - acInstallationService.attachLogListener(jobId, (level, message) -> { - try { - if (!reqParams.showLogVerbose && level == InstallationLogLevel.TRACE) { - return; - } - messages.put(level + ": " + message); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - }, success -> { - try { - messages.put("FINISHED: " + success); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - }); + String jobId = acInstallationService.applyAsynchronously(builder.build(), reqParams.showLogVerbose); // return full URL to the server-sent event stream for this installation resp.setContentType("text/plain"); - String streamLogUrl = req.getRequestURI() + "/" + SUFFIX_STREAM_LOG + "?jobId=" + URLEncoder.encode(jobId, StandardCharsets.UTF_8.toString()) + "&" + PARAM_SHOW_LOG_VERBOSE + "=" + reqParams.showLogVerbose; + String streamLogUrl = req.getRequestURI() + "/" + SUFFIX_STREAM_LOG + "?jobId=" + URLEncoder.encode(jobId, StandardCharsets.UTF_8.toString()); resp.getWriter().print(streamLogUrl); } catch (IllegalStateException e) { LOG.warn("Could not apply configuration: {}", e.getMessage()); resp.sendError(HttpServletResponse.SC_BAD_REQUEST, e.getMessage()); - return; } - - } /** @@ -431,36 +406,48 @@ private void streamLog(HttpServletRequest req, HttpServletResponse resp) throws resp.setStatus(HttpServletResponse.SC_BAD_REQUEST); return; } - // is it fully consumed and no longer running? - if (!acInstallationService.isRunning(jobId) && (messages == null || messages.isEmpty())) { - LOG.debug("Asynchronous installation with id {} is not running anymore and no messages are available", jobId); - resp.setStatus(HttpServletResponse.SC_NO_CONTENT); + if (jobId.chars().anyMatch(Character::isISOControl)) { + LOG.warn("Invalid jobId containing control characters provided"); + resp.setStatus(HttpServletResponse.SC_BAD_REQUEST); return; } - long startTime = System.currentTimeMillis(); - try { - boolean isActive = true; - while (isActive) { - String message = messages.poll(30, java.util.concurrent.TimeUnit.SECONDS); - if (message == null) { - message = "..."; - isActive = false; - } - if (message.startsWith("FINISHED: ")) { - isActive = false; - } else { - resp.getWriter().println("data: " + message); - resp.getWriter().println(); - resp.getWriter().flush(); + int offset = 0; + String lastEventId = req.getHeader("Last-Event-ID"); + if (lastEventId != null) { + // this is the last event id sent by the client, we can use it to skip already sent log messages + try { + offset = Integer.parseInt(lastEventId) + 1; // +1 because the last event id is the last sent message, we want to start with the next one + LOG.debug("Resuming log stream for job {} at offset {}", jobId, offset); + } catch (NumberFormatException e) { + LOG.warn("Invalid Last-Event-ID header: {}", lastEventId, e); + } + } + AtomicBoolean isEmpty = new AtomicBoolean(true); + if (acInstallationService.pollLog(jobId, offset, (optionalOffset, message) -> { + isEmpty.set(false); + // this is called for each log message, which may contain multiple lines + try (BufferedReader reader = new BufferedReader(new StringReader(message))) { + String line; + while ((line = reader.readLine()) != null) { + // write each line as a separate event + resp.getWriter().println("data: " + line); } - // overall runtime above threshold - if (System.currentTimeMillis() - startTime > config.maxLogStreamRequestRuntimeInMs()) { - LOG.debug("Reached maximum runtime of {} ms for log stream request, finishing response", config.maxLogStreamRequestRuntimeInMs()); - isActive = false; + if (optionalOffset.isPresent()) { + // if the offset is present, we can use it as the event id + resp.getWriter().println("id: " + optionalOffset.get()); } + resp.getWriter().println(); + resp.getWriter().flush(); + } catch (IOException e) { + LOG.error("Error writing log message to response", e); + } + }, java.time.Duration.ofSeconds(30), java.time.Duration.ofMillis(500))) { + // make sure to also emit a special message to indicate the end of the stream + resp.getWriter().println("data: END"); + resp.getWriter().println(); + if (isEmpty.get()) { + resp.setStatus(HttpServletResponse.SC_NO_CONTENT); } - } catch(InterruptedException e) { - Thread.currentThread().interrupt(); } } diff --git a/accesscontroltool-bundle/src/main/resources/res/actooluiservice.js b/accesscontroltool-bundle/src/main/resources/res/actooluiservice.js index cfb80aa3..103e95c8 100644 --- a/accesscontroltool-bundle/src/main/resources/res/actooluiservice.js +++ b/accesscontroltool-bundle/src/main/resources/res/actooluiservice.js @@ -38,7 +38,13 @@ function populateViaEventSource(url) { withCredentials: true, }); evtSource.onmessage = (event) => { - printMessage(event.data); + if (event.data === 'END') { + document.dispatchEvent(new Event(EVENT_NAME_INSTALLATION_DONE)); + evtSource.close(); + return; + } else { + printMessage(event.data); + } }; evtSource.onerror = () => { // no reasonable status exposed here, just assume this was a 404 -> regular status code when installation finished diff --git a/accesscontroltool-bundle/src/test/java/biz/netcentric/cq/tools/actool/impl/AcInstallationServiceImplTest.java b/accesscontroltool-bundle/src/test/java/biz/netcentric/cq/tools/actool/impl/AcInstallationServiceImplTest.java index baf2aa2a..b4ccc136 100644 --- a/accesscontroltool-bundle/src/test/java/biz/netcentric/cq/tools/actool/impl/AcInstallationServiceImplTest.java +++ b/accesscontroltool-bundle/src/test/java/biz/netcentric/cq/tools/actool/impl/AcInstallationServiceImplTest.java @@ -30,9 +30,11 @@ import org.apache.jackrabbit.api.security.user.Authorizable; import org.apache.jackrabbit.api.security.user.Group; import org.apache.jackrabbit.api.security.user.User; +import org.apache.sling.event.impl.jobs.queues.ResultBuilderImpl; import org.apache.sling.event.jobs.Job; import org.apache.sling.event.jobs.JobBuilder; import org.apache.sling.event.jobs.JobManager; +import org.apache.sling.event.jobs.consumer.JobExecutionContext; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; @@ -140,7 +142,7 @@ public Iterator answer(InvocationOnMock invocation) throws Throwable { return groups.iterator(); } } - + @Test void testAsynchronousInstallationInDistributedSetups() { JobManager jobManager = Mockito.mock(JobManager.class); @@ -169,10 +171,13 @@ public Object answer(InvocationOnMock invocation) throws Throwable { }); acInstallationServiceImpl.jobManager = jobManager; + JobExecutionContext jobContext = Mockito.mock(JobExecutionContext.class); + when(jobContext.result()).thenReturn(new ResultBuilderImpl()); + // schedule in one server assertEquals("id", acInstallationServiceImpl.applyAsynchronously(new InstallationOptionsBuilder().withConfigurationRootPath("/apps").build())); // and process in another server (with another instance of the service) AcInstallationServiceImpl acInstallationServiceImpl2 = new AcInstallationServiceImpl(); - acInstallationServiceImpl2.process(job); + acInstallationServiceImpl2.process(job, jobContext); } }