Skip to content

Commit e6944e2

Browse files
Stop a security analysis computation in progress (#8)
Stop a security analysis computation in progress
1 parent f1b452e commit e6944e2

File tree

8 files changed

+280
-20
lines changed

8 files changed

+280
-20
lines changed

src/main/java/org/gridsuite/securityanalysis/server/SecurityAnalysisController.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535

3636
/**
3737
* @author Geoffroy Jamgotchian <geoffroy.jamgotchian at rte-france.com>
38+
* @author Franck Lecuyer <franck.lecuyer at rte-france.com>
3839
*/
3940
@RestController
4041
@RequestMapping(value = "/" + SecurityAnalysisApi.API_VERSION)
@@ -134,4 +135,13 @@ public ResponseEntity<Mono<Void>> invalidateStatus(@Parameter(description = "Res
134135
Mono<Void> result = service.setStatus(resultUuid, SecurityAnalysisStatus.NOT_DONE.name());
135136
return ResponseEntity.ok().body(result);
136137
}
138+
139+
@PutMapping(value = "/results/{resultUuid}/stop", produces = APPLICATION_JSON_VALUE)
140+
@Operation(summary = "Stop a security analysis computation")
141+
@ApiResponses(value = {@ApiResponse(responseCode = "200", description = "The security analysis has been stopped")})
142+
public ResponseEntity<Mono<Void>> stop(@Parameter(description = "Result UUID") @PathVariable("resultUuid") UUID resultUuid,
143+
@Parameter(description = "Result receiver") @RequestParam(name = "receiver", required = false) String receiver) {
144+
Mono<Void> result = service.stop(resultUuid, receiver);
145+
return ResponseEntity.ok().body(result);
146+
}
137147
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/**
2+
* Copyright (c) 2021, RTE (http://www.rte-france.com)
3+
* This Source Code Form is subject to the terms of the Mozilla Public
4+
* License, v. 2.0. If a copy of the MPL was not distributed with this
5+
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
6+
*/
7+
package org.gridsuite.securityanalysis.server.service;
8+
9+
import com.powsybl.commons.PowsyblException;
10+
import org.springframework.messaging.Message;
11+
import org.springframework.messaging.MessageHeaders;
12+
import org.springframework.messaging.support.MessageBuilder;
13+
14+
import java.util.Objects;
15+
import java.util.UUID;
16+
17+
/**
18+
* @author Franck Lecuyer <franck.lecuyer at rte-france.com>
19+
*/
20+
public class SecurityAnalysisCancelContext {
21+
22+
private final UUID resultUuid;
23+
24+
private final String receiver;
25+
26+
public SecurityAnalysisCancelContext(UUID resultUuid, String receiver) {
27+
this.resultUuid = Objects.requireNonNull(resultUuid);
28+
this.receiver = Objects.requireNonNull(receiver);
29+
}
30+
31+
public UUID getResultUuid() {
32+
return resultUuid;
33+
}
34+
35+
public String getReceiver() {
36+
return receiver;
37+
}
38+
39+
private static String getNonNullHeader(MessageHeaders headers, String name) {
40+
String header = (String) headers.get(name);
41+
if (header == null) {
42+
throw new PowsyblException("Header '" + name + "' not found");
43+
}
44+
return header;
45+
}
46+
47+
public static SecurityAnalysisCancelContext fromMessage(Message<String> message) {
48+
Objects.requireNonNull(message);
49+
MessageHeaders headers = message.getHeaders();
50+
UUID resultUuid = UUID.fromString(getNonNullHeader(headers, "resultUuid"));
51+
String receiver = (String) headers.get("receiver");
52+
return new SecurityAnalysisCancelContext(resultUuid, receiver);
53+
}
54+
55+
public Message<String> toMessage() {
56+
return MessageBuilder.withPayload("")
57+
.setHeader("resultUuid", resultUuid.toString())
58+
.setHeader("receiver", receiver)
59+
.build();
60+
}
61+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/**
2+
* Copyright (c) 2021, RTE (http://www.rte-france.com)
3+
* This Source Code Form is subject to the terms of the Mozilla Public
4+
* License, v. 2.0. If a copy of the MPL was not distributed with this
5+
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
6+
*/
7+
package org.gridsuite.securityanalysis.server.service;
8+
9+
import org.springframework.context.annotation.Bean;
10+
import org.springframework.messaging.Message;
11+
import org.springframework.stereotype.Service;
12+
import reactor.core.publisher.EmitterProcessor;
13+
import reactor.core.publisher.Flux;
14+
15+
import java.util.Objects;
16+
import java.util.function.Supplier;
17+
import java.util.logging.Level;
18+
19+
/**
20+
* @author Franck Lecuyer <franck.lecuyer at rte-france.com>
21+
*/
22+
@Service
23+
public class SecurityAnalysisCancelPublisherService {
24+
25+
private static final String CATEGORY_BROKER_OUTPUT = SecurityAnalysisCancelPublisherService.class.getName()
26+
+ ".output-broker-messages";
27+
28+
private final EmitterProcessor<Message<String>> cancelMessagePublisher = EmitterProcessor.create();
29+
30+
@Bean
31+
public Supplier<Flux<Message<String>>> publishCancel() {
32+
return () -> cancelMessagePublisher.log(CATEGORY_BROKER_OUTPUT, Level.FINE);
33+
}
34+
35+
public void publish(SecurityAnalysisCancelContext cancelContext) {
36+
Objects.requireNonNull(cancelContext);
37+
cancelMessagePublisher.onNext(cancelContext.toMessage());
38+
}
39+
}

src/main/java/org/gridsuite/securityanalysis/server/service/SecurityAnalysisService.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,25 @@
1919

2020
/**
2121
* @author Geoffroy Jamgotchian <geoffroy.jamgotchian at rte-france.com>
22+
* @author Franck Lecuyer <franck.lecuyer at rte-france.com>
2223
*/
2324
@Service
2425
public class SecurityAnalysisService {
2526
private SecurityAnalysisResultRepository resultRepository;
2627

2728
private SecurityAnalysisRunPublisherService runPublisherService;
2829

30+
private SecurityAnalysisCancelPublisherService cancelPublisherService;
31+
2932
private UuidGeneratorService uuidGeneratorService;
3033

3134
public SecurityAnalysisService(SecurityAnalysisResultRepository resultRepository,
3235
SecurityAnalysisRunPublisherService runPublisherService,
36+
SecurityAnalysisCancelPublisherService cancelPublisherService,
3337
UuidGeneratorService uuidGeneratorService) {
3438
this.resultRepository = Objects.requireNonNull(resultRepository);
3539
this.runPublisherService = Objects.requireNonNull(runPublisherService);
40+
this.cancelPublisherService = Objects.requireNonNull(cancelPublisherService);
3641
this.uuidGeneratorService = Objects.requireNonNull(uuidGeneratorService);
3742
}
3843

@@ -66,4 +71,9 @@ public Mono<String> getStatus(UUID resultUuid) {
6671
public Mono<Void> setStatus(UUID resultUuid, String status) {
6772
return resultRepository.insertStatus(resultUuid, status);
6873
}
74+
75+
public Mono<Void> stop(UUID resultUuid, String receiver) {
76+
return Mono.fromRunnable(() ->
77+
cancelPublisherService.publish(new SecurityAnalysisCancelContext(resultUuid, receiver))).then();
78+
}
6979
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/**
2+
* Copyright (c) 2021, RTE (http://www.rte-france.com)
3+
* This Source Code Form is subject to the terms of the Mozilla Public
4+
* License, v. 2.0. If a copy of the MPL was not distributed with this
5+
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
6+
*/
7+
package org.gridsuite.securityanalysis.server.service;
8+
9+
import org.springframework.context.annotation.Bean;
10+
import org.springframework.messaging.Message;
11+
import org.springframework.messaging.support.MessageBuilder;
12+
import org.springframework.stereotype.Service;
13+
import reactor.core.publisher.EmitterProcessor;
14+
import reactor.core.publisher.Flux;
15+
16+
import java.util.UUID;
17+
import java.util.function.Supplier;
18+
import java.util.logging.Level;
19+
20+
/**
21+
* @author Franck Lecuyer <franck.lecuyer at rte-france.com>
22+
*/
23+
@Service
24+
public class SecurityAnalysisStoppedPublisherService {
25+
26+
private static final String CATEGORY_BROKER_OUTPUT = SecurityAnalysisStoppedPublisherService.class.getName()
27+
+ ".output-broker-messages";
28+
29+
private final EmitterProcessor<Message<String>> stoppedMessagePublisher = EmitterProcessor.create();
30+
31+
@Bean
32+
public Supplier<Flux<Message<String>>> publishStopped() {
33+
return () -> stoppedMessagePublisher.log(CATEGORY_BROKER_OUTPUT, Level.FINE);
34+
}
35+
36+
public void publish(UUID resultUuid, String receiver) {
37+
stoppedMessagePublisher.onNext(MessageBuilder
38+
.withPayload("")
39+
.setHeader("resultUuid", resultUuid.toString())
40+
.setHeader("receiver", receiver)
41+
.build());
42+
}
43+
}

src/main/java/org/gridsuite/securityanalysis/server/service/SecurityAnalysisWorkerService.java

Lines changed: 88 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
package org.gridsuite.securityanalysis.server.service;
88

99
import com.fasterxml.jackson.databind.ObjectMapper;
10+
import com.google.common.collect.Sets;
1011
import com.powsybl.commons.PowsyblException;
1112
import com.powsybl.computation.local.LocalComputationManager;
1213
import com.powsybl.contingency.Contingency;
@@ -32,15 +33,19 @@
3233

3334
import java.util.ArrayList;
3435
import java.util.List;
36+
import java.util.Map;
3537
import java.util.Objects;
38+
import java.util.Set;
3639
import java.util.UUID;
40+
import java.util.concurrent.CancellationException;
3741
import java.util.concurrent.CompletableFuture;
42+
import java.util.concurrent.ConcurrentHashMap;
3843
import java.util.function.Consumer;
39-
import java.util.logging.Level;
4044
import java.util.stream.Collectors;
4145

4246
/**
4347
* @author Geoffroy Jamgotchian <geoffroy.jamgotchian at rte-france.com>
48+
* @author Franck Lecuyer <franck.lecuyer at rte-france.com>
4449
*/
4550
@Service
4651
public class SecurityAnalysisWorkerService {
@@ -62,15 +67,26 @@ public class SecurityAnalysisWorkerService {
6267

6368
private SecurityAnalysisResultPublisherService resultPublisherService;
6469

70+
private SecurityAnalysisStoppedPublisherService stoppedPublisherService;
71+
72+
private Map<UUID, CompletableFuture<SecurityAnalysisResult>> futures = new ConcurrentHashMap<>();
73+
74+
private Map<UUID, SecurityAnalysisCancelContext> cancelComputationRequests = new ConcurrentHashMap<>();
75+
76+
private Set<UUID> runRequests = Sets.newConcurrentHashSet();
77+
6578
public SecurityAnalysisWorkerService(NetworkStoreService networkStoreService, ActionsService actionsService,
6679
SecurityAnalysisResultRepository resultRepository, ObjectMapper objectMapper,
67-
SecurityAnalysisConfigService configService, SecurityAnalysisResultPublisherService resultPublisherService) {
80+
SecurityAnalysisConfigService configService,
81+
SecurityAnalysisResultPublisherService resultPublisherService,
82+
SecurityAnalysisStoppedPublisherService stoppedPublisherService) {
6883
this.networkStoreService = Objects.requireNonNull(networkStoreService);
6984
this.actionsService = Objects.requireNonNull(actionsService);
7085
this.resultRepository = Objects.requireNonNull(resultRepository);
7186
this.objectMapper = Objects.requireNonNull(objectMapper);
7287
this.configService = Objects.requireNonNull(configService);
7388
this.resultPublisherService = Objects.requireNonNull(resultPublisherService);
89+
this.stoppedPublisherService = Objects.requireNonNull(stoppedPublisherService);
7490
}
7591

7692
private static String sanitizeParam(String param) {
@@ -111,6 +127,10 @@ private Mono<Network> getNetwork(UUID networkUuid, List<UUID> otherNetworkUuids)
111127
}
112128

113129
public Mono<SecurityAnalysisResult> run(SecurityAnalysisRunContext context) {
130+
return run(context, null);
131+
}
132+
133+
private Mono<SecurityAnalysisResult> run(SecurityAnalysisRunContext context, UUID resultUuid) {
114134
Objects.requireNonNull(context);
115135

116136
LOGGER.info("Run security analysis on contingency lists: {}", context.getContingencyListNames().stream().map(SecurityAnalysisWorkerService::sanitizeParam).collect(Collectors.toList()));
@@ -125,26 +145,75 @@ public Mono<SecurityAnalysisResult> run(SecurityAnalysisRunContext context) {
125145
return Mono.zip(network, contingencies)
126146
.flatMap(tuple -> {
127147
SecurityAnalysis securityAnalysis = configService.getSecurityAnalysisFactory().create(tuple.getT1(), LocalComputationManager.getDefault(), 0);
128-
CompletableFuture<SecurityAnalysisResult> result = securityAnalysis.run(VariantManagerConstants.INITIAL_VARIANT_ID, context.getParameters(), n -> tuple.getT2());
129-
return Mono.fromCompletionStage(result);
148+
CompletableFuture<SecurityAnalysisResult> future = securityAnalysis.run(VariantManagerConstants.INITIAL_VARIANT_ID, context.getParameters(), n -> tuple.getT2());
149+
if (resultUuid != null) {
150+
futures.put(resultUuid, future);
151+
}
152+
if (resultUuid != null && cancelComputationRequests.get(resultUuid) != null) {
153+
return Mono.empty();
154+
} else {
155+
return Mono.fromCompletionStage(future);
156+
}
130157
});
131158
}
132159

133160
@Bean
134-
public Consumer<Flux<Message<String>>> consumeRun() {
135-
return f -> f.log(CATEGORY_BROKER_INPUT, Level.FINE)
136-
.flatMap(message -> {
137-
SecurityAnalysisResultContext resultContext = SecurityAnalysisResultContext.fromMessage(message, objectMapper);
138-
139-
return run(resultContext.getRunContext())
140-
.flatMap(result -> resultRepository.insert(resultContext.getResultUuid(), result)
141-
.then(resultRepository.insertStatus(resultContext.getResultUuid(), SecurityAnalysisStatus.COMPLETED.name())))
142-
.doOnSuccess(unused -> {
143-
resultPublisherService.publish(resultContext.getResultUuid(), resultContext.getRunContext().getReceiver());
144-
LOGGER.info("Security analysis complete (resultUuid='{}')", resultContext.getResultUuid());
145-
});
146-
})
147-
.doOnError(throwable -> LOGGER.error(throwable.toString(), throwable))
148-
.subscribe();
161+
public Consumer<Message<String>> consumeRun() {
162+
return message -> {
163+
164+
SecurityAnalysisResultContext resultContext = SecurityAnalysisResultContext.fromMessage(message, objectMapper);
165+
runRequests.add(resultContext.getResultUuid());
166+
167+
run(resultContext.getRunContext(), resultContext.getResultUuid())
168+
.flatMap(result -> resultRepository.insert(resultContext.getResultUuid(), result)
169+
.then(resultRepository.insertStatus(resultContext.getResultUuid(), SecurityAnalysisStatus.COMPLETED.name()))
170+
.then(Mono.just(result)))
171+
.doOnSuccess(result -> {
172+
if (result != null) { // result available
173+
resultPublisherService.publish(resultContext.getResultUuid(), resultContext.getRunContext().getReceiver());
174+
LOGGER.info("Security analysis complete (resultUuid='{}')", resultContext.getResultUuid());
175+
} else { // result not available : stop computation request
176+
if (cancelComputationRequests.get(resultContext.getResultUuid()) != null) {
177+
stoppedPublisherService.publish(resultContext.getResultUuid(), cancelComputationRequests.get(resultContext.getResultUuid()).getReceiver());
178+
}
179+
}
180+
})
181+
.doOnError(throwable -> {
182+
if (!(throwable instanceof CancellationException)) {
183+
LOGGER.error(throwable.toString(), throwable);
184+
}
185+
})
186+
.doFinally(s -> {
187+
futures.remove(resultContext.getResultUuid());
188+
cancelComputationRequests.remove(resultContext.getResultUuid());
189+
runRequests.remove(resultContext.getResultUuid());
190+
})
191+
.subscribe();
192+
};
193+
}
194+
195+
@Bean
196+
public Consumer<Message<String>> consumeCancel() {
197+
return message -> {
198+
SecurityAnalysisCancelContext cancelContext = SecurityAnalysisCancelContext.fromMessage(message);
199+
200+
if (runRequests.contains(cancelContext.getResultUuid())) {
201+
cancelComputationRequests.put(cancelContext.getResultUuid(), cancelContext);
202+
}
203+
204+
// find the completableFuture associated with result uuid
205+
CompletableFuture<SecurityAnalysisResult> future = futures.get(cancelContext.getResultUuid());
206+
if (future != null) {
207+
future.cancel(true); // cancel computation in progress
208+
209+
resultRepository.delete(cancelContext.getResultUuid())
210+
.doOnSuccess(unused -> {
211+
stoppedPublisherService.publish(cancelContext.getResultUuid(), cancelContext.getReceiver());
212+
LOGGER.info("Security analysis stopped (resultUuid='{}')", cancelContext.getResultUuid());
213+
})
214+
.doOnError(throwable -> LOGGER.error(throwable.toString(), throwable))
215+
.subscribe();
216+
}
217+
};
149218
}
150219
}

src/main/resources/application.yaml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ spring:
55

66
cloud:
77
function:
8-
definition: consumeRun;publishRun;publishResult
8+
definition: consumeRun;publishRun;publishResult;consumeCancel;publishCancel;publishStopped
99
stream:
1010
bindings:
1111
consumeRun-in-0:
@@ -15,6 +15,12 @@ spring:
1515
destination: sa.run
1616
publishResult-out-0:
1717
destination: sa.result
18+
consumeCancel-in-0:
19+
destination: sa.cancel
20+
publishCancel-out-0:
21+
destination: sa.cancel
22+
publishStopped-out-0:
23+
destination: sa.stopped
1824

1925
springdoc:
2026
swagger-ui:

0 commit comments

Comments
 (0)