Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.logging.Level;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand All @@ -58,6 +59,8 @@ public class SecurityAnalysisWorkerService {

private static final String CATEGORY_BROKER_OUTPUT = SecurityAnalysisWorkerService.class.getName() + ".output-broker-messages";

private static final String CATEGORY_BROKER_INPUT = SecurityAnalysisWorkerService.class.getName() + ".input-broker-messages";

private static final Logger OUTPUT_MESSAGE_LOGGER = LoggerFactory.getLogger(CATEGORY_BROKER_OUTPUT);

private NetworkStoreService networkStoreService;
Expand Down Expand Up @@ -117,18 +120,18 @@ private Mono<Network> getNetwork(UUID networkUuid, List<UUID> otherNetworkUuids)
return network;
} else {
Mono<List<Network>> otherNetworks = Flux.fromIterable(otherNetworkUuids)
.flatMap(this::getNetwork)
.collectList();
.flatMap(this::getNetwork)
.collectList();
return Mono.zip(network, otherNetworks)
.map(t -> {
// creation of the merging view
List<Network> networks = new ArrayList<>();
networks.add(t.getT1());
networks.addAll(t.getT2());
MergingView mergingView = MergingView.create("merge", "iidm");
mergingView.merge(networks.toArray(new Network[0]));
return mergingView;
});
.map(t -> {
// creation of the merging view
List<Network> networks = new ArrayList<>();
networks.add(t.getT1());
networks.addAll(t.getT2());
MergingView mergingView = MergingView.create("merge", "iidm");
mergingView.merge(networks.toArray(new Network[0]));
return mergingView;
});
}
}

Expand All @@ -139,12 +142,20 @@ public Mono<SecurityAnalysisResult> run(SecurityAnalysisRunContext context) {
private Mono<SecurityAnalysisResult> run(SecurityAnalysisRunContext context, UUID resultUuid) {
Objects.requireNonNull(context);

LOGGER.info("Run security analysis on contingency lists: {}", context.getContingencyListNames().stream().map(SecurityAnalysisWorkerService::sanitizeParam).collect(Collectors.toList()));
if (resultUuid != null && cancelComputationRequests.get(resultUuid) != null) {
return Mono.empty();
}

Mono<Network> network = getNetwork(context.getNetworkUuid(), context.getOtherNetworkUuids());
Mono<Network> network = getNetwork(context.getNetworkUuid(), context.getOtherNetworkUuids()).map(uuid -> {
LOGGER.info("Loading network");
return uuid;
});

Mono<List<Contingency>> contingencies = Flux.fromIterable(context.getContingencyListNames())
.flatMap(contingencyListName -> actionsService.getContingencyList(contingencyListName, context.getNetworkUuid()))
.flatMap(contingencyListName -> {
LOGGER.info("Loading contingencies");
return actionsService.getContingencyList(contingencyListName, context.getNetworkUuid());
})
.collectList();

return Mono.zip(network, contingencies)
Expand All @@ -158,76 +169,81 @@ private Mono<SecurityAnalysisResult> run(SecurityAnalysisRunContext context, UUI
if (resultUuid != null && cancelComputationRequests.get(resultUuid) != null) {
return Mono.empty();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should add a log to all the short circuit returns because without it, the log will look like this:

Run security analysis on contingency lists: ...
loading resource network
loading resource ...
cancelling 
... tons of network-store-client logs because the operation continues...
...and nothing here to remind us that we didn't run the security analysis

} else {
LOGGER.info("Starting security analysis on contingency lists: {}", context.getContingencyListNames().stream().map(SecurityAnalysisWorkerService::sanitizeParam).collect(Collectors.toList()));
return Mono.fromCompletionStage(future);
}
});
}

