Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,6 @@ public Mono<Void> setStatus(UUID resultUuid, String status) {

public Mono<Void> stop(UUID resultUuid, String receiver) {
return Mono.fromRunnable(() ->
cancelPublisherService.publish(new SecurityAnalysisCancelContext(resultUuid, receiver))).then();
cancelPublisherService.publish(new SecurityAnalysisCancelContext(resultUuid, receiver)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.logging.Level;
import java.util.stream.Collectors;

import static org.gridsuite.securityanalysis.server.service.SecurityAnalysisStoppedPublisherService.CANCEL_MESSAGE;
Expand Down Expand Up @@ -103,13 +104,19 @@ private Mono<Network> getNetwork(UUID networkUuid) {
.subscribeOn(Schedulers.boundedElastic());
}

private Mono<Network> getNetwork(UUID networkUuid, List<UUID> otherNetworkUuids) {
private Mono<Network> getNetwork(UUID networkUuid, List<UUID> otherNetworkUuids, UUID resultUuid) {
Mono<Network> network = getNetwork(networkUuid);
if (otherNetworkUuids.isEmpty()) {
return network;
} else {
Mono<List<Network>> otherNetworks = Flux.fromIterable(otherNetworkUuids)
.flatMap(this::getNetwork)
.flatMap(uuid -> {
if (resultUuid != null && cancelComputationRequests.get(resultUuid) != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually all these are done in parallel so can you remove this check which is not useful ? sorry for asking to add it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

return Mono.empty();
} else {
return getNetwork(uuid);
}
})
.collectList();
return Mono.zip(network, otherNetworks)
.map(t -> {
Expand All @@ -133,87 +140,103 @@ private Mono<SecurityAnalysisResult> run(SecurityAnalysisRunContext context, UUI

LOGGER.info("Run security analysis on contingency lists: {}", context.getContingencyListNames().stream().map(SecurityAnalysisWorkerService::sanitizeParam).collect(Collectors.toList()));

Mono<Network> network = getNetwork(context.getNetworkUuid(), context.getOtherNetworkUuids());
if (resultUuid != null && cancelComputationRequests.get(resultUuid) != null) {
return Mono.empty();
}

LOGGER.info("Loading networks");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logs should happen when the operation is performed, not when the mono is created. There is the same problem for other log statements in this code, we should fix them all

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


Mono<Network> network = getNetwork(context.getNetworkUuid(), context.getOtherNetworkUuids(), resultUuid);

if (resultUuid != null && cancelComputationRequests.get(resultUuid) != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this "if" happens almost at the same time as the previous one, it's not needed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

return Mono.empty();
}

LOGGER.info("Loading contingencies");

Mono<List<Contingency>> contingencies = Flux.fromIterable(context.getContingencyListNames())
.flatMap(contingencyListName -> actionsService.getContingencyList(contingencyListName, context.getNetworkUuid()))
.collectList();

return Mono.zip(network, contingencies)
.flatMap(tuple -> {
SecurityAnalysis securityAnalysis = configService.getSecurityAnalysisFactory().create(tuple.getT1(), LocalComputationManager.getDefault(), 0);
CompletableFuture<SecurityAnalysisResult> future = securityAnalysis.run(VariantManagerConstants.INITIAL_VARIANT_ID, context.getParameters(), n -> tuple.getT2());
if (resultUuid != null) {
futures.put(resultUuid, future);
}
if (resultUuid != null && cancelComputationRequests.get(resultUuid) != null) {
return Mono.empty();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should add a log to all the short circuit returns because without it, the log will look like this:

Run security analysis on contingency lists: ...
loading resource network
loading resource ...
cancelling 
... tons of network-store-client logs because the operation continues...
...and nothing here to remind us that we didn't run the security analysis

} else {
SecurityAnalysis securityAnalysis = configService.getSecurityAnalysisFactory().create(tuple.getT1(), LocalComputationManager.getDefault(), 0);
CompletableFuture<SecurityAnalysisResult> future = securityAnalysis.run(VariantManagerConstants.INITIAL_VARIANT_ID, context.getParameters(), n -> tuple.getT2());
if (resultUuid != null) {
futures.put(resultUuid, future);
}
LOGGER.info("Starting security analysis computation");
return Mono.fromCompletionStage(future);
}
});
}

@Bean
public Consumer<Message<String>> consumeRun() {
return message -> {

SecurityAnalysisResultContext resultContext = SecurityAnalysisResultContext.fromMessage(message, objectMapper);
runRequests.add(resultContext.getResultUuid());

run(resultContext.getRunContext(), resultContext.getResultUuid())
.flatMap(result -> resultRepository.insert(resultContext.getResultUuid(), result)
.then(resultRepository.insertStatus(resultContext.getResultUuid(), SecurityAnalysisStatus.COMPLETED.name()))
.then(Mono.just(result)))
.doOnSuccess(result -> {
if (result != null) { // result available
resultPublisherService.publish(resultContext.getResultUuid(), resultContext.getRunContext().getReceiver());
LOGGER.info("Security analysis complete (resultUuid='{}')", resultContext.getResultUuid());
} else { // result not available : stop computation request
if (cancelComputationRequests.get(resultContext.getResultUuid()) != null) {
stoppedPublisherService.publishCancel(resultContext.getResultUuid(), cancelComputationRequests.get(resultContext.getResultUuid()).getReceiver());
}
}
})
.onErrorResume(throwable -> {
if (!(throwable instanceof CancellationException)) {
LOGGER.error(FAIL_MESSAGE, throwable);
stoppedPublisherService.publishFail(resultContext.getResultUuid(), resultContext.getRunContext().getReceiver(), throwable.getMessage());
return resultRepository.delete(resultContext.getResultUuid()).then(Mono.empty());
}
return Mono.empty();
})
.doFinally(s -> {
futures.remove(resultContext.getResultUuid());
cancelComputationRequests.remove(resultContext.getResultUuid());
runRequests.remove(resultContext.getResultUuid());
})
.subscribe();
};
public Consumer<Flux<Message<String>>> consumeRun() {
return f -> f.log(CATEGORY_BROKER_INPUT, Level.FINE)
.flatMap(message -> {
SecurityAnalysisResultContext resultContext = SecurityAnalysisResultContext.fromMessage(message, objectMapper);
runRequests.add(resultContext.getResultUuid());

return run(resultContext.getRunContext(), resultContext.getResultUuid())
.flatMap(result -> resultRepository.insert(resultContext.getResultUuid(), result)
.then(resultRepository.insertStatus(resultContext.getResultUuid(), SecurityAnalysisStatus.COMPLETED.name()))
.then(Mono.just(result)))
.doOnSuccess(result -> {
if (result != null) { // result available
resultPublisherService.publish(resultContext.getResultUuid(), resultContext.getRunContext().getReceiver());
LOGGER.info("Security analysis complete (resultUuid='{}')", resultContext.getResultUuid());
} else { // result not available : stop computation request
if (cancelComputationRequests.get(resultContext.getResultUuid()) != null) {
stoppedPublisherService.publishCancel(resultContext.getResultUuid(), cancelComputationRequests.get(resultContext.getResultUuid()).getReceiver());
LOGGER.info("Security analysis stopped (resultUuid='{}')", resultContext.getResultUuid());
}
}
})
.onErrorResume(throwable -> {
if (!(throwable instanceof CancellationException)) {
LOGGER.error(FAIL_MESSAGE, throwable);
stoppedPublisherService.publishFail(resultContext.getResultUuid(), resultContext.getRunContext().getReceiver(), throwable.getMessage());
return resultRepository.delete(resultContext.getResultUuid()).then(Mono.empty());
}
return Mono.empty();
})
.doFinally(s -> {
futures.remove(resultContext.getResultUuid());
cancelComputationRequests.remove(resultContext.getResultUuid());
runRequests.remove(resultContext.getResultUuid());
});
})
.subscribe();
}

@Bean
public Consumer<Message<String>> consumeCancel() {
return message -> {
SecurityAnalysisCancelContext cancelContext = SecurityAnalysisCancelContext.fromMessage(message);
public Consumer<Flux<Message<String>>> consumeCancel() {
return f -> f.log(CATEGORY_BROKER_INPUT, Level.FINE)
.flatMap(message -> {
SecurityAnalysisCancelContext cancelContext = SecurityAnalysisCancelContext.fromMessage(message);

if (runRequests.contains(cancelContext.getResultUuid())) {
cancelComputationRequests.put(cancelContext.getResultUuid(), cancelContext);
}
if (runRequests.contains(cancelContext.getResultUuid())) {
cancelComputationRequests.put(cancelContext.getResultUuid(), cancelContext);
}

// find the completableFuture associated with result uuid
CompletableFuture<SecurityAnalysisResult> future = futures.get(cancelContext.getResultUuid());
if (future != null) {
future.cancel(true); // cancel computation in progress

resultRepository.delete(cancelContext.getResultUuid())
.doOnSuccess(unused -> {
stoppedPublisherService.publishCancel(cancelContext.getResultUuid(), cancelContext.getReceiver());
LOGGER.info(CANCEL_MESSAGE + " (resultUuid='{}')", cancelContext.getResultUuid());
})
.doOnError(throwable -> LOGGER.error(throwable.toString(), throwable))
.subscribe();
}
};
// find the completableFuture associated with result uuid
CompletableFuture<SecurityAnalysisResult> future = futures.get(cancelContext.getResultUuid());
if (future != null) {
future.cancel(true); // cancel computation in progress

return resultRepository.delete(cancelContext.getResultUuid())
.doOnSuccess(unused -> {
stoppedPublisherService.publishCancel(cancelContext.getResultUuid(), cancelContext.getReceiver());
LOGGER.info(CANCEL_MESSAGE + " (resultUuid='{}')", cancelContext.getResultUuid());
});
}
return Mono.empty();
})
.onErrorContinue((t, r) -> LOGGER.error("Exception in consumeCancel", t))
.subscribe();
}
}