diff --git a/src/main/java/org/gridsuite/securityanalysis/server/service/SecurityAnalysisWorkerService.java b/src/main/java/org/gridsuite/securityanalysis/server/service/SecurityAnalysisWorkerService.java index d498ef27..ff254462 100644 --- a/src/main/java/org/gridsuite/securityanalysis/server/service/SecurityAnalysisWorkerService.java +++ b/src/main/java/org/gridsuite/securityanalysis/server/service/SecurityAnalysisWorkerService.java @@ -41,6 +41,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Consumer; +import java.util.logging.Level; import java.util.function.Function; import java.util.stream.Collectors; @@ -58,6 +59,8 @@ public class SecurityAnalysisWorkerService { private static final String CATEGORY_BROKER_OUTPUT = SecurityAnalysisWorkerService.class.getName() + ".output-broker-messages"; + private static final String CATEGORY_BROKER_INPUT = SecurityAnalysisWorkerService.class.getName() + ".input-broker-messages"; + private static final Logger OUTPUT_MESSAGE_LOGGER = LoggerFactory.getLogger(CATEGORY_BROKER_OUTPUT); private NetworkStoreService networkStoreService; @@ -117,18 +120,18 @@ private Mono getNetwork(UUID networkUuid, List otherNetworkUuids) return network; } else { Mono> otherNetworks = Flux.fromIterable(otherNetworkUuids) - .flatMap(this::getNetwork) - .collectList(); + .flatMap(this::getNetwork) + .collectList(); return Mono.zip(network, otherNetworks) - .map(t -> { - // creation of the merging view - List networks = new ArrayList<>(); - networks.add(t.getT1()); - networks.addAll(t.getT2()); - MergingView mergingView = MergingView.create("merge", "iidm"); - mergingView.merge(networks.toArray(new Network[0])); - return mergingView; - }); + .map(t -> { + // creation of the merging view + List networks = new ArrayList<>(); + networks.add(t.getT1()); + networks.addAll(t.getT2()); + MergingView mergingView = MergingView.create("merge", "iidm"); + mergingView.merge(networks.toArray(new Network[0])); + return mergingView; + }); } } @@ -139,12 +142,20 @@ public Mono run(SecurityAnalysisRunContext context) { private Mono run(SecurityAnalysisRunContext context, UUID resultUuid) { Objects.requireNonNull(context); - LOGGER.info("Run security analysis on contingency lists: {}", context.getContingencyListNames().stream().map(SecurityAnalysisWorkerService::sanitizeParam).collect(Collectors.toList())); + if (resultUuid != null && cancelComputationRequests.get(resultUuid) != null) { + return Mono.empty(); + } - Mono network = getNetwork(context.getNetworkUuid(), context.getOtherNetworkUuids()); + Mono network = getNetwork(context.getNetworkUuid(), context.getOtherNetworkUuids()).map(uuid -> { + LOGGER.info("Loading network"); + return uuid; + }); Mono> contingencies = Flux.fromIterable(context.getContingencyListNames()) - .flatMap(contingencyListName -> actionsService.getContingencyList(contingencyListName, context.getNetworkUuid())) + .flatMap(contingencyListName -> { + LOGGER.info("Loading contingencies"); + return actionsService.getContingencyList(contingencyListName, context.getNetworkUuid()); + }) .collectList(); return Mono.zip(network, contingencies) @@ -158,76 +169,81 @@ private Mono run(SecurityAnalysisRunContext context, UUI if (resultUuid != null && cancelComputationRequests.get(resultUuid) != null) { return Mono.empty(); } else { + LOGGER.info("Starting security analysis on contingency lists: {}", context.getContingencyListNames().stream().map(SecurityAnalysisWorkerService::sanitizeParam).collect(Collectors.toList())); return Mono.fromCompletionStage(future); } }); } @Bean - public Consumer> consumeRun() { - return message -> { - SecurityAnalysisResultContext resultContext = SecurityAnalysisResultContext.fromMessage(message, objectMapper); - runRequests.add(resultContext.getResultUuid()); - - run(resultContext.getRunContext(), resultContext.getResultUuid()) - .flatMap(result -> resultRepository.insert(resultContext.getResultUuid(), result) - .then(resultRepository.insertStatus(resultContext.getResultUuid(), SecurityAnalysisStatus.COMPLETED.name())) - .then(Mono.just(result))) - .doOnSuccess(result -> { - if (result != null) { // result available - Message sendMessage = MessageBuilder - .withPayload("") - .setHeader("resultUuid", resultContext.getResultUuid().toString()) - .setHeader("receiver", resultContext.getRunContext().getReceiver()) - .build(); - sendResultMessage(sendMessage); - LOGGER.info("Security analysis complete (resultUuid='{}')", resultContext.getResultUuid()); - } else { // result not available : stop computation request - if (cancelComputationRequests.get(resultContext.getResultUuid()) != null) { - stoppedPublisherService.publishCancel(resultContext.getResultUuid(), cancelComputationRequests.get(resultContext.getResultUuid()).getReceiver()); - } - } - }) - .onErrorResume(throwable -> { - if (!(throwable instanceof CancellationException)) { - LOGGER.error(FAIL_MESSAGE, throwable); - stoppedPublisherService.publishFail(resultContext.getResultUuid(), resultContext.getRunContext().getReceiver(), throwable.getMessage()); - return resultRepository.delete(resultContext.getResultUuid()).then(Mono.empty()); - } - return Mono.empty(); - }) - .doFinally(s -> { - futures.remove(resultContext.getResultUuid()); - cancelComputationRequests.remove(resultContext.getResultUuid()); - runRequests.remove(resultContext.getResultUuid()); - }) - .subscribe(); - }; + public Consumer>> consumeRun() { + return f -> f.log(CATEGORY_BROKER_INPUT, Level.FINE) + .flatMap(message -> { + SecurityAnalysisResultContext resultContext = SecurityAnalysisResultContext.fromMessage(message, objectMapper); + runRequests.add(resultContext.getResultUuid()); + + return run(resultContext.getRunContext(), resultContext.getResultUuid()) + .flatMap(result -> resultRepository.insert(resultContext.getResultUuid(), result) + .then(resultRepository.insertStatus(resultContext.getResultUuid(), SecurityAnalysisStatus.COMPLETED.name())) + .then(Mono.just(result))) + .doOnSuccess(result -> { + if (result != null) { // result available + Message sendMessage = MessageBuilder + .withPayload("") + .setHeader("resultUuid", resultContext.getResultUuid().toString()) + .setHeader("receiver", resultContext.getRunContext().getReceiver()) + .build(); + sendResultMessage(sendMessage); + LOGGER.info("Security analysis complete (resultUuid='{}')", resultContext.getResultUuid()); + } else { // result not available : stop computation request + if (cancelComputationRequests.get(resultContext.getResultUuid()) != null) { + stoppedPublisherService.publishCancel(resultContext.getResultUuid(), cancelComputationRequests.get(resultContext.getResultUuid()).getReceiver()); + LOGGER.info("Security analysis stopped (resultUuid='{}')", resultContext.getResultUuid()); + } + } + }) + .onErrorResume(throwable -> { + if (!(throwable instanceof CancellationException)) { + LOGGER.error(FAIL_MESSAGE, throwable); + stoppedPublisherService.publishFail(resultContext.getResultUuid(), resultContext.getRunContext().getReceiver(), throwable.getMessage()); + return resultRepository.delete(resultContext.getResultUuid()).then(Mono.empty()); + } + return Mono.empty(); + }) + .doFinally(s -> { + futures.remove(resultContext.getResultUuid()); + cancelComputationRequests.remove(resultContext.getResultUuid()); + runRequests.remove(resultContext.getResultUuid()); + }); + }) + .subscribe(); } @Bean - public Consumer> consumeCancel() { - return message -> { - SecurityAnalysisCancelContext cancelContext = SecurityAnalysisCancelContext.fromMessage(message); + public Consumer>> consumeCancel() { + return f -> f.log(CATEGORY_BROKER_INPUT, Level.FINE) + .flatMap(message -> { + SecurityAnalysisCancelContext cancelContext = SecurityAnalysisCancelContext.fromMessage(message); - if (runRequests.contains(cancelContext.getResultUuid())) { - cancelComputationRequests.put(cancelContext.getResultUuid(), cancelContext); - } + if (runRequests.contains(cancelContext.getResultUuid())) { + cancelComputationRequests.put(cancelContext.getResultUuid(), cancelContext); + } - // find the completableFuture associated with result uuid - CompletableFuture future = futures.get(cancelContext.getResultUuid()); - if (future != null) { - future.cancel(true); // cancel computation in progress - - resultRepository.delete(cancelContext.getResultUuid()) - .doOnSuccess(unused -> { - stoppedPublisherService.publishCancel(cancelContext.getResultUuid(), cancelContext.getReceiver()); - LOGGER.info(CANCEL_MESSAGE + " (resultUuid='{}')", cancelContext.getResultUuid()); - }) - .doOnError(throwable -> LOGGER.error(throwable.toString(), throwable)) - .subscribe(); - } - }; + // find the completableFuture associated with result uuid + CompletableFuture future = futures.get(cancelContext.getResultUuid()); + if (future != null) { + future.cancel(true); // cancel computation in progress + + return resultRepository.delete(cancelContext.getResultUuid()) + .doOnSuccess(unused -> { + stoppedPublisherService.publishCancel(cancelContext.getResultUuid(), cancelContext.getReceiver()); + LOGGER.info(CANCEL_MESSAGE + " (resultUuid='{}')", cancelContext.getResultUuid()); + }); + } + return Mono.empty(); + }) + .onErrorContinue((t, r) -> LOGGER.error("Exception in consumeCancel", t)) + .subscribe(); } private void sendResultMessage(Message message) {