@Bean
public Consumer<Message<String>> consumeRun() {
return message -> {
SecurityAnalysisResultContext resultContext = SecurityAnalysisResultContext.fromMessage(message, objectMapper);
runRequests.add(resultContext.getResultUuid());

run(resultContext.getRunContext(), resultContext.getResultUuid())
.flatMap(result -> resultRepository.insert(resultContext.getResultUuid(), result)
.then(resultRepository.insertStatus(resultContext.getResultUuid(), SecurityAnalysisStatus.COMPLETED.name()))
.then(Mono.just(result)))
.doOnSuccess(result -> {
if (result != null) { // result available
Message<String> sendMessage = MessageBuilder
.withPayload("")
.setHeader("resultUuid", resultContext.getResultUuid().toString())
.setHeader("receiver", resultContext.getRunContext().getReceiver())
.build();
sendResultMessage(sendMessage);
LOGGER.info("Security analysis complete (resultUuid='{}')", resultContext.getResultUuid());
} else { // result not available : stop computation request
if (cancelComputationRequests.get(resultContext.getResultUuid()) != null) {
stoppedPublisherService.publishCancel(resultContext.getResultUuid(), cancelComputationRequests.get(resultContext.getResultUuid()).getReceiver());
}
}
})
.onErrorResume(throwable -> {
if (!(throwable instanceof CancellationException)) {
LOGGER.error(FAIL_MESSAGE, throwable);
stoppedPublisherService.publishFail(resultContext.getResultUuid(), resultContext.getRunContext().getReceiver(), throwable.getMessage());
return resultRepository.delete(resultContext.getResultUuid()).then(Mono.empty());
}
return Mono.empty();
})
.doFinally(s -> {
futures.remove(resultContext.getResultUuid());
cancelComputationRequests.remove(resultContext.getResultUuid());
runRequests.remove(resultContext.getResultUuid());
})
.subscribe();
};
public Consumer<Flux<Message<String>>> consumeRun() {
return f -> f.log(CATEGORY_BROKER_INPUT, Level.FINE)
.flatMap(message -> {
SecurityAnalysisResultContext resultContext = SecurityAnalysisResultContext.fromMessage(message, objectMapper);
runRequests.add(resultContext.getResultUuid());

return run(resultContext.getRunContext(), resultContext.getResultUuid())
.flatMap(result -> resultRepository.insert(resultContext.getResultUuid(), result)
.then(resultRepository.insertStatus(resultContext.getResultUuid(), SecurityAnalysisStatus.COMPLETED.name()))
.then(Mono.just(result)))
.doOnSuccess(result -> {
if (result != null) { // result available
Message<String> sendMessage = MessageBuilder
.withPayload("")
.setHeader("resultUuid", resultContext.getResultUuid().toString())
.setHeader("receiver", resultContext.getRunContext().getReceiver())
.build();
sendResultMessage(sendMessage);
LOGGER.info("Security analysis complete (resultUuid='{}')", resultContext.getResultUuid());
} else { // result not available : stop computation request
if (cancelComputationRequests.get(resultContext.getResultUuid()) != null) {
stoppedPublisherService.publishCancel(resultContext.getResultUuid(), cancelComputationRequests.get(resultContext.getResultUuid()).getReceiver());
LOGGER.info("Security analysis stopped (resultUuid='{}')", resultContext.getResultUuid());
}
}
})
.onErrorResume(throwable -> {
if (!(throwable instanceof CancellationException)) {
LOGGER.error(FAIL_MESSAGE, throwable);
stoppedPublisherService.publishFail(resultContext.getResultUuid(), resultContext.getRunContext().getReceiver(), throwable.getMessage());
return resultRepository.delete(resultContext.getResultUuid()).then(Mono.empty());
}
return Mono.empty();
})
.doFinally(s -> {
futures.remove(resultContext.getResultUuid());
cancelComputationRequests.remove(resultContext.getResultUuid());
runRequests.remove(resultContext.getResultUuid());
});
})
.subscribe();
}

@Bean
public Consumer<Message<String>> consumeCancel() {
return message -> {
SecurityAnalysisCancelContext cancelContext = SecurityAnalysisCancelContext.fromMessage(message);
public Consumer<Flux<Message<String>>> consumeCancel() {
return f -> f.log(CATEGORY_BROKER_INPUT, Level.FINE)
.flatMap(message -> {
SecurityAnalysisCancelContext cancelContext = SecurityAnalysisCancelContext.fromMessage(message);

if (runRequests.contains(cancelContext.getResultUuid())) {
cancelComputationRequests.put(cancelContext.getResultUuid(), cancelContext);
}
if (runRequests.contains(cancelContext.getResultUuid())) {
cancelComputationRequests.put(cancelContext.getResultUuid(), cancelContext);
}

// find the completableFuture associated with result uuid
CompletableFuture<SecurityAnalysisResult> future = futures.get(cancelContext.getResultUuid());
if (future != null) {
future.cancel(true); // cancel computation in progress

resultRepository.delete(cancelContext.getResultUuid())
.doOnSuccess(unused -> {
stoppedPublisherService.publishCancel(cancelContext.getResultUuid(), cancelContext.getReceiver());
LOGGER.info(CANCEL_MESSAGE + " (resultUuid='{}')", cancelContext.getResultUuid());
})
.doOnError(throwable -> LOGGER.error(throwable.toString(), throwable))
.subscribe();
}
};
// find the completableFuture associated with result uuid
CompletableFuture<SecurityAnalysisResult> future = futures.get(cancelContext.getResultUuid());
if (future != null) {
future.cancel(true); // cancel computation in progress

return resultRepository.delete(cancelContext.getResultUuid())
.doOnSuccess(unused -> {
stoppedPublisherService.publishCancel(cancelContext.getResultUuid(), cancelContext.getReceiver());
LOGGER.info(CANCEL_MESSAGE + " (resultUuid='{}')", cancelContext.getResultUuid());
});
}
return Mono.empty();
})
.onErrorContinue((t, r) -> LOGGER.error("Exception in consumeCancel", t))
.subscribe();
}

private void sendResultMessage(Message<String> message) {
Expand Down