diff --git a/accesscontroltool-bundle/src/main/java/biz/netcentric/cq/tools/actool/api/AcInstallationService.java b/accesscontroltool-bundle/src/main/java/biz/netcentric/cq/tools/actool/api/AcInstallationService.java index a5b1c4b4..50327a6b 100644 --- a/accesscontroltool-bundle/src/main/java/biz/netcentric/cq/tools/actool/api/AcInstallationService.java +++ b/accesscontroltool-bundle/src/main/java/biz/netcentric/cq/tools/actool/api/AcInstallationService.java @@ -44,6 +44,15 @@ public interface AcInstallationService { */ public boolean attachLogListener(String jobId, BiConsumer listener, Consumer finishListener); + /** + * Checks if the asynchronous installation job with the given ID is running. + * @param jobId + * @return {@code true} if the job with the given ID is running, {@code false} otherwise + * @since 3.6.1 + */ + public boolean isRunning(String jobId); + + /** Applies the full configuration as stored at the path configured at PID biz.netcentric.cq.tools.actool.impl.AcInstallationServiceImpl * to the repository. * diff --git a/accesscontroltool-bundle/src/main/java/biz/netcentric/cq/tools/actool/api/package-info.java b/accesscontroltool-bundle/src/main/java/biz/netcentric/cq/tools/actool/api/package-info.java index 8fa50ca4..baf36856 100644 --- a/accesscontroltool-bundle/src/main/java/biz/netcentric/cq/tools/actool/api/package-info.java +++ b/accesscontroltool-bundle/src/main/java/biz/netcentric/cq/tools/actool/api/package-info.java @@ -1,4 +1,4 @@ -@Version("3.1.0") +@Version("3.2.0") package biz.netcentric.cq.tools.actool.api; /*- diff --git a/accesscontroltool-bundle/src/main/java/biz/netcentric/cq/tools/actool/impl/AcInstallationServiceImpl.java b/accesscontroltool-bundle/src/main/java/biz/netcentric/cq/tools/actool/impl/AcInstallationServiceImpl.java index 46b1fa76..f45ead72 100644 --- a/accesscontroltool-bundle/src/main/java/biz/netcentric/cq/tools/actool/impl/AcInstallationServiceImpl.java +++ b/accesscontroltool-bundle/src/main/java/biz/netcentric/cq/tools/actool/impl/AcInstallationServiceImpl.java @@ -225,7 +225,7 @@ public JobResult process(Job job) { @Override public boolean attachLogListener(String jobId, BiConsumer messageListener, Consumer finishListener) { Job job = jobManager.getJobById(jobId); - if (job == null) { + if (!isRunning(jobId)) { // finished job or unknown job not to distinguish, as only failed jobs are kept in the history LOG.debug("No job found with id {}", jobId); return false; @@ -241,6 +241,13 @@ public boolean attachLogListener(String jobId, BiConsumer countryCodePerName; private final Configuration config; + /** + * Singleton message queue for the currently running asynchronous installation. + */ + private BlockingQueue messages; + @Activate public AcToolUiService(Configuration config) { this.config = config; @@ -289,6 +297,23 @@ protected void doPost(final HttpServletRequest req, final HttpServletResponse re } try { String jobId = acInstallationService.applyAsynchronously(builder.build()); + messages = new LinkedBlockingQueue<>(); + acInstallationService.attachLogListener(jobId, (level, message) -> { + try { + if (!reqParams.showLogVerbose && level == InstallationLogLevel.TRACE) { + return; + } + messages.put(level + ": " + message); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }, success -> { + try { + messages.put("FINISHED: " + success); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); // return full URL to the server-sent event stream for this installation resp.setContentType("text/plain"); String streamLogUrl = req.getRequestURI() + "/" + SUFFIX_STREAM_LOG + "?jobId=" + URLEncoder.encode(jobId, StandardCharsets.UTF_8.toString()) + "&" + PARAM_SHOW_LOG_VERBOSE + "=" + reqParams.showLogVerbose; @@ -299,6 +324,7 @@ protected void doPost(final HttpServletRequest req, final HttpServletResponse re return; } + } /** @@ -387,7 +413,9 @@ void downloadLog(HttpServletRequest req, final HttpServletResponse resp) throws } } - /** Server-sent events emitted for an asynchronous execution log + /** + * Server-sent events emitted for an asynchronous execution log. As the Fastly CDN used with AEM as a Cloud Service does not have streaming + * enabled, this is delivering the events in chunks. * @throws IOException * * @see Server-sent events @@ -401,35 +429,20 @@ private void streamLog(HttpServletRequest req, HttpServletResponse resp) throws resp.setStatus(HttpServletResponse.SC_BAD_REQUEST); return; } - final boolean isVerbose = Boolean.parseBoolean(req.getParameter(AcToolUiService.PARAM_SHOW_LOG_VERBOSE)); - BlockingQueue messages = new LinkedBlockingQueue<>(); - boolean isActive = acInstallationService.attachLogListener(jobId, (level, message) -> { - try { - if (!isVerbose && level == InstallationLogLevel.TRACE) { - return; - } - messages.put(level + ": " + message); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - }, success -> { - try { - messages.put("FINISHED: " + success); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - }); - if (!isActive) { - LOG.debug("Haven't found job with id {}, probably already finished processing it", jobId); + // is it fully consumed and no longer running? + if (!acInstallationService.isRunning(jobId) && (messages == null || messages.isEmpty())) { + LOG.debug("Asynchronous installation with id {} is not running anymore and no messages are available", jobId); resp.setStatus(HttpServletResponse.SC_NO_CONTENT); return; } + long startTime = System.currentTimeMillis(); try { + boolean isActive = true; while (isActive) { String message = messages.poll(30, java.util.concurrent.TimeUnit.SECONDS); if (message == null) { - // just sent keep-alives - message = "."; + message = "..."; + isActive = false; } if (message.startsWith("FINISHED: ")) { isActive = false; @@ -438,6 +451,11 @@ private void streamLog(HttpServletRequest req, HttpServletResponse resp) throws resp.getWriter().println(); resp.getWriter().flush(); } + // overall runtime above threshold + if (System.currentTimeMillis() - startTime > config.maxLogStreamRequestRuntimeInMs()) { + LOG.debug("Reached maximum runtime of {} ms for log stream request, finishing response", config.maxLogStreamRequestRuntimeInMs()); + isActive = false; + } } } catch(InterruptedException e) { Thread.currentThread().interrupt(); diff --git a/pom.xml b/pom.xml index 3efd1c37..8e4e410a 100644 --- a/pom.xml +++ b/pom.xml @@ -244,6 +244,12 @@ biz.aQute.bnd bnd-baseline-maven-plugin ${bnd.version} + + + + Bundle-Version + + biz.aQute.bnd